Skip to content

Commit

Permalink
launch_shell_job: Add the submit argument to run over the daemon (#…
Browse files Browse the repository at this point in the history
…18)

The `submit` argument takes a boolean and when set to `True` the shell
job is sent to the daemon instead of run by the current interpreter. If
the shell jobs are independent, this allows them to be run in parallel
by the daemon workers.
  • Loading branch information
sphuber authored Nov 6, 2022
1 parent f9699de commit 6f2435b
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 8 deletions.
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,44 @@ results, node = launch_shell_job(
print(results['stdout'].get_content())
```
Here you can use `aiida.orm.load_computer` to load the `Computer` instance from its label, PK or UUID.

### Running many shell jobs in parallel
By default the shell command ran by `launch_shell_job` is run blockingly; meaning that the Python interpreter is blocked from doing anything else until the shell command finishes.
This becomes inefficient if you need to run many shell commands.
If the shell commands are independent and can be run in parallel, it is possible to submit the jobs to AiiDA's daemon by setting `submit=True`:
```python
from aiida.engine.daemon.client import get_daemon_client
from aiida_shell import launch_shell_job

# Make sure the daemon is running
get_daemon_client().start_daemon()

nodes = []

for string in ['string_one', 'string_two']:
node = launch_shell_job(
'echo',
arguments=[string],
submit=True,
)
nodes.append(node)
print(f'Submitted {node}')
```
Instead of returning a tuple of the results and the node, `launch_shell_job` now only returns the `node`.
The reason is because the function returns immediately after submitting the job to the daemon at which point it isn't necessarily finished yet.
To check on the status of the submitted jobs, you can use the `verdi process list` command of the CLI that ships with AiiDA.
Or you can do it programmatically:
```python
import time

while True:
if all(node.is_terminated for node in nodes):
break
time.sleep(1)

for node in nodes:
if node.is_finished_ok:
print(f'{node} finished successfully')
else:
print(f'{node} failed')
```
21 changes: 14 additions & 7 deletions src/aiida_shell/engine/launchers/shell_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,25 @@
import typing as t

from aiida.common import exceptions
from aiida.engine import run_get_node
from aiida.orm import Code, Computer, Data, SinglefileData, load_code, load_computer
from aiida.plugins import CalculationFactory
from aiida.engine import launch
from aiida.orm import Code, Computer, Data, ProcessNode, SinglefileData, load_code, load_computer

from aiida_shell.calculations.shell import ShellJob

__all__ = ('launch_shell_job',)

LOGGER = logging.getLogger('aiida_shell')


def launch_shell_job(
def launch_shell_job( # pylint: disable=too-many-arguments,too-many-locals
command: str,
nodes: dict[str, Data] | None = None,
filenames: dict[str, str] | None = None,
arguments: list[str] | None = None,
outputs: list[str] | None = None,
metadata: dict[str, t.Any] | None = None
): # pylint: disable=too-many-arguments,too-many-locals
metadata: dict[str, t.Any] | None = None,
submit: bool = False,
) -> tuple[dict[str, Data], ProcessNode] | ProcessNode:
"""Launch a :class:`aiida_shell.ShellJob` job for the given command.
:param command: The shell command to run. Should be the relative command name, e.g., ``date``.
Expand All @@ -33,8 +35,10 @@ def launch_shell_job(
:param arguments: Optional list of command line arguments optionally containing placeholders for input nodes.
:param outputs: Optional list of relative filenames that should be captured as outputs.
:param metadata: Optional dictionary of metadata inputs to be passed to the ``ShellJob``.
:param submit: Boolean, if ``True`` will submit the job to the daemon and return the ``ProcessNode``.
:raises TypeError: If the value specified for ``metadata.options.computer`` is not a ``Computer``.
:raises ValueError: If the absolute path of the command on the computer could not be determined.
:returns: The tuple of results dictionary and ``ProcessNode``, or just the ``ProcessNode`` if ``submit=True``.
"""
computer = prepare_computer((metadata or {}).get('options', {}).pop('computer', None))

Expand Down Expand Up @@ -66,7 +70,10 @@ def launch_shell_job(
'metadata': metadata or {},
}

results, node = run_get_node(CalculationFactory('core.shell'), **inputs) # type: ignore[arg-type]
if submit:
return launch.submit(ShellJob, **inputs)

results, node = launch.run_get_node(ShellJob, **inputs)

return {label: node for label, node in results.items() if isinstance(node, SinglefileData)}, node

Expand Down
9 changes: 9 additions & 0 deletions tests/calculations/test_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,12 @@ def value_raises(self):

with pytest.raises(ValueError, match=message):
generate_calc_job('core.shell', {'code': generate_code(), 'nodes': nodes})


def test_submit_to_daemon(generate_code, submit_and_await):
"""Test submitting a ``ShellJob`` to the daemon."""
builder = generate_code('echo').get_builder()
builder.arguments = ['testing']
node = submit_and_await(builder)
assert node.is_finished_ok, node.process_state
assert node.outputs.stdout.get_content().strip() == 'testing'
9 changes: 8 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,18 @@ def factory(label='localhost', hostname='localhost', scheduler_type='core.direct
def generate_code(generate_computer):
"""Return a :class:`aiida.orm.Code` instance, either already existing or created."""

def factory(computer_label='localhost', label=None, entry_point_name='core.shell', executable='/bin/true'):
def factory(command='/bin/true', computer_label='localhost', label=None, entry_point_name='core.shell'):
"""Return a :class:`aiida.orm.Code` instance, either already existing or created."""
label = label or str(uuid.uuid4())
computer = generate_computer(computer_label)

with computer.get_transport() as transport:
status, stdout, stderr = transport.exec_command_wait(f'which {command}')
executable = stdout.strip()

if status != 0:
raise ValueError(f'failed to determine the absolute path of the command on the computer: {stderr}')

try:
filters = {'label': label, 'attributes.input_plugin_name': entry_point_name}
return Code.collection.get(**filters)
Expand Down
9 changes: 9 additions & 0 deletions tests/engine/launchers/test_shell_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,12 @@ def test_arguments_files():

assert node.is_finished_ok
assert results['stdout'].get_content().strip() == content.split('\n', maxsplit=1)[0]


def test_submit(submit_and_await):
"""Test the ``submit`` argument."""
node = launch_shell_job('date', submit=True)
submit_and_await(node)
assert node.is_finished_ok
assert isinstance(node.outputs.stdout, SinglefileData)
assert node.outputs.stdout.get_content()

0 comments on commit 6f2435b

Please sign in to comment.