Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graduated force-triggering (prereqs, xtriggers, queues). #5585

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ ones in. -->

### Enhancements

[#5585](https://github.com/cylc/cylc-flow/pull/5585) - Make `cylc trigger`
act incrementally with each call: first runahead release and satisfy
prerequisites, then xtriggers, then queue release.

[#5405](https://github.com/cylc/cylc-flow/pull/5405) - Improve scan command
help, and add scheduler PID to the output.

Expand Down
9 changes: 4 additions & 5 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ async def configure(self):
self.workflow_db_mgr,
self.task_events_mgr,
self.data_store_mgr,
self.flow_mgr
self.flow_mgr,
self.xtrigger_mgr
)

self.data_store_mgr.initiate_data_model()
Expand Down Expand Up @@ -1604,18 +1605,16 @@ async def main_loop(self) -> None:
if self.xtrigger_mgr.check_xtriggers(
itask, self.workflow_db_mgr.put_xtriggers):
housekeep_xtriggers = True
if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)
self.pool.queue_if_ready(itask)

# Check for satisfied ext_triggers, and queue if ready.
if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
and self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)
and all(itask.is_ready_to_run())
):
self.pool.queue_task(itask)
self.pool.queue_if_ready(itask)

if housekeep_xtriggers:
# (Could do this periodically?)
Expand Down
31 changes: 18 additions & 13 deletions cylc/flow/scripts/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@

"""cylc trigger [OPTIONS] ARGS

Force tasks to run despite unsatisfied prerequisites.
Force tasks to run despite the runahead limit, unsatisfied prerequisites,
xtriggers, and queue limiting.

* Triggering an unqueued waiting task queues it, regardless of prerequisites.
* Triggering a queued task submits it, regardless of queue limiting.
* Triggering an active task has no effect (it already triggered).
- If a task has unsatisfied prerequisites, triggering will satisfy them
- Otherwise, it will satisfy any unsatisfied xtriggers
- Otherwise, it will submit the task to run despite queue limiting

Incomplete and active-waiting tasks in the n=0 window already belong to a flow.
Triggering them queues them to run (or rerun) in the same flow.
If a task has unsatisfied prerequisites and xtriggers, and belongs to a limited
queue, you may need to trigger it three times to make it run immediately.

Beyond n=0, triggered tasks get all current active flow numbers by default, or
specified flow numbers via the --flow option. Those flows - if/when they catch
up - will see tasks that ran after triggering event as having run already.
The runahead limit is ignored (i.e., you can trigger tasks beyond the limit).

Tasks in the n=0 window already belong to a flow. Others will be assigned all
current active flows by default (see --flow for other options).

Triggering an active task has no effect (it is already triggered).

Examples:
# trigger task foo in cycle 1234 in test
Expand Down Expand Up @@ -98,10 +102,11 @@ def get_option_parser() -> COP:

parser.add_option(
"--flow", action="append", dest="flow", metavar="FLOW",
help=f"Assign the triggered task to all active flows ({FLOW_ALL});"
f" no flow ({FLOW_NONE}); a new flow ({FLOW_NEW});"
f" or a specific flow (e.g. 2). The default is {FLOW_ALL}."
" Reuse the option to assign multiple specific flows."
help=f"If the target task does not already belong to a flow,"
f" assign it to all active flows ({FLOW_ALL});"
f" or to no flow ({FLOW_NONE}); or to a new flow ({FLOW_NEW});"
f" or to a specific flow (e.g. 2). The default is {FLOW_ALL}."
" Reuse the option to assign to multiple specific flows."
)

parser.add_option(
Expand Down
Loading
Loading