Skip to content

Commit

Permalink
docs(deferring): reorder deferring paragraphs
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Aug 1, 2024
1 parent e839032 commit d80dd1b
Showing 1 changed file with 60 additions and 54 deletions.
114 changes: 60 additions & 54 deletions docs/apache-airflow/authoring-and-scheduling/deferring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,59 +97,6 @@ When writing a deferrable operators these are the main points to consider:
return
Exiting deferred task from Triggers
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. versionadded:: 2.10.0

If you want to exit your task directly from the triggerer without going into the worker, you can specify the instance level attribute ``end_from_trigger`` with the attributes of your deferrable operator, as discussed above. This can save some resources needed to start a new worker.

Triggers can have two options: they can either send execution back to the worker or end the task instance directly. If the trigger ends the task instance itself, the ``method_name`` does not matter and can be ``None``. Otherwise, provide ``method_name`` that should be used when resuming execution in the task.

.. code-block:: python
class WaitFiveHourSensorAsync(BaseSensorOperator):
# this sensor always exits from trigger.
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.end_from_trigger = True
def execute(self, context: Context) -> NoReturn:
self.defer(
method_name=None,
trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger),
)
``TaskSuccessEvent`` and ``TaskFailureEvent`` are the two events that can be used to end the task instance directly. This marks the task with the state ``task_instance_state`` and optionally pushes xcom if applicable. Here's an example of how to use these events:

.. code-block:: python
class WaitFiveHourTrigger(BaseTrigger):
def __init__(self, duration: timedelta, *, end_from_trigger: bool = False):
super().__init__()
self.duration = duration
self.end_from_trigger = end_from_trigger
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"your_module.WaitFiveHourTrigger",
{"duration": self.duration, "end_from_trigger": self.end_from_trigger},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
await asyncio.sleep(self.duration.total_seconds())
if self.end_from_trigger:
yield TaskSuccessEvent()
else:
yield TriggerEvent({"duration": self.duration})
In the above example, the trigger will end the task instance directly if ``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. Otherwise, it will resume the task instance with the method specified in the operator.

.. note::
Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the DAG parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases.

Writing Triggers
~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -346,7 +293,9 @@ In the sensor part, we'll need to provide the path to ``HourDeltaTrigger`` as ``
return
The initialization stage of mapped tasks occurs after the scheduler submits them to the executor. Thus, this feature offers limited dynamic task mapping support and its usage differs from standard practices. To enable dynamic task mapping support, you need to define ``start_from_trigger`` and ``trigger_kwargs`` in the ``__init__`` method. **Note that you don't need to define both of them to use this feature, but you need to use the exact same parameter name.** For example, if you define an argument as ``t_kwargs`` and assign this value to ``self.start_trigger_args.trigger_kwargs``, it will not have any effect. The entire ``__init__`` method will be skipped when mapping a task whose ``start_from_trigger`` is set to True. The scheduler will use the provided ``start_from_trigger`` and ``trigger_kwargs`` from ``partial`` and ``expand`` (fallbacks to the ones from class attributes if not provided) to determine whether and how to submit tasks to the executor or the triggerer. Note that XCom values won't be resolved at this stage. After the trigger has finished executing, the task may be sent back to the worker to execute the ``next_method``, or the task instance may end directly. If the task is sent back to the worker, the arguments in the ``__init__`` method will still take effect before the ``next_method``is executed, but they will not affect the execution of the trigger.
The initialization stage of mapped tasks occurs after the scheduler submits them to the executor. Thus, this feature offers limited dynamic task mapping support and its usage differs from standard practices. To enable dynamic task mapping support, you need to define ``start_from_trigger`` and ``trigger_kwargs`` in the ``__init__`` method. **Note that you don't need to define both of them to use this feature, but you need to use the exact same parameter name.** For example, if you define an argument as ``t_kwargs`` and assign this value to ``self.start_trigger_args.trigger_kwargs``, it will not have any effect. The entire ``__init__`` method will be skipped when mapping a task whose ``start_from_trigger`` is set to True. The scheduler will use the provided ``start_from_trigger`` and ``trigger_kwargs`` from ``partial`` and ``expand`` (fallbacks to the ones from class attributes if not provided) to determine whether and how to submit tasks to the executor or the triggerer. Note that XCom values won't be resolved at this stage.
After the trigger has finished executing, the task may be sent back to the worker to execute the ``next_method``, or the task instance may end directly. (Refer to :ref:`Exiting deferred task from Triggers<deferring/exiting_from_trigger>`) If the task is sent back to the worker, the arguments in the ``__init__`` method will still take effect before the ``next_method``is executed, but they will not affect the execution of the trigger.
.. code-block:: python
Expand Down Expand Up @@ -395,6 +344,63 @@ This will be expanded into 2 tasks, with their "hours" arguments set to 1 and 2
trigger_kwargs=[{"hours": 1}, {"hours": 2}]
)
.. _deferring/exiting_from_trigger:
Exiting deferred task from Triggers
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. versionadded:: 2.10.0
If you want to exit your task directly from the triggerer without going into the worker, you can specify the instance level attribute ``end_from_trigger`` with the attributes of your deferrable operator, as discussed above. This can save some resources needed to start a new worker.

Triggers can have two options: they can either send execution back to the worker or end the task instance directly. If the trigger ends the task instance itself, the ``method_name`` does not matter and can be ``None``. Otherwise, provide ``method_name`` that should be used when resuming execution in the task.

.. code-block:: python
class WaitFiveHourSensorAsync(BaseSensorOperator):
# this sensor always exits from trigger.
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.end_from_trigger = True
def execute(self, context: Context) -> NoReturn:
self.defer(
method_name=None,
trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger),
)
``TaskSuccessEvent`` and ``TaskFailureEvent`` are the two events that can be used to end the task instance directly. This marks the task with the state ``task_instance_state`` and optionally pushes xcom if applicable. Here's an example of how to use these events:

.. code-block:: python
class WaitFiveHourTrigger(BaseTrigger):
def __init__(self, duration: timedelta, *, end_from_trigger: bool = False):
super().__init__()
self.duration = duration
self.end_from_trigger = end_from_trigger
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"your_module.WaitFiveHourTrigger",
{"duration": self.duration, "end_from_trigger": self.end_from_trigger},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
await asyncio.sleep(self.duration.total_seconds())
if self.end_from_trigger:
yield TaskSuccessEvent()
else:
yield TriggerEvent({"duration": self.duration})
In the above example, the trigger will end the task instance directly if ``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. Otherwise, it will resume the task instance with the method specified in the operator.

.. note::
Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the DAG parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases.


High Availability
-----------------

Expand Down

0 comments on commit d80dd1b

Please sign in to comment.