Skip to content
This repository has been archived by the owner on Sep 12, 2022. It is now read-only.

Commit

Permalink
formatted with yapf
Browse files Browse the repository at this point in the history
  • Loading branch information
zhxu73 committed Aug 13, 2020
1 parent 6b255d3 commit c02e877
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 82 deletions.
71 changes: 49 additions & 22 deletions service/argo/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,27 @@
from django.conf import settings

from service.argo.rest_api import ArgoAPIClient
from service.argo.exception import (BaseWorkflowDirNotExist,
ProviderWorkflowDirNotExist,
WorkflowFileNotExist,
WorkflowFileNotYAML,
ArgoConfigFileNotExist,
ArgoConfigFileNotYAML,
ArgoConfigFileError
)
from service.argo.exception import (
BaseWorkflowDirNotExist, ProviderWorkflowDirNotExist, WorkflowFileNotExist,
WorkflowFileNotYAML, ArgoConfigFileNotExist, ArgoConfigFileNotYAML,
ArgoConfigFileError
)


class ArgoContext:
"""
Context that the Argo Workflow should be executing in
"""

def __init__(self, api_host=None, api_port=None, token=None,
namespace=None, ssl_verify=None, config=None):
def __init__(
self,
api_host=None,
api_port=None,
token=None,
namespace=None,
ssl_verify=None,
config=None
):
"""
Create a context to execute ArgoWorkflow
Expand Down Expand Up @@ -68,10 +72,18 @@ def client(self):
Returns:
ArgoAPIClient: an API client with the config from this context
"""
return ArgoAPIClient(self._api_host, self._api_port, self._token, self._namespace,
verify=self._ssl_verify)

def _find_provider_dir(base_directory, provider_uuid, default_provider="default"):
return ArgoAPIClient(
self._api_host,
self._api_port,
self._token,
self._namespace,
verify=self._ssl_verify
)


def _find_provider_dir(
base_directory, provider_uuid, default_provider="default"
):
"""
Check if the provider workflow directory exists
Expand All @@ -90,12 +102,16 @@ def _find_provider_dir(base_directory, provider_uuid, default_provider="default"
"""
try:
# find provider directory
provider_dirs = [entry for entry in os.listdir(base_directory)
if entry == provider_uuid]
provider_dirs = [
entry
for entry in os.listdir(base_directory) if entry == provider_uuid
]
# try default provider if given provider dir does not exist
if not provider_dirs and default_provider:
provider_dirs = [entry for entry in os.listdir(base_directory)
if entry == default_provider]
provider_dirs = [
entry for entry in os.listdir(base_directory)
if entry == default_provider
]
if not provider_dirs:
raise ProviderWorkflowDirNotExist(provider_uuid)

