Skip to content

Commit

Permalink
Keep FAB compatibility for versions before 1.3.0 in 2.10 (#41549) (#4…
Browse files Browse the repository at this point in the history
…1809)

* Fix: Keep compatibility with old FAB versions (#41549)

* Fix: Tests after #41549 (Keep compatibility with old FAB versions)

* Fix test_dag and test_dagbag
  • Loading branch information
joaopamaral committed Aug 28, 2024
1 parent d906b51 commit 87b5e61
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 14 deletions.
30 changes: 21 additions & 9 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import re2
import sqlalchemy_jsonfield
from dateutil.relativedelta import relativedelta
from packaging import version as packaging_version
from sqlalchemy import (
Boolean,
Column,
Expand Down Expand Up @@ -116,6 +117,7 @@
clear_task_instances,
)
from airflow.models.tasklog import LogTemplate
from airflow.providers.fab import __version__ as FAB_VERSION
from airflow.secrets.local_filesystem import LocalFilesystemBackend
from airflow.security import permissions
from airflow.settings import json
Expand Down Expand Up @@ -940,16 +942,26 @@ def update_old_perm(permission: str):

updated_access_control = {}
for role, perms in access_control.items():
updated_access_control[role] = updated_access_control.get(role, {})
if isinstance(perms, (set, list)):
# Support for old-style access_control where only the actions are specified
updated_access_control[role][permissions.RESOURCE_DAG] = set(perms)
if packaging_version.parse(FAB_VERSION) >= packaging_version.parse("1.3.0"):
updated_access_control[role] = updated_access_control.get(role, {})
if isinstance(perms, (set, list)):
# Support for old-style access_control where only the actions are specified
updated_access_control[role][permissions.RESOURCE_DAG] = set(perms)
else:
updated_access_control[role] = perms
if permissions.RESOURCE_DAG in updated_access_control[role]:
updated_access_control[role][permissions.RESOURCE_DAG] = {
update_old_perm(perm)
for perm in updated_access_control[role][permissions.RESOURCE_DAG]
}
elif isinstance(perms, dict):
# Not allow new access control format with old FAB versions
raise AirflowException(
"Please upgrade the FAB provider to a version >= 1.3.0 to allow "
"use the Dag Level Access Control new format."
)
else:
updated_access_control[role] = perms
if permissions.RESOURCE_DAG in updated_access_control[role]:
updated_access_control[role][permissions.RESOURCE_DAG] = {
update_old_perm(perm) for perm in updated_access_control[role][permissions.RESOURCE_DAG]
}
updated_access_control[role] = {update_old_perm(perm) for perm in perms}

return updated_access_control

Expand Down
80 changes: 76 additions & 4 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import pytest
import time_machine
from dateutil.relativedelta import relativedelta
from packaging import version as packaging_version
from pendulum.tz.timezone import Timezone
from sqlalchemy import inspect, select
from sqlalchemy.exc import SAWarning
Expand Down Expand Up @@ -85,6 +86,7 @@
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
from airflow.providers.fab import __version__ as FAB_VERSION
from airflow.security import permissions
from airflow.templates import NativeEnvironment, SandboxedEnvironment
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
Expand Down Expand Up @@ -2767,12 +2769,20 @@ def test_replace_outdated_access_control_actions(self):
outdated_permissions = {
"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT},
"role2": {permissions.DEPRECATED_ACTION_CAN_DAG_READ, permissions.DEPRECATED_ACTION_CAN_DAG_EDIT},
"role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}},
"role3": self._get_compatible_access_control(
{permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}
),
}
updated_permissions = {
"role1": {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}},
"role2": {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}},
"role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}},
"role1": self._get_compatible_access_control(
{permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}}
),
"role2": self._get_compatible_access_control(
{permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}}
),
"role3": self._get_compatible_access_control(
{permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}
),
}

with pytest.warns(DeprecationWarning) as deprecation_warnings:
Expand All @@ -2789,6 +2799,68 @@ def test_replace_outdated_access_control_actions(self):
assert "permission is deprecated" in str(deprecation_warnings[0].message)
assert "permission is deprecated" in str(deprecation_warnings[1].message)

