From 6ebcbe4be0e41f9dd2acf142894c022b715e841d Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Thu, 25 May 2023 20:47:37 -0600 Subject: [PATCH 1/2] add host validation for url scheme Signed-off-by: Flaviu Vadan --- scripts/service.py | 1 + src/hera/events/service.py | 3 +++ src/hera/workflows/service.py | 3 +++ 3 files changed, 7 insertions(+) diff --git a/scripts/service.py b/scripts/service.py index a480f605b..05a6d2ed8 100644 --- a/scripts/service.py +++ b/scripts/service.py @@ -415,6 +415,7 @@ def __init__( namespace: Optional[str] = None, ): self.host = cast(str, host or global_config.host) + assert self.host.startswith("http") or self.host.startswith("https"), "The host scheme is required for service usage" self.verify_ssl = verify_ssl if verify_ssl is not None else global_config.verify_ssl self.token = token or global_config.token self.namespace = namespace or global_config.namespace diff --git a/src/hera/events/service.py b/src/hera/events/service.py index 5b4135455..051298b7a 100644 --- a/src/hera/events/service.py +++ b/src/hera/events/service.py @@ -37,6 +37,9 @@ def __init__( namespace: Optional[str] = None, ): self.host = cast(str, host or global_config.host) + assert self.host.startswith("http") or self.host.startswith( + "https" + ), "The host scheme is required for service usage" self.verify_ssl = verify_ssl if verify_ssl is not None else global_config.verify_ssl self.token = token or global_config.token self.namespace = namespace or global_config.namespace diff --git a/src/hera/workflows/service.py b/src/hera/workflows/service.py index bec5baf74..a7adc9287 100644 --- a/src/hera/workflows/service.py +++ b/src/hera/workflows/service.py @@ -59,6 +59,9 @@ def __init__( namespace: Optional[str] = None, ): self.host = cast(str, host or global_config.host) + assert self.host.startswith("http") or self.host.startswith( + "https" + ), "The host scheme is required for service usage" self.verify_ssl = verify_ssl if verify_ssl is not None else global_config.verify_ssl self.token = token or global_config.token self.namespace = namespace or global_config.namespace From 4d520c8fc11083cb4e026d0bdbc8bd8c0e047f4b Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Thu, 25 May 2023 21:42:12 -0600 Subject: [PATCH 2/2] add assertion in the APIs Signed-off-by: Flaviu Vadan --- .../workflows/upstream/coinflip.upstream.yaml | 3 +- scripts/service.py | 5 +- src/hera/events/service.py | 31 +++++++++- src/hera/workflows/service.py | 57 ++++++++++++++++++- tests/test_workflow.py | 2 + 5 files changed, 89 insertions(+), 9 deletions(-) diff --git a/examples/workflows/upstream/coinflip.upstream.yaml b/examples/workflows/upstream/coinflip.upstream.yaml index b97e6a011..19de8a4aa 100644 --- a/examples/workflows/upstream/coinflip.upstream.yaml +++ b/examples/workflows/upstream/coinflip.upstream.yaml @@ -9,8 +9,7 @@ metadata: generateName: coinflip- annotations: workflows.argoproj.io/description: | - This is an example of coin flip defined as a sequence of conditional steps. - You can also run it in Python: https://couler-proj.github.io/couler/examples/#coin-flip + This is an example of coin flip defined as a sequence of conditional steps.\ spec: entrypoint: coinflip templates: diff --git a/scripts/service.py b/scripts/service.py index 05a6d2ed8..2a9c43cde 100644 --- a/scripts/service.py +++ b/scripts/service.py @@ -177,6 +177,7 @@ def __str__(self) -> str: return f""" {signature} + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.{self.method}( url={req_url}, params={params}, @@ -406,6 +407,9 @@ def get_service_def() -> str: from hera.shared import global_config from typing import Optional, cast +def valid_host_scheme(host: str) -> bool: + return host.startswith("http://") or host.startswith("https://") + class {models_type}Service: def __init__( self, @@ -415,7 +419,6 @@ def __init__( namespace: Optional[str] = None, ): self.host = cast(str, host or global_config.host) - assert self.host.startswith("http") or self.host.startswith("https"), "The host scheme is required for service usage" self.verify_ssl = verify_ssl if verify_ssl is not None else global_config.verify_ssl self.token = token or global_config.token self.namespace = namespace or global_config.namespace diff --git a/src/hera/events/service.py b/src/hera/events/service.py index 051298b7a..27bcfa9ba 100644 --- a/src/hera/events/service.py +++ b/src/hera/events/service.py @@ -28,6 +28,10 @@ from hera.shared import global_config +def valid_host_scheme(host: str) -> bool: + return host.startswith("http://") or host.startswith("https://") + + class EventsService: def __init__( self, @@ -37,9 +41,6 @@ def __init__( namespace: Optional[str] = None, ): self.host = cast(str, host or global_config.host) - assert self.host.startswith("http") or self.host.startswith( - "https" - ), "The host scheme is required for service usage" self.verify_ssl = verify_ssl if verify_ssl is not None else global_config.verify_ssl self.token = token or global_config.token self.namespace = namespace or global_config.namespace @@ -57,6 +58,7 @@ def list_event_sources( limit: Optional[str] = None, continue_: Optional[str] = None, ) -> EventSourceList: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/event-sources/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -83,6 +85,7 @@ def list_event_sources( raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def create_event_source(self, req: CreateEventSourceRequest, namespace: Optional[str] = None) -> EventSource: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.post( url=urljoin(self.host, "api/v1/event-sources/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -101,6 +104,7 @@ def create_event_source(self, req: CreateEventSourceRequest, namespace: Optional raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def get_event_source(self, name: str, namespace: Optional[str] = None) -> EventSource: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/event-sources/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -119,6 +123,7 @@ def get_event_source(self, name: str, namespace: Optional[str] = None) -> EventS def update_event_source( self, name: str, req: UpdateEventSourceRequest, namespace: Optional[str] = None ) -> EventSource: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/event-sources/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -147,6 +152,7 @@ def delete_event_source( propagation_policy: Optional[str] = None, dry_run: Optional[list] = None, ) -> EventSourceDeletedResponse: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.delete( url=urljoin(self.host, "api/v1/event-sources/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -170,6 +176,7 @@ def delete_event_source( raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def receive_event(self, discriminator: str, req: Item, namespace: Optional[str] = None) -> EventResponse: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.post( url=urljoin(self.host, "api/v1/events/{namespace}/{discriminator}").format( discriminator=discriminator, namespace=namespace if namespace is not None else self.namespace @@ -188,6 +195,7 @@ def receive_event(self, discriminator: str, req: Item, namespace: Optional[str] raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def get_info(self) -> InfoResponse: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/info"), params=None, @@ -214,6 +222,7 @@ def list_sensors( limit: Optional[str] = None, continue_: Optional[str] = None, ) -> SensorList: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/sensors/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -240,6 +249,7 @@ def list_sensors( raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def create_sensor(self, req: CreateSensorRequest, namespace: Optional[str] = None) -> Sensor: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.post( url=urljoin(self.host, "api/v1/sensors/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -258,6 +268,7 @@ def create_sensor(self, req: CreateSensorRequest, namespace: Optional[str] = Non raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def get_sensor(self, name: str, namespace: Optional[str] = None, resource_version: Optional[str] = None) -> Sensor: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/sensors/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -274,6 +285,7 @@ def get_sensor(self, name: str, namespace: Optional[str] = None, resource_versio raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def update_sensor(self, name: str, req: UpdateSensorRequest, namespace: Optional[str] = None) -> Sensor: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/sensors/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -302,6 +314,7 @@ def delete_sensor( propagation_policy: Optional[str] = None, dry_run: Optional[list] = None, ) -> DeleteSensorResponse: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.delete( url=urljoin(self.host, "api/v1/sensors/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -337,6 +350,7 @@ def watch_event_sources( limit: Optional[str] = None, continue_: Optional[str] = None, ) -> EventSourceWatchEvent: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/stream/event-sources/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -380,6 +394,7 @@ def event_sources_logs( limit_bytes: Optional[str] = None, insecure_skip_tls_verify_backend: Optional[bool] = None, ) -> EventsourceLogEntry: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/stream/event-sources/{namespace}/logs").format( namespace=namespace if namespace is not None else self.namespace @@ -423,6 +438,7 @@ def watch_events( limit: Optional[str] = None, continue_: Optional[str] = None, ) -> Event: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/stream/events/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -461,6 +477,7 @@ def watch_sensors( limit: Optional[str] = None, continue_: Optional[str] = None, ) -> SensorWatchEvent: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/stream/sensors/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -503,6 +520,7 @@ def sensors_logs( limit_bytes: Optional[str] = None, insecure_skip_tls_verify_backend: Optional[bool] = None, ) -> SensorLogEntry: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/stream/sensors/{namespace}/logs").format( namespace=namespace if namespace is not None else self.namespace @@ -533,6 +551,7 @@ def sensors_logs( raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def get_user_info(self) -> GetUserInfoResponse: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/userinfo"), params=None, @@ -547,6 +566,7 @@ def get_user_info(self) -> GetUserInfoResponse: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def get_version(self) -> Version: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/version"), params=None, @@ -570,6 +590,7 @@ def get_artifact_file( namespace: Optional[str] = None, ) -> str: """Get an artifact.""" + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin( self.host, @@ -595,6 +616,7 @@ def get_artifact_file( def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: """Get an output artifact by UID.""" + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "artifacts-by-uid/{uid}/{nodeId}/{artifactName}").format( uid=uid, nodeId=node_id, artifactName=artifact_name @@ -612,6 +634,7 @@ def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) def get_output_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: """Get an output artifact.""" + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "artifacts/{namespace}/{name}/{nodeId}/{artifactName}").format( name=name, @@ -632,6 +655,7 @@ def get_output_artifact(self, name: str, node_id: str, artifact_name: str, names def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: """Get an input artifact by UID.""" + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "input-artifacts-by-uid/{uid}/{nodeId}/{artifactName}").format( uid=uid, nodeId=node_id, artifactName=artifact_name @@ -649,6 +673,7 @@ def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: """Get an input artifact.""" + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "input-artifacts/{namespace}/{name}/{nodeId}/{artifactName}").format( name=name, diff --git a/src/hera/workflows/service.py b/src/hera/workflows/service.py index a7adc9287..e05c5b5af 100644 --- a/src/hera/workflows/service.py +++ b/src/hera/workflows/service.py @@ -50,6 +50,10 @@ ) +def valid_host_scheme(host: str) -> bool: + return host.startswith("http://") or host.startswith("https://") + + class WorkflowsService: def __init__( self, @@ -59,9 +63,6 @@ def __init__( namespace: Optional[str] = None, ): self.host = cast(str, host or global_config.host) - assert self.host.startswith("http") or self.host.startswith( - "https" - ), "The host scheme is required for service usage" self.verify_ssl = verify_ssl if verify_ssl is not None else global_config.verify_ssl self.token = token or global_config.token self.namespace = namespace or global_config.namespace @@ -79,6 +80,7 @@ def list_archived_workflows( continue_: Optional[str] = None, name_prefix: Optional[str] = None, ) -> WorkflowList: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/archived-workflows"), params={ @@ -104,6 +106,7 @@ def list_archived_workflows( raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def list_archived_workflow_label_keys(self) -> LabelKeys: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/archived-workflows-label-keys"), params=None, @@ -129,6 +132,7 @@ def list_archived_workflow_label_values( limit: Optional[str] = None, continue_: Optional[str] = None, ) -> LabelValues: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/archived-workflows-label-values"), params={ @@ -153,6 +157,7 @@ def list_archived_workflow_label_values( raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def get_archived_workflow(self, uid: str) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/archived-workflows/{uid}").format(uid=uid), params=None, @@ -167,6 +172,7 @@ def get_archived_workflow(self, uid: str) -> Workflow: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def delete_archived_workflow(self, uid: str) -> ArchivedWorkflowDeletedResponse: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.delete( url=urljoin(self.host, "api/v1/archived-workflows/{uid}").format(uid=uid), params=None, @@ -181,6 +187,7 @@ def delete_archived_workflow(self, uid: str) -> ArchivedWorkflowDeletedResponse: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def resubmit_archived_workflow(self, uid: str, req: ResubmitArchivedWorkflowRequest) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/archived-workflows/{uid}/resubmit").format(uid=uid), params=None, @@ -197,6 +204,7 @@ def resubmit_archived_workflow(self, uid: str, req: ResubmitArchivedWorkflowRequ raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def retry_archived_workflow(self, uid: str, req: RetryArchivedWorkflowRequest) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/archived-workflows/{uid}/retry").format(uid=uid), params=None, @@ -224,6 +232,7 @@ def list_cluster_workflow_templates( limit: Optional[str] = None, continue_: Optional[str] = None, ) -> ClusterWorkflowTemplateList: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/cluster-workflow-templates"), params={ @@ -248,6 +257,7 @@ def list_cluster_workflow_templates( raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def create_cluster_workflow_template(self, req: ClusterWorkflowTemplateCreateRequest) -> ClusterWorkflowTemplate: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.post( url=urljoin(self.host, "api/v1/cluster-workflow-templates"), params=None, @@ -264,6 +274,7 @@ def create_cluster_workflow_template(self, req: ClusterWorkflowTemplateCreateReq raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def lint_cluster_workflow_template(self, req: ClusterWorkflowTemplateLintRequest) -> ClusterWorkflowTemplate: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.post( url=urljoin(self.host, "api/v1/cluster-workflow-templates/lint"), params=None, @@ -282,6 +293,7 @@ def lint_cluster_workflow_template(self, req: ClusterWorkflowTemplateLintRequest def get_cluster_workflow_template( self, name: str, resource_version: Optional[str] = None ) -> ClusterWorkflowTemplate: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/cluster-workflow-templates/{name}").format(name=name), params={"getOptions.resourceVersion": resource_version}, @@ -298,6 +310,7 @@ def get_cluster_workflow_template( def update_cluster_workflow_template( self, name: str, req: ClusterWorkflowTemplateUpdateRequest ) -> ClusterWorkflowTemplate: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/cluster-workflow-templates/{name}").format(name=name), params=None, @@ -323,6 +336,7 @@ def delete_cluster_workflow_template( propagation_policy: Optional[str] = None, dry_run: Optional[list] = None, ) -> ClusterWorkflowTemplateDeleteResponse: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.delete( url=urljoin(self.host, "api/v1/cluster-workflow-templates/{name}").format(name=name), params={ @@ -356,6 +370,7 @@ def list_cron_workflows( limit: Optional[str] = None, continue_: Optional[str] = None, ) -> CronWorkflowList: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/cron-workflows/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -382,6 +397,7 @@ def list_cron_workflows( raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def create_cron_workflow(self, req: CreateCronWorkflowRequest, namespace: Optional[str] = None) -> CronWorkflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.post( url=urljoin(self.host, "api/v1/cron-workflows/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -400,6 +416,7 @@ def create_cron_workflow(self, req: CreateCronWorkflowRequest, namespace: Option raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def lint_cron_workflow(self, req: LintCronWorkflowRequest, namespace: Optional[str] = None) -> CronWorkflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.post( url=urljoin(self.host, "api/v1/cron-workflows/{namespace}/lint").format( namespace=namespace if namespace is not None else self.namespace @@ -420,6 +437,7 @@ def lint_cron_workflow(self, req: LintCronWorkflowRequest, namespace: Optional[s def get_cron_workflow( self, name: str, namespace: Optional[str] = None, resource_version: Optional[str] = None ) -> CronWorkflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/cron-workflows/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -438,6 +456,7 @@ def get_cron_workflow( def update_cron_workflow( self, name: str, req: UpdateCronWorkflowRequest, namespace: Optional[str] = None ) -> CronWorkflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/cron-workflows/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -466,6 +485,7 @@ def delete_cron_workflow( propagation_policy: Optional[str] = None, dry_run: Optional[list] = None, ) -> CronWorkflowDeletedResponse: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.delete( url=urljoin(self.host, "api/v1/cron-workflows/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -491,6 +511,7 @@ def delete_cron_workflow( def resume_cron_workflow( self, name: str, req: CronWorkflowResumeRequest, namespace: Optional[str] = None ) -> CronWorkflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/cron-workflows/{namespace}/{name}/resume").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -511,6 +532,7 @@ def resume_cron_workflow( def suspend_cron_workflow( self, name: str, req: CronWorkflowSuspendRequest, namespace: Optional[str] = None ) -> CronWorkflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/cron-workflows/{namespace}/{name}/suspend").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -529,6 +551,7 @@ def suspend_cron_workflow( raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def get_info(self) -> InfoResponse: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/info"), params=None, @@ -543,6 +566,7 @@ def get_info(self) -> InfoResponse: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def get_user_info(self) -> GetUserInfoResponse: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/userinfo"), params=None, @@ -557,6 +581,7 @@ def get_user_info(self) -> GetUserInfoResponse: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def get_version(self) -> Version: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/version"), params=None, @@ -583,6 +608,7 @@ def list_workflow_templates( limit: Optional[str] = None, continue_: Optional[str] = None, ) -> WorkflowTemplateList: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/workflow-templates/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -611,6 +637,7 @@ def list_workflow_templates( def create_workflow_template( self, req: WorkflowTemplateCreateRequest, namespace: Optional[str] = None ) -> WorkflowTemplate: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.post( url=urljoin(self.host, "api/v1/workflow-templates/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -631,6 +658,7 @@ def create_workflow_template( def lint_workflow_template( self, req: WorkflowTemplateLintRequest, namespace: Optional[str] = None ) -> WorkflowTemplate: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.post( url=urljoin(self.host, "api/v1/workflow-templates/{namespace}/lint").format( namespace=namespace if namespace is not None else self.namespace @@ -651,6 +679,7 @@ def lint_workflow_template( def get_workflow_template( self, name: str, namespace: Optional[str] = None, resource_version: Optional[str] = None ) -> WorkflowTemplate: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/workflow-templates/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -669,6 +698,7 @@ def get_workflow_template( def update_workflow_template( self, name: str, req: WorkflowTemplateUpdateRequest, namespace: Optional[str] = None ) -> WorkflowTemplate: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/workflow-templates/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -697,6 +727,7 @@ def delete_workflow_template( propagation_policy: Optional[str] = None, dry_run: Optional[list] = None, ) -> WorkflowTemplateDeleteResponse: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.delete( url=urljoin(self.host, "api/v1/workflow-templates/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -733,6 +764,7 @@ def list_workflows( continue_: Optional[str] = None, fields: Optional[str] = None, ) -> WorkflowList: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/workflows/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -760,6 +792,7 @@ def list_workflows( raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def create_workflow(self, req: WorkflowCreateRequest, namespace: Optional[str] = None) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.post( url=urljoin(self.host, "api/v1/workflows/{namespace}").format( namespace=namespace if namespace is not None else self.namespace @@ -778,6 +811,7 @@ def create_workflow(self, req: WorkflowCreateRequest, namespace: Optional[str] = raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def lint_workflow(self, req: WorkflowLintRequest, namespace: Optional[str] = None) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.post( url=urljoin(self.host, "api/v1/workflows/{namespace}/lint").format( namespace=namespace if namespace is not None else self.namespace @@ -796,6 +830,7 @@ def lint_workflow(self, req: WorkflowLintRequest, namespace: Optional[str] = Non raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def submit_workflow(self, req: WorkflowSubmitRequest, namespace: Optional[str] = None) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.post( url=urljoin(self.host, "api/v1/workflows/{namespace}/submit").format( namespace=namespace if namespace is not None else self.namespace @@ -820,6 +855,7 @@ def get_workflow( resource_version: Optional[str] = None, fields: Optional[str] = None, ) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/workflows/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -847,6 +883,7 @@ def delete_workflow( dry_run: Optional[list] = None, force: Optional[bool] = None, ) -> WorkflowDeleteResponse: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.delete( url=urljoin(self.host, "api/v1/workflows/{namespace}/{name}").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -888,6 +925,7 @@ def workflow_logs( grep: Optional[str] = None, selector: Optional[str] = None, ) -> V1alpha1LogEntry: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/workflows/{namespace}/{name}/log").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -918,6 +956,7 @@ def workflow_logs( raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def resubmit_workflow(self, name: str, req: WorkflowResubmitRequest, namespace: Optional[str] = None) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/workflows/{namespace}/{name}/resubmit").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -936,6 +975,7 @@ def resubmit_workflow(self, name: str, req: WorkflowResubmitRequest, namespace: raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def resume_workflow(self, name: str, req: WorkflowResumeRequest, namespace: Optional[str] = None) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/workflows/{namespace}/{name}/resume").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -954,6 +994,7 @@ def resume_workflow(self, name: str, req: WorkflowResumeRequest, namespace: Opti raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def retry_workflow(self, name: str, req: WorkflowRetryRequest, namespace: Optional[str] = None) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/workflows/{namespace}/{name}/retry").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -972,6 +1013,7 @@ def retry_workflow(self, name: str, req: WorkflowRetryRequest, namespace: Option raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def set_workflow(self, name: str, req: WorkflowSetRequest, namespace: Optional[str] = None) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/workflows/{namespace}/{name}/set").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -990,6 +1032,7 @@ def set_workflow(self, name: str, req: WorkflowSetRequest, namespace: Optional[s raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def stop_workflow(self, name: str, req: WorkflowStopRequest, namespace: Optional[str] = None) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/workflows/{namespace}/{name}/stop").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -1008,6 +1051,7 @@ def stop_workflow(self, name: str, req: WorkflowStopRequest, namespace: Optional raise Exception(f"Server returned status code {resp.status_code} with error: {resp.json()}") def suspend_workflow(self, name: str, req: WorkflowSuspendRequest, namespace: Optional[str] = None) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/workflows/{namespace}/{name}/suspend").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -1028,6 +1072,7 @@ def suspend_workflow(self, name: str, req: WorkflowSuspendRequest, namespace: Op def terminate_workflow( self, name: str, req: WorkflowTerminateRequest, namespace: Optional[str] = None ) -> Workflow: + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.put( url=urljoin(self.host, "api/v1/workflows/{namespace}/{name}/terminate").format( name=name, namespace=namespace if namespace is not None else self.namespace @@ -1064,6 +1109,7 @@ def pod_logs( selector: Optional[str] = None, ) -> V1alpha1LogEntry: """DEPRECATED: Cannot work via HTTP if podName is an empty string. Use WorkflowLogs.""" + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "api/v1/workflows/{namespace}/{name}/{podName}/log").format( name=name, podName=pod_name, namespace=namespace if namespace is not None else self.namespace @@ -1102,6 +1148,7 @@ def get_artifact_file( namespace: Optional[str] = None, ) -> str: """Get an artifact.""" + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin( self.host, @@ -1127,6 +1174,7 @@ def get_artifact_file( def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: """Get an output artifact by UID.""" + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "artifacts-by-uid/{uid}/{nodeId}/{artifactName}").format( uid=uid, nodeId=node_id, artifactName=artifact_name @@ -1144,6 +1192,7 @@ def get_output_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) def get_output_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: """Get an output artifact.""" + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "artifacts/{namespace}/{name}/{nodeId}/{artifactName}").format( name=name, @@ -1164,6 +1213,7 @@ def get_output_artifact(self, name: str, node_id: str, artifact_name: str, names def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) -> str: """Get an input artifact by UID.""" + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "input-artifacts-by-uid/{uid}/{nodeId}/{artifactName}").format( uid=uid, nodeId=node_id, artifactName=artifact_name @@ -1181,6 +1231,7 @@ def get_input_artifact_by_uid(self, uid: str, node_id: str, artifact_name: str) def get_input_artifact(self, name: str, node_id: str, artifact_name: str, namespace: Optional[str] = None) -> str: """Get an input artifact.""" + assert valid_host_scheme(self.host), "The host scheme is required for service usage" resp = requests.get( url=urljoin(self.host, "input-artifacts/{namespace}/{name}/{nodeId}/{artifactName}").format( name=name, diff --git a/tests/test_workflow.py b/tests/test_workflow.py index 13ee03966..cbedf0aa5 100644 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -63,6 +63,7 @@ def _transform_cron_workflow(obj): def test_hera_output(module_name): # GIVEN global_config.reset() + global_config.host = "http://hera.testing" workflow = importlib.import_module(f"examples.workflows.{module_name}").w yaml_path = Path(hera_examples.__file__).parent / f"{module_name.replace('_', '-')}.yaml" @@ -80,6 +81,7 @@ def test_hera_output(module_name): def test_hera_output_upstream(module_name): # GIVEN global_config.reset() + global_config.host = "http://hera.testing" workflow = importlib.import_module(f"examples.workflows.upstream.{module_name}").w yaml_path = Path(hera_upstream_examples.__file__).parent / f"{module_name.replace('_', '-')}.yaml" upstream_yaml_path = (