From 41a9bff20ab3dc5284fa906d5bb34c39510f14fa Mon Sep 17 00:00:00 2001 From: wanlce Date: Fri, 1 Apr 2022 15:28:46 +0800 Subject: [PATCH 1/4] Fix incorrect data_interval_start due to DAG's schedule changed This PR fixes the incorrect first run of data_interval_start after changing the scheduling time. * Added _align_to_prev function for scheduling alignment after time change. * renamed _align to _align_to_next. --- airflow/timetables/interval.py | 47 +++++++++++++++++---- tests/timetables/test_interval_timetable.py | 24 +++++++++++ 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index 5ed9cd21d2571..d41e2c227c25f 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -49,7 +49,7 @@ def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime: """ raise NotImplementedError() - def _align(self, current: DateTime) -> DateTime: + def _align_to_next(self, current: DateTime) -> DateTime: """Align given time to the scheduled. For fixed schedules (e.g. every midnight); this finds the next time that @@ -58,6 +58,21 @@ def _align(self, current: DateTime) -> DateTime: """ raise NotImplementedError() + def _align_to_prev(self, current: DateTime) -> DateTime: + """Align given time to the scheduled. + + For fixed schedules (e.g. every midnight); this finds the prev time that + aligns to the declared time, if the given time does not align. If the + schedule is not fixed (e.g. every hour), the given time is returned. + + It is not enough to just have _align_to_next. When a DAG's schedule + changed, _algin_to_next does not correct the scheduling time in time, + resulting in the first scheduling after the scheduling time has been + changed remaining the same. So we would need to align forward to ensure + that it works correctly after a scheduling time change. + """ + raise NotImplementedError() + def _get_next(self, current: DateTime) -> DateTime: """Get the first schedule after the current time.""" raise NotImplementedError() @@ -76,7 +91,7 @@ def next_dagrun_info( if not restriction.catchup: earliest = self._skip_to_latest(earliest) elif earliest is not None: - earliest = self._align(earliest) + earliest = self._align_to_next(earliest) if last_automated_data_interval is None: # First run; schedule the run at the first available time matching # the schedule, and retrospectively create a data interval for it. @@ -84,13 +99,15 @@ def next_dagrun_info( return None start = earliest else: # There's a previous run. + # Alignment is needed when DAG has new schedule interval. + align_last_data_interval_end = self._align_to_prev(last_automated_data_interval.end) if earliest is not None: # Catchup is False or DAG has new start date in the future. # Make sure we get the later one. - start = max(last_automated_data_interval.end, earliest) + start = max(align_last_data_interval_end, earliest) else: # Data interval starts from the end of the previous interval. - start = last_automated_data_interval.end + start = align_last_data_interval_end if restriction.latest is not None and start > restriction.latest: return None end = self._get_next(start) @@ -203,7 +220,7 @@ def _get_prev(self, current: DateTime) -> DateTime: delta = naive - scheduled return convert_to_utc(current.in_timezone(self._timezone) - delta) - def _align(self, current: DateTime) -> DateTime: + def _align_to_next(self, current: DateTime) -> DateTime: """Get the next scheduled time. This is ``current + interval``, unless ``current`` falls right on the @@ -214,6 +231,17 @@ def _align(self, current: DateTime) -> DateTime: return next_time return current + def _align_to_prev(self, current: DateTime) -> DateTime: + """Get the prev scheduled time. + + This is ``current - interval``, unless ``current`` falls right on the + interval boundary, when ``current`` is returned. + """ + prev_time = self._get_prev(current) + if self._get_next(prev_time) != current: + return prev_time + return current + def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime: """Bound the earliest time a run can be scheduled. @@ -235,13 +263,13 @@ def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime: raise AssertionError("next schedule shouldn't be earlier") if earliest is None: return new_start - return max(new_start, self._align(earliest)) + return max(new_start, self._align_to_next(earliest)) def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: # Get the last complete period before run_after, e.g. if a DAG run is # scheduled at each midnight, the data interval of a manually triggered # run at 1am 25th is between 0am 24th and 0am 25th. - end = self._get_prev(self._align(run_after)) + end = self._get_prev(self._align_to_next(run_after)) return DataInterval(start=self._get_prev(end), end=end) @@ -299,7 +327,10 @@ def _get_next(self, current: DateTime) -> DateTime: def _get_prev(self, current: DateTime) -> DateTime: return convert_to_utc(current - self._delta) - def _align(self, current: DateTime) -> DateTime: + def _align_to_next(self, current: DateTime) -> DateTime: + return current + + def _align_to_prev(self, current: DateTime) -> DateTime: return current def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime: diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index 53e5ec352d5e5..43b52ea1e75af 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -36,6 +36,7 @@ CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE) YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1) +OLD_INTERVAL = DataInterval(start=YESTERDAY, end=CURRENT_TIME) HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE) HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1)) @@ -62,6 +63,29 @@ def test_no_catchup_first_starts_at_current_time( assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME + DELTA_FROM_MIDNIGHT) +@pytest.mark.parametrize( + "earliest", + [pytest.param(None, id="none"), pytest.param(START_DATE, id="start_date")], +) +@pytest.mark.parametrize( + "catchup", + [pytest.param(True, id="catchup_true"), pytest.param(False, id="catchup_false")], +) +@freezegun.freeze_time(CURRENT_TIME) +def test_new_schedule_interval_next_info_starts_at_new_time( + earliest: Optional[pendulum.DateTime], + catchup: bool, +) -> None: + """First run after DAG has new schedule interval.""" + next_info = CRON_TIMETABLE.next_dagrun_info( + last_automated_data_interval=OLD_INTERVAL, + restriction=TimeRestriction(earliest=earliest, latest=None, catchup=catchup), + ) + expected_start = YESTERDAY + datetime.timedelta(hours=16, minutes=30) + expected_end = CURRENT_TIME + datetime.timedelta(hours=16, minutes=30) + assert next_info == DagRunInfo.interval(start=expected_start, end=expected_end) + + @pytest.mark.parametrize( "timetable", [ From e3271e684a7fdc4680552ca74b4a38f1627d7d69 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 12 Apr 2022 14:59:14 +0800 Subject: [PATCH 2/4] Wrap function name with backticks --- airflow/timetables/interval.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index d41e2c227c25f..54013bea2e226 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -65,11 +65,10 @@ def _align_to_prev(self, current: DateTime) -> DateTime: aligns to the declared time, if the given time does not align. If the schedule is not fixed (e.g. every hour), the given time is returned. - It is not enough to just have _align_to_next. When a DAG's schedule - changed, _algin_to_next does not correct the scheduling time in time, - resulting in the first scheduling after the scheduling time has been - changed remaining the same. So we would need to align forward to ensure - that it works correctly after a scheduling time change. + It is not enough to use ``_get_prev(_align_to_next())`` instead. When a + DAG's schedule changes, this alternative would make the first scheduling + after the schedule change remain the same. We need to align forward to + ensure it works correctly in this situation. """ raise NotImplementedError() From 5ece2b3e7aff2da5282b6f137444893872d40e06 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 9 Aug 2022 18:13:22 +0800 Subject: [PATCH 3/4] Add test case for auto run alignment --- tests/timetables/test_interval_timetable.py | 38 +++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index 5724e49cf9320..d55efb354d4ec 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -183,3 +183,41 @@ def test_validate_failure(timetable: Timetable, error_message: str) -> None: def test_cron_interval_timezone_from_string(): timetable = CronDataIntervalTimetable("@hourly", "UTC") assert timetable.serialize()['timezone'] == 'UTC' + + +@pytest.mark.parametrize( + "last_data_interval, expected_info", + [ + pytest.param( + DataInterval( + pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + ), + DagRunInfo.interval( + pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 9, tzinfo=TIMEZONE), + ), + id="exact", + ), + pytest.param( + # Previous data interval does not align with the current timetable. + # This is possible if the user edits a DAG with existing runs. + DataInterval( + pendulum.DateTime(2022, 8, 7, 1, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, 1, tzinfo=TIMEZONE), + ), + DagRunInfo.interval( + pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 9, tzinfo=TIMEZONE), + ), + id="changed", + ), + ], +) +def test_cron_next_dagrun_info_alignment(last_data_interval: DataInterval, expected_info: DagRunInfo): + timetable = CronDataIntervalTimetable("@daily", TIMEZONE) + info = timetable.next_dagrun_info( + last_automated_data_interval=last_data_interval, + restriction=TimeRestriction(None, None, True), + ) + assert info == expected_info From 2d61dd316724ca7959dcd551aeba3976379e6461 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 9 Aug 2022 18:13:48 +0800 Subject: [PATCH 4/4] Fix manual run alignment --- airflow/timetables/interval.py | 2 +- tests/timetables/test_interval_timetable.py | 31 +++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index cefd6e5d43135..1c8447d8c1432 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -159,7 +159,7 @@ def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: # Get the last complete period before run_after, e.g. if a DAG run is # scheduled at each midnight, the data interval of a manually triggered # run at 1am 25th is between 0am 24th and 0am 25th. - end = self._get_prev(self._align_to_next(run_after)) + end = self._align_to_prev(run_after) return DataInterval(start=self._get_prev(end), end=end) diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index d55efb354d4ec..8f73a32909441 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -185,6 +185,37 @@ def test_cron_interval_timezone_from_string(): assert timetable.serialize()['timezone'] == 'UTC' +@pytest.mark.parametrize( + "trigger_at, expected_interval", + [ + # Arbitrary trigger time. + pytest.param( + pendulum.DateTime(2022, 8, 8, 1, tzinfo=TIMEZONE), + DataInterval( + pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + ), + id="adhoc", + ), + # Trigger time falls exactly on interval boundary. + pytest.param( + pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + DataInterval( + pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + ), + id="exact", + ), + ], +) +def test_cron_infer_manual_data_interval_alignment( + trigger_at: pendulum.DateTime, + expected_interval: DataInterval, +) -> None: + timetable = CronDataIntervalTimetable("@daily", TIMEZONE) + assert timetable.infer_manual_data_interval(run_after=trigger_at) == expected_interval + + @pytest.mark.parametrize( "last_data_interval, expected_info", [