Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#595] Improve artifact/parameter access #639

Merged
merged 14 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions src/hera/workflows/_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
VolumeMount,
)
from hera.workflows.parameter import MISSING, Parameter
from hera.workflows.protocol import Templatable
from hera.workflows.resources import Resources
from hera.workflows.user_container import UserContainer
from hera.workflows.volume import Volume, _BaseVolume
Expand Down Expand Up @@ -670,6 +671,45 @@ class TemplateInvocatorSubNodeMixin(BaseMixin):
when: Optional[str] = None
with_sequence: Optional[Sequence] = None

@property
def _subtype(self) -> str:
raise NotImplementedError

@property
def id(self) -> str:
"""ID of this node."""
return f"{{{{{self._subtype}.{self.name}.id}}}}"

@property
def ip(self) -> str:
"""IP of this node."""
return f"{{{{{self._subtype}.{self.name}.ip}}}}"

@property
def status(self) -> str:
"""Status of this node."""
return f"{{{{{self._subtype}.{self.name}.status}}}}"

@property
def exit_code(self) -> str:
"""ExitCode holds the exit code of a script template."""
return f"{{{{{self._subtype}.{self.name}.exitCode}}}}"

@property
def started_at(self) -> str:
"""Time at which this node started."""
return f"{{{{{self._subtype}.{self.name}.startedAt}}}}"

@property
def finished_at(self) -> str:
"""Time at which this node completed."""
return f"{{{{{self._subtype}.{self.name}.finishedAt}}}}"

@property
def result(self) -> str:
"""Result holds the result (stdout) of a script template."""
return f"{{{{{self._subtype}.{self.name}.outputs.result}}}}"

@root_validator(pre=False)
def _check_values(cls, values):
def one(xs: List):
Expand All @@ -680,6 +720,147 @@ def one(xs: List):
raise ValueError("exactly one of ['template', 'template_ref', 'inline'] must be present")
return values

def _get_parameters_as(self, name: str, subtype: str) -> Parameter:
"""Returns a `Parameter` that represents all the outputs of the specified subtype.

Parameters
----------
name: str
The name of the parameter to search for.
subtype: str
The inheritor subtype field, used to construct the output artifact `from_` reference.

Returns
-------
Parameter
The parameter, named based on the given `name`, along with a value that references all outputs.
"""
return Parameter(name=name, value=f"{{{{{subtype}.{self.name}.outputs.parameters}}}}")

def _get_parameter(self, name: str, subtype: str) -> Parameter:
"""Attempts to find the specified parameter in the outputs for the specified subtype.

Notes
-----
This is specifically designed to be invoked by inheritors.

Parameters
----------
name: str
The name of the parameter to search for.
subtype: str
The inheritor subtype field, used to construct the output artifact `from_` reference.

Returns
-------
Parameter
The parameter if found.

Raises
------
ValueError
When no outputs can be constructed/no outputs are set.
KeyError
When the artifact is not found.
NotImplementedError
When something else other than an `Parameter` is found for the specified name.
"""
if isinstance(self.template, str):
raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}")

# here, we build the template early to verify that we can get the outputs
if isinstance(self.template, Templatable):
template = self.template._build_template()
else:
template = self.template
Comment on lines +772 to +775
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general point here - we've been finding it hard to work with TemplateRefs that we know output certain parameters, so we might want to relax/change this validation. Something we'll look to address in a later PR, JFYI

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Do you imagine this can actually accommodate template refs by actually fetching the template based on the ref if self.template is a string, and search the params like that?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetching the template

This would be pretty awesome, and along the lines of what we're thinking - if we're able to statically generate stubs from a given WorkflowTemplate, so that we have a class that inherits from a Hera TemplateRef and works with our functions like get_parameter.

class TemplateRef(IOMixin):
    pass  # let's say we've implemented this


from hera.workflow import TemplateRef


