Skip to content

Commit fb2c4b4

Browse files
committed
DW-5997: Adding the check for pending dart actions to workflow_Service.run_triggered_workflow, leaving in /do-manual-trigger entry point because it checks for max concurrency.
1 parent c486afc commit fb2c4b4

File tree

5 files changed

+154
-5
lines changed

5 files changed

+154
-5
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from dart.context.locator import injectable
2+
from dart.model.workflow import WorkflowState, WorkflowInstanceState
3+
from dart.model.action import ActionState
4+
5+
import logging
6+
import boto3
7+
8+
_logger = logging.getLogger(__name__)
9+
10+
@injectable
11+
class PendingActionsCheck(object):
12+
def __init__(self, action_service):
13+
self._action_service = action_service
14+
self._batch_client = boto3.client('batch')
15+
16+
def get_not_completed_workflow_instances(self, workflow_id, workflow_service):
17+
wf = workflow_service.get_workflow(workflow_id, raise_when_missing=False)
18+
if not wf:
19+
_logger.info('Zombie Check: workflow (id={wf_id}) not found. log-info: {log_info}'.
20+
format(wf_id=workflow_id, log_info=workflow_msg.get('log_info')))
21+
return None
22+
23+
if wf.data.state != WorkflowState.ACTIVE:
24+
_logger.info('Zombie Check: expected workflow (id={wf_id}) to be in ACTIVE state. log-info: {log_info}'.
25+
format(wf_id=workflow_id, log_info=workflow_msg.get('log_info')))
26+
27+
# get all workflow_instances of current workflow:
28+
NOT_COMPLETE_STATES = ['QUEUED', 'RUNNING']
29+
all_wf_instances = workflow_service.find_workflow_instances(workflow_id)
30+
current_wf_instances = [wf for wf in all_wf_instances if wf.data.state in NOT_COMPLETE_STATES]
31+
_logger.info('Zombie Check: Found workflow instance ids (workflow_id={0}) instances = {1}'.format(workflow_id, current_wf_instances))
32+
33+
return current_wf_instances
34+
35+
def get_instance_actions(self, current_wf_instances):
36+
# get all actions of not completed workflow_instances
37+
incomplete_actions = []
38+
action_2_wf_instance = {}
39+
for wf_instance in current_wf_instances:
40+
wf_instance_actions = self._action_service.find_actions(workflow_instance_id=wf_instance.id)
41+
incomplete_actions.extend(wf_instance_actions)
42+
for action in wf_instance_actions:
43+
action_2_wf_instance[action.id] = wf_instance
44+
45+
jobs_2_actions = {}
46+
for action in incomplete_actions:
47+
if action.data.batch_job_id:
48+
jobs_2_actions[action.data.batch_job_id] = action
49+
50+
return incomplete_actions, jobs_2_actions, action_2_wf_instance
51+
52+
def handle_done_batch_jobs_with_not_complete_wf_instances(self, batch_jobs, jobs_2_actions, action_2_wf_instance, workflow_service):
53+
for job in batch_jobs.get('jobs'):
54+
# jobs fail + action not-failed => fail workflow instance and action
55+
action = jobs_2_actions[job.get('jobId')]
56+
if action:
57+
wf_instance = action_2_wf_instance[action.id]
58+
if job.get('status') == 'FAILED' and not (action.data.state in ['FAILED', 'COMPLETED']):
59+
_logger.info("Zombie Check: Job {0} is failed but action {0} is not failed/completed. Updating action and workflow_instance to FAILED".format(job.get('jobId'), action.id))
60+
self._action_service.update_action_state(action, ActionState.FAILED, action.data.error_message)
61+
workflow_service.update_workflow_instance_state(wf_instance, WorkflowInstanceState.FAILED)
62+
63+
# Jobs complete + action not-failed => mark workflow instance as complete and mark actions as complete
64+
if job.get('status') == 'COMPLETED' and not (action.data.state in ['FAILED', 'COMPLETED']):
65+
_logger.info("Zombie Check: Job {0} is completed but action {0} is not failed/completed. Updating action to COMPLETED".format(job.get("jobId"), action.id))
66+
self._action_service.update_action_state(action, ActionState.COMPLETED, action.data.error_message)
67+
workflow_service.update_workflow_instance_state(wf_instance, WorkflowInstanceState.FAILED)
68+
69+
def find_pending_dart_actions(self, workflow_id, workflow_service):
70+
''' We send workflow_service to avoid cyclical injection from workflow_service '''
71+
current_wf_instances = self.get_not_completed_workflow_instances(workflow_id, workflow_service)
72+
if current_wf_instances:
73+
incomplete_actions, jobs_2_actions, action_2_wf_instance = self.get_instance_actions(current_wf_instances)
74+
batch_job_ids = [job.data.batch_job_id for job in incomplete_actions]
75+
_logger.info("Zombie Check: extract job_ids {0} form incomplete actions {1}".format(batch_job_ids, [act.id for act in incomplete_actions]))
76+
77+
try:
78+
batch_jobs = self._batch_client.describe_jobs(jobs=batch_job_ids)
79+
except Exception as err:
80+
_logger.error("Zombie Check: failed to execute batch's describe_jobs. err = {0}".format(err))
81+
else:
82+
self.handle_done_batch_jobs_with_not_complete_wf_instances(batch_jobs, jobs_2_actions, action_2_wf_instance, workflow_service)
83+
84+