Expand All @@ -104,6 +120,7 @@ def _find_provider_dir(base_directory, provider_uuid, default_provider="default"
except OSError:
raise BaseWorkflowDirNotExist(base_directory)


def _find_workflow_file(provider_dir_path, filename, provider_uuid):
"""
Find the path of the workflow file, and check if the file exists
Expand All @@ -122,8 +139,10 @@ def _find_workflow_file(provider_dir_path, filename, provider_uuid):
"""
try:
# find workflow file
wf_files = [entry for entry in os.listdir(provider_dir_path)
if entry == filename]
wf_files = [
entry
for entry in os.listdir(provider_dir_path) if entry == filename
]
if not wf_files:
raise WorkflowFileNotExist(provider_uuid, filename)

Expand All @@ -133,6 +152,7 @@ def _find_workflow_file(provider_dir_path, filename, provider_uuid):
except OSError:
raise ProviderWorkflowDirNotExist(provider_uuid)


def argo_lookup_workflow(base_directory, filename, provider_uuid):
"""
Lookup workflow by name and cloud provider
Expand All @@ -150,7 +170,9 @@ def argo_lookup_workflow(base_directory, filename, provider_uuid):
ArgoWorkflow: JSON object representing the workflow if found, None otherwise
"""
provider_dir_path = _find_provider_dir(base_directory, provider_uuid)
wf_file_path = _find_workflow_file(provider_dir_path, filename, provider_uuid)
wf_file_path = _find_workflow_file(
provider_dir_path, filename, provider_uuid
)

try:
# read workflow definition
Expand All @@ -163,6 +185,7 @@ def argo_lookup_workflow(base_directory, filename, provider_uuid):

return wf_def


def argo_lookup_yaml_file(base_directory, filename, provider_uuid):
"""
Lookup yaml file by filename and cloud provider and read the yaml file
Expand All @@ -180,7 +203,9 @@ def argo_lookup_yaml_file(base_directory, filename, provider_uuid):
ArgoWorkflow: JSON object representing the workflow if found, None otherwise
"""
provider_dir_path = _find_provider_dir(base_directory, provider_uuid)
wf_file_path = _find_workflow_file(provider_dir_path, filename, provider_uuid)
wf_file_path = _find_workflow_file(
provider_dir_path, filename, provider_uuid
)

try:
# read workflow definition
Expand All @@ -193,6 +218,7 @@ def argo_lookup_yaml_file(base_directory, filename, provider_uuid):

return wf_def


def read_argo_config(config_file_path=None, provider_uuid=None):
"""
Read configuration for Argo.
Expand Down Expand Up @@ -244,6 +270,7 @@ def read_argo_config(config_file_path=None, provider_uuid=None):
except yaml.YAMLError:
raise ArgoConfigFileNotYAML(config_file_path)


def argo_context_from_config(config_file_path=None):
"""
Construct an ArgoContext from a config file
Expand Down
12 changes: 12 additions & 0 deletions service/argo/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,73 @@
Exceptions
"""


class ArgoBaseException(Exception):
"""
Base exception for Argo related errors
"""


class ResponseNotJSON(ArgoBaseException):
"""
Response of a HTTP request is not JSON
"""


class BaseWorkflowDirNotExist(ArgoBaseException):
"""
Base directory for workflow files does not exist.
"""


class ProviderWorkflowDirNotExist(ArgoBaseException):
"""
Workflow directory for the provider does not exist.
"""


class WorkflowFileNotExist(ArgoBaseException):
"""
Workflow definition file (yaml file) does not exist
"""


class WorkflowFileNotYAML(ArgoBaseException):
"""
Unable to parse workflow definition file as YAML
"""


class ArgoConfigFileNotExist(ArgoBaseException):
"""
Configuration file for Argo does not exist
"""


class ArgoConfigFileNotYAML(ArgoBaseException):
"""
Configuration file for Argo is not yaml
"""


class ArgoConfigFileError(ArgoBaseException):
"""
Error in config file
"""


class WorkflowDataFileNotExist(ArgoBaseException):
"""
Data file does not exist
"""


class WorkflowFailed(ArgoBaseException):
"""
Workflow complete with "Failed"
"""


class WorkflowErrored(ArgoBaseException):
"""
Workflow complete with "Error"
Expand Down
80 changes: 58 additions & 22 deletions service/argo/instance_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@


def argo_deploy_instance(
provider_uuid,
instance_uuid,
server_ip,
username,
timezone,
):
provider_uuid,
instance_uuid,
server_ip,
username,
timezone,
):
"""
run Argo workflow to deploy an instance
Expand All @@ -33,12 +33,17 @@ def argo_deploy_instance(
exc: exception thrown
"""
try:
wf_data = _get_workflow_data(provider_uuid, server_ip, username, timezone)

wf, status = argo_workflow_exec("instance_deploy.yml", provider_uuid,
wf_data,
config_file_path=settings.ARGO_CONFIG_FILE_PATH,
wait=True)
wf_data = _get_workflow_data(
provider_uuid, server_ip, username, timezone
)

wf, status = argo_workflow_exec(
"instance_deploy.yml",
provider_uuid,
wf_data,
config_file_path=settings.ARGO_CONFIG_FILE_PATH,
wait=True
)