class MyTemplateRefStub(TemplateRef):  # we generate this in some hera.workflow.template_ref_stubs module or something
    inputs = [Parameter("a-parameter", default="some-default")]
    outputs = [Parameter("an-output-parameter")]
    ... etc


# at this point, we know that the template is a `Template` object
if template.outputs is None: # type: ignore
raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}")
if template.outputs.parameters is None: # type: ignore
raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}")
parameters = template.outputs.parameters # type: ignore

obj = next((output for output in parameters if output.name == name), None)
if obj is not None:
return Parameter(
name=obj.name,
value=f"{{{{{subtype}.{self.name}.outputs.parameters.{name}}}}}",
)
raise KeyError(f"No output parameter named `{name}` found")

def _get_artifact(self, name: str, subtype: str) -> Artifact:
"""Attempts to find the specified artifact in the outputs for the specified subtype.

Notes
-----
This is specifically designed to be invoked by inheritors.

Parameters
----------
name: str
The name of the artifact to search for.
subtype: str
The inheritor subtype field, used to construct the output artifact `from_` reference.

Returns
-------
Artifact
The artifact if found.

Raises
------
ValueError
When no outputs can be constructed/no outputs are set.
KeyError
When the artifact is not found.
NotImplementedError
When something else other than an `Artifact` is found for the specified name.
"""
if isinstance(self.template, str):
raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}")

# here, we build the template early to verify that we can get the outputs
if isinstance(self.template, Templatable):
template = self.template._build_template()
else:
template = cast(Template, self.template)

# at this point, we know that the template is a `Template` object
if template.outputs is None: # type: ignore
raise ValueError(f"Cannot get output artifacts when the template has no outputs: {template}")
elif template.outputs.artifacts is None: # type: ignore
raise ValueError(f"Cannot get output artifacts when the template has no output artifacts: {template}")
artifacts = cast(List[ModelArtifact], template.outputs.artifacts)

obj = next((output for output in artifacts if output.name == name), None)
if obj is not None:
return Artifact(name=name, path=obj.path, from_=f"{{{{{subtype}.{self.name}.outputs.artifacts.{name}}}}}")
raise KeyError(f"No output artifact named `{name}` found")

def get_parameters_as(self, name: str) -> Parameter:
"""Returns a `Parameter` that represents all the outputs of this subnode.

Parameters
----------
name: str
The name of the parameter to search for.

Returns
-------
Parameter
The parameter, named based on the given `name`, along with a value that references all outputs.
"""
return self._get_parameters_as(name=name, subtype=self._subtype)

def get_artifact(self, name: str) -> Artifact:
"""Gets an artifact from the outputs of this subnode"""
return self._get_artifact(name=name, subtype=self._subtype)

def get_parameter(self, name: str) -> Parameter:
"""Gets a parameter from the outputs of this subnode"""
return self._get_parameter(name=name, subtype=self._subtype)


def _get_params_from_source(source: Callable) -> Optional[List[Parameter]]:
source_signature: Dict[str, Optional[object]] = {}
Expand Down
75 changes: 5 additions & 70 deletions src/hera/workflows/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
Template as _ModelTemplate,
WorkflowStep as _ModelWorkflowStep,
)
from hera.workflows.parameter import Parameter
from hera.workflows.protocol import Steppable, Templatable


Expand All @@ -31,78 +30,14 @@ class Step(
ParameterMixin,
ItemMixin,
):
"""Step is used to run a given template. Must be instantiated under a Steps or Parallel context,
or outside of a Workflow.
"""
Step is used to run a given template. Must be instantiated under a Steps or Parallel context, or
outside a Workflow.
"""

@property
def id(self) -> str:
return f"{{{{steps.{self.name}.id}}}}"

@property
def ip(self) -> str:
return f"{{{{steps.{self.name}.ip}}}}"

@property
def status(self) -> str:
return f"{{{{steps.{self.name}.status}}}}"

@property
def exit_code(self) -> str:
return f"{{{{steps.{self.name}.exitCode}}}}"

@property
def started_at(self) -> str:
return f"{{{{steps.{self.name}.startedAt}}}}"

@property
def finished_at(self) -> str:
return f"{{{{steps.{self.name}.finishedAt}}}}"

@property
def result(self) -> str:
return f"{{{{steps.{self.name}.outputs.result}}}}"

def get_parameters_as(self, name):
"""Gets all the output parameters from this task"""
return Parameter(name=name, value=f"{{{{steps.{self.name}.outputs.parameters}}}}")

def get_parameter(self, name: str) -> Parameter:
"""Returns a Parameter from the task's outputs based on the name.

