From d80dd1b0bda0a6e19959d7346d9043cfa0e332a8 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 1 Aug 2024 17:55:42 +0800 Subject: [PATCH] docs(deferring): reorder deferring paragraphs --- .../authoring-and-scheduling/deferring.rst | 114 +++++++++--------- 1 file changed, 60 insertions(+), 54 deletions(-) diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst b/docs/apache-airflow/authoring-and-scheduling/deferring.rst index 8336a5ba7ead31..45be9280369d0c 100644 --- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst +++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst @@ -97,59 +97,6 @@ When writing a deferrable operators these are the main points to consider: return -Exiting deferred task from Triggers -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - .. versionadded:: 2.10.0 - -If you want to exit your task directly from the triggerer without going into the worker, you can specify the instance level attribute ``end_from_trigger`` with the attributes of your deferrable operator, as discussed above. This can save some resources needed to start a new worker. - -Triggers can have two options: they can either send execution back to the worker or end the task instance directly. If the trigger ends the task instance itself, the ``method_name`` does not matter and can be ``None``. Otherwise, provide ``method_name`` that should be used when resuming execution in the task. - -.. code-block:: python - - class WaitFiveHourSensorAsync(BaseSensorOperator): - # this sensor always exits from trigger. - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self.end_from_trigger = True - - def execute(self, context: Context) -> NoReturn: - self.defer( - method_name=None, - trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger), - ) - - -``TaskSuccessEvent`` and ``TaskFailureEvent`` are the two events that can be used to end the task instance directly. This marks the task with the state ``task_instance_state`` and optionally pushes xcom if applicable. Here's an example of how to use these events: - -.. code-block:: python - - - class WaitFiveHourTrigger(BaseTrigger): - def __init__(self, duration: timedelta, *, end_from_trigger: bool = False): - super().__init__() - self.duration = duration - self.end_from_trigger = end_from_trigger - - def serialize(self) -> tuple[str, dict[str, Any]]: - return ( - "your_module.WaitFiveHourTrigger", - {"duration": self.duration, "end_from_trigger": self.end_from_trigger}, - ) - - async def run(self) -> AsyncIterator[TriggerEvent]: - await asyncio.sleep(self.duration.total_seconds()) - if self.end_from_trigger: - yield TaskSuccessEvent() - else: - yield TriggerEvent({"duration": self.duration}) - -In the above example, the trigger will end the task instance directly if ``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. Otherwise, it will resume the task instance with the method specified in the operator. - -.. note:: - Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the DAG parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases. - Writing Triggers ~~~~~~~~~~~~~~~~ @@ -346,7 +293,9 @@ In the sensor part, we'll need to provide the path to ``HourDeltaTrigger`` as `` return -The initialization stage of mapped tasks occurs after the scheduler submits them to the executor. Thus, this feature offers limited dynamic task mapping support and its usage differs from standard practices. To enable dynamic task mapping support, you need to define ``start_from_trigger`` and ``trigger_kwargs`` in the ``__init__`` method. **Note that you don't need to define both of them to use this feature, but you need to use the exact same parameter name.** For example, if you define an argument as ``t_kwargs`` and assign this value to ``self.start_trigger_args.trigger_kwargs``, it will not have any effect. The entire ``__init__`` method will be skipped when mapping a task whose ``start_from_trigger`` is set to True. The scheduler will use the provided ``start_from_trigger`` and ``trigger_kwargs`` from ``partial`` and ``expand`` (fallbacks to the ones from class attributes if not provided) to determine whether and how to submit tasks to the executor or the triggerer. Note that XCom values won't be resolved at this stage. After the trigger has finished executing, the task may be sent back to the worker to execute the ``next_method``, or the task instance may end directly. If the task is sent back to the worker, the arguments in the ``__init__`` method will still take effect before the ``next_method``is executed, but they will not affect the execution of the trigger. +The initialization stage of mapped tasks occurs after the scheduler submits them to the executor. Thus, this feature offers limited dynamic task mapping support and its usage differs from standard practices. To enable dynamic task mapping support, you need to define ``start_from_trigger`` and ``trigger_kwargs`` in the ``__init__`` method. **Note that you don't need to define both of them to use this feature, but you need to use the exact same parameter name.** For example, if you define an argument as ``t_kwargs`` and assign this value to ``self.start_trigger_args.trigger_kwargs``, it will not have any effect. The entire ``__init__`` method will be skipped when mapping a task whose ``start_from_trigger`` is set to True. The scheduler will use the provided ``start_from_trigger`` and ``trigger_kwargs`` from ``partial`` and ``expand`` (fallbacks to the ones from class attributes if not provided) to determine whether and how to submit tasks to the executor or the triggerer. Note that XCom values won't be resolved at this stage. + +After the trigger has finished executing, the task may be sent back to the worker to execute the ``next_method``, or the task instance may end directly. (Refer to :ref:`Exiting deferred task from Triggers`) If the task is sent back to the worker, the arguments in the ``__init__`` method will still take effect before the ``next_method``is executed, but they will not affect the execution of the trigger. .. code-block:: python @@ -395,6 +344,63 @@ This will be expanded into 2 tasks, with their "hours" arguments set to 1 and 2 trigger_kwargs=[{"hours": 1}, {"hours": 2}] ) + +.. _deferring/exiting_from_trigger: + +Exiting deferred task from Triggers +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + .. versionadded:: 2.10.0 + +If you want to exit your task directly from the triggerer without going into the worker, you can specify the instance level attribute ``end_from_trigger`` with the attributes of your deferrable operator, as discussed above. This can save some resources needed to start a new worker. + +Triggers can have two options: they can either send execution back to the worker or end the task instance directly. If the trigger ends the task instance itself, the ``method_name`` does not matter and can be ``None``. Otherwise, provide ``method_name`` that should be used when resuming execution in the task. + +.. code-block:: python + + class WaitFiveHourSensorAsync(BaseSensorOperator): + # this sensor always exits from trigger. + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self.end_from_trigger = True + + def execute(self, context: Context) -> NoReturn: + self.defer( + method_name=None, + trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger), + ) + + +``TaskSuccessEvent`` and ``TaskFailureEvent`` are the two events that can be used to end the task instance directly. This marks the task with the state ``task_instance_state`` and optionally pushes xcom if applicable. Here's an example of how to use these events: + +.. code-block:: python + + + class WaitFiveHourTrigger(BaseTrigger): + def __init__(self, duration: timedelta, *, end_from_trigger: bool = False): + super().__init__() + self.duration = duration + self.end_from_trigger = end_from_trigger + + def serialize(self) -> tuple[str, dict[str, Any]]: + return ( + "your_module.WaitFiveHourTrigger", + {"duration": self.duration, "end_from_trigger": self.end_from_trigger}, + ) + + async def run(self) -> AsyncIterator[TriggerEvent]: + await asyncio.sleep(self.duration.total_seconds()) + if self.end_from_trigger: + yield TaskSuccessEvent() + else: + yield TriggerEvent({"duration": self.duration}) + +In the above example, the trigger will end the task instance directly if ``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. Otherwise, it will resume the task instance with the method specified in the operator. + +.. note:: + Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the DAG parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases. + + High Availability -----------------