From 255a9f242dc9462f46174160f10c8774e96aa0e9 Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Thu, 25 May 2023 19:18:36 -0600 Subject: [PATCH 1/9] add special case handling of cron workflow status Signed-off-by: Flaviu Vadan --- scripts/service.py | 26 ++++++++++ src/hera/workflows/service.py | 91 ++++++++++++++++++++++++++++++++--- 2 files changed, 110 insertions(+), 7 deletions(-) diff --git a/scripts/service.py b/scripts/service.py index a480f605b..40ae52049 100644 --- a/scripts/service.py +++ b/scripts/service.py @@ -172,6 +172,32 @@ def __str__(self) -> str: ret_val = "str(resp.content)" elif "Response" in self.response.ref: ret_val = f"{self.response}()" + elif "CronWorkflow" in self.response.ref: + # when users schedule cron workflows that have not executed the moment they are scheduled, the response + # does contain `CronWorkflowStatus` but its fields are empty. However, the `CronWorkflowStatus` object, + # while option on `CronWorkflow`, has *required* fields. Here, we overwrite the response with a special + # case that handles setting the `CronWorkflowStatus` to `None` if the response is empty. + return f""" + {signature} + resp = requests.{self.method}( + url={req_url}, + params={params}, + headers={headers}, + data={body}, + verify=self.verify_ssl + ) + + if resp.ok: + resp_json = resp.json() + if "status" in resp_json and resp_json["status"]['active'] is None and resp_json["status"]['lastScheduledTime'] is None and resp_json["status"]['conditions'] is None: + # this is a necessary special case as the status fields cannot be empty on the `CronWorkflowStatus` + # object. So, we overwrite the response with a value that allows the response to pass through safely. + # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. + resp_json['status'] = None + return {self.response}(**resp_json) + else: + raise Exception(f"Server returned status code {{resp.status_code}} with error: {{resp.json()}}") + """ else: ret_val = f"{self.response}(**resp.json())" diff --git a/src/hera/workflows/service.py b/src/hera/workflows/service.py index bec5baf74..1a62f6f04 100644 --- a/src/hera/workflows/service.py +++ b/src/hera/workflows/service.py @@ -374,7 +374,18 @@ def list_cron_workflows( ) if resp.ok: - return CronWorkflowList(**resp.json()) + resp_json = resp.json() + if ( + "status" in resp_json + and resp_json["status"]["active"] is None + and resp_json["status"]["lastScheduledTime"] is None + and resp_json["status"]["conditions"] is None + ): + # this is a necessary special case as the status fields cannot be empty on the `CronWorkflowStatus` + # object. So, we overwrite the response with a value that allows the response to pass through safely. + # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. + resp_json["status"] = None + return CronWorkflowList(**resp_json) else: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") @@ -392,7 +403,18 @@ def create_cron_workflow(self, req: CreateCronWorkflowRequest, namespace: Option ) if resp.ok: - return CronWorkflow(**resp.json()) + resp_json = resp.json() + if ( + "status" in resp_json + and resp_json["status"]["active"] is None + and resp_json["status"]["lastScheduledTime"] is None + and resp_json["status"]["conditions"] is None + ): + # this is a necessary special case as the status fields cannot be empty on the `CronWorkflowStatus` + # object. So, we overwrite the response with a value that allows the response to pass through safely. + # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. + resp_json["status"] = None + return CronWorkflow(**resp_json) else: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") @@ -410,7 +432,18 @@ def lint_cron_workflow(self, req: LintCronWorkflowRequest, namespace: Optional[s ) if resp.ok: - return CronWorkflow(**resp.json()) + resp_json = resp.json() + if ( + "status" in resp_json + and resp_json["status"]["active"] is None + and resp_json["status"]["lastScheduledTime"] is None + and resp_json["status"]["conditions"] is None + ): + # this is a necessary special case as the status fields cannot be empty on the `CronWorkflowStatus` + # object. So, we overwrite the response with a value that allows the response to pass through safely. + # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. + resp_json["status"] = None + return CronWorkflow(**resp_json) else: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") @@ -428,7 +461,18 @@ def get_cron_workflow( ) if resp.ok: - return CronWorkflow(**resp.json()) + resp_json = resp.json() + if ( + "status" in resp_json + and resp_json["status"]["active"] is None + and resp_json["status"]["lastScheduledTime"] is None + and resp_json["status"]["conditions"] is None + ): + # this is a necessary special case as the status fields cannot be empty on the `CronWorkflowStatus` + # object. So, we overwrite the response with a value that allows the response to pass through safely. + # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. + resp_json["status"] = None + return CronWorkflow(**resp_json) else: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") @@ -448,7 +492,18 @@ def update_cron_workflow( ) if resp.ok: - return CronWorkflow(**resp.json()) + resp_json = resp.json() + if ( + "status" in resp_json + and resp_json["status"]["active"] is None + and resp_json["status"]["lastScheduledTime"] is None + and resp_json["status"]["conditions"] is None + ): + # this is a necessary special case as the status fields cannot be empty on the `CronWorkflowStatus` + # object. So, we overwrite the response with a value that allows the response to pass through safely. + # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. + resp_json["status"] = None + return CronWorkflow(**resp_json) else: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") @@ -501,7 +556,18 @@ def resume_cron_workflow( ) if resp.ok: - return CronWorkflow(**resp.json()) + resp_json = resp.json() + if ( + "status" in resp_json + and resp_json["status"]["active"] is None + and resp_json["status"]["lastScheduledTime"] is None + and resp_json["status"]["conditions"] is None + ): + # this is a necessary special case as the status fields cannot be empty on the `CronWorkflowStatus` + # object. So, we overwrite the response with a value that allows the response to pass through safely. + # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. + resp_json["status"] = None + return CronWorkflow(**resp_json) else: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") @@ -521,7 +587,18 @@ def suspend_cron_workflow( ) if resp.ok: - return CronWorkflow(**resp.json()) + resp_json = resp.json() + if ( + "status" in resp_json + and resp_json["status"]["active"] is None + and resp_json["status"]["lastScheduledTime"] is None + and resp_json["status"]["conditions"] is None + ): + # this is a necessary special case as the status fields cannot be empty on the `CronWorkflowStatus` + # object. So, we overwrite the response with a value that allows the response to pass through safely. + # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. + resp_json["status"] = None + return CronWorkflow(**resp_json) else: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") From 9eba9509604d659adf92c0b35614667767188561 Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Thu, 25 May 2023 19:54:48 -0600 Subject: [PATCH 2/9] add an exceptions module and expose that to services Signed-off-by: Flaviu Vadan --- scripts/service.py | 15 +- src/hera/events/service.py | 145 ++++++++++------ src/hera/shared/exceptions.py | 56 +++++++ src/hera/workflows/service.py | 301 +++++++++++++++++++++++----------- 4 files changed, 364 insertions(+), 153 deletions(-) create mode 100644 src/hera/shared/exceptions.py diff --git a/scripts/service.py b/scripts/service.py index 40ae52049..d245041b1 100644 --- a/scripts/service.py +++ b/scripts/service.py @@ -195,9 +195,11 @@ def __str__(self) -> str: # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json['status'] = None return {self.response}(**resp_json) - else: - raise Exception(f"Server returned status code {{resp.status_code}} with error: {{resp.json()}}") - """ + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {{resp.status_code}} with error: {{resp.json()}}", + ) + """ else: ret_val = f"{self.response}(**resp.json())" @@ -213,8 +215,10 @@ def __str__(self) -> str: if resp.ok: return {ret_val} - else: - raise Exception(f"Server returned status code {{resp.status_code}} with error: {{resp.json()}}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {{resp.status_code}} with error: {{resp.json()}}", + ) """ @@ -430,6 +434,7 @@ def get_service_def() -> str: import requests from hera.{module}.models import {imports} from hera.shared import global_config +from hera.shared.exceptions import exception_from_status_code from typing import Optional, cast class {models_type}Service: diff --git a/src/hera/events/service.py b/src/hera/events/service.py index 5b4135455..805eb6759 100644 --- a/src/hera/events/service.py +++ b/src/hera/events/service.py @@ -26,6 +26,7 @@ Version, ) from hera.shared import global_config +from hera.shared.exceptions import exception_from_status_code class EventsService: @@ -76,8 +77,10 @@ def list_event_sources( if resp.ok: return EventSourceList(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def create_event_source(self, req: CreateEventSourceRequest, namespace: Optional[str] = None) -> EventSource: resp = requests.post( @@ -94,8 +97,10 @@ def create_event_source(self, req: CreateEventSourceRequest, namespace: Optional if resp.ok: return EventSource(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_event_source(self, name: str, namespace: Optional[str] = None) -> EventSource: resp = requests.get( @@ -110,8 +115,10 @@ def get_event_source(self, name: str, namespace: Optional[str] = None) -> EventS if resp.ok: return EventSource(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def update_event_source( self, name: str, req: UpdateEventSourceRequest, namespace: Optional[str] = None @@ -130,8 +137,10 @@ def update_event_source( if resp.ok: return EventSource(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def delete_event_source( self, @@ -163,8 +172,10 @@ def delete_event_source( if resp.ok: return EventSourceDeletedResponse() - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def receive_event(self, discriminator: str, req: Item, namespace: Optional[str] = None) -> EventResponse: resp = requests.post( @@ -181,8 +192,10 @@ def receive_event(self, discriminator: str, req: Item, namespace: Optional[str] if resp.ok: return EventResponse() - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_info(self) -> InfoResponse: resp = requests.get( @@ -195,8 +208,10 @@ def get_info(self) -> InfoResponse: if resp.ok: return InfoResponse() - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def list_sensors( self, @@ -233,8 +248,10 @@ def list_sensors( if resp.ok: return SensorList(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def create_sensor(self, req: CreateSensorRequest, namespace: Optional[str] = None) -> Sensor: resp = requests.post( @@ -251,8 +268,10 @@ def create_sensor(self, req: CreateSensorRequest, namespace: Optional[str] = Non if resp.ok: return Sensor(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_sensor(self, name: str, namespace: Optional[str] = None, resource_version: Optional[str] = None) -> Sensor: resp = requests.get( @@ -267,8 +286,10 @@ def get_sensor(self, name: str, namespace: Optional[str] = None, resource_versio if resp.ok: return Sensor(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def update_sensor(self, name: str, req: UpdateSensorRequest, namespace: Optional[str] = None) -> Sensor: resp = requests.put( @@ -285,8 +306,10 @@ def update_sensor(self, name: str, req: UpdateSensorRequest, namespace: Optional if resp.ok: return Sensor(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def delete_sensor( self, @@ -318,8 +341,10 @@ def delete_sensor( if resp.ok: return DeleteSensorResponse() - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def watch_event_sources( self, @@ -356,8 +381,10 @@ def watch_event_sources( if resp.ok: return EventSourceWatchEvent(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def event_sources_logs( self, @@ -404,8 +431,10 @@ def event_sources_logs( if resp.ok: return EventsourceLogEntry(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def watch_events( self, @@ -442,8 +471,10 @@ def watch_events( if resp.ok: return Event(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def watch_sensors( self, @@ -480,8 +511,10 @@ def watch_sensors( if resp.ok: return SensorWatchEvent(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def sensors_logs( self, @@ -526,8 +559,10 @@ def sensors_logs( if resp.ok: return SensorLogEntry(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_user_info(self) -> GetUserInfoResponse: resp = requests.get( @@ -540,8 +575,10 @@ def get_user_info(self) -> GetUserInfoResponse: if resp.ok: return GetUserInfoResponse() - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_version(self) -> Version: resp = requests.get( @@ -554,8 +591,10 @@ def get_version(self) -> Version: if resp.ok: return Version(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_artifact_file( self, @@ -587,8 +626,10 @@ def get_artifact_file( if resp.ok: return str(resp.content) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: """Get an output artifact by UID.""" @@ -604,8 +645,10 @@ def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) if resp.ok: return str(resp.content) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_output_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: """Get an output artifact.""" @@ -624,8 +667,10 @@ def get_output_artifact(self, name: str, node_id: str, artifact_name: str, names if resp.ok: return str(resp.content) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: """Get an input artifact by UID.""" @@ -641,8 +686,10 @@ def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) if resp.ok: return str(resp.content) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: """Get an input artifact.""" @@ -661,8 +708,10 @@ def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namesp if resp.ok: return str(resp.content) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) __all__ = ["EventsService"] diff --git a/src/hera/shared/exceptions.py b/src/hera/shared/exceptions.py new file mode 100644 index 000000000..c0f056bb9 --- /dev/null +++ b/src/hera/shared/exceptions.py @@ -0,0 +1,56 @@ +"""Module that holds Hera specific exceptions. + +These are thin wrappers around the core Python `Exception`. +""" +from http import HTTPStatus +from typing import Type + + +class HeraException(Exception): + """Base class for exceptions in this module.""" + + status_code: int + + +class Unauthorized(HeraException): + status_code = HTTPStatus.UNAUTHORIZED.value + + +class BadRequest(HeraException): + status_code = HTTPStatus.BAD_REQUEST.value + + +class Forbidden(HeraException): + status_code = HTTPStatus.FORBIDDEN.value + + +class NotFound(HeraException): + status_code = HTTPStatus.NOT_FOUND.value + + +class NotImplemented(HeraException): + status_code = HTTPStatus.NOT_IMPLEMENTED.value + + +class AlreadyExists(HeraException): + status_code = HTTPStatus.CONFLICT.value + + +class InternalServerError(HeraException): + status_code = HTTPStatus.INTERNAL_SERVER_ERROR.value + + +status_code_to_exception_map: dict[int, Type[HeraException]] = { + Unauthorized.status_code: Unauthorized, + BadRequest.status_code: BadRequest, + Forbidden.status_code: Forbidden, + NotFound.status_code: NotFound, + NotImplemented.status_code: NotImplemented, + AlreadyExists.status_code: AlreadyExists, + InternalServerError.status_code: InternalServerError, +} + + +def exception_from_status_code(status_code: int, msg: str) -> HeraException: + """Returns a `HeraException` mapped from the given status code initialized with the given message""" + return status_code_to_exception_map.get(status_code, HeraException)(msg) diff --git a/src/hera/workflows/service.py b/src/hera/workflows/service.py index 1a62f6f04..535086ebd 100644 --- a/src/hera/workflows/service.py +++ b/src/hera/workflows/service.py @@ -4,6 +4,7 @@ import requests from hera.shared import global_config +from hera.shared.exceptions import exception_from_status_code from hera.workflows.models import ( ArchivedWorkflowDeletedResponse, ClusterWorkflowTemplate, @@ -97,8 +98,10 @@ def list_archived_workflows( if resp.ok: return WorkflowList(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def list_archived_workflow_label_keys(self) -> LabelKeys: resp = requests.get( @@ -111,8 +114,10 @@ def list_archived_workflow_label_keys(self) -> LabelKeys: if resp.ok: return LabelKeys(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def list_archived_workflow_label_values( self, @@ -146,8 +151,10 @@ def list_archived_workflow_label_values( if resp.ok: return LabelValues(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_archived_workflow(self, uid: str) -> Workflow: resp = requests.get( @@ -160,8 +167,10 @@ def get_archived_workflow(self, uid: str) -> Workflow: if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def delete_archived_workflow(self, uid: str) -> ArchivedWorkflowDeletedResponse: resp = requests.delete( @@ -174,8 +183,10 @@ def delete_archived_workflow(self, uid: str) -> ArchivedWorkflowDeletedResponse: if resp.ok: return ArchivedWorkflowDeletedResponse() - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def resubmit_archived_workflow(self, uid: str, req: ResubmitArchivedWorkflowRequest) -> Workflow: resp = requests.put( @@ -190,8 +201,10 @@ def resubmit_archived_workflow(self, uid: str, req: ResubmitArchivedWorkflowRequ if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def retry_archived_workflow(self, uid: str, req: RetryArchivedWorkflowRequest) -> Workflow: resp = requests.put( @@ -206,8 +219,10 @@ def retry_archived_workflow(self, uid: str, req: RetryArchivedWorkflowRequest) - if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def list_cluster_workflow_templates( self, @@ -241,8 +256,10 @@ def list_cluster_workflow_templates( if resp.ok: return ClusterWorkflowTemplateList(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def create_cluster_workflow_template(self, req: ClusterWorkflowTemplateCreateRequest) -> ClusterWorkflowTemplate: resp = requests.post( @@ -257,8 +274,10 @@ def create_cluster_workflow_template(self, req: ClusterWorkflowTemplateCreateReq if resp.ok: return ClusterWorkflowTemplate(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def lint_cluster_workflow_template(self, req: ClusterWorkflowTemplateLintRequest) -> ClusterWorkflowTemplate: resp = requests.post( @@ -273,8 +292,10 @@ def lint_cluster_workflow_template(self, req: ClusterWorkflowTemplateLintRequest if resp.ok: return ClusterWorkflowTemplate(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_cluster_workflow_template( self, name: str, resource_version: Optional[str] = None @@ -289,8 +310,10 @@ def get_cluster_workflow_template( if resp.ok: return ClusterWorkflowTemplate(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def update_cluster_workflow_template( self, name: str, req: ClusterWorkflowTemplateUpdateRequest @@ -307,8 +330,10 @@ def update_cluster_workflow_template( if resp.ok: return ClusterWorkflowTemplate(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def delete_cluster_workflow_template( self, @@ -337,8 +362,10 @@ def delete_cluster_workflow_template( if resp.ok: return ClusterWorkflowTemplateDeleteResponse() - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def list_cron_workflows( self, @@ -386,8 +413,10 @@ def list_cron_workflows( # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflowList(**resp_json) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def create_cron_workflow(self, req: CreateCronWorkflowRequest, namespace: Optional[str] = None) -> CronWorkflow: resp = requests.post( @@ -415,8 +444,10 @@ def create_cron_workflow(self, req: CreateCronWorkflowRequest, namespace: Option # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflow(**resp_json) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def lint_cron_workflow(self, req: LintCronWorkflowRequest, namespace: Optional[str] = None) -> CronWorkflow: resp = requests.post( @@ -444,8 +475,10 @@ def lint_cron_workflow(self, req: LintCronWorkflowRequest, namespace: Optional[s # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflow(**resp_json) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_cron_workflow( self, name: str, namespace: Optional[str] = None, resource_version: Optional[str] = None @@ -473,8 +506,10 @@ def get_cron_workflow( # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflow(**resp_json) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def update_cron_workflow( self, name: str, req: UpdateCronWorkflowRequest, namespace: Optional[str] = None @@ -504,8 +539,10 @@ def update_cron_workflow( # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflow(**resp_json) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def delete_cron_workflow( self, @@ -537,8 +574,10 @@ def delete_cron_workflow( if resp.ok: return CronWorkflowDeletedResponse() - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def resume_cron_workflow( self, name: str, req: CronWorkflowResumeRequest, namespace: Optional[str] = None @@ -568,8 +607,10 @@ def resume_cron_workflow( # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflow(**resp_json) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def suspend_cron_workflow( self, name: str, req: CronWorkflowSuspendRequest, namespace: Optional[str] = None @@ -599,8 +640,10 @@ def suspend_cron_workflow( # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflow(**resp_json) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_info(self) -> InfoResponse: resp = requests.get( @@ -613,8 +656,10 @@ def get_info(self) -> InfoResponse: if resp.ok: return InfoResponse() - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_user_info(self) -> GetUserInfoResponse: resp = requests.get( @@ -627,8 +672,10 @@ def get_user_info(self) -> GetUserInfoResponse: if resp.ok: return GetUserInfoResponse() - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_version(self) -> Version: resp = requests.get( @@ -641,8 +688,10 @@ def get_version(self) -> Version: if resp.ok: return Version(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def list_workflow_templates( self, @@ -679,8 +728,10 @@ def list_workflow_templates( if resp.ok: return WorkflowTemplateList(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def create_workflow_template( self, req: WorkflowTemplateCreateRequest, namespace: Optional[str] = None @@ -699,8 +750,10 @@ def create_workflow_template( if resp.ok: return WorkflowTemplate(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def lint_workflow_template( self, req: WorkflowTemplateLintRequest, namespace: Optional[str] = None @@ -719,8 +772,10 @@ def lint_workflow_template( if resp.ok: return WorkflowTemplate(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_workflow_template( self, name: str, namespace: Optional[str] = None, resource_version: Optional[str] = None @@ -737,8 +792,10 @@ def get_workflow_template( if resp.ok: return WorkflowTemplate(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def update_workflow_template( self, name: str, req: WorkflowTemplateUpdateRequest, namespace: Optional[str] = None @@ -757,8 +814,10 @@ def update_workflow_template( if resp.ok: return WorkflowTemplate(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def delete_workflow_template( self, @@ -790,8 +849,10 @@ def delete_workflow_template( if resp.ok: return WorkflowTemplateDeleteResponse() - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def list_workflows( self, @@ -830,8 +891,10 @@ def list_workflows( if resp.ok: return WorkflowList(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def create_workflow(self, req: WorkflowCreateRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.post( @@ -848,8 +911,10 @@ def create_workflow(self, req: WorkflowCreateRequest, namespace: Optional[str] = if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def lint_workflow(self, req: WorkflowLintRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.post( @@ -866,8 +931,10 @@ def lint_workflow(self, req: WorkflowLintRequest, namespace: Optional[str] = Non if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def submit_workflow(self, req: WorkflowSubmitRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.post( @@ -884,8 +951,10 @@ def submit_workflow(self, req: WorkflowSubmitRequest, namespace: Optional[str] = if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_workflow( self, @@ -906,8 +975,10 @@ def get_workflow( if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def delete_workflow( self, @@ -941,8 +1012,10 @@ def delete_workflow( if resp.ok: return WorkflowDeleteResponse() - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def workflow_logs( self, @@ -988,8 +1061,10 @@ def workflow_logs( if resp.ok: return V1alpha1LogEntry(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def resubmit_workflow(self, name: str, req: WorkflowResubmitRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.put( @@ -1006,8 +1081,10 @@ def resubmit_workflow(self, name: str, req: WorkflowResubmitRequest, namespace: if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def resume_workflow(self, name: str, req: WorkflowResumeRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.put( @@ -1024,8 +1101,10 @@ def resume_workflow(self, name: str, req: WorkflowResumeRequest, namespace: Opti if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def retry_workflow(self, name: str, req: WorkflowRetryRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.put( @@ -1042,8 +1121,10 @@ def retry_workflow(self, name: str, req: WorkflowRetryRequest, namespace: Option if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def set_workflow(self, name: str, req: WorkflowSetRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.put( @@ -1060,8 +1141,10 @@ def set_workflow(self, name: str, req: WorkflowSetRequest, namespace: Optional[s if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def stop_workflow(self, name: str, req: WorkflowStopRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.put( @@ -1078,8 +1161,10 @@ def stop_workflow(self, name: str, req: WorkflowStopRequest, namespace: Optional if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def suspend_workflow(self, name: str, req: WorkflowSuspendRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.put( @@ -1096,8 +1181,10 @@ def suspend_workflow(self, name: str, req: WorkflowSuspendRequest, namespace: Op if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def terminate_workflow( self, name: str, req: WorkflowTerminateRequest, namespace: Optional[str] = None @@ -1116,8 +1203,10 @@ def terminate_workflow( if resp.ok: return Workflow(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def pod_logs( self, @@ -1163,8 +1252,10 @@ def pod_logs( if resp.ok: return V1alpha1LogEntry(**resp.json()) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_artifact_file( self, @@ -1196,8 +1287,10 @@ def get_artifact_file( if resp.ok: return str(resp.content) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: """Get an output artifact by UID.""" @@ -1213,8 +1306,10 @@ def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) if resp.ok: return str(resp.content) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_output_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: """Get an output artifact.""" @@ -1233,8 +1328,10 @@ def get_output_artifact(self, name: str, node_id: str, artifact_name: str, names if resp.ok: return str(resp.content) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: """Get an input artifact by UID.""" @@ -1250,8 +1347,10 @@ def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) if resp.ok: return str(resp.content) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: """Get an input artifact.""" @@ -1270,8 +1369,10 @@ def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namesp if resp.ok: return str(resp.content) - else: - raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with error: {resp.json()}", + ) __all__ = ["WorkflowsService"] From 648ce195215c2975911e3063a694c9f70b1da0de Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Thu, 25 May 2023 20:33:22 -0600 Subject: [PATCH 3/9] add tempalte invokator parameter/artifact accessor Signed-off-by: Flaviu Vadan --- src/hera/workflows/_mixins.py | 119 ++++++++++++++++++++++++++++++++++ src/hera/workflows/steps.py | 52 ++++++--------- src/hera/workflows/task.py | 43 ++++-------- 3 files changed, 152 insertions(+), 62 deletions(-) diff --git a/src/hera/workflows/_mixins.py b/src/hera/workflows/_mixins.py index fe18e66f4..d7897ab8e 100644 --- a/src/hera/workflows/_mixins.py +++ b/src/hera/workflows/_mixins.py @@ -54,6 +54,7 @@ VolumeMount, ) from hera.workflows.parameter import MISSING, Parameter +from hera.workflows.protocol import Templatable from hera.workflows.resources import Resources from hera.workflows.user_container import UserContainer from hera.workflows.volume import Volume, _BaseVolume @@ -675,6 +676,124 @@ def one(xs: List): raise ValueError("exactly one of ['template', 'template_ref', 'inline'] must be present") return values + def _get_parameters_as(self, name: str, subtype: str) -> Parameter: + """Returns a `Parameter` that represents all the outputs of the specified subtype. + + Parameters + ---------- + name: str + The name of the parameter to search for. + subtype: str + The inheritor subtype field, used to construct the output artifact `from_` reference. + + Returns + ------- + Parameter + The parameter, named based on the given `name`, along with a value that references all outputs. + """ + return Parameter(name=name, value=f"{{{{{subtype}.{self.name}.outputs.parameters}}}}") + + def _get_parameter(self, name: str, subtype: str) -> Parameter: + """Attempts to find the specified parameter in the outputs for the specified subtype. + + Notes + ----- + This is specifically designed to be invoked by inheritors. + + Parameters + ---------- + name: str + The name of the parameter to search for. + subtype: str + The inheritor subtype field, used to construct the output artifact `from_` reference. + + Returns + ------- + Parameter + The parameter if found. + + Raises + ------ + ValueError + When no outputs can be constructed/no outputs are set. + KeyError + When the artifact is not found. + NotImplementedError + When something else other than an `Parameter` is found for the specified name. + """ + if isinstance(self.template, str): + raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}") + + # here, we build the template early to verify that we can get the outputs + if isinstance(self.template, Templatable): + template = self.template._build_template() + else: + template = self.template + + # at this point, we know that the template is a `Template` object + if template.outputs is None: # type: ignore + raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}") + if template.outputs.parameters is None: # type: ignore + raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}") + parameters = template.outputs.parameters # type: ignore + + obj = next((output for output in parameters if output.name == name), None) + if obj is not None: + return Parameter( + name=obj.name, + value=f"{{{{{subtype}.{self.name}.outputs.parameters.{name}}}}}", + ) + raise KeyError(f"No output parameter named `{name}` found") + + def _get_artifact(self, name: str, subtype: str) -> Artifact: + """Attempts to find the specified artifact in the outputs for the specified subtype. + + Notes + ----- + This is specifically designed to be invoked by inheritors. + + Parameters + ---------- + name: str + The name of the artifact to search for. + subtype: str + The inheritor subtype field, used to construct the output artifact `from_` reference. + + Returns + ------- + Artifact + The artifact if found. + + Raises + ------ + ValueError + When no outputs can be constructed/no outputs are set. + KeyError + When the artifact is not found. + NotImplementedError + When something else other than an `Artifact` is found for the specified name. + """ + if isinstance(self.template, str): + raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}") + + # here, we build the template early to verify that we can get the outputs + if isinstance(self.template, Templatable): + template = self.template._build_template() + else: + template = cast(Template, self.template) + + # at this point, we know that the template is a `Template` object + if template.outputs is None: # type: ignore + raise ValueError(f"Cannot get output artifacts when the template has no outputs: {template}") + elif template.outputs.artifacts is None: # type: ignore + raise ValueError(f"Cannot get output artifacts when the template has no output artifacts: {template}") + artifacts = cast(List[ModelArtifact], template.outputs.artifacts) + + obj = next((output for output in artifacts if output.name == name), None) + if obj is not None: + return Artifact(name=name, path=obj.path, from_=f"{{{{{subtype}.{self.name}.outputs.artifacts.{name}}}}}") + raise KeyError(f"No output artifact named `{name}` found") + def _get_params_from_source(source: Callable) -> Optional[List[Parameter]]: source_signature: Dict[str, Optional[object]] = {} diff --git a/src/hera/workflows/steps.py b/src/hera/workflows/steps.py index 7d01f6e1a..3adb49fc2 100644 --- a/src/hera/workflows/steps.py +++ b/src/hera/workflows/steps.py @@ -15,6 +15,7 @@ TemplateInvocatorSubNodeMixin, TemplateMixin, ) +from hera.workflows.artifact import Artifact from hera.workflows.exceptions import InvalidType from hera.workflows.models import ( Template as _ModelTemplate, @@ -31,8 +32,9 @@ class Step( ParameterMixin, ItemMixin, ): - """Step is used to run a given template. Must be instantiated under a Steps or Parallel context, - or outside of a Workflow. + """ + Step is used to run a given template. Must be instantiated under a Steps or Parallel context, or + outside a Workflow. """ @property @@ -63,46 +65,30 @@ def finished_at(self) -> str: def result(self) -> str: return f"{{{{steps.{self.name}.outputs.result}}}}" - def get_parameters_as(self, name): - """Gets all the output parameters from this task""" - return Parameter(name=name, value=f"{{{{steps.{self.name}.outputs.parameters}}}}") - - def get_parameter(self, name: str) -> Parameter: - """Returns a Parameter from the task's outputs based on the name. + def get_parameters_as(self, name: str) -> Parameter: + """Returns a `Parameter` that represents all the outputs of the specified subtype. Parameters ---------- name: str - The name of the parameter to extract as an output. + The name of the parameter to search for. + subtype: str + The inheritor subtype field, used to construct the output artifact `from_` reference. Returns ------- Parameter - Parameter with the same name + The parameter, named based on the given `name`, along with a value that references all outputs. """ - if isinstance(self.template, str): - raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}") - - # here, we build the template early to verify that we can get the outputs - if isinstance(self.template, Templatable): - template = self.template._build_template() - else: - template = self.template - - # at this point, we know that the template is a `Template` object - if template.outputs is None: # type: ignore - raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}") - if template.outputs.parameters is None: # type: ignore - raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}") - parameters = template.outputs.parameters # type: ignore - - obj = next((output for output in parameters if output.name == name), None) - if obj is not None: - return Parameter( - name=obj.name, - value=f"{{{{steps.{self.name}.outputs.parameters.{name}}}}}", - ) - raise KeyError(f"No output parameter named `{name}` found") + return super()._get_parameters_as(name=name, subtype="steps") + + def get_artifact(self, name: str) -> Artifact: + """Gets an artifact from the outputs of this `Step`""" + return super()._get_artifact(name=name, subtype="steps") + + def get_parameter(self, name: str) -> Parameter: + """Gets a parameter from the outputs of this `Step`""" + return super()._get_parameter(name=name, subtype="steps") def _build_as_workflow_step(self) -> _ModelWorkflowStep: _template = None diff --git a/src/hera/workflows/task.py b/src/hera/workflows/task.py index cd5b0e0fa..1bd2b6eff 100644 --- a/src/hera/workflows/task.py +++ b/src/hera/workflows/task.py @@ -16,6 +16,7 @@ TemplateInvocatorSubNodeMixin, TemplateMixin, ) +from hera.workflows.artifact import Artifact from hera.workflows.models import ( DAGTask as _ModelDAGTask, Template, @@ -147,45 +148,29 @@ def result(self) -> str: return f"{{{{tasks.{self.name}.outputs.result}}}}" def get_parameters_as(self, name: str) -> Parameter: - """Gets all the output parameters from this task""" - return Parameter(name=name, value=f"{{{{tasks.{self.name}.outputs.parameters}}}}") - - def get_parameter(self, name: str) -> Parameter: - """Returns a Parameter from the task's outputs based on the name. + """Returns a `Parameter` that represents all the outputs of the specified subtype. Parameters ---------- name: str - The name of the parameter to extract as an output. + The name of the parameter to search for. + subtype: str + The inheritor subtype field, used to construct the output artifact `from_` reference. Returns ------- Parameter - Parameter with the same name + The parameter, named based on the given `name`, along with a value that references all outputs. """ - if isinstance(self.template, str): - raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}") + return super()._get_parameters_as(name=name, subtype="tasks") - # here, we build the template early to verify that we can get the outputs - if isinstance(self.template, Templatable): - template = self.template._build_template() - else: - template = self.template - - # at this point, we know that the template is a `Template` object - if template.outputs is None: # type: ignore - raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}") - if template.outputs.parameters is None: # type: ignore - raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}") - parameters = template.outputs.parameters # type: ignore - - obj = next((output for output in parameters if output.name == name), None) - if obj is not None: - return Parameter( - name=obj.name, - value=f"{{{{tasks.{self.name}.outputs.parameters.{name}}}}}", - ) - raise KeyError(f"No output parameter named `{name}` found") + def get_artifact(self, name: str) -> Artifact: + """Gets an artifact from the outputs of this `Task`""" + return super()._get_artifact(name=name, subtype="tasks") + + def get_parameter(self, name: str) -> Parameter: + """Gets a parameter from the outputs of this `Task`""" + return super()._get_parameter(name=name, subtype="tasks") def next(self, other: Task, operator: Operator = Operator.and_, on: Optional[TaskResult] = None) -> Task: """Set self as a dependency of `other`.""" From 0abebe08a5793d3bf8a087ddbca6b2581850191d Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Thu, 25 May 2023 20:37:55 -0600 Subject: [PATCH 4/9] use Dict Signed-off-by: Flaviu Vadan --- src/hera/shared/exceptions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hera/shared/exceptions.py b/src/hera/shared/exceptions.py index c0f056bb9..1168d2990 100644 --- a/src/hera/shared/exceptions.py +++ b/src/hera/shared/exceptions.py @@ -3,7 +3,7 @@ These are thin wrappers around the core Python `Exception`. """ from http import HTTPStatus -from typing import Type +from typing import Dict, Type class HeraException(Exception): @@ -40,7 +40,7 @@ class InternalServerError(HeraException): status_code = HTTPStatus.INTERNAL_SERVER_ERROR.value -status_code_to_exception_map: dict[int, Type[HeraException]] = { +status_code_to_exception_map: Dict[int, Type[HeraException]] = { Unauthorized.status_code: Unauthorized, BadRequest.status_code: BadRequest, Forbidden.status_code: Forbidden, From 16bd2f8bd64475c5a8c916dd7724324cce0876f9 Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Thu, 25 May 2023 20:38:08 -0600 Subject: [PATCH 5/9] use Dict Signed-off-by: Flaviu Vadan --- src/hera/shared/exceptions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hera/shared/exceptions.py b/src/hera/shared/exceptions.py index c0f056bb9..1168d2990 100644 --- a/src/hera/shared/exceptions.py +++ b/src/hera/shared/exceptions.py @@ -3,7 +3,7 @@ These are thin wrappers around the core Python `Exception`. """ from http import HTTPStatus -from typing import Type +from typing import Dict, Type class HeraException(Exception): @@ -40,7 +40,7 @@ class InternalServerError(HeraException): status_code = HTTPStatus.INTERNAL_SERVER_ERROR.value -status_code_to_exception_map: dict[int, Type[HeraException]] = { +status_code_to_exception_map: Dict[int, Type[HeraException]] = { Unauthorized.status_code: Unauthorized, BadRequest.status_code: BadRequest, Forbidden.status_code: Forbidden, From 7da48f8c0ac74043f5a52187b1240b3e90500b23 Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Thu, 25 May 2023 20:41:20 -0600 Subject: [PATCH 6/9] make a proper exceptions module Signed-off-by: Flaviu Vadan --- scripts/service.py | 2 +- src/hera/events/service.py | 2 +- src/hera/{shared/exceptions.py => exceptions/__init__.py} | 0 src/hera/workflows/service.py | 2 +- 4 files changed, 3 insertions(+), 3 deletions(-) rename src/hera/{shared/exceptions.py => exceptions/__init__.py} (100%) diff --git a/scripts/service.py b/scripts/service.py index d245041b1..a8e045f69 100644 --- a/scripts/service.py +++ b/scripts/service.py @@ -434,7 +434,7 @@ def get_service_def() -> str: import requests from hera.{module}.models import {imports} from hera.shared import global_config -from hera.shared.exceptions import exception_from_status_code +from hera.exceptions import exception_from_status_code from typing import Optional, cast class {models_type}Service: diff --git a/src/hera/events/service.py b/src/hera/events/service.py index 805eb6759..3681b5f9e 100644 --- a/src/hera/events/service.py +++ b/src/hera/events/service.py @@ -25,8 +25,8 @@ UpdateSensorRequest, Version, ) +from hera.exceptions import exception_from_status_code from hera.shared import global_config -from hera.shared.exceptions import exception_from_status_code class EventsService: diff --git a/src/hera/shared/exceptions.py b/src/hera/exceptions/__init__.py similarity index 100% rename from src/hera/shared/exceptions.py rename to src/hera/exceptions/__init__.py diff --git a/src/hera/workflows/service.py b/src/hera/workflows/service.py index 535086ebd..fec0d5bba 100644 --- a/src/hera/workflows/service.py +++ b/src/hera/workflows/service.py @@ -3,8 +3,8 @@ import requests +from hera.exceptions import exception_from_status_code from hera.shared import global_config -from hera.shared.exceptions import exception_from_status_code from hera.workflows.models import ( ArchivedWorkflowDeletedResponse, ClusterWorkflowTemplate, From 1cf7abf1576403b843501ab8dbca284b4e4c735d Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Thu, 25 May 2023 21:18:58 -0600 Subject: [PATCH 7/9] extract message from error Signed-off-by: Flaviu Vadan --- scripts/service.py | 4 +- src/hera/events/service.py | 48 ++++++++-------- src/hera/workflows/service.py | 100 +++++++++++++++++----------------- 3 files changed, 76 insertions(+), 76 deletions(-) diff --git a/scripts/service.py b/scripts/service.py index a8e045f69..07520f1eb 100644 --- a/scripts/service.py +++ b/scripts/service.py @@ -197,7 +197,7 @@ def __str__(self) -> str: return {self.response}(**resp_json) raise exception_from_status_code( resp.status_code, - f"Server returned status code {{resp.status_code}} with error: {{resp.json()}}", + f"Server returned status code {{resp.status_code}} with error: {{resp.json()['message']}}", ) """ else: @@ -217,7 +217,7 @@ def __str__(self) -> str: return {ret_val} raise exception_from_status_code( resp.status_code, - f"Server returned status code {{resp.status_code}} with error: {{resp.json()}}", + f"Server returned status code {{resp.status_code}} with error: {{resp.json()['message']}}", ) """ diff --git a/src/hera/events/service.py b/src/hera/events/service.py index 3681b5f9e..4af1ce94f 100644 --- a/src/hera/events/service.py +++ b/src/hera/events/service.py @@ -79,7 +79,7 @@ def list_event_sources( return EventSourceList(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def create_event_source(self, req: CreateEventSourceRequest, namespace: Optional[str] = None) -> EventSource: @@ -99,7 +99,7 @@ def create_event_source(self, req: CreateEventSourceRequest, namespace: Optional return EventSource(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_event_source(self, name: str, namespace: Optional[str] = None) -> EventSource: @@ -117,7 +117,7 @@ def get_event_source(self, name: str, namespace: Optional[str] = None) -> EventS return EventSource(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def update_event_source( @@ -139,7 +139,7 @@ def update_event_source( return EventSource(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def delete_event_source( @@ -174,7 +174,7 @@ def delete_event_source( return EventSourceDeletedResponse() raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def receive_event(self, discriminator: str, req: Item, namespace: Optional[str] = None) -> EventResponse: @@ -194,7 +194,7 @@ def receive_event(self, discriminator: str, req: Item, namespace: Optional[str] return EventResponse() raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_info(self) -> InfoResponse: @@ -210,7 +210,7 @@ def get_info(self) -> InfoResponse: return InfoResponse() raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def list_sensors( @@ -250,7 +250,7 @@ def list_sensors( return SensorList(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def create_sensor(self, req: CreateSensorRequest, namespace: Optional[str] = None) -> Sensor: @@ -270,7 +270,7 @@ def create_sensor(self, req: CreateSensorRequest, namespace: Optional[str] = Non return Sensor(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_sensor(self, name: str, namespace: Optional[str] = None, resource_version: Optional[str] = None) -> Sensor: @@ -288,7 +288,7 @@ def get_sensor(self, name: str, namespace: Optional[str] = None, resource_versio return Sensor(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def update_sensor(self, name: str, req: UpdateSensorRequest, namespace: Optional[str] = None) -> Sensor: @@ -308,7 +308,7 @@ def update_sensor(self, name: str, req: UpdateSensorRequest, namespace: Optional return Sensor(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def delete_sensor( @@ -343,7 +343,7 @@ def delete_sensor( return DeleteSensorResponse() raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def watch_event_sources( @@ -383,7 +383,7 @@ def watch_event_sources( return EventSourceWatchEvent(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def event_sources_logs( @@ -433,7 +433,7 @@ def event_sources_logs( return EventsourceLogEntry(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def watch_events( @@ -473,7 +473,7 @@ def watch_events( return Event(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def watch_sensors( @@ -513,7 +513,7 @@ def watch_sensors( return SensorWatchEvent(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def sensors_logs( @@ -561,7 +561,7 @@ def sensors_logs( return SensorLogEntry(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_user_info(self) -> GetUserInfoResponse: @@ -577,7 +577,7 @@ def get_user_info(self) -> GetUserInfoResponse: return GetUserInfoResponse() raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_version(self) -> Version: @@ -593,7 +593,7 @@ def get_version(self) -> Version: return Version(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_artifact_file( @@ -628,7 +628,7 @@ def get_artifact_file( return str(resp.content) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: @@ -647,7 +647,7 @@ def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) return str(resp.content) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_output_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: @@ -669,7 +669,7 @@ def get_output_artifact(self, name: str, node_id: str, artifact_name: str, names return str(resp.content) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: @@ -688,7 +688,7 @@ def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) return str(resp.content) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: @@ -710,7 +710,7 @@ def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namesp return str(resp.content) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) diff --git a/src/hera/workflows/service.py b/src/hera/workflows/service.py index fec0d5bba..2739c53bf 100644 --- a/src/hera/workflows/service.py +++ b/src/hera/workflows/service.py @@ -100,7 +100,7 @@ def list_archived_workflows( return WorkflowList(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def list_archived_workflow_label_keys(self) -> LabelKeys: @@ -116,7 +116,7 @@ def list_archived_workflow_label_keys(self) -> LabelKeys: return LabelKeys(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def list_archived_workflow_label_values( @@ -153,7 +153,7 @@ def list_archived_workflow_label_values( return LabelValues(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_archived_workflow(self, uid: str) -> Workflow: @@ -169,7 +169,7 @@ def get_archived_workflow(self, uid: str) -> Workflow: return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def delete_archived_workflow(self, uid: str) -> ArchivedWorkflowDeletedResponse: @@ -185,7 +185,7 @@ def delete_archived_workflow(self, uid: str) -> ArchivedWorkflowDeletedResponse: return ArchivedWorkflowDeletedResponse() raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def resubmit_archived_workflow(self, uid: str, req: ResubmitArchivedWorkflowRequest) -> Workflow: @@ -203,7 +203,7 @@ def resubmit_archived_workflow(self, uid: str, req: ResubmitArchivedWorkflowRequ return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def retry_archived_workflow(self, uid: str, req: RetryArchivedWorkflowRequest) -> Workflow: @@ -221,7 +221,7 @@ def retry_archived_workflow(self, uid: str, req: RetryArchivedWorkflowRequest) - return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def list_cluster_workflow_templates( @@ -258,7 +258,7 @@ def list_cluster_workflow_templates( return ClusterWorkflowTemplateList(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def create_cluster_workflow_template(self, req: ClusterWorkflowTemplateCreateRequest) -> ClusterWorkflowTemplate: @@ -276,7 +276,7 @@ def create_cluster_workflow_template(self, req: ClusterWorkflowTemplateCreateReq return ClusterWorkflowTemplate(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def lint_cluster_workflow_template(self, req: ClusterWorkflowTemplateLintRequest) -> ClusterWorkflowTemplate: @@ -294,7 +294,7 @@ def lint_cluster_workflow_template(self, req: ClusterWorkflowTemplateLintRequest return ClusterWorkflowTemplate(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_cluster_workflow_template( @@ -312,7 +312,7 @@ def get_cluster_workflow_template( return ClusterWorkflowTemplate(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def update_cluster_workflow_template( @@ -332,7 +332,7 @@ def update_cluster_workflow_template( return ClusterWorkflowTemplate(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def delete_cluster_workflow_template( @@ -364,7 +364,7 @@ def delete_cluster_workflow_template( return ClusterWorkflowTemplateDeleteResponse() raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def list_cron_workflows( @@ -415,7 +415,7 @@ def list_cron_workflows( return CronWorkflowList(**resp_json) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def create_cron_workflow(self, req: CreateCronWorkflowRequest, namespace: Optional[str] = None) -> CronWorkflow: @@ -446,7 +446,7 @@ def create_cron_workflow(self, req: CreateCronWorkflowRequest, namespace: Option return CronWorkflow(**resp_json) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def lint_cron_workflow(self, req: LintCronWorkflowRequest, namespace: Optional[str] = None) -> CronWorkflow: @@ -477,7 +477,7 @@ def lint_cron_workflow(self, req: LintCronWorkflowRequest, namespace: Optional[s return CronWorkflow(**resp_json) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_cron_workflow( @@ -508,7 +508,7 @@ def get_cron_workflow( return CronWorkflow(**resp_json) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def update_cron_workflow( @@ -541,7 +541,7 @@ def update_cron_workflow( return CronWorkflow(**resp_json) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def delete_cron_workflow( @@ -576,7 +576,7 @@ def delete_cron_workflow( return CronWorkflowDeletedResponse() raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def resume_cron_workflow( @@ -609,7 +609,7 @@ def resume_cron_workflow( return CronWorkflow(**resp_json) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def suspend_cron_workflow( @@ -642,7 +642,7 @@ def suspend_cron_workflow( return CronWorkflow(**resp_json) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_info(self) -> InfoResponse: @@ -658,7 +658,7 @@ def get_info(self) -> InfoResponse: return InfoResponse() raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_user_info(self) -> GetUserInfoResponse: @@ -674,7 +674,7 @@ def get_user_info(self) -> GetUserInfoResponse: return GetUserInfoResponse() raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_version(self) -> Version: @@ -690,7 +690,7 @@ def get_version(self) -> Version: return Version(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def list_workflow_templates( @@ -730,7 +730,7 @@ def list_workflow_templates( return WorkflowTemplateList(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def create_workflow_template( @@ -752,7 +752,7 @@ def create_workflow_template( return WorkflowTemplate(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def lint_workflow_template( @@ -774,7 +774,7 @@ def lint_workflow_template( return WorkflowTemplate(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_workflow_template( @@ -794,7 +794,7 @@ def get_workflow_template( return WorkflowTemplate(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def update_workflow_template( @@ -816,7 +816,7 @@ def update_workflow_template( return WorkflowTemplate(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def delete_workflow_template( @@ -851,7 +851,7 @@ def delete_workflow_template( return WorkflowTemplateDeleteResponse() raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def list_workflows( @@ -893,7 +893,7 @@ def list_workflows( return WorkflowList(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def create_workflow(self, req: WorkflowCreateRequest, namespace: Optional[str] = None) -> Workflow: @@ -913,7 +913,7 @@ def create_workflow(self, req: WorkflowCreateRequest, namespace: Optional[str] = return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def lint_workflow(self, req: WorkflowLintRequest, namespace: Optional[str] = None) -> Workflow: @@ -933,7 +933,7 @@ def lint_workflow(self, req: WorkflowLintRequest, namespace: Optional[str] = Non return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def submit_workflow(self, req: WorkflowSubmitRequest, namespace: Optional[str] = None) -> Workflow: @@ -953,7 +953,7 @@ def submit_workflow(self, req: WorkflowSubmitRequest, namespace: Optional[str] = return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_workflow( @@ -977,7 +977,7 @@ def get_workflow( return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def delete_workflow( @@ -1014,7 +1014,7 @@ def delete_workflow( return WorkflowDeleteResponse() raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def workflow_logs( @@ -1063,7 +1063,7 @@ def workflow_logs( return V1alpha1LogEntry(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def resubmit_workflow(self, name: str, req: WorkflowResubmitRequest, namespace: Optional[str] = None) -> Workflow: @@ -1083,7 +1083,7 @@ def resubmit_workflow(self, name: str, req: WorkflowResubmitRequest, namespace: return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def resume_workflow(self, name: str, req: WorkflowResumeRequest, namespace: Optional[str] = None) -> Workflow: @@ -1103,7 +1103,7 @@ def resume_workflow(self, name: str, req: WorkflowResumeRequest, namespace: Opti return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def retry_workflow(self, name: str, req: WorkflowRetryRequest, namespace: Optional[str] = None) -> Workflow: @@ -1123,7 +1123,7 @@ def retry_workflow(self, name: str, req: WorkflowRetryRequest, namespace: Option return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def set_workflow(self, name: str, req: WorkflowSetRequest, namespace: Optional[str] = None) -> Workflow: @@ -1143,7 +1143,7 @@ def set_workflow(self, name: str, req: WorkflowSetRequest, namespace: Optional[s return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def stop_workflow(self, name: str, req: WorkflowStopRequest, namespace: Optional[str] = None) -> Workflow: @@ -1163,7 +1163,7 @@ def stop_workflow(self, name: str, req: WorkflowStopRequest, namespace: Optional return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def suspend_workflow(self, name: str, req: WorkflowSuspendRequest, namespace: Optional[str] = None) -> Workflow: @@ -1183,7 +1183,7 @@ def suspend_workflow(self, name: str, req: WorkflowSuspendRequest, namespace: Op return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def terminate_workflow( @@ -1205,7 +1205,7 @@ def terminate_workflow( return Workflow(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def pod_logs( @@ -1254,7 +1254,7 @@ def pod_logs( return V1alpha1LogEntry(**resp.json()) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_artifact_file( @@ -1289,7 +1289,7 @@ def get_artifact_file( return str(resp.content) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: @@ -1308,7 +1308,7 @@ def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) return str(resp.content) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_output_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: @@ -1330,7 +1330,7 @@ def get_output_artifact(self, name: str, node_id: str, artifact_name: str, names return str(resp.content) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: @@ -1349,7 +1349,7 @@ def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) return str(resp.content) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: @@ -1371,7 +1371,7 @@ def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namesp return str(resp.content) raise exception_from_status_code( resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()}", + f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", ) From db472151d2b9e86db21e4d37918f2e1d9eba3e1b Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Thu, 25 May 2023 22:21:11 -0600 Subject: [PATCH 8/9] add handling of json decode errors Signed-off-by: Flaviu Vadan --- scripts/service.py | 31 +- src/hera/events/service.py | 361 +++++++++++----- src/hera/workflows/service.py | 751 +++++++++++++++++++++++++--------- 3 files changed, 839 insertions(+), 304 deletions(-) diff --git a/scripts/service.py b/scripts/service.py index 07520f1eb..a4e6da152 100644 --- a/scripts/service.py +++ b/scripts/service.py @@ -195,10 +195,17 @@ def __str__(self) -> str: # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json['status'] = None return {self.response}(**resp_json) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {{resp.status_code}} with error: {{resp.json()['message']}}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {{resp.status_code}} with message: `{{resp.json()['message']}}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {{resp.status_code}} with message: `{{resp.text}}`", + ) """ else: ret_val = f"{self.response}(**resp.json())" @@ -215,10 +222,17 @@ def __str__(self) -> str: if resp.ok: return {ret_val} - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {{resp.status_code}} with error: {{resp.json()['message']}}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {{resp.status_code}} with message: `{{resp.json()['message']}}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {{resp.status_code}} with message: `{{resp.text}}`", + ) """ @@ -432,6 +446,7 @@ def get_service_def() -> str: return """ from urllib.parse import urljoin import requests +import json from hera.{module}.models import {imports} from hera.shared import global_config from hera.exceptions import exception_from_status_code diff --git a/src/hera/events/service.py b/src/hera/events/service.py index 4af1ce94f..49033fe76 100644 --- a/src/hera/events/service.py +++ b/src/hera/events/service.py @@ -1,3 +1,4 @@ +import json from typing import Optional, cast from urllib.parse import urljoin @@ -77,10 +78,17 @@ def list_event_sources( if resp.ok: return EventSourceList(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def create_event_source(self, req: CreateEventSourceRequest, namespace: Optional[str] = None) -> EventSource: resp = requests.post( @@ -97,10 +105,17 @@ def create_event_source(self, req: CreateEventSourceRequest, namespace: Optional if resp.ok: return EventSource(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_event_source(self, name: str, namespace: Optional[str] = None) -> EventSource: resp = requests.get( @@ -115,10 +130,17 @@ def get_event_source(self, name: str, namespace: Optional[str] = None) -> EventS if resp.ok: return EventSource(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def update_event_source( self, name: str, req: UpdateEventSourceRequest, namespace: Optional[str] = None @@ -137,10 +159,17 @@ def update_event_source( if resp.ok: return EventSource(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def delete_event_source( self, @@ -172,10 +201,17 @@ def delete_event_source( if resp.ok: return EventSourceDeletedResponse() - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def receive_event(self, discriminator: str, req: Item, namespace: Optional[str] = None) -> EventResponse: resp = requests.post( @@ -192,10 +228,17 @@ def receive_event(self, discriminator: str, req: Item, namespace: Optional[str] if resp.ok: return EventResponse() - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_info(self) -> InfoResponse: resp = requests.get( @@ -208,10 +251,17 @@ def get_info(self) -> InfoResponse: if resp.ok: return InfoResponse() - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def list_sensors( self, @@ -248,10 +298,17 @@ def list_sensors( if resp.ok: return SensorList(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def create_sensor(self, req: CreateSensorRequest, namespace: Optional[str] = None) -> Sensor: resp = requests.post( @@ -268,10 +325,17 @@ def create_sensor(self, req: CreateSensorRequest, namespace: Optional[str] = Non if resp.ok: return Sensor(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_sensor(self, name: str, namespace: Optional[str] = None, resource_version: Optional[str] = None) -> Sensor: resp = requests.get( @@ -286,10 +350,17 @@ def get_sensor(self, name: str, namespace: Optional[str] = None, resource_versio if resp.ok: return Sensor(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def update_sensor(self, name: str, req: UpdateSensorRequest, namespace: Optional[str] = None) -> Sensor: resp = requests.put( @@ -306,10 +377,17 @@ def update_sensor(self, name: str, req: UpdateSensorRequest, namespace: Optional if resp.ok: return Sensor(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def delete_sensor( self, @@ -341,10 +419,17 @@ def delete_sensor( if resp.ok: return DeleteSensorResponse() - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def watch_event_sources( self, @@ -381,10 +466,17 @@ def watch_event_sources( if resp.ok: return EventSourceWatchEvent(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def event_sources_logs( self, @@ -431,10 +523,17 @@ def event_sources_logs( if resp.ok: return EventsourceLogEntry(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def watch_events( self, @@ -471,10 +570,17 @@ def watch_events( if resp.ok: return Event(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def watch_sensors( self, @@ -511,10 +617,17 @@ def watch_sensors( if resp.ok: return SensorWatchEvent(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def sensors_logs( self, @@ -559,10 +672,17 @@ def sensors_logs( if resp.ok: return SensorLogEntry(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_user_info(self) -> GetUserInfoResponse: resp = requests.get( @@ -575,10 +695,17 @@ def get_user_info(self) -> GetUserInfoResponse: if resp.ok: return GetUserInfoResponse() - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_version(self) -> Version: resp = requests.get( @@ -591,10 +718,17 @@ def get_version(self) -> Version: if resp.ok: return Version(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_artifact_file( self, @@ -626,10 +760,17 @@ def get_artifact_file( if resp.ok: return str(resp.content) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: """Get an output artifact by UID.""" @@ -645,10 +786,17 @@ def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) if resp.ok: return str(resp.content) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_output_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: """Get an output artifact.""" @@ -667,10 +815,17 @@ def get_output_artifact(self, name: str, node_id: str, artifact_name: str, names if resp.ok: return str(resp.content) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: """Get an input artifact by UID.""" @@ -686,10 +841,17 @@ def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) if resp.ok: return str(resp.content) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: """Get an input artifact.""" @@ -708,10 +870,17 @@ def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namesp if resp.ok: return str(resp.content) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) __all__ = ["EventsService"] diff --git a/src/hera/workflows/service.py b/src/hera/workflows/service.py index 2739c53bf..6514fc32a 100644 --- a/src/hera/workflows/service.py +++ b/src/hera/workflows/service.py @@ -1,3 +1,4 @@ +import json from typing import Optional, cast from urllib.parse import urljoin @@ -98,10 +99,17 @@ def list_archived_workflows( if resp.ok: return WorkflowList(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def list_archived_workflow_label_keys(self) -> LabelKeys: resp = requests.get( @@ -114,10 +122,17 @@ def list_archived_workflow_label_keys(self) -> LabelKeys: if resp.ok: return LabelKeys(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def list_archived_workflow_label_values( self, @@ -151,10 +166,17 @@ def list_archived_workflow_label_values( if resp.ok: return LabelValues(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_archived_workflow(self, uid: str) -> Workflow: resp = requests.get( @@ -167,10 +189,17 @@ def get_archived_workflow(self, uid: str) -> Workflow: if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def delete_archived_workflow(self, uid: str) -> ArchivedWorkflowDeletedResponse: resp = requests.delete( @@ -183,10 +212,17 @@ def delete_archived_workflow(self, uid: str) -> ArchivedWorkflowDeletedResponse: if resp.ok: return ArchivedWorkflowDeletedResponse() - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def resubmit_archived_workflow(self, uid: str, req: ResubmitArchivedWorkflowRequest) -> Workflow: resp = requests.put( @@ -201,10 +237,17 @@ def resubmit_archived_workflow(self, uid: str, req: ResubmitArchivedWorkflowRequ if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def retry_archived_workflow(self, uid: str, req: RetryArchivedWorkflowRequest) -> Workflow: resp = requests.put( @@ -219,10 +262,17 @@ def retry_archived_workflow(self, uid: str, req: RetryArchivedWorkflowRequest) - if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def list_cluster_workflow_templates( self, @@ -256,10 +306,17 @@ def list_cluster_workflow_templates( if resp.ok: return ClusterWorkflowTemplateList(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def create_cluster_workflow_template(self, req: ClusterWorkflowTemplateCreateRequest) -> ClusterWorkflowTemplate: resp = requests.post( @@ -274,10 +331,17 @@ def create_cluster_workflow_template(self, req: ClusterWorkflowTemplateCreateReq if resp.ok: return ClusterWorkflowTemplate(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def lint_cluster_workflow_template(self, req: ClusterWorkflowTemplateLintRequest) -> ClusterWorkflowTemplate: resp = requests.post( @@ -292,10 +356,17 @@ def lint_cluster_workflow_template(self, req: ClusterWorkflowTemplateLintRequest if resp.ok: return ClusterWorkflowTemplate(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_cluster_workflow_template( self, name: str, resource_version: Optional[str] = None @@ -310,10 +381,17 @@ def get_cluster_workflow_template( if resp.ok: return ClusterWorkflowTemplate(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def update_cluster_workflow_template( self, name: str, req: ClusterWorkflowTemplateUpdateRequest @@ -330,10 +408,17 @@ def update_cluster_workflow_template( if resp.ok: return ClusterWorkflowTemplate(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def delete_cluster_workflow_template( self, @@ -362,10 +447,17 @@ def delete_cluster_workflow_template( if resp.ok: return ClusterWorkflowTemplateDeleteResponse() - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def list_cron_workflows( self, @@ -413,10 +505,17 @@ def list_cron_workflows( # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflowList(**resp_json) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def create_cron_workflow(self, req: CreateCronWorkflowRequest, namespace: Optional[str] = None) -> CronWorkflow: resp = requests.post( @@ -444,10 +543,17 @@ def create_cron_workflow(self, req: CreateCronWorkflowRequest, namespace: Option # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflow(**resp_json) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def lint_cron_workflow(self, req: LintCronWorkflowRequest, namespace: Optional[str] = None) -> CronWorkflow: resp = requests.post( @@ -475,10 +581,17 @@ def lint_cron_workflow(self, req: LintCronWorkflowRequest, namespace: Optional[s # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflow(**resp_json) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_cron_workflow( self, name: str, namespace: Optional[str] = None, resource_version: Optional[str] = None @@ -506,10 +619,17 @@ def get_cron_workflow( # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflow(**resp_json) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def update_cron_workflow( self, name: str, req: UpdateCronWorkflowRequest, namespace: Optional[str] = None @@ -539,10 +659,17 @@ def update_cron_workflow( # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflow(**resp_json) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def delete_cron_workflow( self, @@ -574,10 +701,17 @@ def delete_cron_workflow( if resp.ok: return CronWorkflowDeletedResponse() - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def resume_cron_workflow( self, name: str, req: CronWorkflowResumeRequest, namespace: Optional[str] = None @@ -607,10 +741,17 @@ def resume_cron_workflow( # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflow(**resp_json) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def suspend_cron_workflow( self, name: str, req: CronWorkflowSuspendRequest, namespace: Optional[str] = None @@ -640,10 +781,17 @@ def suspend_cron_workflow( # See `hera.scripts.service.ServiceEndpoint.__str__` for more details. resp_json["status"] = None return CronWorkflow(**resp_json) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_info(self) -> InfoResponse: resp = requests.get( @@ -656,10 +804,17 @@ def get_info(self) -> InfoResponse: if resp.ok: return InfoResponse() - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_user_info(self) -> GetUserInfoResponse: resp = requests.get( @@ -672,10 +827,17 @@ def get_user_info(self) -> GetUserInfoResponse: if resp.ok: return GetUserInfoResponse() - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_version(self) -> Version: resp = requests.get( @@ -688,10 +850,17 @@ def get_version(self) -> Version: if resp.ok: return Version(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def list_workflow_templates( self, @@ -728,10 +897,17 @@ def list_workflow_templates( if resp.ok: return WorkflowTemplateList(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def create_workflow_template( self, req: WorkflowTemplateCreateRequest, namespace: Optional[str] = None @@ -750,10 +926,17 @@ def create_workflow_template( if resp.ok: return WorkflowTemplate(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def lint_workflow_template( self, req: WorkflowTemplateLintRequest, namespace: Optional[str] = None @@ -772,10 +955,17 @@ def lint_workflow_template( if resp.ok: return WorkflowTemplate(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_workflow_template( self, name: str, namespace: Optional[str] = None, resource_version: Optional[str] = None @@ -792,10 +982,17 @@ def get_workflow_template( if resp.ok: return WorkflowTemplate(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def update_workflow_template( self, name: str, req: WorkflowTemplateUpdateRequest, namespace: Optional[str] = None @@ -814,10 +1011,17 @@ def update_workflow_template( if resp.ok: return WorkflowTemplate(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def delete_workflow_template( self, @@ -849,10 +1053,17 @@ def delete_workflow_template( if resp.ok: return WorkflowTemplateDeleteResponse() - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def list_workflows( self, @@ -891,10 +1102,17 @@ def list_workflows( if resp.ok: return WorkflowList(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def create_workflow(self, req: WorkflowCreateRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.post( @@ -911,10 +1129,17 @@ def create_workflow(self, req: WorkflowCreateRequest, namespace: Optional[str] = if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def lint_workflow(self, req: WorkflowLintRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.post( @@ -931,10 +1156,17 @@ def lint_workflow(self, req: WorkflowLintRequest, namespace: Optional[str] = Non if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def submit_workflow(self, req: WorkflowSubmitRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.post( @@ -951,10 +1183,17 @@ def submit_workflow(self, req: WorkflowSubmitRequest, namespace: Optional[str] = if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_workflow( self, @@ -975,10 +1214,17 @@ def get_workflow( if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def delete_workflow( self, @@ -1012,10 +1258,17 @@ def delete_workflow( if resp.ok: return WorkflowDeleteResponse() - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def workflow_logs( self, @@ -1061,10 +1314,17 @@ def workflow_logs( if resp.ok: return V1alpha1LogEntry(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def resubmit_workflow(self, name: str, req: WorkflowResubmitRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.put( @@ -1081,10 +1341,17 @@ def resubmit_workflow(self, name: str, req: WorkflowResubmitRequest, namespace: if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def resume_workflow(self, name: str, req: WorkflowResumeRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.put( @@ -1101,10 +1368,17 @@ def resume_workflow(self, name: str, req: WorkflowResumeRequest, namespace: Opti if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def retry_workflow(self, name: str, req: WorkflowRetryRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.put( @@ -1121,10 +1395,17 @@ def retry_workflow(self, name: str, req: WorkflowRetryRequest, namespace: Option if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def set_workflow(self, name: str, req: WorkflowSetRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.put( @@ -1141,10 +1422,17 @@ def set_workflow(self, name: str, req: WorkflowSetRequest, namespace: Optional[s if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def stop_workflow(self, name: str, req: WorkflowStopRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.put( @@ -1161,10 +1449,17 @@ def stop_workflow(self, name: str, req: WorkflowStopRequest, namespace: Optional if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def suspend_workflow(self, name: str, req: WorkflowSuspendRequest, namespace: Optional[str] = None) -> Workflow: resp = requests.put( @@ -1181,10 +1476,17 @@ def suspend_workflow(self, name: str, req: WorkflowSuspendRequest, namespace: Op if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def terminate_workflow( self, name: str, req: WorkflowTerminateRequest, namespace: Optional[str] = None @@ -1203,10 +1505,17 @@ def terminate_workflow( if resp.ok: return Workflow(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def pod_logs( self, @@ -1252,10 +1561,17 @@ def pod_logs( if resp.ok: return V1alpha1LogEntry(**resp.json()) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_artifact_file( self, @@ -1287,10 +1603,17 @@ def get_artifact_file( if resp.ok: return str(resp.content) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: """Get an output artifact by UID.""" @@ -1306,10 +1629,17 @@ def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) if resp.ok: return str(resp.content) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_output_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: """Get an output artifact.""" @@ -1328,10 +1658,17 @@ def get_output_artifact(self, name: str, node_id: str, artifact_name: str, names if resp.ok: return str(resp.content) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: """Get an input artifact by UID.""" @@ -1347,10 +1684,17 @@ def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) if resp.ok: return str(resp.content) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: """Get an input artifact.""" @@ -1369,10 +1713,17 @@ def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namesp if resp.ok: return str(resp.content) - raise exception_from_status_code( - resp.status_code, - f"Server returned status code {resp.status_code} with error: {resp.json()['message']}", - ) + + try: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.json()['message']}`", + ) + except json.JSONDecodeError: + raise exception_from_status_code( + resp.status_code, + f"Server returned status code {resp.status_code} with message: `{resp.text}`", + ) __all__ = ["WorkflowsService"] From b97a45ce98484e372b2f10e94f99a565304cc45b Mon Sep 17 00:00:00 2001 From: Elliot Gunton Date: Fri, 26 May 2023 14:23:55 +0100 Subject: [PATCH 9/9] Move properties/funcs into TISubNodeMixin (#644) Small PR on top of a WIP to avoid making suggestions/commits --------- Signed-off-by: Elliot Gunton --- src/hera/workflows/_mixins.py | 62 +++++++++++++++++++++++++++++++++++ src/hera/workflows/steps.py | 55 ++----------------------------- src/hera/workflows/task.py | 55 ++----------------------------- 3 files changed, 66 insertions(+), 106 deletions(-) diff --git a/src/hera/workflows/_mixins.py b/src/hera/workflows/_mixins.py index d7897ab8e..872a0e69f 100644 --- a/src/hera/workflows/_mixins.py +++ b/src/hera/workflows/_mixins.py @@ -666,6 +666,45 @@ class TemplateInvocatorSubNodeMixin(BaseMixin): when: Optional[str] = None with_sequence: Optional[Sequence] = None + @property + def _subtype(self) -> str: + raise NotImplementedError + + @property + def id(self) -> str: + """ID of this node.""" + return f"{{{{{self._subtype}.{self.name}.id}}}}" + + @property + def ip(self) -> str: + """IP of this node.""" + return f"{{{{{self._subtype}.{self.name}.ip}}}}" + + @property + def status(self) -> str: + """Status of this node.""" + return f"{{{{{self._subtype}.{self.name}.status}}}}" + + @property + def exit_code(self) -> str: + """ExitCode holds the exit code of a script template.""" + return f"{{{{{self._subtype}.{self.name}.exitCode}}}}" + + @property + def started_at(self) -> str: + """Time at which this node started.""" + return f"{{{{{self._subtype}.{self.name}.startedAt}}}}" + + @property + def finished_at(self) -> str: + """Time at which this node completed.""" + return f"{{{{{self._subtype}.{self.name}.finishedAt}}}}" + + @property + def result(self) -> str: + """Result holds the result (stdout) of a script template.""" + return f"{{{{{self._subtype}.{self.name}.outputs.result}}}}" + @root_validator(pre=False) def _check_values(cls, values): def one(xs: List): @@ -794,6 +833,29 @@ def _get_artifact(self, name: str, subtype: str) -> Artifact: return Artifact(name=name, path=obj.path, from_=f"{{{{{subtype}.{self.name}.outputs.artifacts.{name}}}}}") raise KeyError(f"No output artifact named `{name}` found") + def get_parameters_as(self, name: str) -> Parameter: + """Returns a `Parameter` that represents all the outputs of this subnode. + + Parameters + ---------- + name: str + The name of the parameter to search for. + + Returns + ------- + Parameter + The parameter, named based on the given `name`, along with a value that references all outputs. + """ + return self._get_parameters_as(name=name, subtype=self._subtype) + + def get_artifact(self, name: str) -> Artifact: + """Gets an artifact from the outputs of this subnode""" + return self._get_artifact(name=name, subtype=self._subtype) + + def get_parameter(self, name: str) -> Parameter: + """Gets a parameter from the outputs of this subnode""" + return self._get_parameter(name=name, subtype=self._subtype) + def _get_params_from_source(source: Callable) -> Optional[List[Parameter]]: source_signature: Dict[str, Optional[object]] = {} diff --git a/src/hera/workflows/steps.py b/src/hera/workflows/steps.py index 3adb49fc2..1dee96f5e 100644 --- a/src/hera/workflows/steps.py +++ b/src/hera/workflows/steps.py @@ -15,13 +15,11 @@ TemplateInvocatorSubNodeMixin, TemplateMixin, ) -from hera.workflows.artifact import Artifact from hera.workflows.exceptions import InvalidType from hera.workflows.models import ( Template as _ModelTemplate, WorkflowStep as _ModelWorkflowStep, ) -from hera.workflows.parameter import Parameter from hera.workflows.protocol import Steppable, Templatable @@ -38,57 +36,8 @@ class Step( """ @property - def id(self) -> str: - return f"{{{{steps.{self.name}.id}}}}" - - @property - def ip(self) -> str: - return f"{{{{steps.{self.name}.ip}}}}" - - @property - def status(self) -> str: - return f"{{{{steps.{self.name}.status}}}}" - - @property - def exit_code(self) -> str: - return f"{{{{steps.{self.name}.exitCode}}}}" - - @property - def started_at(self) -> str: - return f"{{{{steps.{self.name}.startedAt}}}}" - - @property - def finished_at(self) -> str: - return f"{{{{steps.{self.name}.finishedAt}}}}" - - @property - def result(self) -> str: - return f"{{{{steps.{self.name}.outputs.result}}}}" - - def get_parameters_as(self, name: str) -> Parameter: - """Returns a `Parameter` that represents all the outputs of the specified subtype. - - Parameters - ---------- - name: str - The name of the parameter to search for. - subtype: str - The inheritor subtype field, used to construct the output artifact `from_` reference. - - Returns - ------- - Parameter - The parameter, named based on the given `name`, along with a value that references all outputs. - """ - return super()._get_parameters_as(name=name, subtype="steps") - - def get_artifact(self, name: str) -> Artifact: - """Gets an artifact from the outputs of this `Step`""" - return super()._get_artifact(name=name, subtype="steps") - - def get_parameter(self, name: str) -> Parameter: - """Gets a parameter from the outputs of this `Step`""" - return super()._get_parameter(name=name, subtype="steps") + def _subtype(self) -> str: + return "steps" def _build_as_workflow_step(self) -> _ModelWorkflowStep: _template = None diff --git a/src/hera/workflows/task.py b/src/hera/workflows/task.py index 1bd2b6eff..247ca1d01 100644 --- a/src/hera/workflows/task.py +++ b/src/hera/workflows/task.py @@ -16,13 +16,11 @@ TemplateInvocatorSubNodeMixin, TemplateMixin, ) -from hera.workflows.artifact import Artifact from hera.workflows.models import ( DAGTask as _ModelDAGTask, Template, ) from hera.workflows.operator import Operator -from hera.workflows.parameter import Parameter from hera.workflows.protocol import Templatable from hera.workflows.workflow_status import WorkflowStatus @@ -120,57 +118,8 @@ def _get_dependency_tasks(self) -> List[str]: return task_names @property - def id(self) -> str: - return f"{{{{tasks.{self.name}.id}}}}" - - @property - def ip(self) -> str: - return f"{{{{tasks.{self.name}.ip}}}}" - - @property - def status(self) -> str: - return f"{{{{tasks.{self.name}.status}}}}" - - @property - def exit_code(self) -> str: - return f"{{{{tasks.{self.name}.exitCode}}}}" - - @property - def started_at(self) -> str: - return f"{{{{tasks.{self.name}.startedAt}}}}" - - @property - def finished_at(self) -> str: - return f"{{{{tasks.{self.name}.finishedAt}}}}" - - @property - def result(self) -> str: - return f"{{{{tasks.{self.name}.outputs.result}}}}" - - def get_parameters_as(self, name: str) -> Parameter: - """Returns a `Parameter` that represents all the outputs of the specified subtype. - - Parameters - ---------- - name: str - The name of the parameter to search for. - subtype: str - The inheritor subtype field, used to construct the output artifact `from_` reference. - - Returns - ------- - Parameter - The parameter, named based on the given `name`, along with a value that references all outputs. - """ - return super()._get_parameters_as(name=name, subtype="tasks") - - def get_artifact(self, name: str) -> Artifact: - """Gets an artifact from the outputs of this `Task`""" - return super()._get_artifact(name=name, subtype="tasks") - - def get_parameter(self, name: str) -> Parameter: - """Gets a parameter from the outputs of this `Task`""" - return super()._get_parameter(name=name, subtype="tasks") + def _subtype(self) -> str: + return "tasks" def next(self, other: Task, operator: Operator = Operator.and_, on: Optional[TaskResult] = None) -> Task: """Set self as a dependency of `other`."""