src/python/dart/service/trigger.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
class TriggerService(object):
2222
def __init__(self, action_service, datastore_service, workflow_service, manual_trigger_processor,
2323
subscription_batch_trigger_processor, workflow_completion_trigger_processor, event_trigger_processor,
24-
scheduled_trigger_processor, super_trigger_processor, retry_trigger_processor, filter_service,
25-
subscription_service, dart_config):
24+
scheduled_trigger_processor, super_trigger_processor, retry_trigger_processor,
25+
zombie_check_trigger_processor, filter_service, subscription_service, dart_config):
2626
self._action_service = action_service
2727
self._datastore_service = datastore_service
2828
self._workflow_service = workflow_service
@@ -33,6 +33,7 @@ def __init__(self, action_service, datastore_service, workflow_service, manual_t
3333
self._scheduled_trigger_processor = scheduled_trigger_processor
3434
self._super_trigger_processor = super_trigger_processor
3535
self._retry_trigger_processor = retry_trigger_processor
36+
self._zombie_check_trigger_processor = zombie_check_trigger_processor
3637
self._filter_service = filter_service
3738
self._subscription_service = subscription_service
3839
self._nudge_config = dart_config['nudge']
@@ -45,6 +46,7 @@ def __init__(self, action_service, datastore_service, workflow_service, manual_t
4546
scheduled_trigger_processor.trigger_type().name: scheduled_trigger_processor,
4647
super_trigger_processor.trigger_type().name: super_trigger_processor,
4748
retry_trigger_processor.trigger_type().name: retry_trigger_processor,
49+
zombie_check_trigger_processor.trigger_type().name: zombie_check_trigger_processor
4850
}
4951

5052
params_schemas = []
@@ -218,6 +220,9 @@ def delete_trigger_retryable(trigger_id):
218220
db.session.delete(trigger_dao)
219221
db.session.commit()
220222

223+
def check_zombie_workflows(self, workflow_json):
224+
self._zombie_check_trigger_processor.send_evaluation_message(workflow_json)
225+
221226
def trigger_workflow_async(self, workflow_json):
222227
self._manual_trigger_processor.send_evaluation_message(workflow_json)
223228

src/python/dart/service/workflow.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@
2222
@injectable
2323
class WorkflowService(object):
2424
def __init__(self, datastore_service, action_service, trigger_proxy, filter_service, subscription_service,
25-
subscription_element_service, emailer):
25+
subscription_element_service, emailer, pending_actions_check):
2626
self._datastore_service = datastore_service
2727
self._action_service = action_service
2828
self._trigger_proxy = trigger_proxy
2929
self._filter_service = filter_service
3030
self._subscription_service = subscription_service
3131
self._subscription_element_service = subscription_element_service
3232
self._emailer = emailer
33+
self._pending_actions_check = pending_actions_check
3334