# dump logs
_dump_deploy_logs(wf, username, instance_uuid)
Expand All @@ -51,9 +56,12 @@ def argo_deploy_instance(
raise WorkflowErrored(wf.wf_name)
raise WorkflowFailed(wf.wf_name)
except Exception as exc:
celery_logger.debug("ARGO, argo_deploy_instance(), {}, {}".format(type(exc), exc))
celery_logger.debug(
"ARGO, argo_deploy_instance(), {}, {}".format(type(exc), exc)
)
raise exc


def _get_workflow_data(provider_uuid, server_ip, username, timezone):
"""
Generate the data structure to be passed to the workflow
Expand All @@ -67,16 +75,34 @@ def _get_workflow_data(provider_uuid, server_ip, username, timezone):
dict: {"arguments": {"parameters": [{"name": "", "value": ""}]}}
"""
wf_data = {"arguments": {"parameters": []}}
wf_data["arguments"]["parameters"].append({"name": "server-ip", "value": server_ip})
wf_data["arguments"]["parameters"].append({"name": "user", "value": username})
wf_data["arguments"]["parameters"].append(
{
"name": "server-ip",
"value": server_ip
}
)
wf_data["arguments"]["parameters"].append(
{
"name": "user",
"value": username
}
)
wf_data["arguments"]["parameters"].append({"name": "tz", "value": timezone})

# read zoneinfo from argo config
config = read_argo_config(settings.ARGO_CONFIG_FILE_PATH, provider_uuid=provider_uuid)
wf_data["arguments"]["parameters"].append({"name": "zoneinfo", "value": config["zoneinfo"]})
config = read_argo_config(
settings.ARGO_CONFIG_FILE_PATH, provider_uuid=provider_uuid
)
wf_data["arguments"]["parameters"].append(
{
"name": "zoneinfo",
"value": config["zoneinfo"]
}
)

return wf_data


def _get_workflow_data_for_temp(provider_uuid, server_ip, username, timezone):
"""
Generate the data structure to be passed to the workflow.
Expand All @@ -96,7 +122,9 @@ def _get_workflow_data_for_temp(provider_uuid, server_ip, username, timezone):
wf_data.append("tz={}".format(timezone))

# read zoneinfo from argo config
config = read_argo_config(settings.ARGO_CONFIG_FILE_PATH, provider_uuid=provider_uuid)
config = read_argo_config(
settings.ARGO_CONFIG_FILE_PATH, provider_uuid=provider_uuid
)
wf_data.append("zoneinfo={}".format(config["zoneinfo"]))

return wf_data
Expand All @@ -116,8 +144,12 @@ def _create_deploy_log_dir(username, instance_uuid, timestamp):
Returns:
str: path to the directory to dump logs
"""
base_dir = os.path.abspath(os.path.join(
os.path.dirname(atmosphere.__file__), "..", "logs", "atmosphere_deploy.d"))
base_dir = os.path.abspath(
os.path.join(
os.path.dirname(atmosphere.__file__), "..", "logs",
"atmosphere_deploy.d"
)
)

# create base dir if missing
if not os.path.isdir(base_dir):
Expand All @@ -130,6 +162,7 @@ def _create_deploy_log_dir(username, instance_uuid, timestamp):

return directory


def _dump_deploy_logs(wf, username, instance_uuid):
"""
Dump workflow logs locally
Expand Down Expand Up @@ -163,6 +196,9 @@ def _dump_deploy_logs(wf, username, instance_uuid):
log_filename = os.path.join(log_dir, node_name + ".log")
wf.dump_pod_logs(context, node_name, log_filename)
except Exception as exc:
celery_logger.debug("ARGO, failed to dump logs for workflow {}, {}"
.format(wf.wf_name, type(exc)))
celery_logger.debug(
"ARGO, failed to dump logs for workflow {}, {}".format(
wf.wf_name, type(exc)
)
)
celery_logger.debug(exc)
Loading

0 comments on commit c02e877

Please sign in to comment.