Skip to content

Commit

Permalink
[#522] Add a wait + synchronous Workflow.create (#643)
Browse files Browse the repository at this point in the history
<!-- Thank you for submitting a PR to Hera! 🚀 -->

**Pull Request Checklist**
- [x] Fixes #522
- [ ] Tests added
- [x] Documentation/examples added
- [x] [Good commit messages](https://cbea.ms/git-commit/) and/or PR
title
<!-- Also remember to sign off commits or the DCO check will fail on
your PR! -->

**Description of PR**
<!-- If not linked to an issue, please describe your changes here -->

See #522 for the feature request!

<!-- Piece of cake! ✨🍰✨ -->

---------

Signed-off-by: Flaviu Vadan <flaviuvadan@gmail.com>
Co-authored-by: Elliot Gunton <egunton@bloomberg.net>
  • Loading branch information
flaviuvadan and elliotgunton authored May 26, 2023
1 parent da9a0ef commit b1bd001
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/hera/workflows/cron_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def build(self) -> TWorkflow:
status=self.cron_status,
)

def create(self) -> TWorkflow:
def create(self) -> TWorkflow: # type: ignore
"""Creates the CronWorkflow on the Argo cluster."""
assert self.workflows_service, "workflow service not initialized"
assert self.namespace, "workflow namespace not defined"
Expand Down
57 changes: 52 additions & 5 deletions src/hera/workflows/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
from __future__ import annotations

import time
from pathlib import Path
from types import ModuleType
from typing import Any, Dict, List, Optional, Union
Expand Down Expand Up @@ -44,11 +45,12 @@
WorkflowLintRequest,
WorkflowMetadata,
WorkflowSpec as _ModelWorkflowSpec,
WorkflowStatus,
WorkflowStatus as _ModelWorkflowStatus,
WorkflowTemplateRef,
)
from hera.workflows.protocol import Templatable, TTemplate, TWorkflow, VolumeClaimable
from hera.workflows.service import WorkflowsService
from hera.workflows.workflow_status import WorkflowStatus

_yaml: Optional[ModuleType] = None
try:
Expand Down Expand Up @@ -82,7 +84,7 @@ class Workflow(
# Workflow fields - https://argoproj.github.io/argo-workflows/fields/#workflow
api_version: Optional[str] = None
kind: Optional[str] = None
status: Optional[WorkflowStatus] = None
status: Optional[_ModelWorkflowStatus] = None

# ObjectMeta fields - https://argoproj.github.io/argo-workflows/fields/#objectmeta
annotations: Optional[Dict[str, str]] = None
Expand Down Expand Up @@ -330,13 +332,58 @@ def to_yaml(self, *args, **kwargs) -> str:
kwargs.setdefault("sort_keys", False)
return _yaml.dump(self.to_dict(), *args, **kwargs)

def create(self) -> TWorkflow:
"""Creates the Workflow on the Argo cluster."""
def create(self, wait: bool = False, poll_interval: int = 5) -> TWorkflow:
"""Creates the Workflow on the Argo cluster.
Parameters
----------
wait: bool = False
If true then the workflow is created asynchronously and the function returns immediately.
If false then the workflow is created and the function blocks until the workflow is done executing.
poll_interval: int = 5
The interval in seconds to poll the workflow status if wait is true. Ignored when wait is true.
"""
assert self.workflows_service, "workflow service not initialized"
assert self.namespace, "workflow namespace not defined"
return self.workflows_service.create_workflow(

wf = self.workflows_service.create_workflow(
WorkflowCreateRequest(workflow=self.build()), namespace=self.namespace
)
# set the workflow name to the name returned by the API, which helps cover the case of users relying on
# `generate_name=True`
self.name = wf.metadata.name

if wait:
return self.wait(poll_interval=poll_interval)
return wf

def wait(self, poll_interval: int = 5) -> TWorkflow:
"""Waits for the Workflow to complete execution.
Parameters
----------
poll_interval: int = 5
The interval in seconds to poll the workflow status.
"""
assert self.workflows_service is not None, "workflow service not initialized"
assert self.namespace is not None, "workflow namespace not defined"
assert self.name is not None, "workflow name not defined"

wf = self.workflows_service.get_workflow(self.name, namespace=self.namespace)
assert wf.metadata.name is not None, f"workflow name not defined for workflow {self.name}"

assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
status = WorkflowStatus.from_argo_status(wf.status.phase)

# keep polling for workflow status until completed, at the interval dictated by the user
while status == WorkflowStatus.running:
time.sleep(poll_interval)
wf = self.workflows_service.get_workflow(wf.metadata.name, namespace=self.namespace)
assert wf.status is not None, f"workflow status not defined for workflow {wf.metadata.name}"
assert wf.status.phase is not None, f"workflow phase not defined for workflow status {wf.status}"
status = WorkflowStatus.from_argo_status(wf.status.phase)
return wf

def lint(self) -> TWorkflow:
"""Lints the Workflow using the Argo cluster."""
Expand Down
2 changes: 1 addition & 1 deletion src/hera/workflows/workflow_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def _set_status(cls, v):
if v is not None:
raise ValueError("status is not a valid field on a WorkflowTemplate")

def create(self) -> TWorkflow:
def create(self) -> TWorkflow: # type: ignore
"""Creates the WorkflowTemplate on the Argo cluster."""
assert self.workflows_service, "workflow service not initialized"
assert self.namespace, "workflow namespace not defined"
Expand Down

0 comments on commit b1bd001

Please sign in to comment.