Parameters
----------
name: str
The name of the parameter to extract as an output.

Returns
-------
Parameter
Parameter with the same name
"""
if isinstance(self.template, str):
raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}")

# here, we build the template early to verify that we can get the outputs
if isinstance(self.template, Templatable):
template = self.template._build_template()
else:
template = self.template

# at this point, we know that the template is a `Template` object
if template.outputs is None: # type: ignore
raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}")
if template.outputs.parameters is None: # type: ignore
raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}")
parameters = template.outputs.parameters # type: ignore

obj = next((output for output in parameters if output.name == name), None)
if obj is not None:
return Parameter(
name=obj.name,
value=f"{{{{steps.{self.name}.outputs.parameters.{name}}}}}",
)
raise KeyError(f"No output parameter named `{name}` found")
def _subtype(self) -> str:
return "steps"

def _build_as_workflow_step(self) -> _ModelWorkflowStep:
_template = None
Expand Down
70 changes: 2 additions & 68 deletions src/hera/workflows/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
Template,
)
from hera.workflows.operator import Operator
from hera.workflows.parameter import Parameter
from hera.workflows.protocol import Templatable
from hera.workflows.workflow_status import WorkflowStatus

Expand Down Expand Up @@ -119,73 +118,8 @@ def _get_dependency_tasks(self) -> List[str]:
return task_names

@property
def id(self) -> str:
return f"{{{{tasks.{self.name}.id}}}}"

@property
def ip(self) -> str:
return f"{{{{tasks.{self.name}.ip}}}}"

@property
def status(self) -> str:
return f"{{{{tasks.{self.name}.status}}}}"

@property
def exit_code(self) -> str:
return f"{{{{tasks.{self.name}.exitCode}}}}"

@property
def started_at(self) -> str:
return f"{{{{tasks.{self.name}.startedAt}}}}"

@property
def finished_at(self) -> str:
return f"{{{{tasks.{self.name}.finishedAt}}}}"

@property
def result(self) -> str:
return f"{{{{tasks.{self.name}.outputs.result}}}}"

def get_parameters_as(self, name: str) -> Parameter:
"""Gets all the output parameters from this task"""
return Parameter(name=name, value=f"{{{{tasks.{self.name}.outputs.parameters}}}}")

def get_parameter(self, name: str) -> Parameter:
"""Returns a Parameter from the task's outputs based on the name.

Parameters
----------
name: str
The name of the parameter to extract as an output.

Returns
-------
Parameter
Parameter with the same name
"""
if isinstance(self.template, str):
raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}")

# here, we build the template early to verify that we can get the outputs
if isinstance(self.template, Templatable):
template = self.template._build_template()
else:
template = self.template

# at this point, we know that the template is a `Template` object
if template.outputs is None: # type: ignore
raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}")
if template.outputs.parameters is None: # type: ignore
raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}")
parameters = template.outputs.parameters # type: ignore

obj = next((output for output in parameters if output.name == name), None)
if obj is not None:
return Parameter(
name=obj.name,
value=f"{{{{tasks.{self.name}.outputs.parameters.{name}}}}}",
)
raise KeyError(f"No output parameter named `{name}` found")
def _subtype(self) -> str:
return "tasks"

def next(self, other: Task, operator: Operator = Operator.and_, on: Optional[TaskResult] = None) -> Task:
"""Set self as a dependency of `other`."""
Expand Down