Skip to content

Commit

Permalink
Merge pull request #6067 from hjoliver/tweak-suicide
Browse files Browse the repository at this point in the history
Distinguish never-spawned from never-submitted.
  • Loading branch information
hjoliver committed Apr 29, 2024
2 parents 6f8ce38 + d7305c5 commit 91342c3
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 134 deletions.
1 change: 1 addition & 0 deletions changes.d/6067.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a bug that sometimes allowed suicide-triggered or manually removed tasks to be added back later.
3 changes: 2 additions & 1 deletion cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1788,6 +1788,7 @@ def _reset_job_timers(self, itask):
itask.timeout = None
itask.poll_timer = None
return

ctx = (itask.submit_num, itask.state.status)
if itask.poll_timer and itask.poll_timer.ctx == ctx:
return
Expand Down Expand Up @@ -1844,7 +1845,7 @@ def _reset_job_timers(self, itask):
message += '%d*' % (num + 1)
message += '%s,' % intvl_as_str(item)
message += '...'
LOG.info(f"[{itask}] {message}")
LOG.debug(f"[{itask}] {message}")
# Set next poll time
self.check_poll_time(itask)

Expand Down
77 changes: 55 additions & 22 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class TaskPool:

ERR_TMPL_NO_TASKID_MATCH = "No matching tasks found: {0}"
ERR_PREFIX_TASK_NOT_ON_SEQUENCE = "Invalid cycle point for task: {0}, {1}"
SUICIDE_MSG = "suicide"
SUICIDE_MSG = "suicide trigger"

def __init__(
self,
Expand Down Expand Up @@ -221,7 +221,7 @@ def add_to_pool(self, itask) -> None:
self.active_tasks.setdefault(itask.point, {})
self.active_tasks[itask.point][itask.identity] = itask
self.active_tasks_changed = True
LOG.info(f"[{itask}] added to active task pool")
LOG.debug(f"[{itask}] added to active task pool")

self.create_data_store_elements(itask)

Expand Down Expand Up @@ -807,10 +807,11 @@ def remove(self, itask, reason=None):
itask.flow_nums
)

msg = "removed from active task pool"
if reason is None:
msg = "task completed"
msg += ": completed"
else:
msg = f"removed ({reason})"
msg += f": {reason}"

if itask.is_xtrigger_sequential:
self.xtrigger_mgr.sequential_spawn_next.discard(itask.identity)
Expand All @@ -837,7 +838,17 @@ def remove(self, itask, reason=None):
# Event-driven final update of task_states table.
# TODO: same for datastore (still updated by scheduler loop)
self.workflow_db_mgr.put_update_task_state(itask)
LOG.info(f"[{itask}] {msg}")

level = logging.DEBUG
if itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
):
level = logging.WARNING
msg += " - active job orphaned"

LOG.log(level, f"[{itask}] {msg}")
del itask

def get_tasks(self) -> List[TaskProxy]:
Expand Down Expand Up @@ -1392,14 +1403,12 @@ def spawn_on_output(self, itask, output, forced=False):
suicide.append(t)

for c_task in suicide:
msg = self.__class__.SUICIDE_MSG
if c_task.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
is_held=False):
msg += " suiciding while active"
self.remove(c_task, msg)
self.remove(c_task, self.__class__.SUICIDE_MSG)

if suicide:
# Update DB now in case of very quick respawn attempt.
# See https://github.com/cylc/cylc-flow/issues/6066
self.workflow_db_mgr.process_queued_ops()

self.remove_if_complete(itask, output)

Expand Down Expand Up @@ -1555,17 +1564,33 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool:

def _get_task_history(
self, name: str, point: 'PointBase', flow_nums: Set[int]
) -> Tuple[int, str, bool]:
"""Get history of previous submits for this task."""
) -> Tuple[bool, int, str, bool]:
"""Get history of previous submits for this task.
Args:
name: task name
point: task cycle point
flow_nums: task flow numbers
Returns:
never_spawned: if task never spawned before
submit_num: submit number of previous submit
prev_status: task status of previous sumbit
prev_flow_wait: if previous submit was a flow-wait task
"""
info = self.workflow_db_mgr.pri_dao.select_prev_instances(
name, str(point)
)
try:
submit_num: int = max(s[0] for s in info)
except ValueError:
# never spawned before in any flow
# never spawned in any flow
submit_num = 0
never_spawned = True
else:
never_spawned = False
# (submit_num could still be zero, if removed before submit)

