Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Keep compatibility with old FAB versions #41549

Merged
merged 11 commits into from
Aug 20, 2024
30 changes: 21 additions & 9 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import re2
import sqlalchemy_jsonfield
from dateutil.relativedelta import relativedelta
from packaging import version as packaging_version
from sqlalchemy import (
Boolean,
Column,
Expand Down Expand Up @@ -116,6 +117,7 @@
clear_task_instances,
)
from airflow.models.tasklog import LogTemplate
from airflow.providers.fab import __version__ as FAB_VERSION
from airflow.secrets.local_filesystem import LocalFilesystemBackend
from airflow.security import permissions
from airflow.settings import json
Expand Down Expand Up @@ -936,16 +938,26 @@ def update_old_perm(permission: str):

updated_access_control = {}
for role, perms in access_control.items():
updated_access_control[role] = updated_access_control.get(role, {})
if isinstance(perms, (set, list)):
# Support for old-style access_control where only the actions are specified
updated_access_control[role][permissions.RESOURCE_DAG] = set(perms)
if packaging_version.parse(FAB_VERSION) >= packaging_version.parse("1.3.0"):
updated_access_control[role] = updated_access_control.get(role, {})
if isinstance(perms, (set, list)):
# Support for old-style access_control where only the actions are specified
updated_access_control[role][permissions.RESOURCE_DAG] = set(perms)
else:
updated_access_control[role] = perms
joaopamaral marked this conversation as resolved.
Show resolved Hide resolved
if permissions.RESOURCE_DAG in updated_access_control[role]:
updated_access_control[role][permissions.RESOURCE_DAG] = {
update_old_perm(perm)
for perm in updated_access_control[role][permissions.RESOURCE_DAG]
}
elif isinstance(perms, dict):
# Not allow new access control format with old FAB versions
raise AirflowException(
"Please upgrade the FAB provider to a version >= 1.3.0 to allow "
"use the Dag Level Access Control new format."
)
else:
updated_access_control[role] = perms
if permissions.RESOURCE_DAG in updated_access_control[role]:
updated_access_control[role][permissions.RESOURCE_DAG] = {
update_old_perm(perm) for perm in updated_access_control[role][permissions.RESOURCE_DAG]
}
updated_access_control[role] = {update_old_perm(perm) for perm in perms}

return updated_access_control

Expand Down
57 changes: 57 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2539,6 +2539,63 @@ def test_replace_outdated_access_control_actions(self):
assert "permission is deprecated" in str(deprecation_warnings[0].message)
assert "permission is deprecated" in str(deprecation_warnings[1].message)

@pytest.mark.parametrize(
"fab_version, perms, expected_exception, expected_perms",
[
pytest.param(
"1.2.0",
{
"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT},
"role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}},
# will raise error in old FAB with new access control format
},
AirflowException,
None,
id="old_fab_new_access_control_format",
),
pytest.param(
"1.2.0",
{
"role1": [
permissions.ACTION_CAN_READ,
permissions.ACTION_CAN_EDIT,
permissions.ACTION_CAN_READ,
],
},
None,
{"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}},
id="old_fab_old_access_control_format",
),
pytest.param(
"1.3.0",
{
"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}, # old format
"role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, # new format
},
None,
{
"role1": {
permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}
},
"role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}},
},
id="new_fab_mixed_access_control_format",
),
],
)
def test_access_control_format(self, fab_version, perms, expected_exception, expected_perms):
if expected_exception:
with patch("airflow.models.dag.FAB_VERSION", fab_version):
with pytest.raises(
expected_exception,
match="Please upgrade the FAB provider to a version >= 1.3.0 to allow use the Dag Level Access Control new format.",
):
DAG(dag_id="dag_test", schedule=None, access_control=perms)
else:
with patch("airflow.models.dag.FAB_VERSION", fab_version):
dag = DAG(dag_id="dag_test", schedule=None, access_control=perms)
assert dag.access_control == expected_perms

def test_validate_executor_field_executor_not_configured(self):
dag = DAG("test-dag", schedule=None)
EmptyOperator(task_id="t1", dag=dag, executor="test.custom.executor")
Expand Down