3435
@staticmethod
3536
def save_workflow(workflow, commit=True, flush=False):
@@ -263,6 +264,7 @@ def run_triggered_workflow(self, workflow_msg, trigger_type, trigger_id=None, re
263264
states = [WorkflowInstanceState.QUEUED, WorkflowInstanceState.RUNNING]
264265
if self.find_workflow_instances_count(wf.id, states) >= wf.data.concurrency:
265266
_logger.info('workflow (id={wf_id}) has already reached max concurrency of {concurrency}. log-info: {log_info}'.format(wf_id=wf.id, concurrency=wf.data.concurrency, log_info=workflow_msg.get('log_info')))
267+
self._pending_actions_check.find_pending_dart_actions(wf.id, self)
266268
return
267269

268270
wf_instance = self.save_workflow_instance(
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from dart.context.locator import injectable
2+
from dart.model.trigger import TriggerType
3+
from dart.trigger.base import TriggerProcessor
4+
from dart.model.workflow import WorkflowState, WorkflowInstanceState
5+
from dart.model.action import ActionState
6+
7+
import logging
8+
import boto3
9+
10+
_logger = logging.getLogger(__name__)
11+
12+
zombie_check_trigger = TriggerType(
13+
name='zombie_check',
14+
description='Check if the actions of current workflow instances are not in (FAILED, SUCCESS) states in Batch'
15+
)
16+
17+
@injectable
18+
class ZombieCheckTriggerProcessor(TriggerProcessor):
19+
def __init__(self, trigger_proxy, action_service, workflow_service, pending_actions_check):
20+
self._trigger_proxy = trigger_proxy
21+
self._action_service = action_service
22+
self._workflow_service = workflow_service
23+
self._trigger_type = zombie_check_trigger
24+
self._pending_actions_check = pending_actions_check
25+
self._batch_client = boto3.client('batch')
26+
27+
def trigger_type(self):
28+
return self._trigger_type
29+
30+
def initialize_trigger(self, trigger, trigger_service):
31+
# manual triggers should never be saved, thus never initialized
32+
pass
33+
34+
def update_trigger(self, unmodified_trigger, modified_trigger):
35+
return modified_trigger
36+
37+
def evaluate_message(self, workflow_msg, trigger_service):
38+
""" :type message: dict
39+
:type trigger_service: dart.service.trigger.TriggerService """
40+
41+
workflow_id = workflow_msg.get('workflow_id')
42+
self._pending_actions_check.find_pending_dart_actions(workflow_id, self._workflow_service)
43+
44+
# return an empty list since this is not associated with a particular trigger instance
45+
return []
46+
47+
def teardown_trigger(self, trigger, trigger_service):
48+
pass
49+
50+
def send_evaluation_message(self, workflow_msg):
51+
self._trigger_proxy.process_trigger(self._trigger_type, workflow_msg)
52+
53+

src/python/dart/web/api/workflow.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,17 @@ def trigger_workflow(workflow):
162162
if wf.data.state != WorkflowState.ACTIVE:
163163
return {'results': 'ERROR', 'error_message': 'This workflow is not ACTIVE'}, 400, None
164164

165+
wf_uuid = uuid.uuid4().hex # to avoid uuid serialization issues
166+
current_user_id = current_user.email if hasattr(current_user, 'email') else 'anonymous'
167+
165168
states = [WorkflowInstanceState.QUEUED, WorkflowInstanceState.RUNNING]
166169
if workflow_service().find_workflow_instances_count(wf.id, states) >= wf.data.concurrency:
170+
_logger.info("Checking for Batch 'stuck' workflows, workflow_id={workflow_id} for user={user_id} with uuid={wf_uuid}".
171+
format(workflow_id=workflow.id, user_id=current_user_id, wf_uuid=wf_uuid))
172+
trigger_service().check_zombie_workflows({'workflow_id': workflow.id,
173+
'log_info': {'user_id': current_user_id, 'wf_uuid': wf_uuid}})
167174
return {'results': 'ERROR', 'error_message': 'Max concurrency reached: %s' % wf.data.concurrency}, 400, None
168175

169-
wf_uuid = uuid.uuid4().hex # to avoid uuid serialization issues
170-
current_user_id = current_user.email if hasattr(current_user, 'email') else 'anonymous'
171176
_logger.info("Launching Workflow {workflow_id} for user={user_id} with uuid={wf_uuid}".
172177
format(workflow_id=workflow.id, user_id=current_user_id, wf_uuid=wf_uuid))
173178

0 commit comments

Comments
 (0)