Skip to content

Commit

Permalink
Remove unnecessary resolver methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Sep 5, 2023
1 parent 647b2fd commit 1347df9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 134 deletions.
131 changes: 7 additions & 124 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from typing import (
Any,
Dict,
Iterable,
List,
NamedTuple,
Optional,
Expand Down Expand Up @@ -58,7 +57,6 @@
from graphql import ResolveInfo
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.scheduler import Scheduler
from cylc.flow.workflow_status import StopMode


class TaskMsg(NamedTuple):
Expand Down Expand Up @@ -678,8 +676,6 @@ async def mutator(
'response': (False, f'No matching workflow in {workflows}')}]
w_id = w_ids[0]
result = await self._mutation_mapper(command, kwargs, meta)
if result is None:
result = (True, 'Command queued')
return [{'id': w_id, 'response': result}]

def _log_command(self, command: str, user: str) -> None:
Expand All @@ -688,9 +684,9 @@ def _log_command(self, command: str, user: str) -> None:
if command == 'put_messages' and is_owner:
# Logging put_messages is overkill.
return
log_msg = f"[command] received: {command}"
log_msg = f"[command] issued: {command}"
if not is_owner:
log_msg += (f" (issued by {user})")
log_msg += (f" (by {user})")
LOG.info(log_msg)

