Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename cluster_policy to task_policy #23468

Merged
merged 2 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions docs/apache-airflow/concepts/cluster-policies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,17 @@ This policy checks if each DAG has at least one tag defined:
Task policies
-------------

Here's an example of enforcing a maximum timeout policy on every task:
Here's an example of enforcing a maximum timeout policy on every task::

.. literalinclude:: /../../tests/cluster_policies/__init__.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using literalinclude from cluster_policies?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk when using literalinclude, it raises redefinition error.

image

Can I create new file tests/cluster_policies/example_task_policy.py for adding that code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just rename the other task_policy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain it more?
If I rename task_policy method (eg. two_days_task_policy), then the code in the document and the contents of the document do not match. I think it is weird.
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you replace it cluster_policy() with "example_task_policy" for example?

There is no reason why task_policy should be named always "task_policy". This is why it was named "cluster_policy" (historically we only had cluster policy). But this is quite ok to have a method named differently and import it as different method name in python. There is a "import x as y" and patching also work this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just thinking of a way to declare a method only in airflow_local_settings.py file...

I will change method name cluster_policy to example_task_policy!

:language: python
:start-after: [START example_task_cluster_policy]
:end-before: [END example_task_cluster_policy]
class TimedOperator(BaseOperator, ABC):
timeout: timedelta


def task_policy(task: TimedOperator):
if task.task_type == 'HivePartitionSensor':
task.queue = "sensor_queue"
if task.timeout > timedelta(hours=48):
task.timeout = timedelta(hours=48)

You could also implement to protect against common errors, rather than as technical security controls. For example, don't run tasks without airflow owners:

Expand Down
20 changes: 1 addition & 19 deletions tests/cluster_policies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from abc import ABC
from datetime import timedelta
from typing import Callable, List

from airflow.configuration import conf
Expand Down Expand Up @@ -62,7 +60,7 @@ def _check_task_rules(current_task: BaseOperator):
)


def cluster_policy(task: BaseOperator):
def task_policy(task: BaseOperator):
"""Ensure Tasks have non-default owners."""
_check_task_rules(task)

Expand All @@ -80,22 +78,6 @@ def dag_policy(dag: DAG):

# [END example_dag_cluster_policy]


class TimedOperator(BaseOperator, ABC):
timeout: timedelta


# [START example_task_cluster_policy]
def task_policy(task: TimedOperator):
if task.task_type == 'HivePartitionSensor':
task.queue = "sensor_queue"
if task.timeout > timedelta(hours=48):
task.timeout = timedelta(hours=48)


# [END example_task_cluster_policy]


# [START example_task_mutation_hook]
def task_instance_mutation_hook(task_instance: TaskInstance):
if task_instance.try_number >= 1:
Expand Down
6 changes: 3 additions & 3 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ def test_collect_dags_from_db(self):
assert serialized_dag.dag_id == dag.dag_id
assert set(serialized_dag.task_dict) == set(dag.task_dict)

@patch("airflow.settings.task_policy", cluster_policies.cluster_policy)
@patch("airflow.settings.task_policy", cluster_policies.task_policy)
def test_task_cluster_policy_violation(self):
"""
test that file processing results in import error when task does not
Expand All @@ -974,7 +974,7 @@ def test_task_cluster_policy_violation(self):
}
assert expected_import_errors == dagbag.import_errors

@patch("airflow.settings.task_policy", cluster_policies.cluster_policy)
@patch("airflow.settings.task_policy", cluster_policies.task_policy)
def test_task_cluster_policy_nonstring_owner(self):
"""
test that file processing results in import error when task does not
Expand All @@ -994,7 +994,7 @@ def test_task_cluster_policy_nonstring_owner(self):
}
assert expected_import_errors == dagbag.import_errors

@patch("airflow.settings.task_policy", cluster_policies.cluster_policy)
@patch("airflow.settings.task_policy", cluster_policies.task_policy)
def test_task_cluster_policy_obeyed(self):
"""
test that dag successfully imported without import errors when tasks
Expand Down