Skip to content

Commit

Permalink
ShellJob: Add support for custom parsing
Browse files Browse the repository at this point in the history
The `ShellJob` comes with the default `ShellParser` which will
automatically attach the `stdout` and `stderr` files as `SinglefileData`
nodes, as well as any additional files that have been specified by the
user in the `outputs` argument.

Although this makes for an easy interface to use since the user is not
required to implement the parser, it is also rather restrictive. A
strong side of AiiDA is exactly to allow storing data in JSON form in
the relational database making it queryable. By storing data in
`SinglefileData` nodes exclusively, this functionality is not available.
In certain cases, it might be advantageous to parse the output produced
by a shell command into other specific `Data` nodes.

The input `parser` is added to the `ShellJob`. It takes a Python
function with the signature:

    def parser(self, dirpath: pathlib.Path) -> dict[str, Data]:
        """Parse files in the working directory of the shell job."""

The `dirpath` will be the path to the folder with the output files
generated by the shell command that have been retrieved by the engine.
If specified, this function is called by the `ShellParser` allowing it
to parse output files into any `Data` nodes which will be automatically
attached as outputs of the `ShellJob`.

To make sure that the specified parsing function can also be loaded by
the daemon, in the case that the `ShellJob` is submitted instead of ran
in the interpreter where the function is defined, it is stored in a
`PickledData` instance. This data plugin will pickle the function, which
allows the daemon worker to unpickle it and call the function.
  • Loading branch information
