From fb2c4b4c76f7caa432238c91092f18f59999f20b Mon Sep 17 00:00:00 2001 From: Adi Ophir Date: Mon, 7 Aug 2017 16:42:27 -0500 Subject: [PATCH] DW-5997: Adding the check for pending dart actions to workflow_Service.run_triggered_workflow, leaving in /do-manual-trigger entry point because it checks for max concurrency. --- .../dart/service/pending_actions_check.py | 84 +++++++++++++++++++ src/python/dart/service/trigger.py | 9 +- src/python/dart/service/workflow.py | 4 +- src/python/dart/trigger/zombie_check.py | 53 ++++++++++++ src/python/dart/web/api/workflow.py | 9 +- 5 files changed, 154 insertions(+), 5 deletions(-) create mode 100644 src/python/dart/service/pending_actions_check.py create mode 100644 src/python/dart/trigger/zombie_check.py diff --git a/src/python/dart/service/pending_actions_check.py b/src/python/dart/service/pending_actions_check.py new file mode 100644 index 0000000..c00bab1 --- /dev/null +++ b/src/python/dart/service/pending_actions_check.py @@ -0,0 +1,84 @@ +from dart.context.locator import injectable +from dart.model.workflow import WorkflowState, WorkflowInstanceState +from dart.model.action import ActionState + +import logging +import boto3 + +_logger = logging.getLogger(__name__) + +@injectable +class PendingActionsCheck(object): + def __init__(self, action_service): + self._action_service = action_service + self._batch_client = boto3.client('batch') + + def get_not_completed_workflow_instances(self, workflow_id, workflow_service): + wf = workflow_service.get_workflow(workflow_id, raise_when_missing=False) + if not wf: + _logger.info('Zombie Check: workflow (id={wf_id}) not found. log-info: {log_info}'. + format(wf_id=workflow_id, log_info=workflow_msg.get('log_info'))) + return None + + if wf.data.state != WorkflowState.ACTIVE: + _logger.info('Zombie Check: expected workflow (id={wf_id}) to be in ACTIVE state. log-info: {log_info}'. + format(wf_id=workflow_id, log_info=workflow_msg.get('log_info'))) + + # get all workflow_instances of current workflow: + NOT_COMPLETE_STATES = ['QUEUED', 'RUNNING'] + all_wf_instances = workflow_service.find_workflow_instances(workflow_id) + current_wf_instances = [wf for wf in all_wf_instances if wf.data.state in NOT_COMPLETE_STATES] + _logger.info('Zombie Check: Found workflow instance ids (workflow_id={0}) instances = {1}'.format(workflow_id, current_wf_instances)) + + return current_wf_instances + + def get_instance_actions(self, current_wf_instances): + # get all actions of not completed workflow_instances + incomplete_actions = [] + action_2_wf_instance = {} + for wf_instance in current_wf_instances: + wf_instance_actions = self._action_service.find_actions(workflow_instance_id=wf_instance.id) + incomplete_actions.extend(wf_instance_actions) + for action in wf_instance_actions: + action_2_wf_instance[action.id] = wf_instance + + jobs_2_actions = {} + for action in incomplete_actions: + if action.data.batch_job_id: + jobs_2_actions[action.data.batch_job_id] = action + + return incomplete_actions, jobs_2_actions, action_2_wf_instance + + def handle_done_batch_jobs_with_not_complete_wf_instances(self, batch_jobs, jobs_2_actions, action_2_wf_instance, workflow_service): + for job in batch_jobs.get('jobs'): + # jobs fail + action not-failed => fail workflow instance and action + action = jobs_2_actions[job.get('jobId')] + if action: + wf_instance = action_2_wf_instance[action.id] + if job.get('status') == 'FAILED' and not (action.data.state in ['FAILED', 'COMPLETED']): + _logger.info("Zombie Check: Job {0} is failed but action {0} is not failed/completed. Updating action and workflow_instance to FAILED".format(job.get('jobId'), action.id)) + self._action_service.update_action_state(action, ActionState.FAILED, action.data.error_message) + workflow_service.update_workflow_instance_state(wf_instance, WorkflowInstanceState.FAILED) + + # Jobs complete + action not-failed => mark workflow instance as complete and mark actions as complete + if job.get('status') == 'COMPLETED' and not (action.data.state in ['FAILED', 'COMPLETED']): + _logger.info("Zombie Check: Job {0} is completed but action {0} is not failed/completed. Updating action to COMPLETED".format(job.get("jobId"), action.id)) + self._action_service.update_action_state(action, ActionState.COMPLETED, action.data.error_message) + workflow_service.update_workflow_instance_state(wf_instance, WorkflowInstanceState.FAILED) + + def find_pending_dart_actions(self, workflow_id, workflow_service): + ''' We send workflow_service to avoid cyclical injection from workflow_service ''' + current_wf_instances = self.get_not_completed_workflow_instances(workflow_id, workflow_service) + if current_wf_instances: + incomplete_actions, jobs_2_actions, action_2_wf_instance = self.get_instance_actions(current_wf_instances) + batch_job_ids = [job.data.batch_job_id for job in incomplete_actions] + _logger.info("Zombie Check: extract job_ids {0} form incomplete actions {1}".format(batch_job_ids, [act.id for act in incomplete_actions])) + + try: + batch_jobs = self._batch_client.describe_jobs(jobs=batch_job_ids) + except Exception as err: + _logger.error("Zombie Check: failed to execute batch's describe_jobs. err = {0}".format(err)) + else: + self.handle_done_batch_jobs_with_not_complete_wf_instances(batch_jobs, jobs_2_actions, action_2_wf_instance, workflow_service) + + diff --git a/src/python/dart/service/trigger.py b/src/python/dart/service/trigger.py index 77bff61..e4ed891 100644 --- a/src/python/dart/service/trigger.py +++ b/src/python/dart/service/trigger.py @@ -21,8 +21,8 @@ class TriggerService(object): def __init__(self, action_service, datastore_service, workflow_service, manual_trigger_processor, subscription_batch_trigger_processor, workflow_completion_trigger_processor, event_trigger_processor, - scheduled_trigger_processor, super_trigger_processor, retry_trigger_processor, filter_service, - subscription_service, dart_config): + scheduled_trigger_processor, super_trigger_processor, retry_trigger_processor, + zombie_check_trigger_processor, filter_service, subscription_service, dart_config): self._action_service = action_service self._datastore_service = datastore_service self._workflow_service = workflow_service @@ -33,6 +33,7 @@ def __init__(self, action_service, datastore_service, workflow_service, manual_t self._scheduled_trigger_processor = scheduled_trigger_processor self._super_trigger_processor = super_trigger_processor self._retry_trigger_processor = retry_trigger_processor + self._zombie_check_trigger_processor = zombie_check_trigger_processor self._filter_service = filter_service self._subscription_service = subscription_service self._nudge_config = dart_config['nudge'] @@ -45,6 +46,7 @@ def __init__(self, action_service, datastore_service, workflow_service, manual_t scheduled_trigger_processor.trigger_type().name: scheduled_trigger_processor, super_trigger_processor.trigger_type().name: super_trigger_processor, retry_trigger_processor.trigger_type().name: retry_trigger_processor, + zombie_check_trigger_processor.trigger_type().name: zombie_check_trigger_processor } params_schemas = [] @@ -218,6 +220,9 @@ def delete_trigger_retryable(trigger_id): db.session.delete(trigger_dao) db.session.commit() + def check_zombie_workflows(self, workflow_json): + self._zombie_check_trigger_processor.send_evaluation_message(workflow_json) + def trigger_workflow_async(self, workflow_json): self._manual_trigger_processor.send_evaluation_message(workflow_json) diff --git a/src/python/dart/service/workflow.py b/src/python/dart/service/workflow.py index 5f8f72b..d19eafe 100644 --- a/src/python/dart/service/workflow.py +++ b/src/python/dart/service/workflow.py @@ -22,7 +22,7 @@ @injectable class WorkflowService(object): def __init__(self, datastore_service, action_service, trigger_proxy, filter_service, subscription_service, - subscription_element_service, emailer): + subscription_element_service, emailer, pending_actions_check): self._datastore_service = datastore_service self._action_service = action_service self._trigger_proxy = trigger_proxy @@ -30,6 +30,7 @@ def __init__(self, datastore_service, action_service, trigger_proxy, filter_serv self._subscription_service = subscription_service self._subscription_element_service = subscription_element_service self._emailer = emailer + self._pending_actions_check = pending_actions_check @staticmethod def save_workflow(workflow, commit=True, flush=False): @@ -263,6 +264,7 @@ def run_triggered_workflow(self, workflow_msg, trigger_type, trigger_id=None, re states = [WorkflowInstanceState.QUEUED, WorkflowInstanceState.RUNNING] if self.find_workflow_instances_count(wf.id, states) >= wf.data.concurrency: _logger.info('workflow (id={wf_id}) has already reached max concurrency of {concurrency}. log-info: {log_info}'.format(wf_id=wf.id, concurrency=wf.data.concurrency, log_info=workflow_msg.get('log_info'))) + self._pending_actions_check.find_pending_dart_actions(wf.id, self) return wf_instance = self.save_workflow_instance( diff --git a/src/python/dart/trigger/zombie_check.py b/src/python/dart/trigger/zombie_check.py new file mode 100644 index 0000000..1153451 --- /dev/null +++ b/src/python/dart/trigger/zombie_check.py @@ -0,0 +1,53 @@ +from dart.context.locator import injectable +from dart.model.trigger import TriggerType +from dart.trigger.base import TriggerProcessor +from dart.model.workflow import WorkflowState, WorkflowInstanceState +from dart.model.action import ActionState + +import logging +import boto3 + +_logger = logging.getLogger(__name__) + +zombie_check_trigger = TriggerType( + name='zombie_check', + description='Check if the actions of current workflow instances are not in (FAILED, SUCCESS) states in Batch' +) + +@injectable +class ZombieCheckTriggerProcessor(TriggerProcessor): + def __init__(self, trigger_proxy, action_service, workflow_service, pending_actions_check): + self._trigger_proxy = trigger_proxy + self._action_service = action_service + self._workflow_service = workflow_service + self._trigger_type = zombie_check_trigger + self._pending_actions_check = pending_actions_check + self._batch_client = boto3.client('batch') + + def trigger_type(self): + return self._trigger_type + + def initialize_trigger(self, trigger, trigger_service): + # manual triggers should never be saved, thus never initialized + pass + + def update_trigger(self, unmodified_trigger, modified_trigger): + return modified_trigger + + def evaluate_message(self, workflow_msg, trigger_service): + """ :type message: dict + :type trigger_service: dart.service.trigger.TriggerService """ + + workflow_id = workflow_msg.get('workflow_id') + self._pending_actions_check.find_pending_dart_actions(workflow_id, self._workflow_service) + + # return an empty list since this is not associated with a particular trigger instance + return [] + + def teardown_trigger(self, trigger, trigger_service): + pass + + def send_evaluation_message(self, workflow_msg): + self._trigger_proxy.process_trigger(self._trigger_type, workflow_msg) + + diff --git a/src/python/dart/web/api/workflow.py b/src/python/dart/web/api/workflow.py index 57de552..c55988a 100644 --- a/src/python/dart/web/api/workflow.py +++ b/src/python/dart/web/api/workflow.py @@ -162,12 +162,17 @@ def trigger_workflow(workflow): if wf.data.state != WorkflowState.ACTIVE: return {'results': 'ERROR', 'error_message': 'This workflow is not ACTIVE'}, 400, None + wf_uuid = uuid.uuid4().hex # to avoid uuid serialization issues + current_user_id = current_user.email if hasattr(current_user, 'email') else 'anonymous' + states = [WorkflowInstanceState.QUEUED, WorkflowInstanceState.RUNNING] if workflow_service().find_workflow_instances_count(wf.id, states) >= wf.data.concurrency: + _logger.info("Checking for Batch 'stuck' workflows, workflow_id={workflow_id} for user={user_id} with uuid={wf_uuid}". + format(workflow_id=workflow.id, user_id=current_user_id, wf_uuid=wf_uuid)) + trigger_service().check_zombie_workflows({'workflow_id': workflow.id, + 'log_info': {'user_id': current_user_id, 'wf_uuid': wf_uuid}}) return {'results': 'ERROR', 'error_message': 'Max concurrency reached: %s' % wf.data.concurrency}, 400, None - wf_uuid = uuid.uuid4().hex # to avoid uuid serialization issues - current_user_id = current_user.email if hasattr(current_user, 'email') else 'anonymous' _logger.info("Launching Workflow {workflow_id} for user={user_id} with uuid={wf_uuid}". format(workflow_id=workflow.id, user_id=current_user_id, wf_uuid=wf_uuid))