Skip to content

Commit

Permalink
Fix warnings in subdag tests
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr committed Aug 14, 2024
1 parent 3ad1499 commit a389262
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 22 deletions.
4 changes: 2 additions & 2 deletions tests/dags/test_impersonation_subdag.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@

default_args = {"owner": "airflow", "start_date": DEFAULT_DATE, "run_as_user": "airflow_test_user"}

dag = DAG(dag_id="impersonation_subdag", default_args=default_args)
dag = DAG(dag_id="impersonation_subdag", schedule=None, default_args=default_args)


def print_today():
print(f"Today is {timezone.utcnow()}")


subdag = DAG("impersonation_subdag.test_subdag_operation", default_args=default_args)
subdag = DAG("impersonation_subdag.test_subdag_operation", schedule=None, default_args=default_args)


PythonOperator(python_callable=print_today, task_id="exec_python_fn", dag=subdag)
Expand Down
40 changes: 20 additions & 20 deletions tests/operators/test_subdag_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ def test_subdag_name(self):
"""
Subdag names must be {parent_dag}.{subdag task}
"""
dag = DAG("parent", default_args=default_args)
subdag_good = DAG("parent.test", default_args=default_args)
subdag_bad1 = DAG("parent.bad", default_args=default_args)
subdag_bad2 = DAG("bad.test", default_args=default_args)
subdag_bad3 = DAG("bad.bad", default_args=default_args)
dag = DAG("parent", schedule=None, default_args=default_args)
subdag_good = DAG("parent.test", schedule=None, default_args=default_args)
subdag_bad1 = DAG("parent.bad", schedule=None, default_args=default_args)
subdag_bad2 = DAG("bad.test", schedule=None, default_args=default_args)
subdag_bad3 = DAG("bad.bad", schedule=None, default_args=default_args)

with pytest.warns(RemovedInAirflow3Warning, match=WARNING_MESSAGE):
SubDagOperator(task_id="test", dag=dag, subdag=subdag_good)
Expand All @@ -80,8 +80,8 @@ def test_subdag_in_context_manager(self):
"""
Creating a sub DAG within a main DAG's context manager
"""
with DAG("parent", default_args=default_args) as dag:
subdag = DAG("parent.test", default_args=default_args)
with DAG("parent", schedule=None, default_args=default_args) as dag:
subdag = DAG("parent.test", schedule=None, default_args=default_args)
with pytest.warns(RemovedInAirflow3Warning, match=WARNING_MESSAGE):
op = SubDagOperator(task_id="test", subdag=subdag)

Expand All @@ -108,7 +108,7 @@ def test_subdag_pools(self, dag_maker):
SubDagOperator(task_id="child", dag=dag, subdag=subdag, pool="test_pool_1")

# recreate dag because failed subdagoperator was already added
dag = DAG("parent", default_args=default_args)
dag = DAG("parent", schedule=None, default_args=default_args)
with pytest.warns(RemovedInAirflow3Warning, match=WARNING_MESSAGE):
SubDagOperator(task_id="child", dag=dag, subdag=subdag, pool="test_pool_10")

Expand All @@ -121,8 +121,8 @@ def test_subdag_pools_no_possible_conflict(self):
Subdags and subdag tasks with no pool overlap, should not to query
pools
"""
dag = DAG("parent", default_args=default_args)
subdag = DAG("parent.child", default_args=default_args)
dag = DAG("parent", schedule=None, default_args=default_args)
subdag = DAG("parent.child", schedule=None, default_args=default_args)

session = airflow.settings.Session()
pool_1 = airflow.models.Pool(pool="test_pool_1", slots=1, include_deferred=False)
Expand All @@ -147,8 +147,8 @@ def test_execute_create_dagrun_wait_until_success(self):
When SubDagOperator executes, it creates a DagRun if there is no existing one
and wait until the DagRun succeeds.
"""
dag = DAG("parent", default_args=default_args)
subdag = DAG("parent.test", default_args=default_args)
dag = DAG("parent", schedule=None, default_args=default_args)
subdag = DAG("parent.test", schedule=None, default_args=default_args)
with pytest.warns(RemovedInAirflow3Warning, match=WARNING_MESSAGE):
subdag_task = SubDagOperator(task_id="test", subdag=subdag, dag=dag, poke_interval=1)

Expand Down Expand Up @@ -185,8 +185,8 @@ def test_execute_create_dagrun_with_conf(self):
and wait until the DagRun succeeds.
"""
conf = {"key": "value"}
dag = DAG("parent", default_args=default_args)
subdag = DAG("parent.test", default_args=default_args)
dag = DAG("parent", schedule="@daily", default_args=default_args)
subdag = DAG("parent.test", schedule=None, default_args=default_args)
with pytest.warns(RemovedInAirflow3Warning, match=WARNING_MESSAGE):
subdag_task = SubDagOperator(task_id="test", subdag=subdag, dag=dag, poke_interval=1, conf=conf)

Expand Down Expand Up @@ -221,8 +221,8 @@ def test_execute_dagrun_failed(self):
"""
When the DagRun failed during the execution, it raises an Airflow Exception.
"""
dag = DAG("parent", default_args=default_args)
subdag = DAG("parent.test", default_args=default_args)
dag = DAG("parent", schedule=None, default_args=default_args)
subdag = DAG("parent.test", schedule=None, default_args=default_args)
with pytest.warns(RemovedInAirflow3Warning, match=WARNING_MESSAGE):
subdag_task = SubDagOperator(task_id="test", subdag=subdag, dag=dag, poke_interval=1)

Expand All @@ -247,8 +247,8 @@ def test_execute_skip_if_dagrun_success(self):
"""
When there is an existing DagRun in SUCCESS state, skip the execution.
"""
dag = DAG("parent", default_args=default_args)
subdag = DAG("parent.test", default_args=default_args)
dag = DAG("parent", schedule=None, default_args=default_args)
subdag = DAG("parent.test", schedule=None, default_args=default_args)

subdag.create_dagrun = Mock()
with pytest.warns(RemovedInAirflow3Warning, match=WARNING_MESSAGE):
Expand Down Expand Up @@ -367,8 +367,8 @@ def test_subdag_with_propagate_skipped_state(
mock_skip.assert_not_called()

def test_deprecation_warning(self):
dag = DAG("parent", default_args=default_args)
subdag = DAG("parent.test", default_args=default_args)
dag = DAG("parent", schedule=None, default_args=default_args)
subdag = DAG("parent.test", schedule=None, default_args=default_args)
warning_message = """This class is deprecated. Please use `airflow.utils.task_group.TaskGroup`."""

with pytest.warns(DeprecationWarning) as warnings:
Expand Down

0 comments on commit a389262

Please sign in to comment.