Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split out and handle 'params' in mapped operator #26100

Merged
merged 2 commits into from
Oct 6, 2022

Conversation

uranusjr
Copy link
Member

@uranusjr uranusjr commented Sep 1, 2022

Allow using params in expanding kwargs. Fix #24014.

Syntax unlocked:

# Two tasks with params={"a": 1} and params={"b": 2}.
MyOperator.partial(task_id="t").expand(params=[{"a": 1}, {"b": 2}])

# Two tasks with params={"a": 0, "b": 1} and params={"a": 0, "b": 2}.
# DAG-level params are also merged. If there are duplicated keys, the
# priority is mapped -> task-partial -> DAG.
MyOperator.partial(task_id="t", params={"a": 0}).expand(params=[{"b": 1}, {"b": 2}])

@uranusjr uranusjr added this to the Airflow 2.4.0 milestone Sep 1, 2022
@uranusjr uranusjr force-pushed the aip-42-task-param-expand branch 5 times, most recently from 5824f87 to 7b2115b Compare September 7, 2022 21:59
Copy link
Member Author

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some notes explaining the implementation, hopefully this is easier to review with them.

@@ -162,6 +165,7 @@ class OperatorPartial:
"""

operator_class: Type["BaseOperator"]
params: Union[ParamsDict, dict]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be able to expand a task against params, we need to be able to “merge” default DAG- and task-level params with the user-mapped params. So params is split out of other partial kwargs and treated specially. Partial kwargs should never contain the "params" key.

@@ -256,7 +256,6 @@ def partial(
partial_kwargs.setdefault("end_date", end_date)
partial_kwargs.setdefault("owner", owner)
partial_kwargs.setdefault("email", email)
partial_kwargs.setdefault("params", default_params)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don’t set the default params in partial_kwargs

return OperatorPartial(
operator_class=operator_class,
kwargs=partial_kwargs,
params=partial_params,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

… but pass them to this separate attribute.

@@ -363,7 +363,6 @@ def _expand(self, expand_input: ExpandInput, *, strict: bool) -> XComArg:
task_id = get_unique_task_id(partial_kwargs.pop("task_id"), dag, task_group)
if task_group:
task_id = task_group.child_id(task_id)
params = partial_kwargs.pop("params", None) or default_params
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this line is deleted since it’s now no-op.

Comment on lines -568 to 573
# Ordering is significant; mapped kwargs should override partial ones.

# If params appears in the mapped kwargs, we need to merge it into the
# partial params, overriding existing keys.
params = copy.copy(self.params)
with contextlib.suppress(KeyError):
params.update(mapped_kwargs["params"])

# Ordering is significant; mapped kwargs should override partial ones,
# and the specially handled params should be respected.
return {
"task_id": self.task_id,
"dag": self.dag,
"task_group": self.task_group,
"params": self.params,
"start_date": self.start_date,
"end_date": self.end_date,
**self.partial_kwargs,
**mapped_kwargs,
"params": params,
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual merging happens here. The merged params is put last so it overrides the incomplete mapped_kwargs["params"].

Comment on lines 1186 to 1196
"""Template all attributes listed in template_fields.
"""Template all attributes listed in *self.template_fields*.

This mutates the attributes in-place and is irreversible.

:param context: Dict with values to apply on content
:param jinja_env: Jinja environment
:param context: Context dict with values to apply on content.
:param jinja_env: Jinja environment to use for rendering.
"""
if not jinja_env:
jinja_env = self.get_template_env()
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
return self
return None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not change any behaviour, I only did it so the behaviour is easier to explain…

Comment on lines 2184 to 2188
original_task = self.task
rendered_task = self.task.render_template_fields(context)
if rendered_task is None: # Compatibility -- custom renderer, assume unmapped.
return self.task
original_task, self.task = self.task, rendered_task
if rendered_task is not None: # Mapped operator, assign unmapped task.
self.task = rendered_task
return original_task
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

… here. Previously it’s difficult to follow when rendered_task is None and when it’s not; now it’s simple—a mapped operator (MappedOperator subclases) performs unmapping and returns the unmapped task, while a non-mapped operator (BaseOperator subclasses) return None because no unmapping is needed.

@uranusjr
Copy link
Member Author

uranusjr commented Sep 27, 2022

This is going to conflict with #26702 and I want to get that one in first for 2.4.1. Resolved, should be ready for 2.5.0.

It turns out we need to update the template context a bit after
unmapping. This also fixes a bug that context["task"] pointed to the
mapped task but context["ti"].task is the unmapped task. They now both
point to the unmapped one.
@uranusjr uranusjr marked this pull request as ready for review September 29, 2022 03:42
@uranusjr uranusjr merged commit 0ec9651 into apache:main Oct 6, 2022
@uranusjr uranusjr deleted the aip-42-task-param-expand branch October 6, 2022 04:47
@ephraimbuddy ephraimbuddy added the type:new-feature Changelog: New Features label Oct 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:dynamic-task-mapping AIP-42 type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Task mapping against 'params'
4 participants