From b1bd0015b97a9069710fd00b3200c74ce6e920aa Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Fri, 26 May 2023 10:25:42 -0600 Subject: [PATCH] [#522] Add a wait + synchronous `Workflow.create` (#643) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Pull Request Checklist** - [x] Fixes #522 - [ ] Tests added - [x] Documentation/examples added - [x] [Good commit messages](https://cbea.ms/git-commit/) and/or PR title **Description of PR** See #522 for the feature request! --------- Signed-off-by: Flaviu Vadan Co-authored-by: Elliot Gunton --- src/hera/workflows/cron_workflow.py | 2 +- src/hera/workflows/workflow.py | 57 ++++++++++++++++++++++--- src/hera/workflows/workflow_template.py | 2 +- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/src/hera/workflows/cron_workflow.py b/src/hera/workflows/cron_workflow.py index 72ddb9656..14c02bc58 100644 --- a/src/hera/workflows/cron_workflow.py +++ b/src/hera/workflows/cron_workflow.py @@ -75,7 +75,7 @@ def build(self) -> TWorkflow: status=self.cron_status, ) - def create(self) -> TWorkflow: + def create(self) -> TWorkflow: # type: ignore """Creates the CronWorkflow on the Argo cluster.""" assert self.workflows_service, "workflow service not initialized" assert self.namespace, "workflow namespace not defined" diff --git a/src/hera/workflows/workflow.py b/src/hera/workflows/workflow.py index 1afd09e75..c265e7b89 100644 --- a/src/hera/workflows/workflow.py +++ b/src/hera/workflows/workflow.py @@ -5,6 +5,7 @@ """ from __future__ import annotations +import time from pathlib import Path from types import ModuleType from typing import Any, Dict, List, Optional, Union @@ -44,11 +45,12 @@ WorkflowLintRequest, WorkflowMetadata, WorkflowSpec as _ModelWorkflowSpec, - WorkflowStatus, + WorkflowStatus as _ModelWorkflowStatus, WorkflowTemplateRef, ) from hera.workflows.protocol import Templatable, TTemplate, TWorkflow, VolumeClaimable from hera.workflows.service import WorkflowsService +from hera.workflows.workflow_status import WorkflowStatus _yaml: Optional[ModuleType] = None try: @@ -82,7 +84,7 @@ class Workflow( # Workflow fields - https://argoproj.github.io/argo-workflows/fields/#workflow api_version: Optional[str] = None kind: Optional[str] = None - status: Optional[WorkflowStatus] = None + status: Optional[_ModelWorkflowStatus] = None # ObjectMeta fields - https://argoproj.github.io/argo-workflows/fields/#objectmeta annotations: Optional[Dict[str, str]] = None @@ -330,13 +332,58 @@ def to_yaml(self, *args, **kwargs) -> str: kwargs.setdefault("sort_keys", False) return _yaml.dump(self.to_dict(), *args, **kwargs) - def create(self) -> TWorkflow: - """Creates the Workflow on the Argo cluster.""" + def create(self, wait: bool = False, poll_interval: int = 5) -> TWorkflow: + """Creates the Workflow on the Argo cluster. + + Parameters + ---------- + wait: bool = False + If true then the workflow is created asynchronously and the function returns immediately. + If false then the workflow is created and the function blocks until the workflow is done executing. + poll_interval: int = 5 + The interval in seconds to poll the workflow status if wait is true. Ignored when wait is true. + """ assert self.workflows_service, "workflow service not initialized" assert self.namespace, "workflow namespace not defined" - return self.workflows_service.create_workflow( + + wf = self.workflows_service.create_workflow( WorkflowCreateRequest(workflow=self.build()), namespace=self.namespace ) + # set the workflow name to the name returned by the API, which helps cover the case of users relying on + # `generate_name=True` + self.name = wf.metadata.name + + if wait: + return self.wait(poll_interval=poll_interval) + return wf + + def wait(self, poll_interval: int = 5) -> TWorkflow: + """Waits for the Workflow to complete execution. + + Parameters + ---------- + poll_interval: int = 5 + The interval in seconds to poll the workflow status. + """ + assert self.workflows_service is not None, "workflow service not initialized" + assert self.namespace is not None, "workflow namespace not defined" + assert self.name is not None, "workflow name not defined" + + wf = self.workflows_service.get_workflow(self.name, namespace=self.namespace) + assert wf.metadata.name is not None, f"workflow name not defined for workflow {self.name}" + + assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}" + assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}" + status = WorkflowStatus.from_argo_status(wf.status.phase) + + # keep polling for workflow status until completed, at the interval dictated by the user + while status == WorkflowStatus.running: + time.sleep(poll_interval) + wf = self.workflows_service.get_workflow(wf.metadata.name, namespace=self.namespace) + assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}" + assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}" + status = WorkflowStatus.from_argo_status(wf.status.phase) + return wf def lint(self) -> TWorkflow: """Lints the Workflow using the Argo cluster.""" diff --git a/src/hera/workflows/workflow_template.py b/src/hera/workflows/workflow_template.py index 59801b325..0cefc5790 100644 --- a/src/hera/workflows/workflow_template.py +++ b/src/hera/workflows/workflow_template.py @@ -31,7 +31,7 @@ def _set_status(cls, v): if v is not None: raise ValueError("status is not a valid field on a WorkflowTemplate") - def create(self) -> TWorkflow: + def create(self) -> TWorkflow: # type: ignore """Creates the WorkflowTemplate on the Argo cluster.""" assert self.workflows_service, "workflow service not initialized" assert self.namespace, "workflow namespace not defined"