From b28e8bfd035085c9cf6d74df13f87d5dc3f4d8d5 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 1 Aug 2024 21:20:47 +0800 Subject: [PATCH] Allowing DateTimeSensorAsync, FileSensor and TimeSensorAsync to start execution from trigger during dynamic task mapping (#41182) * docs(deferring): reword the doc to enable dynamic task mapping support to start execution from the trigger feature * feat(sensors): add trigger_kwargs argument to DateTimeSensorAsync, FileSensor and TimeSensorAsync * Add start execution from trigger during dynamic task mapping. --- airflow/sensors/date_time.py | 11 +- airflow/sensors/filesystem.py | 5 +- airflow/sensors/time_sensor.py | 6 +- .../authoring-and-scheduling/deferring.rst | 119 ++++++++++-------- 4 files changed, 83 insertions(+), 58 deletions(-) diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py index 31979b9b4b0d7..20a6a484e05a4 100644 --- a/airflow/sensors/date_time.py +++ b/airflow/sensors/date_time.py @@ -87,6 +87,8 @@ class DateTimeSensorAsync(DateTimeSensor): :param target_time: datetime after which the job succeeds. (templated) :param start_from_trigger: Start the task directly from the triggerer without going into the worker. + :param trigger_kwargs: The keyword arguments passed to the trigger when start_from_trigger is set to True + during dynamic task mapping. This argument is not used in standard usage. :param end_from_trigger: End the task directly from the triggerer without going into the worker. """ @@ -99,7 +101,14 @@ class DateTimeSensorAsync(DateTimeSensor): ) start_from_trigger = False - def __init__(self, *, start_from_trigger: bool = False, end_from_trigger: bool = False, **kwargs) -> None: + def __init__( + self, + *, + start_from_trigger: bool = False, + end_from_trigger: bool = False, + trigger_kwargs: dict[str, Any] | None = None, + **kwargs, + ) -> None: super().__init__(**kwargs) self.end_from_trigger = end_from_trigger diff --git a/airflow/sensors/filesystem.py b/airflow/sensors/filesystem.py index f8e96725c1cb9..5d32ab07ad4e7 100644 --- a/airflow/sensors/filesystem.py +++ b/airflow/sensors/filesystem.py @@ -21,7 +21,7 @@ import os from functools import cached_property from glob import glob -from typing import TYPE_CHECKING, Sequence +from typing import TYPE_CHECKING, Any, Sequence from airflow.configuration import conf from airflow.exceptions import AirflowException @@ -50,6 +50,8 @@ class FileSensor(BaseSensorOperator): :param deferrable: If waiting for completion, whether to defer the task until done, default is ``False``. :param start_from_trigger: Start the task directly from the triggerer without going into the worker. + :param trigger_kwargs: The keyword arguments passed to the trigger when start_from_trigger is set to True + during dynamic task mapping. This argument is not used in standard usage. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -77,6 +79,7 @@ def __init__( recursive=False, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), start_from_trigger: bool = False, + trigger_kwargs: dict[str, Any] | None = None, **kwargs, ): super().__init__(**kwargs) diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py index e43e5e08a41d1..6dba2628fce35 100644 --- a/airflow/sensors/time_sensor.py +++ b/airflow/sensors/time_sensor.py @@ -57,7 +57,10 @@ class TimeSensorAsync(BaseSensorOperator): This frees up a worker slot while it is waiting. :param target_time: time after which the job succeeds + :param start_from_trigger: Start the task directly from the triggerer without going into the worker. :param end_from_trigger: End the task directly from the triggerer without going into the worker. + :param trigger_kwargs: The keyword arguments passed to the trigger when start_from_trigger is set to True + during dynamic task mapping. This argument is not used in standard usage. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -76,9 +79,10 @@ class TimeSensorAsync(BaseSensorOperator): def __init__( self, *, + target_time: datetime.time, start_from_trigger: bool = False, + trigger_kwargs: dict[str, Any] | None = None, end_from_trigger: bool = False, - target_time: datetime.time, **kwargs, ) -> None: super().__init__(**kwargs) diff --git a/docs/apache-airflow/authoring-and-scheduling/deferring.rst b/docs/apache-airflow/authoring-and-scheduling/deferring.rst index 51265fae1807c..def9246968b2a 100644 --- a/docs/apache-airflow/authoring-and-scheduling/deferring.rst +++ b/docs/apache-airflow/authoring-and-scheduling/deferring.rst @@ -97,59 +97,6 @@ When writing a deferrable operators these are the main points to consider: return -Exiting deferred task from Triggers -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - .. versionadded:: 2.10.0 - -If you want to exit your task directly from the triggerer without going into the worker, you can specify the instance level attribute ``end_from_trigger`` with the attributes of your deferrable operator, as discussed above. This can save some resources needed to start a new worker. - -Triggers can have two options: they can either send execution back to the worker or end the task instance directly. If the trigger ends the task instance itself, the ``method_name`` does not matter and can be ``None``. Otherwise, provide ``method_name`` that should be used when resuming execution in the task. - -.. code-block:: python - - class WaitFiveHourSensorAsync(BaseSensorOperator): - # this sensor always exits from trigger. - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - self.end_from_trigger = True - - def execute(self, context: Context) -> NoReturn: - self.defer( - method_name=None, - trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger), - ) - - -``TaskSuccessEvent`` and ``TaskFailureEvent`` are the two events that can be used to end the task instance directly. This marks the task with the state ``task_instance_state`` and optionally pushes xcom if applicable. Here's an example of how to use these events: - -.. code-block:: python - - - class WaitFiveHourTrigger(BaseTrigger): - def __init__(self, duration: timedelta, *, end_from_trigger: bool = False): - super().__init__() - self.duration = duration - self.end_from_trigger = end_from_trigger - - def serialize(self) -> tuple[str, dict[str, Any]]: - return ( - "your_module.WaitFiveHourTrigger", - {"duration": self.duration, "end_from_trigger": self.end_from_trigger}, - ) - - async def run(self) -> AsyncIterator[TriggerEvent]: - await asyncio.sleep(self.duration.total_seconds()) - if self.end_from_trigger: - yield TaskSuccessEvent() - else: - yield TriggerEvent({"duration": self.duration}) - -In the above example, the trigger will end the task instance directly if ``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. Otherwise, it will resume the task instance with the method specified in the operator. - -.. note:: - Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the DAG parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases. - Writing Triggers ~~~~~~~~~~~~~~~~ @@ -345,7 +292,11 @@ In the sensor part, we'll need to provide the path to ``HourDeltaTrigger`` as `` # We have no more work to do here. Mark as complete. return -To enable Dynamic Task Mapping support, you can define ``start_from_trigger`` and ``trigger_kwargs`` in the parameter of "__init__". **Note that you don't need to define both of them to use this feature, but you do need to use the exact same parameter name.** For example, if you define an argument as ``t_kwargs`` and assign this value to ``self.start_trigger_args.trigger_kwargs``, it will not work. Also, this works different from mapping an operator without ``start_from_trigger`` support. The whole ``__init__`` method will be skipped when mapping an operator whose ``start_from_trigger`` is set to True. Only argument ``trigger_kwargs`` is used and passed into ``trigger_cls``. + +The initialization stage of mapped tasks occurs after the scheduler submits them to the executor. Thus, this feature offers limited dynamic task mapping support and its usage differs from standard practices. To enable dynamic task mapping support, you need to define ``start_from_trigger`` and ``trigger_kwargs`` in the ``__init__`` method. **Note that you don't need to define both of them to use this feature, but you need to use the exact same parameter name.** For example, if you define an argument as ``t_kwargs`` and assign this value to ``self.start_trigger_args.trigger_kwargs``, it will not have any effect. The entire ``__init__`` method will be skipped when mapping a task whose ``start_from_trigger`` is set to True. The scheduler will use the provided ``start_from_trigger`` and ``trigger_kwargs`` from ``partial`` and ``expand`` (fallbacks to the ones from class attributes if not provided) to determine whether and how to submit tasks to the executor or the triggerer. Note that XCom values won't be resolved at this stage. + +After the trigger has finished executing, the task may be sent back to the worker to execute the ``next_method``, or the task instance may end directly. (Refer to :ref:`Exiting deferred task from Triggers`) If the task is sent back to the worker, the arguments in the ``__init__`` method will still take effect before the ``next_method`` is executed, but they will not affect the execution of the trigger. + .. code-block:: python @@ -384,7 +335,8 @@ To enable Dynamic Task Mapping support, you can define ``start_from_trigger`` an # We have no more work to do here. Mark as complete. return -These parameters can be mapped using the ``expand`` and ``partial`` methods. Note that XCom values won't be resolved at this stage. + +This will be expanded into 2 tasks, with their "hours" arguments set to 1 and 2 respectively. .. code-block:: python @@ -392,6 +344,63 @@ These parameters can be mapped using the ``expand`` and ``partial`` methods. Not trigger_kwargs=[{"hours": 1}, {"hours": 2}] ) + +.. _deferring/exiting_from_trigger: + +Exiting deferred task from Triggers +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + .. versionadded:: 2.10.0 + +If you want to exit your task directly from the triggerer without going into the worker, you can specify the instance level attribute ``end_from_trigger`` with the attributes of your deferrable operator, as discussed above. This can save some resources needed to start a new worker. + +Triggers can have two options: they can either send execution back to the worker or end the task instance directly. If the trigger ends the task instance itself, the ``method_name`` does not matter and can be ``None``. Otherwise, provide ``method_name`` that should be used when resuming execution in the task. + +.. code-block:: python + + class WaitFiveHourSensorAsync(BaseSensorOperator): + # this sensor always exits from trigger. + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self.end_from_trigger = True + + def execute(self, context: Context) -> NoReturn: + self.defer( + method_name=None, + trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger), + ) + + +``TaskSuccessEvent`` and ``TaskFailureEvent`` are the two events that can be used to end the task instance directly. This marks the task with the state ``task_instance_state`` and optionally pushes xcom if applicable. Here's an example of how to use these events: + +.. code-block:: python + + + class WaitFiveHourTrigger(BaseTrigger): + def __init__(self, duration: timedelta, *, end_from_trigger: bool = False): + super().__init__() + self.duration = duration + self.end_from_trigger = end_from_trigger + + def serialize(self) -> tuple[str, dict[str, Any]]: + return ( + "your_module.WaitFiveHourTrigger", + {"duration": self.duration, "end_from_trigger": self.end_from_trigger}, + ) + + async def run(self) -> AsyncIterator[TriggerEvent]: + await asyncio.sleep(self.duration.total_seconds()) + if self.end_from_trigger: + yield TaskSuccessEvent() + else: + yield TriggerEvent({"duration": self.duration}) + +In the above example, the trigger will end the task instance directly if ``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. Otherwise, it will resume the task instance with the method specified in the operator. + +.. note:: + Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the DAG parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases. + + High Availability -----------------