Skip to content
This repository has been archived by the owner on Sep 12, 2022. It is now read-only.

Commit

Permalink
fix linting
Browse files Browse the repository at this point in the history
  • Loading branch information
zhxu73 committed Aug 12, 2020
1 parent ad96979 commit 6b255d3
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 136 deletions.
46 changes: 32 additions & 14 deletions service/argo/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,27 @@
"""

import os
import json
import yaml
import time
from service.argo.rest_api import ArgoAPIClient
from service.argo.exception import *
from django.conf import settings

from threepio import celery_logger as logger
from service.argo.rest_api import ArgoAPIClient
from service.argo.exception import (BaseWorkflowDirNotExist,
ProviderWorkflowDirNotExist,
WorkflowFileNotExist,
WorkflowFileNotYAML,
ArgoConfigFileNotExist,
ArgoConfigFileNotYAML,
ArgoConfigFileError
)


class ArgoContext:
"""
Context that the Argo Workflow should be executing in
"""

def __init__(self, api_host=None, api_port=None, token=None, namespace=None, ssl_verify=None, config=None):
def __init__(self, api_host=None, api_port=None, token=None,
namespace=None, ssl_verify=None, config=None):
"""
Create a context to execute ArgoWorkflow
Expand All @@ -27,7 +33,8 @@ def __init__(self, api_host=None, api_port=None, token=None, namespace=None, ssl
token (str, optional): k8s bearer token. Defaults to None.
namespace (str, optional): k8s namespace for the workflow. Defaults to None.
ssl_verify (bool, optional): whether to verify ssl cert or not. Defaults to None.
config (dict, optional): configuration, serve as a fallback if a config entry is not passed as a parameter. Defaults to None.
config (dict, optional): configuration, serve as a fallback if a
config entry is not passed as a parameter. Defaults to None.
"""
if api_host:
self._api_host = api_host
Expand Down Expand Up @@ -62,7 +69,6 @@ def client(self):
ArgoAPIClient: an API client with the config from this context
"""
return ArgoAPIClient(self._api_host, self._api_port, self._token, self._namespace,
# Argo server currently has self-signed cert
verify=self._ssl_verify)

def _find_provider_dir(base_directory, provider_uuid, default_provider="default"):
Expand All @@ -72,7 +78,8 @@ def _find_provider_dir(base_directory, provider_uuid, default_provider="default"
Args:
base_directory (str): base directory for workflow files
provider_uuid (str): provider uuid
default_provider (str, optional): default provider name. unset if None or "". Defaults to "default".
default_provider (str, optional): default provider name. unset if None
or "". Defaults to "default".
Raises:
ProviderWorkflowDirNotExist: provider workflow directory not exist
Expand All @@ -84,7 +91,7 @@ def _find_provider_dir(base_directory, provider_uuid, default_provider="default"
try:
# find provider directory
provider_dirs = [entry for entry in os.listdir(base_directory)
if entry == provider_uuid]
if entry == provider_uuid]
# try default provider if given provider dir does not exist
if not provider_dirs and default_provider:
provider_dirs = [entry for entry in os.listdir(base_directory)
Expand Down Expand Up @@ -190,11 +197,13 @@ def read_argo_config(config_file_path=None, provider_uuid=None):
"""
Read configuration for Argo.
Read from given path if specified, else read from path specified in the settings.
Only config specific to the provider is returned, if provider uuid is not given, then uses the default one from the config.
Only config specific to the provider is returned, if provider uuid is not
given, then uses the default one from the config.
If there is no provider specific config, uses the default one.
Args:
config_file_path (str, optional): path to the config file. will use the default one from the setting if None. Defaults to None.
config_file_path (str, optional): path to the config file. will use
the default one from the setting if None. Defaults to None.
provider_uuid (str, optional): uuid of the provider. Defaults to None.
Raises:
Expand All @@ -210,9 +219,9 @@ def read_argo_config(config_file_path=None, provider_uuid=None):
# read config file
with open(settings.ARGO_CONFIG_FILE_PATH, "r") as config_file:
all_config = yaml.safe_load(config_file.read())

# validate config
if type(all_config) is not dict:
if isinstance(all_config, dict):
raise ArgoConfigFileError("config root not key-value")
if "default" not in all_config:
raise ArgoConfigFileError("default missing")
Expand All @@ -236,6 +245,15 @@ def read_argo_config(config_file_path=None, provider_uuid=None):
raise ArgoConfigFileNotYAML(config_file_path)

def argo_context_from_config(config_file_path=None):
"""
Construct an ArgoContext from a config file
Args:
config_file_path (str, optional): path to config file. Defaults to None.
Returns:
ArgoContext: argo context
"""
# read configuration from file
config = read_argo_config(config_file_path=config_file_path)

Expand Down
15 changes: 3 additions & 12 deletions service/argo/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,70 +3,61 @@
"""

class ArgoBaseException(Exception):
pass
"""
Base exception for Argo related errors
"""

class ResponseNotJSON(ArgoBaseException):
"""
Response of a HTTP request is not JSON
"""
pass

class BaseWorkflowDirNotExist(ArgoBaseException):
"""
Base directory for workflow files does not exist.
"""
pass

class ProviderWorkflowDirNotExist(ArgoBaseException):
"""
Workflow directory for the provider does not exist.
"""
pass

class WorkflowFileNotExist(ArgoBaseException):
"""
Workflow definition file (yaml file) does not exist
"""
pass

class WorkflowFileNotYAML(ArgoBaseException):
"""
Unable to parse workflow definition file as YAML
"""
pass

class ArgoConfigFileNotExist(ArgoBaseException):
"""
Configuration file for Argo does not exist
"""
pass

class ArgoConfigFileNotYAML(ArgoBaseException):
"""
Configuration file for Argo is not yaml
"""
pass

class ArgoConfigFileError(ArgoBaseException):
"""
Error in config file
"""
pass

class WorkflowDataFileNotExist(ArgoBaseException):
"""
Data file does not exist
"""
pass

class WorkflowFailed(ArgoBaseException):
"""
Workflow complete with "Failed"
"""
pass

class WorkflowErrored(ArgoBaseException):
"""
Workflow complete with "Error"
"""
pass
46 changes: 22 additions & 24 deletions service/argo/instance_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,24 @@
Deploy instance.
"""

import yaml
import json
import os
import time
from django.conf import settings
from threepio import celery_logger
import atmosphere

from service.argo.wf_call import argo_workflow_exec
from service.argo.common import argo_context_from_config, read_argo_config
from service.argo.wf import ArgoWorkflow
from service.argo.exception import WorkflowFailed, WorkflowErrored
import atmosphere

from django.conf import settings

from threepio import celery_logger

def argo_deploy_instance(
provider_uuid,
instance_uuid,
server_ip,
username,
timezone,
):
provider_uuid,
instance_uuid,
server_ip,
username,
timezone,
):
"""
run Argo workflow to deploy an instance
Expand All @@ -39,9 +36,9 @@ def argo_deploy_instance(
wf_data = _get_workflow_data(provider_uuid, server_ip, username, timezone)

wf, status = argo_workflow_exec("instance_deploy.yml", provider_uuid,
wf_data,
config_file_path=settings.ARGO_CONFIG_FILE_PATH,
wait=True)
wf_data,
config_file_path=settings.ARGO_CONFIG_FILE_PATH,
wait=True)

# dump logs
_dump_deploy_logs(wf, username, instance_uuid)
Expand All @@ -52,8 +49,7 @@ def argo_deploy_instance(
if not status.success:
if status.error:
raise WorkflowErrored(wf.wf_name)
else:
raise WorkflowFailed(wf.wf_name)
raise WorkflowFailed(wf.wf_name)
except Exception as exc:
celery_logger.debug("ARGO, argo_deploy_instance(), {}, {}".format(type(exc), exc))
raise exc
Expand Down Expand Up @@ -108,7 +104,8 @@ def _get_workflow_data_for_temp(provider_uuid, server_ip, username, timezone):

def _create_deploy_log_dir(username, instance_uuid, timestamp):
"""
Create directory to dump deploy workflow log, example path: base_dir/username/instance_uuid/timestamp/.
Create directory to dump deploy workflow log,
example path: base_dir/username/instance_uuid/timestamp/.
base directory is created if missing
Args:
Expand All @@ -127,11 +124,11 @@ def _create_deploy_log_dir(username, instance_uuid, timestamp):
os.makedirs(base_dir)

# create deploy log directory if missing
dir = os.path.join(base_dir, username, instance_uuid, timestamp)
if not os.path.isdir(dir):
os.makedirs(dir)
directory = os.path.join(base_dir, username, instance_uuid, timestamp)
if not os.path.isdir(directory):
os.makedirs(directory)

return dir
return directory

def _dump_deploy_logs(wf, username, instance_uuid):
"""
Expand Down Expand Up @@ -166,5 +163,6 @@ def _dump_deploy_logs(wf, username, instance_uuid):
log_filename = os.path.join(log_dir, node_name + ".log")
wf.dump_pod_logs(context, node_name, log_filename)
except Exception as exc:
celery_logger.debug("ARGO, failed to dump logs for workflow {}, {}".format(wf.wf_name, type(exc)))
celery_logger.debug("ARGO, failed to dump logs for workflow {}, {}"
.format(wf.wf_name, type(exc)))
celery_logger.debug(exc)
48 changes: 23 additions & 25 deletions service/argo/rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
Client to access Argo REST API
"""

import requests
import json
import requests
from threepio import celery_logger as logger
from service.argo.exception import ResponseNotJSON

Expand Down Expand Up @@ -103,8 +103,8 @@ def get_log_for_pod_in_workflow(self, wf_name, pod_name, container_name="main"):
Returns:
list: a list of lines of logs
"""
api_url = "/api/v1/workflows/{}/{}/{}/log?logOptions.timestamps=true&logOptions.container={}".format(
self._namespace, wf_name, pod_name, container_name)
api_url = "/api/v1/workflows/{}/{}/{}/log?logOptions.timestamps=true&logOptions.container={}"
api_url = api_url.format(self._namespace, wf_name, pod_name, container_name)

resp = self._req("get", api_url, json_resp=False)

Expand Down Expand Up @@ -184,7 +184,7 @@ def update_workflow_template(self, wf_temp_name, wf_temp_def_json):
Returns:
dict: response text as JSON object
"""
api_url = "/api/v1/workflow-templates/" + self._namespace
api_url = "/api/v1/workflow-templates/{}/{}".format(self._namespace, wf_temp_name)

json_data = {}
json_data["namespace"] = self._namespace
Expand Down Expand Up @@ -275,8 +275,7 @@ def _req(self, method, url, json_data={}, additional_headers={}, json_resp=True)
resp.raise_for_status()
if json_resp:
return json.loads(resp.text)
else:
return resp.text
return resp.text
except JSONDecodeError as exc:
msg = "ARGO - REST API, {}, {}".format(type(exc), resp.text)
logger.exception(msg)
Expand Down Expand Up @@ -329,24 +328,23 @@ def verify(self):
return self._verify

def _http_method(method_str):
"""
Return function for given HTTP Method from requests library
"""
Return function for given HTTP Method from requests library
Args:
method_str (str): HTTP method, "get", "post", etc.
Args:
method_str (str): HTTP method, "get", "post", etc.
Returns:
function: requests.get, requests.post, etc. None if no match
"""
if method_str == "get":
return requests.get
elif method_str == "post":
return requests.post
elif method_str == "delete":
return requests.delete
elif method_str == "put":
return requests.put
elif method_str == "options":
return requests.options
else:
return None
Returns:
function: requests.get, requests.post, etc. None if no match
"""
if method_str == "get":
return requests.get
if method_str == "post":
return requests.post
if method_str == "delete":
return requests.delete
if method_str == "put":
return requests.put
if method_str == "options":
return requests.options
return None
Loading

0 comments on commit 6b255d3

Please sign in to comment.