prev_status: str = TASK_STATUS_WAITING
prev_flow_wait = False
Expand All @@ -1582,7 +1607,7 @@ def _get_task_history(
# overlap due to merges (they'll have have same snum and
# f_wait); keep going to find the finished one, if any.

return submit_num, prev_status, prev_flow_wait
return never_spawned, submit_num, prev_status, prev_flow_wait

def _load_historical_outputs(self, itask):
"""Load a task's historical outputs from the DB."""
Expand Down Expand Up @@ -1619,10 +1644,19 @@ def spawn_task(
if not self.can_be_spawned(name, point):
return None

submit_num, prev_status, prev_flow_wait = (
never_spawned, submit_num, prev_status, prev_flow_wait = (
self._get_task_history(name, point, flow_nums)
)

if (
not never_spawned and
not prev_flow_wait and
submit_num == 0
):
# Previous instance removed before completing any outputs.
LOG.debug(f"Not spawning {point}/{name} - task removed")
return None

itask = self._get_task_proxy_db_outputs(
point,
self.config.get_taskdef(name),
Expand Down Expand Up @@ -1653,8 +1687,6 @@ def spawn_task(
if itask.transient and not force:
return None

# (else not previously finishedr, so run it)

if not itask.transient:
if (name, point) in self.tasks_to_hold:
LOG.info(f"[{itask}] holding (as requested earlier)")
Expand Down Expand Up @@ -2117,8 +2149,9 @@ def force_trigger_tasks(
if not self.can_be_spawned(name, point):
continue

submit_num, _prev_status, prev_fwait = self._get_task_history(
name, point, flow_nums)
_, submit_num, _prev_status, prev_fwait = (
self._get_task_history(name, point, flow_nums)
)

itask = TaskProxy(
self.tokens,
Expand Down
6 changes: 3 additions & 3 deletions tests/functional/cylc-set/02-off-flow-out.t
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ reftest_run

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/a_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/a_cold.* setting implied output: started'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" '1/a_cold.* task completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" '1/a_cold.* completed'

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/b_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/b_cold.* setting implied output: started'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" '1/b_cold.* task completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" '1/b_cold.* completed'

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/c_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/c_cold.* setting implied output: started'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" '1/c_cold.* task completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" '1/c_cold.* completed'

purge
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ __FLOW__

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"

cylc play "${WORKFLOW_NAME}"
cylc play --debug "${WORKFLOW_NAME}"

poll_grep_workflow_log "INFO - DONE"

Expand Down
2 changes: 1 addition & 1 deletion tests/functional/hold-release/05-release.t
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
inherit = STOP
script = """
cylc__job__poll_grep_workflow_log -E \
'1/dog1/01:succeeded.* task completed'
'1/dog1/01:succeeded.* completed'
cylc stop "${CYLC_WORKFLOW_ID}"
"""
__FLOW_CONFIG__
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/spawn-on-demand/09-set-outputs/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
cylc set --flow=2 --output=out1 --output=out2 "${CYLC_WORKFLOW_ID}//1/foo"
# Set bar outputs after it is gone from the pool.
cylc__job__poll_grep_workflow_log -E "1/bar.* task completed"
cylc__job__poll_grep_workflow_log -E "1/bar.* completed"
cylc set --flow=2 --output=out1 --output=out2 "${CYLC_WORKFLOW_ID}//1/bar"
"""
[[qux, quw, fux, fuw]]
Expand Down
24 changes: 0 additions & 24 deletions tests/functional/triggering/15-suicide.t

This file was deleted.

23 changes: 0 additions & 23 deletions tests/functional/triggering/15-suicide/flow.cylc

This file was deleted.

5 changes: 0 additions & 5 deletions tests/functional/triggering/15-suicide/reference.log

This file was deleted.

34 changes: 0 additions & 34 deletions tests/functional/triggering/18-suicide-active.t

This file was deleted.

11 changes: 0 additions & 11 deletions tests/functional/triggering/18-suicide-active/flow.cylc

This file was deleted.

3 changes: 2 additions & 1 deletion tests/integration/test_task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from cylc.flow.task_events_mgr import TaskJobLogsRetrieveContext
from cylc.flow.scheduler import Scheduler

import logging
from typing import Any as Fixture


Expand Down Expand Up @@ -51,7 +52,7 @@ async def test__reset_job_timers(
process_execution_polling_intervals.
"""
schd = scheduler(flow(one_conf))
async with start(schd):
async with start(schd, level=logging.DEBUG):
itask = schd.pool.get_tasks()[0]
itask.state.status = 'running'
itask.platform['execution polling intervals'] = [25]
Expand Down
Loading

0 comments on commit 91342c3

Please sign in to comment.