Skip to content

Commit

Permalink
[Backport] Deprecate implicit default DAG schedule (#41469)
Browse files Browse the repository at this point in the history
Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
  • Loading branch information
uranusjr and potiuk committed Aug 20, 2024
1 parent 21b3f7c commit a9c7d1a
Show file tree
Hide file tree
Showing 194 changed files with 1,022 additions and 667 deletions.
2 changes: 1 addition & 1 deletion airflow/example_dags/example_dynamic_task_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from airflow.decorators import task
from airflow.models.dag import DAG

with DAG(dag_id="example_dynamic_task_mapping", start_date=datetime(2022, 3, 4)) as dag:
with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)) as dag:

@task
def add_one(x: int):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def execute(self, context):

with DAG(
dag_id="example_dynamic_task_mapping_with_no_taskflow_operators",
schedule=None,
start_date=datetime(2022, 3, 4),
catchup=False,
):
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_setup_teardown.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

with DAG(
dag_id="example_setup_teardown",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_setup_teardown_taskflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

with DAG(
dag_id="example_setup_teardown_taskflow",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_short_circuit_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.utils.trigger_rule import TriggerRule


@dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"])
@dag(schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"])
def example_short_circuit_decorator():
# [START howto_operator_short_circuit]
@task.short_circuit()
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_short_circuit_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

with DAG(
dag_id="example_short_circuit_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
2 changes: 2 additions & 0 deletions airflow/example_dags/example_skip_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from __future__ import annotations

import datetime
from typing import TYPE_CHECKING

import pendulum
Expand Down Expand Up @@ -63,6 +64,7 @@ def create_test_pipeline(suffix, trigger_rule):

with DAG(
dag_id="example_skip_dag",
schedule=datetime.timedelta(days=1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
# [START howto_task_group]
with DAG(
dag_id="example_task_group",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_task_group_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def task_group_function(value: int) -> None:
# Executing Tasks and TaskGroups
with DAG(
dag_id="example_task_group_decorator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
8 changes: 7 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,12 @@ def __init__(
self.timetable = DatasetTriggeredTimetable(DatasetAll(*schedule))
self.schedule_interval = self.timetable.summary
elif isinstance(schedule, ArgNotSet):
warnings.warn(
"Creating a DAG with an implicit schedule is deprecated, and will stop working "
"in a future release. Set `schedule=datetime.timedelta(days=1)` explicitly.",
RemovedInAirflow3Warning,
stacklevel=2,
)
self.timetable = create_timetable(schedule, self.timezone)
self.schedule_interval = DEFAULT_SCHEDULE_INTERVAL
else:
Expand Down Expand Up @@ -3647,7 +3653,7 @@ def get_serialized_fields(cls):
"auto_register",
"fail_stop",
}
cls.__serialized_fields = frozenset(vars(DAG(dag_id="test"))) - exclusion_list
cls.__serialized_fields = frozenset(vars(DAG(dag_id="test", schedule=None))) - exclusion_list
return cls.__serialized_fields

def get_edge_info(self, upstream_task_id: str, downstream_task_id: str) -> EdgeInfoType:
Expand Down
2 changes: 1 addition & 1 deletion airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1663,7 +1663,7 @@ def serialize_dag(cls, dag: DAG) -> dict:
@classmethod
def deserialize_dag(cls, encoded_dag: dict[str, Any]) -> SerializedDAG:
"""Deserializes a DAG from a JSON object."""
dag = SerializedDAG(dag_id=encoded_dag["_dag_id"])
dag = SerializedDAG(dag_id=encoded_dag["_dag_id"], schedule=None)

for k, v in encoded_dag.items():
if k == "_downstream_task_ids":
Expand Down
2 changes: 1 addition & 1 deletion kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@


def create_context(task) -> Context:
dag = DAG(dag_id="dag")
dag = DAG(dag_id="dag", schedule=None)
execution_date = timezone.datetime(
2016, 1, 1, 1, 0, 0, tzinfo=timezone.parse_timezone("Europe/Amsterdam")
)
Expand Down
37 changes: 19 additions & 18 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,18 @@ def configured_app(minimal_app_for_api):

with DAG(
DAG_ID,
schedule=None,
start_date=datetime(2020, 6, 15),
doc_md="details",
params={"foo": 1},
tags=["example"],
) as dag:
EmptyOperator(task_id=TASK_ID)

with DAG(DAG2_ID, start_date=datetime(2020, 6, 15)) as dag2: # no doc_md
with DAG(DAG2_ID, schedule=None, start_date=datetime(2020, 6, 15)) as dag2: # no doc_md
EmptyOperator(task_id=TASK_ID)

with DAG(DAG3_ID) as dag3: # DAG start_date set to None
with DAG(DAG3_ID, schedule=None) as dag3: # DAG start_date set to None
EmptyOperator(task_id=TASK_ID, start_date=datetime(2019, 6, 12))

dag_bag = DagBag(os.devnull, include_examples=False)
Expand Down Expand Up @@ -988,10 +989,10 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer):
)
def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1", tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4")
dag1 = DAG(dag_id="TEST_DAG_1", schedule=None, tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", schedule=None, tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", schedule=None, tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4", schedule=None)
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
Expand All @@ -1016,10 +1017,10 @@ def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
)
def test_filter_dags_by_dag_id_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1")
dag2 = DAG(dag_id="TEST_DAG_2")
dag3 = DAG(dag_id="SAMPLE_DAG_1")
dag4 = DAG(dag_id="SAMPLE_DAG_2")
dag1 = DAG(dag_id="TEST_DAG_1", schedule=None)
dag2 = DAG(dag_id="TEST_DAG_2", schedule=None)
dag3 = DAG(dag_id="SAMPLE_DAG_1", schedule=None)
dag4 = DAG(dag_id="SAMPLE_DAG_2", schedule=None)
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
Expand Down Expand Up @@ -1938,10 +1939,10 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer, session):
)
def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1", tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4")
dag1 = DAG(dag_id="TEST_DAG_1", schedule=None, tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", schedule=None, tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", schedule=None, tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4", schedule=None)
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
Expand Down Expand Up @@ -1971,10 +1972,10 @@ def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
)
def test_filter_dags_by_dag_id_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1")
dag2 = DAG(dag_id="TEST_DAG_2")
dag3 = DAG(dag_id="SAMPLE_DAG_1")
dag4 = DAG(dag_id="SAMPLE_DAG_2")
dag1 = DAG(dag_id="TEST_DAG_1", schedule=None)
dag2 = DAG(dag_id="TEST_DAG_2", schedule=None)
dag3 = DAG(dag_id="SAMPLE_DAG_1", schedule=None)
dag4 = DAG(dag_id="SAMPLE_DAG_2", schedule=None)
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
Expand Down
2 changes: 1 addition & 1 deletion tests/api_connexion/endpoints/test_extra_link_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def teardown_method(self) -> None:
clear_db_xcom()

def _create_dag(self):
with DAG(dag_id="TEST_DAG_ID", default_args={"start_date": self.default_time}) as dag:
with DAG(dag_id="TEST_DAG_ID", schedule=None, default_args={"start_date": self.default_time}) as dag:
CustomOperator(task_id="TEST_SINGLE_LINK", bash_command="TEST_LINK_VALUE")
CustomOperator(
task_id="TEST_MULTIPLE_LINK", bash_command=["TEST_LINK_VALUE_1", "TEST_LINK_VALUE_2"]
Expand Down
2 changes: 1 addition & 1 deletion tests/api_connexion/endpoints/test_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu

# Recreate DAG without tasks
dagbag = self.app.dag_bag
dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
dag = DAG(self.DAG_ID, schedule=None, start_date=timezone.parse(self.default_time))
del dagbag.dags[self.DAG_ID]
dagbag.bag_dag(dag=dag, root_dag=dag)

Expand Down
4 changes: 2 additions & 2 deletions tests/api_connexion/endpoints/test_task_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ class TestTaskEndpoint:

@pytest.fixture(scope="class")
def setup_dag(self, configured_app):
with DAG(self.dag_id, start_date=self.task1_start_date, doc_md="details") as dag:
with DAG(self.dag_id, schedule=None, start_date=self.task1_start_date, doc_md="details") as dag:
task1 = EmptyOperator(task_id=self.task_id, params={"foo": "bar"})
task2 = EmptyOperator(task_id=self.task_id2, start_date=self.task2_start_date)

with DAG(self.mapped_dag_id, start_date=self.task1_start_date) as mapped_dag:
with DAG(self.mapped_dag_id, schedule=None, start_date=self.task1_start_date) as mapped_dag:
EmptyOperator(task_id=self.task_id3)
# Use the private _expand() method to avoid the empty kwargs check.
# We don't care about how the operator runs here, only its presence.
Expand Down
3 changes: 2 additions & 1 deletion tests/api_connexion/schemas/test_dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from datetime import datetime
from datetime import datetime, timedelta

import pendulum
import pytest
Expand Down Expand Up @@ -158,6 +158,7 @@ def test_serialize_test_dag_collection_schema(url_safe_serializer):
def test_serialize_test_dag_detail_schema(url_safe_serializer):
dag = DAG(
dag_id="test_dag",
schedule=timedelta(days=1),
start_date=datetime(2020, 6, 19),
doc_md="docs",
orientation="LR",
Expand Down
6 changes: 5 additions & 1 deletion tests/api_experimental/common/test_delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ def setup_dag_models(self, for_sub_dag=False):

task = EmptyOperator(
task_id="dummy",
dag=DAG(dag_id=self.key, default_args={"start_date": timezone.datetime(2022, 1, 1)}),
dag=DAG(
dag_id=self.key,
schedule=None,
default_args={"start_date": timezone.datetime(2022, 1, 1)},
),
owner="airflow",
)

Expand Down
16 changes: 12 additions & 4 deletions tests/api_experimental/common/test_trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_trigger_dag_dag_not_found(self, dag_bag_mock):
@mock.patch("airflow.models.DagBag")
def test_trigger_dag_dag_run_exist(self, dag_bag_mock, dag_run_mock):
dag_id = "dag_run_exist"
dag = DAG(dag_id)
dag = DAG(dag_id, schedule=None)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag
dag_run_mock.find_duplicate.return_value = DagRun()
Expand Down Expand Up @@ -90,7 +90,11 @@ def test_trigger_dag_include_nested_subdags(self, dag_bag_mock, dag_run_mock, da
@mock.patch("airflow.models.DagBag")
def test_trigger_dag_with_too_early_start_date(self, dag_bag_mock):
dag_id = "trigger_dag_with_too_early_start_date"
dag = DAG(dag_id, default_args={"start_date": timezone.datetime(2016, 9, 5, 10, 10, 0)})
dag = DAG(
dag_id=dag_id,
schedule=None,
default_args={"start_date": timezone.datetime(2016, 9, 5, 10, 10, 0)},
)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag

Expand All @@ -100,7 +104,11 @@ def test_trigger_dag_with_too_early_start_date(self, dag_bag_mock):
@mock.patch("airflow.models.DagBag")
def test_trigger_dag_with_valid_start_date(self, dag_bag_mock):
dag_id = "trigger_dag_with_valid_start_date"
dag = DAG(dag_id, default_args={"start_date": timezone.datetime(2016, 9, 5, 10, 10, 0)})
dag = DAG(
dag_id=dag_id,
schedule=None,
default_args={"start_date": timezone.datetime(2016, 9, 5, 10, 10, 0)},
)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag
dag_bag_mock.dags_hash = {}
Expand All @@ -120,7 +128,7 @@ def test_trigger_dag_with_valid_start_date(self, dag_bag_mock):
@mock.patch("airflow.models.DagBag")
def test_trigger_dag_with_conf(self, dag_bag_mock, conf, expected_conf):
dag_id = "trigger_dag_with_conf"
dag = DAG(dag_id)
dag = DAG(dag_id, schedule=None)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag

Expand Down
5 changes: 4 additions & 1 deletion tests/callbacks/test_callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ def test_from_json(self, input, request_class):
if input is None:
ti = TaskInstance(
task=BashOperator(
task_id="test", bash_command="true", dag=DAG(dag_id="id"), start_date=datetime.now()
task_id="test",
bash_command="true",
start_date=datetime.now(),
dag=DAG(dag_id="id", schedule=None),
),
run_id="fake_run",
state=State.RUNNING,
Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ def dag_maker(request):
if serialized_marker:
(want_serialized,) = serialized_marker.args or (True,)

from airflow.utils.helpers import NOTSET
from airflow.utils.log.logging_mixin import LoggingMixin

class DagFactory(LoggingMixin):
Expand Down Expand Up @@ -923,6 +924,7 @@ def create_dagrun_after(self, dagrun, **kwargs):
def __call__(
self,
dag_id="test_dag",
schedule=NOTSET,
serialized=want_serialized,
fileloc=None,
processor_subdir=None,
Expand Down Expand Up @@ -951,6 +953,12 @@ def __call__(
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
self.start_date = DEFAULT_DATE
self.kwargs["start_date"] = self.start_date
# Set schedule argument to explicitly set value, or a default if no
# other scheduling arguments are set.
if schedule is not NOTSET:
self.kwargs["schedule"] = schedule
elif "timetable" not in self.kwargs and "schedule_interval" not in self.kwargs:
self.kwargs["schedule"] = timedelta(days=1)
self.dag = DAG(dag_id, **self.kwargs)
self.dag.fileloc = fileloc or request.module.__file__
self.want_serialized = serialized
Expand Down
13 changes: 12 additions & 1 deletion tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@
from tests.models import TEST_DAGS_FOLDER
from tests.test_utils.compat import ParseImportError
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_callbacks, clear_db_dags, clear_db_runs, clear_db_serialized_dags
from tests.test_utils.db import (
clear_db_callbacks,
clear_db_dags,
clear_db_import_errors,
clear_db_runs,
clear_db_serialized_dags,
)

pytestmark = pytest.mark.db_test

Expand Down Expand Up @@ -148,7 +154,12 @@ def run_processor_manager_one_loop(self, manager, parent_pipe):
return results
raise RuntimeError("Shouldn't get here - nothing to read, but manager not finished!")

@pytest.fixture
def clear_parse_import_errors(self):
clear_db_import_errors()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.usefixtures("clear_parse_import_errors")
@conf_vars({("core", "load_examples"): "False"})
def test_remove_file_clears_import_error(self, tmp_path):
path_to_parse = tmp_path / "temp_dag.py"
Expand Down
4 changes: 3 additions & 1 deletion tests/dags/test_cli_triggered_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def success(ti=None, *args, **kwargs):
# DAG tests that tasks ignore all dependencies

dag1 = DAG(
dag_id="test_run_ignores_all_dependencies", default_args=dict(depends_on_past=True, **default_args)
dag_id="test_run_ignores_all_dependencies",
schedule=None,
default_args={"depends_on_past": True, **default_args},
)
dag1_task1 = PythonOperator(task_id="test_run_dependency_task", python_callable=fail, dag=dag1)
dag1_task2 = PythonOperator(task_id="test_run_dependent_task", python_callable=success, dag=dag1)
Expand Down
2 changes: 1 addition & 1 deletion tests/dags/test_dagrun_fast_follow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


dag_id = "test_dagrun_fast_follow"
dag = DAG(dag_id=dag_id, default_args=args)
dag = DAG(dag_id=dag_id, schedule=None, default_args=args)

# A -> B -> C
task_a = PythonOperator(task_id="A", dag=dag, python_callable=lambda: True)
Expand Down
Loading

0 comments on commit a9c7d1a

Please sign in to comment.