From da9ac3a69eb7f85fc05d3d9f64f292c49cadc099 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Sat, 13 Apr 2024 16:34:01 +1200 Subject: [PATCH] Distinguish never-spawned from never-submitted. --- cylc/flow/task_pool.py | 44 +++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 2b214d70943..f8573923c46 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1401,6 +1401,12 @@ def spawn_on_output(self, itask, output, forced=False): msg += " suiciding while active" self.remove(c_task, msg) + if suicide: + # Update the DB immediately to ensure a record exists in case of + # very quick removal and respawn, due to suicide triggers. + # See https://github.com/cylc/cylc-flow/issues/6066 + self.workflow_db_mgr.process_queued_ops() + self.remove_if_complete(itask, output) def remove_if_complete( @@ -1555,17 +1561,33 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool: def _get_task_history( self, name: str, point: 'PointBase', flow_nums: Set[int] - ) -> Tuple[int, str, bool]: - """Get history of previous submits for this task.""" + ) -> Tuple[bool, int, str, bool]: + """Get history of previous submits for this task. + + Args: + name: task name + point: task cycle point + flow_nums: task flow numbers + Returns: + never_spawned: if task never spawned before + submit_num: submit number of previous submit + prev_status: task status of previous sumbit + prev_flow_wait: if previous submit was a flow-wait task + + """ info = self.workflow_db_mgr.pri_dao.select_prev_instances( name, str(point) ) try: submit_num: int = max(s[0] for s in info) except ValueError: - # never spawned before in any flow + # never spawned in any flow submit_num = 0 + never_spawned = True + else: + never_spawned = False + # (submit_num could still be zero, if removed before submit) prev_status: str = TASK_STATUS_WAITING prev_flow_wait = False @@ -1582,7 +1604,7 @@ def _get_task_history( # overlap due to merges (they'll have have same snum and # f_wait); keep going to find the finished one, if any. - return submit_num, prev_status, prev_flow_wait + return never_spawned, submit_num, prev_status, prev_flow_wait def _load_historical_outputs(self, itask): """Load a task's historical outputs from the DB.""" @@ -1619,10 +1641,15 @@ def spawn_task( if not self.can_be_spawned(name, point): return None - submit_num, prev_status, prev_flow_wait = ( + never_spawned, submit_num, prev_status, prev_flow_wait = ( self._get_task_history(name, point, flow_nums) ) + if not never_spawned and submit_num == 0: + # Previous spawn suicided before completing any outputs. + LOG.debug(f"{point}/{name} already spawned in this flow") + return None + itask = self._get_task_proxy_db_outputs( point, self.config.get_taskdef(name), @@ -1653,8 +1680,6 @@ def spawn_task( if itask.transient and not force: return None - # (else not previously finishedr, so run it) - if not itask.transient: if (name, point) in self.tasks_to_hold: LOG.info(f"[{itask}] holding (as requested earlier)") @@ -2117,8 +2142,9 @@ def force_trigger_tasks( if not self.can_be_spawned(name, point): continue - submit_num, _prev_status, prev_fwait = self._get_task_history( - name, point, flow_nums) + _, submit_num, _prev_status, prev_fwait = ( + self._get_task_history(name, point, flow_nums) + ) itask = TaskProxy( self.tokens,