Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip Mode #6039

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6039.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a new mode task run mode "skip" which overrides workflow live mode task submission.
60 changes: 60 additions & 0 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@
from cylc.flow.platforms import (
fail_if_platform_and_host_conflict, get_platform_deprecated_settings,
is_platform_definition_subshell)
from cylc.flow.run_modes import RunMode
from cylc.flow.task_events_mgr import EventData
from cylc.flow.run_modes import TASK_CONFIG_RUN_MODES


# Regex to check whether a string is a command
REC_COMMAND = re.compile(r'(`|\$\()\s*(.*)\s*([`)])$')
Expand Down Expand Up @@ -1334,6 +1337,27 @@ def get_script_common_text(this: str, example: Optional[str] = None):
"[platforms][<platform name>]submission retry delays"
)
)
Conf(
'run mode', VDR.V_STRING,
options=list(TASK_CONFIG_RUN_MODES) + [''],
default=RunMode.LIVE.value,
desc=f'''
For a workflow run in live mode run this task in skip
mode.

{RunMode.LIVE.value}:
{RunMode.LIVE.describe()}
{RunMode.SKIP.value}:
{RunMode.SKIP.describe()}


.. seealso::

:ref:`task-run-modes`

.. versionadded:: 8.4.0

''')
with Conf('meta', desc=r'''
Metadata for the task or task family.

Expand Down Expand Up @@ -1406,7 +1430,43 @@ def get_script_common_text(this: str, example: Optional[str] = None):
determine how an event handler responds to task failure
events.
''')
with Conf('skip', desc='''
Task configuration for task :ref:`task-run-modes.skip`.

For a full description of skip run mode see
:ref:`task-run-modes.skip`.

.. versionadded:: 8.4.0
'''):
Conf(
'outputs',
VDR.V_STRING_LIST,
desc='''
Outputs to be emitted by a task in skip mode.

* By default, all required outputs will be generated
plus succeeded if success is optional.
* If skip-mode outputs is specified and does not
include either succeeded or failed then succeeded
will be produced.
* The outputs submitted and started are always
produced and do not need to be defined in outputs.

.. versionadded:: 8.4.0
'''
)
Conf(
'disable task event handlers',
VDR.V_BOOLEAN,
default=True,
desc='''
Task event handlers are turned off by default for
skip mode tasks. Changing this setting to ``False``
will re-enable task event handlers.

.. versionadded:: 8.4.0
'''
)
with Conf('simulation', desc='''
Task configuration for workflow *simulation* and *dummy* run
modes.
Expand Down
10 changes: 6 additions & 4 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@
from cylc.flow.log_level import log_level_to_verbosity
from cylc.flow.network.schema import WorkflowStopMode
from cylc.flow.parsec.exceptions import ParsecError
from cylc.flow.run_modes import RunMode
from cylc.flow.task_id import TaskID
from cylc.flow.task_state import TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED
from cylc.flow.workflow_status import RunMode, StopMode
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED)
from cylc.flow.workflow_status import StopMode

from metomi.isodatetime.parsers import TimePointParser

Expand Down Expand Up @@ -247,7 +249,7 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]):
"""Poll pollable tasks or a task or family if options are provided."""
validate.is_tasks(tasks)
yield
if schd.get_run_mode() == RunMode.SIMULATION:
if schd.get_run_mode() == RunMode.SIMULATION.value:
yield 0
itasks, _, bad_items = schd.pool.filter_task_proxies(tasks)
schd.task_job_mgr.poll_task_jobs(schd.workflow, itasks)
Expand All @@ -260,7 +262,7 @@ async def kill_tasks(schd: 'Scheduler', tasks: Iterable[str]):
validate.is_tasks(tasks)
yield
itasks, _, bad_items = schd.pool.filter_task_proxies(tasks)
if schd.get_run_mode() == RunMode.SIMULATION:
if schd.get_run_mode() == RunMode.SIMULATION.value:
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
itask.state_reset(TASK_STATUS_FAILED)
Expand Down
19 changes: 8 additions & 11 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
)
from cylc.flow.print_tree import print_tree
from cylc.flow.task_qualifiers import ALT_QUALIFIERS
from cylc.flow.simulation import configure_sim_modes
from cylc.flow.run_modes.nonlive import run_mode_validate_checks
from cylc.flow.subprocctx import SubFuncContext
from cylc.flow.task_events_mgr import (
EventData,
Expand All @@ -99,6 +99,7 @@
get_trigger_completion_variable_maps,
trigger_to_completion_variable,
)
from cylc.flow.run_modes import RunMode
from cylc.flow.task_trigger import TaskTrigger, Dependency
from cylc.flow.taskdef import TaskDef
from cylc.flow.unicode_rules import (
Expand All @@ -114,7 +115,6 @@
WorkflowFiles,
check_deprecation,
)
from cylc.flow.workflow_status import RunMode
from cylc.flow.xtrigger_mgr import XtriggerCollator

if TYPE_CHECKING:
Expand Down Expand Up @@ -513,10 +513,6 @@ def __init__(

self.process_runahead_limit()

run_mode = self.run_mode()
if run_mode in {RunMode.SIMULATION, RunMode.DUMMY}:
configure_sim_modes(self.taskdefs.values(), run_mode)

self.configure_workflow_state_polling_tasks()

self._check_task_event_handlers()
Expand Down Expand Up @@ -567,6 +563,8 @@ def __init__(

self.mem_log("config.py: end init config")

run_mode_validate_checks(self.taskdefs)

@staticmethod
def _warn_if_queues_have_implicit_tasks(
config, taskdefs, max_warning_lines
Expand Down Expand Up @@ -1740,10 +1738,6 @@ def process_config_env(self):
]
)

def run_mode(self) -> str:
"""Return the run mode."""
return RunMode.get(self.options)

def _check_task_event_handlers(self):
"""Check custom event handler templates can be expanded.

Expand Down Expand Up @@ -2495,7 +2489,10 @@ def _get_taskdef(self, name: str) -> TaskDef:

# Get the taskdef object for generating the task proxy class
taskd = TaskDef(
name, rtcfg, self.run_mode(), self.start_point,
name,
rtcfg,
RunMode.get(self.options),
self.start_point,
self.initial_point)

# TODO - put all taskd.foo items in a single config dict
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ message PbRuntime {
optional string environment = 16;
optional string outputs = 17;
optional string completion = 18;
optional string run_mode = 19;
}


Expand Down
100 changes: 50 additions & 50 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

Loading
Loading