async def _mutation_mapper(
Expand All @@ -705,8 +701,8 @@ async def _mutation_mapper(
method = getattr(self, command, None)
if method is not None:
return (
method(**kwargs),
"Command queued"
True,
method(**kwargs)
)

try:
Expand All @@ -715,12 +711,12 @@ async def _mutation_mapper(
raise ValueError(f"Command '{command}' not found")

return (
True,
self.schd.queue_command(
command,
{},
[],
kwargs
),
"Command queued"
)
)

def broadcast(
Expand Down Expand Up @@ -824,116 +820,3 @@ def set_graph_window_extent(self, n_edge_distance):
return (True, f'Maximum edge distance set to {n_edge_distance}')
else:
return (False, 'Edge distance cannot be negative')

def reset(
self,
tasks: Iterable[str],
outputs: Optional[Iterable[str]] = None,
prerequisites: Optional[Iterable[str]] = None,
flow: Optional[Iterable[str]] = None,
flow_wait: bool = False,
flow_descr: Optional[str] = None,
) -> Tuple[bool, str]:
"""Set task prequisites and outputs.
Args:
tasks: Identifiers or task globs.
outputs: Outputs to set complete.
prerequisites: Prerequisites to set satisfied.
flow: Flows that spawned tasks should belong to.
"""
return (
self.schd.queue_command(
"reset",
(tasks,),
{
"outputs": outputs,
"prerequisites": prerequisites,
"flow": flow,
"flow_wait": flow_wait,
"flow_descr": flow_descr,
}
),
"Command queued"
)

def stop(
self,
mode: Union[str, 'StopMode'],
cycle_point: Optional[str] = None,
clock_time: Optional[str] = None,
task: Optional[str] = None,
flow_num: Optional[int] = None,
) -> Tuple[bool, str]:
"""Stop the workflow or specific flow from spawning any further.
Args:
mode: Stop mode to set
cycle_point: Cycle point after which to stop.
clock_time: Wallclock time after which to stop.
task: Stop after this task succeeds.
flow_num: The flow to stop.
):
Returns:
outcome: True if command successfully queued.
message: Information about outcome.
"""
return (
self.schd.queue_command(
"stop",
(),
filter_none(
{
'mode': mode,
'cycle_point': cycle_point,
'clock_time': clock_time,
'task': task,
'flow_num': flow_num,
}
)
),
"Command queued"
)

def force_trigger_tasks(
self,
tasks: Iterable[str],
flow: Iterable[str],
flow_wait: bool,
flow_descr: Optional[str] = None,
):
"""Trigger submission of task jobs where possible.
Args:
tasks (list):
List of identifiers or task globs.
flow (list):
Flow ownership of triggered tasks.
flow_wait (bool):
Wait for flows before continuing
flow_descr (str):
Description of new flow.
Returns:
tuple: (outcome, message)
outcome (bool)
True if command successfully queued.
message (str)
Information about outcome.
"""
return (
self.schd.queue_command(
"force_trigger_tasks",
(tasks or [],),
{
"flow": flow,
"flow_wait": flow_wait,
"flow_descr": flow_descr
}
),
"Command queued"
)
return (True, 'Command queued')
32 changes: 24 additions & 8 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class Scheduler:
flow_mgr: FlowMgr

# queues
command_queue: 'Queue[Tuple[str, tuple, dict]]'
command_queue: 'Queue[Tuple[str, str, list, dict]]'
message_queue: 'Queue[TaskMsg]'
ext_trigger_queue: Queue

Expand Down Expand Up @@ -917,7 +917,7 @@ def queue_command(
f"{key}={value}" for key, value in kwargs.items()
)
sep = ', ' if kwargs_string and args_string else ''
uuid = uuid4()
uuid = str(uuid4())
LOG.info(
f"[command] queued {uuid}:\n"
f"{name}({args_string}{sep}{kwargs_string})"
Expand All @@ -930,7 +930,7 @@ def queue_command(
kwargs,
)
)
return str(uuid)
return uuid

async def process_command_queue(self) -> None:
"""Process queued commands."""
Expand All @@ -939,6 +939,10 @@ async def process_command_queue(self) -> None:
return
LOG.debug(f"Processing {qsize} queued command(s)")
while True:
uuid: str
name: str
args: list
kwargs: dict
try:
uuid, name, args, kwargs = self.command_queue.get(False)
except Empty:
Expand Down Expand Up @@ -1042,15 +1046,15 @@ def command_resume(self) -> None:
"""Resume paused workflow."""
self.resume_workflow()

def command_poll_tasks(self, tasks: List[str]) -> int:
def command_poll_tasks(self, tasks: Iterable[str]) -> int:
"""Poll pollable tasks or a task or family if options are provided."""
if self.config.run_mode('simulation'):
return 0
itasks, _, bad_items = self.pool.filter_task_proxies(tasks)
self.task_job_mgr.poll_task_jobs(self.workflow, itasks)
return len(bad_items)

def command_kill_tasks(self, tasks: List[str]) -> int:
def command_kill_tasks(self, tasks: Iterable[str]) -> int:
"""Kill all tasks or a task/family if options are provided."""
itasks, _, bad_items = self.pool.filter_task_proxies(tasks)
if self.config.run_mode('simulation'):
Expand Down Expand Up @@ -1090,7 +1094,7 @@ def command_set_verbosity(lvl: Union[int, str]) -> None:
raise CommandFailedError(exc)
cylc.flow.flags.verbosity = log_level_to_verbosity(lvl)

def command_remove_tasks(self, tasks) -> int:
def command_remove_tasks(self, tasks: Iterable[str]) -> int:
"""Remove tasks."""
return self.pool.remove_tasks(tasks)

Expand Down Expand Up @@ -2145,13 +2149,25 @@ def resume_workflow(self, quiet: bool = False) -> None:
self.workflow_db_mgr.put_workflow_paused(False)
self.update_data_store()

def command_force_trigger_tasks(self, tasks, flow, flow_wait, flow_descr):
def command_force_trigger_tasks(
self,
tasks: Iterable[str],
flow: List[str],
flow_wait: bool = False,
flow_descr: Optional[str] = None
):
"""Manual task trigger."""
return self.pool.force_trigger_tasks(
tasks, flow, flow_wait, flow_descr)

def command_reset(
self, tasks, outputs, prerequisites, flow, flow_wait, flow_descr
self,
tasks: List[str],
flow: List[str],
outputs: Optional[List[str]] = None,
prerequisites: Optional[List[str]] = None,
flow_wait: bool = False,
flow_descr: Optional[str] = None
):
"""Force spawn task successors.
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1581,8 +1581,8 @@ def spawn_task(
def reset(
self,
items: Iterable[str],
outputs: List[str],
prerequisites: List[str],
outputs: Optional[List[str]],
prerequisites: Optional[List[str]],
flow: List[str],
flow_wait: bool = False,
flow_descr: Optional[str] = None
Expand Down

0 comments on commit 1347df9

Please sign in to comment.