Skip to content

Commit

Permalink
Command UUID.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Sep 5, 2023
1 parent a16b74c commit 647b2fd
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 54 deletions.
92 changes: 52 additions & 40 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,14 +688,14 @@ def _log_command(self, command: str, user: str) -> None:
if command == 'put_messages' and is_owner:
# Logging put_messages is overkill.
return
log_msg = f"[command] {command}"
log_msg = f"[command] received: {command}"
if not is_owner:
log_msg += (f" (issued by {user})")
LOG.info(log_msg)

async def _mutation_mapper(
self, command: str, kwargs: Dict[str, Any], meta: Dict[str, Any]
) -> Optional[Tuple[bool, str]]:
) -> Tuple[bool, str]:
"""Map between GraphQL resolvers and internal command interface."""

self._log_command(
Expand All @@ -704,19 +704,24 @@ async def _mutation_mapper(
)
method = getattr(self, command, None)
if method is not None:
return method(**kwargs)
return (
method(**kwargs),
"Command queued"
)

try:
self.schd.get_command_method(command)
except AttributeError:
raise ValueError(f"Command '{command}' not found")

self.schd.queue_command(
command,
{},
kwargs
return (
self.schd.queue_command(
command,
{},
kwargs
),
"Command queued"
)
return None

def broadcast(
self,
Expand Down Expand Up @@ -837,18 +842,20 @@ def reset(
prerequisites: Prerequisites to set satisfied.
flow: Flows that spawned tasks should belong to.
"""
self.schd.queue_command(
"reset",
(tasks,),
{
"outputs": outputs,
"prerequisites": prerequisites,
"flow": flow,
"flow_wait": flow_wait,
"flow_descr": flow_descr,
},
return (
self.schd.queue_command(
"reset",
(tasks,),
{
"outputs": outputs,
"prerequisites": prerequisites,
"flow": flow,
"flow_wait": flow_wait,
"flow_descr": flow_descr,
}
),
"Command queued"
)
return (True, 'Command queued')

def stop(
self,
Expand All @@ -873,20 +880,22 @@ def stop(
message: Information about outcome.
"""
self.schd.queue_command(
"stop",
(),
filter_none(
{
'mode': mode,
'cycle_point': cycle_point,
'clock_time': clock_time,
'task': task,
'flow_num': flow_num,
}
)
return (
self.schd.queue_command(
"stop",
(),
filter_none(
{
'mode': mode,
'cycle_point': cycle_point,
'clock_time': clock_time,
'task': task,
'flow_num': flow_num,
}
)
),
"Command queued"
)
return (True, 'Command queued')

def force_trigger_tasks(
self,
Expand Down Expand Up @@ -915,13 +924,16 @@ def force_trigger_tasks(
Information about outcome.
"""
self.schd.queue_command(
"force_trigger_tasks",
(tasks or [],),
{
"flow": flow,
"flow_wait": flow_wait,
"flow_descr": flow_descr
}
return (
self.schd.queue_command(
"force_trigger_tasks",
(tasks or [],),
{
"flow": flow,
"flow_wait": flow_wait,
"flow_descr": flow_descr
}
),
"Command queued"
)
return (True, 'Command queued')
37 changes: 23 additions & 14 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,15 +907,30 @@ def queue_command(
name: str,
args: list,
kwargs: dict
) -> None:
"""Queue a command for action by the scheduler."""
) -> str:
"""Queue a command for action by the scheduler.
Return a unique ID for the command.
"""
args_string = ', '.join(str(a) for a in args)
kwargs_string = ', '.join(
f"{key}={value}" for key, value in kwargs.items()
)
sep = ', ' if kwargs_string and args_string else ''
uuid = uuid4()
LOG.info(
f"[command] queued {uuid}:\n"
f"{name}({args_string}{sep}{kwargs_string})"
)
self.command_queue.put(
(
uuid,
name,
args,
kwargs
kwargs,
)
)
return str(uuid)

async def process_command_queue(self) -> None:
"""Process queued commands."""
Expand All @@ -925,15 +940,10 @@ async def process_command_queue(self) -> None:
LOG.debug(f"Processing {qsize} queued command(s)")
while True:
try:
name, args, kwargs = self.command_queue.get(False)
uuid, name, args, kwargs = self.command_queue.get(False)
except Empty:
break
args_string = ', '.join(str(a) for a in args)
kwargs_string = ', '.join(
f"{key}={value}" for key, value in kwargs.items()
)
sep = ', ' if kwargs_string and args_string else ''
cmdstr = f"{name}({args_string}{sep}{kwargs_string})"
msg = f"[command] actioned {uuid} ({name})"
try:
fcn = self.get_command_method(name)
n_warnings: Optional[int]
Expand All @@ -948,15 +958,14 @@ async def process_command_queue(self) -> None:
not isinstance(exc, CommandFailedError)
):
LOG.error(traceback.format_exc())
LOG.error(f"Command failed: {cmdstr}\n{exc}")
LOG.error(f"{msg} failed:\n{exc}")
else:
if n_warnings:
LOG.info(
f"Command actioned with {n_warnings} warning(s): "
f"{cmdstr}"
f"{msg} with {n_warnings} warnings"
)
else:
LOG.info(f"Command actioned: {cmdstr}")
LOG.info(f"{msg}")
self.is_updated = True
self.command_queue.task_done()

Expand Down

0 comments on commit 647b2fd

Please sign in to comment.