sphuber committed Mar 30, 2023
1 parent 3a40eef commit 32db384
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 1 deletion.
33 changes: 33 additions & 0 deletions src/aiida_shell/calculations/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""Implementation of :class:`aiida.engine.CalcJob` to make it easy to run an arbitrary shell command on a computer."""
from __future__ import annotations

import inspect
import pathlib
import typing as t

Expand All @@ -10,6 +11,8 @@
from aiida.engine import CalcJob, CalcJobProcessSpec
from aiida.orm import Data, Dict, List, SinglefileData, to_aiida_type

from aiida_shell.data import PickledData

__all__ = ('ShellJob',)


Expand All @@ -34,6 +37,9 @@ def define(cls, spec: CalcJobProcessSpec): # type: ignore[override]
'arguments', valid_type=List, required=False, serializer=to_aiida_type, validator=cls.validate_arguments
)
spec.input('outputs', valid_type=List, required=False, serializer=to_aiida_type, validator=cls.validate_outputs)
spec.input(
'parser', valid_type=PickledData, required=False, serializer=PickledData, validator=cls.validate_parser
)
spec.input(
'metadata.options.filename_stdin',
valid_type=str,
Expand Down Expand Up @@ -64,6 +70,11 @@ def define(cls, spec: CalcJobProcessSpec): # type: ignore[override]
'ERROR_OUTPUT_FILEPATHS_MISSING',
message='One or more output files defined in the `outputs` input were not retrieved: {missing_filepaths}.'
)
spec.exit_code(
310,
'ERROR_PARSER_HOOK_EXCEPTED',
message='Callable specified in the `parser` input excepted: {exception}.'
)
spec.exit_code(
400, 'ERROR_COMMAND_FAILED', message='The command exited with a non-zero status: {status} {stderr}.'
)
Expand All @@ -73,6 +84,28 @@ def define(cls, spec: CalcJobProcessSpec): # type: ignore[override]
message='The command exited with a zero status but the stderr was not empty.'
)

@classmethod
def validate_parser(cls, value: t.Any, _) -> str | None:
"""Validate the ``parser`` input."""
if not value:
return None

try:
deserialized_parser = value.load()
except ValueError as exception:
return f'The parser specified in the `parser` could not be loaded: {exception}.'

try:
signature = inspect.signature(deserialized_parser)
except TypeError as exception:
return f'The `parser` is not a callable function: {exception}'

parameters = list(signature.parameters.keys())

if any(required_parameter not in parameters for required_parameter in ('self', 'dirpath')):
correct_signature = '(self, dirpath: pathlib.Path) -> dict[str, Data]:'
return f'The `parser` has an invalid function signature, it should be: {correct_signature}'

@classmethod
def validate_nodes(cls, value: t.Mapping[str, Data], _) -> str | None:
"""Validate the ``nodes`` input."""
Expand Down
19 changes: 18 additions & 1 deletion src/aiida_shell/parsers/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import re

from aiida.engine import ExitCode
from aiida.orm import FolderData, SinglefileData
from aiida.orm import Data, FolderData, SinglefileData
from aiida.parsers.parser import Parser

from aiida_shell.calculations.shell import ShellJob
Expand All @@ -32,6 +32,12 @@ def parse(self, **kwargs):
missing_filepaths = self.parse_custom_outputs(dirpath)
exit_code = self.parse_default_outputs(dirpath)

if 'parser' in self.node.inputs:
try:
self.call_parser_hook(dirpath)
except Exception as exception: # pylint: disable=broad-except
return self.exit_codes.ERROR_PARSER_HOOK_EXCEPTED.format(exception=exception)

if missing_filepaths:
return self.exit_codes.ERROR_OUTPUT_FILEPATHS_MISSING.format(missing_filepaths=', '.join(missing_filepaths))

Expand Down Expand Up @@ -113,3 +119,14 @@ def parse_custom_outputs(self, dirpath: pathlib.Path) -> list[str]:
self.out(self.format_link_label(filepath.name), FolderData(tree=filepath))

return missing_filepaths

def call_parser_hook(self, dirpath: pathlib.Path):
"""Execute the ``parser`` custom parser hook that was passed as input to the ``ShellJob``."""
unpickled_parser = self.node.inputs.parser.load()
results = unpickled_parser(self, dirpath) or {}

if not isinstance(results, dict) or any(not isinstance(value, Data) for value in results.values()):
raise TypeError(f'{unpickled_parser} did not return a dictionary of `Data` nodes but: {results}')

for key, value in results.items():
self.out(key, value)
40 changes: 40 additions & 0 deletions tests/calculations/test_shell.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
# pylint: disable=redefined-outer-name
"""Tests for the :mod:`aiida_shell.calculations.shell` module."""
import io
import pathlib
Expand Down Expand Up @@ -242,3 +243,42 @@ def test_submit_to_daemon(generate_code, submit_and_await):
node = submit_and_await(builder)
assert node.is_finished_ok, node.process_state
assert node.outputs.stdout.get_content().strip() == 'testing'


def test_parser(generate_calc_job, generate_code):
"""Test the ``parser`` input for valid input."""

def parser(self, dirpath): # pylint: disable=unused-argument
pass

process = generate_calc_job('core.shell', inputs={'code': generate_code(), 'parser': parser}, return_process=True)
assert isinstance(process.inputs.parser, SinglefileData)


def test_parser_invalid_not_callable(generate_calc_job, generate_code):
"""Test the ``parser`` validation when input is not callable."""
with pytest.raises(ValueError, match=r'The `parser` is not a callable function: .* is not a callable object'):
generate_calc_job('core.shell', inputs={'code': generate_code(), 'parser': 'not-callable'})


def test_parser_invalid_signature(generate_calc_job, generate_code):
"""Test the ``parser`` validation when input is not callable."""
with pytest.raises(ValueError, match=r'The `parser` has an invalid function signature, it should be:.*'):
generate_calc_job('core.shell', inputs={'code': generate_code(), 'parser': lambda x: x})


def test_parser_over_daemon(generate_code, submit_and_await):
"""Test submitting a ``ShellJob`` with a custom parser over the daemon."""
value = 'testing'

def parser(self, dirpath): # pylint: disable=unused-argument
from aiida.orm import Str # pylint: disable=reimported
return {'string': Str((dirpath / 'stdout').read_text().strip())}

builder = generate_code('/bin/echo').get_builder()
builder.arguments = [value]
builder.parser = parser

node = submit_and_await(builder)
assert node.is_finished_ok, (node.exit_status, node.exit_message)
assert node.outputs.string == value

0 comments on commit 32db384

Please sign in to comment.