def _get_compatible_access_control(self, perms):
if packaging_version.parse(FAB_VERSION) >= packaging_version.parse("1.3.0"):
return perms
return perms.get(permissions.RESOURCE_DAG, set())

@pytest.mark.parametrize(
"fab_version, perms, expected_exception, expected_perms",
[
pytest.param(
"1.2.0",
{
"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT},
"role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}},
# will raise error in old FAB with new access control format
},
AirflowException,
None,
id="old_fab_new_access_control_format",
),
pytest.param(
"1.2.0",
{
"role1": [
permissions.ACTION_CAN_READ,
permissions.ACTION_CAN_EDIT,
permissions.ACTION_CAN_READ,
],
},
None,
{"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}},
id="old_fab_old_access_control_format",
),
pytest.param(
"1.3.0",
{
"role1": {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}, # old format
"role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}}, # new format
},
None,
{
"role1": {
permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}
},
"role3": {permissions.RESOURCE_DAG_RUN: {permissions.ACTION_CAN_CREATE}},
},
id="new_fab_mixed_access_control_format",
),
],
)
def test_access_control_format(self, fab_version, perms, expected_exception, expected_perms):
if expected_exception:
with patch("airflow.models.dag.FAB_VERSION", fab_version):
with pytest.raises(
expected_exception,
match="Please upgrade the FAB provider to a version >= 1.3.0 to allow use the Dag Level Access Control new format.",
):
DAG(dag_id="dag_test", schedule=None, access_control=perms)
else:
with patch("airflow.models.dag.FAB_VERSION", fab_version):
dag = DAG(dag_id="dag_test", schedule=None, access_control=perms)
assert dag.access_control == expected_perms

def test_validate_executor_field_executor_not_configured(self):
dag = DAG("test-dag", schedule=None)
EmptyOperator(task_id="t1", dag=dag, executor="test.custom.executor")
Expand Down
12 changes: 11 additions & 1 deletion tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import pytest
import time_machine
from packaging import version as packaging_version
from sqlalchemy import func
from sqlalchemy.exc import OperationalError

Expand All @@ -40,6 +41,7 @@
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.serialized_dag import SerializedDagModel
from airflow.providers.fab import __version__ as FAB_VERSION
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils.dates import timezone as tz
from airflow.utils.session import create_session
Expand Down Expand Up @@ -996,11 +998,19 @@ def _sync_perms():
dag.access_control = {"Public": {"can_read"}}
_sync_perms()
mock_sync_perm_for_dag.assert_called_once_with(
"test_example_bash_operator", {"Public": {"DAGs": {"can_read"}}}
"test_example_bash_operator",
{
"Public": {"DAGs": {"can_read"}}
if packaging_version.parse(FAB_VERSION) >= packaging_version.parse("1.3.0")
else {"can_read"}
},
)

@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
@patch("airflow.www.security_appless.ApplessAirflowSecurityManager")
@pytest.mark.skipif(
packaging_version.parse(FAB_VERSION) < packaging_version.parse("1.3.0"), reason="Requires FAB 1.3.0+"
)
def test_sync_perm_for_dag_with_dict_access_control(self, mock_security_manager):
"""
Test that dagbag._sync_perm_for_dag will call ApplessAirflowSecurityManager.sync_perm_for_dag
Expand Down
7 changes: 7 additions & 0 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import pytest
from dateutil.relativedelta import FR, relativedelta
from kubernetes.client import models as k8s
from packaging import version as packaging_version

import airflow
from airflow.datasets import Dataset
Expand All @@ -58,6 +59,7 @@
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.providers.fab import __version__ as FAB_VERSION
from airflow.security import permissions
from airflow.sensors.bash import BashSensor
from airflow.serialization.dag_dependency import DagDependency
Expand Down Expand Up @@ -246,6 +248,11 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i
}
},
}
if packaging_version.parse(FAB_VERSION) >= packaging_version.parse("1.3.0")
else {
"__type": "set",
"__var": [permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT],
}
},
},
"edge_info": {},
Expand Down

0 comments on commit 87b5e61

Please sign in to comment.