diff --git a/gaps/batch.py b/gaps/batch.py index 7e4534cf..ec28a209 100644 --- a/gaps/batch.py +++ b/gaps/batch.py @@ -91,11 +91,16 @@ def _make_job_dirs(self): logger.debug( "Batch jobs list: %s", sorted(table.index.values.tolist()) ) + logger.debug("Using the following batch sets: %s", self._sets) logger.info("Preparing batch job directories...") # walk through current directory getting everything to copy for source_dir, _, filenames in os.walk(self._base_dir): logger.debug("Processing files in : %s", source_dir) + logger.debug( + " - Is dupe dir: %s", + any(job_tag in source_dir for job_tag in self._sets), + ) # don't make additional copies of job sub directories. if any(job_tag in source_dir for job_tag in self._sets): @@ -112,6 +117,7 @@ def _make_job_dirs(self): / tag / source_dir.relative_to(self._base_dir) ) + logger.debug("Creating dir: %s", destination_dir) destination_dir.mkdir(parents=True, exist_ok=True) for name in filenames: diff --git a/gaps/cli/config.py b/gaps/cli/config.py index 89ea0499..0d86d04c 100644 --- a/gaps/cli/config.py +++ b/gaps/cli/config.py @@ -244,29 +244,12 @@ def kickoff_jobs(self): keys_to_run, lists_to_run = self._keys_and_lists_to_run() jobs = sorted(product(*lists_to_run)) - num_jobs_submit = len(jobs) - self._warn_about_excessive_au_usage(num_jobs_submit) + self._warn_about_excessive_au_usage(len(jobs)) extra_exec_args = self._extract_extra_exec_args_for_command() - for node_index, values in enumerate(jobs): - tag = _tag(node_index, num_jobs_submit) - self.ctx.obj["NAME"] = job_name = f"{self.job_name}{tag}" - node_specific_config = deepcopy(self.config) - node_specific_config.pop("execution_control", None) - node_specific_config.update( - { - "tag": tag, - "command_name": self.command_name, - "pipeline_step": self.pipeline_step, - "config_file": self.config_file.as_posix(), - "project_dir": self.project_dir.as_posix(), - "job_name": job_name, - "out_dir": self.project_dir.as_posix(), - "out_fpath": self._suggested_stem(job_name).as_posix(), - "run_method": getattr( - self.command_config, "run_method", None - ), - } - ) + + for tag, values, exec_kwargs in self._with_tagged_context(jobs): + + node_specific_config = self._compile_node_config(tag) node_specific_config.update(extra_exec_args) for key, val in zip(keys_to_run, values): @@ -275,21 +258,63 @@ def kickoff_jobs(self): else: node_specific_config.update(dict(zip(key, val))) - cmd = "; ".join(_CMD_LIST).format( - run_func_module=self.command_config.runner.__module__, - run_func_name=self.command_config.runner.__name__, - node_specific_config=as_script_str(node_specific_config), - project_dir=self.project_dir.as_posix(), - logging_options=as_script_str(self.logging_options), - exclude_from_status=as_script_str(self.exclude_from_status), - pipeline_step=self.pipeline_step, - job_name=job_name, - ) - cmd = f"python -c {cmd!r}" - kickoff_job(self.ctx, cmd, deepcopy(self.exec_kwargs)) + cmd = self._compile_run_command(node_specific_config) + kickoff_job(self.ctx, cmd, exec_kwargs) return self + def _with_tagged_context(self, jobs): + """Iterate over jobs and populate context with job name.""" + num_jobs_submit = len(jobs) + + exec_kwargs = deepcopy(self.exec_kwargs) + num_test_nodes = exec_kwargs.pop("num_test_nodes", None) + if num_test_nodes is None: + num_test_nodes = float("inf") + + for node_index, values in enumerate(jobs): + if node_index >= num_test_nodes: + return + + tag = _tag(node_index, num_jobs_submit) + self.ctx.obj["NAME"] = f"{self.job_name}{tag}" + yield tag, values, exec_kwargs + + def _compile_node_config(self, tag): + """Compile initial node-specific config.""" + job_name = self.ctx.obj["NAME"] + node_specific_config = deepcopy(self.config) + node_specific_config.pop("execution_control", None) + node_specific_config.update( + { + "tag": tag, + "command_name": self.command_name, + "pipeline_step": self.pipeline_step, + "config_file": self.config_file.as_posix(), + "project_dir": self.project_dir.as_posix(), + "job_name": job_name, + "out_dir": self.project_dir.as_posix(), + "out_fpath": self._suggested_stem(job_name).as_posix(), + "run_method": getattr(self.command_config, "run_method", None), + } + ) + return node_specific_config + + def _compile_run_command(self, node_specific_config): + """Create run command from config and job name.""" + job_name = self.ctx.obj["NAME"] + cmd = "; ".join(_CMD_LIST).format( + run_func_module=self.command_config.runner.__module__, + run_func_name=self.command_config.runner.__name__, + node_specific_config=as_script_str(node_specific_config), + project_dir=self.project_dir.as_posix(), + logging_options=as_script_str(self.logging_options), + exclude_from_status=as_script_str(self.exclude_from_status), + pipeline_step=self.pipeline_step, + job_name=job_name, + ) + return f"python -c {cmd!r}" + def _suggested_stem(self, job_name_with_tag): """Determine suggested filepath with filename stem.""" if self._include_tag_in_out_fpath: diff --git a/gaps/cli/documentation.py b/gaps/cli/documentation.py index 11d0ee64..1fac2ccd 100644 --- a/gaps/cli/documentation.py +++ b/gaps/cli/documentation.py @@ -26,6 +26,7 @@ "conda_env": None, "module": None, "sh_script": None, + "num_test_nodes": None, } EXTRA_EXEC_PARAMS = { @@ -338,6 +339,14 @@ Extra shell script to run before command call. By default, ``None``, which does not run any scripts. + :num_test_nodes: (str, optional) + Number of nodes to submit before terminating the + submission process. This can be used to test a + new submission configuration without sumbitting + all nodes (i.e. only running a handful to ensure + the inputs are specified correctly and the + outputs look reasonable). By default, ``None``, + which submits all node jobs. Only the `option` key is required for local execution. For execution on the HPC, the `allocation` and `walltime` keys are also diff --git a/gaps/cli/execution.py b/gaps/cli/execution.py index ab67f94d..efb30ae8 100644 --- a/gaps/cli/execution.py +++ b/gaps/cli/execution.py @@ -5,6 +5,7 @@ import logging import datetime as dt from pathlib import Path +from copy import deepcopy from warnings import warn from inspect import signature @@ -55,6 +56,7 @@ def kickoff_job(ctx, cmd, exec_kwargs): If `exec_kwargs` is missing some arguments required by the respective `submit` function. """ + exec_kwargs = deepcopy(exec_kwargs) hardware_option = HardwareOption(exec_kwargs.pop("option", "local")) if hardware_option.manager is None: _kickoff_local_job(ctx, cmd) diff --git a/tests/cli/test_cli_config.py b/tests/cli/test_cli_config.py index 3d1cf25f..49b07173 100644 --- a/tests/cli/test_cli_config.py +++ b/tests/cli/test_cli_config.py @@ -485,9 +485,8 @@ def test_run_multiple_nodes( "max_workers": 1, }, "input1": 1, - "input2": 7, "input3": 8, - "_z_0": ["unsorted", "strings"], + "_z_0": ["unsorted", "strings"], "project_points": [0, 1, 2, 4], } @@ -524,13 +523,13 @@ def test_run_multiple_nodes_correct_zfill( TestCommand, "run", name="run", - split_keys={"project_points", "_z_0"}, + split_keys={"project_points", "input3"}, ) else: command_config = CLICommandFromFunction( _testing_function, name="run", - split_keys={"project_points", "_z_0"}, + split_keys={"project_points", "input3"}, ) config = { @@ -542,9 +541,7 @@ def test_run_multiple_nodes_correct_zfill( "max_workers": 1, }, "input1": 1, - "input2": 7, - "input3": 8, - "_z_0": ["unsorted", "strings"], + "input3": ["unsorted", "strings"], "project_points": [0, 1, 2, 4, 5, 6, 7, 8, 9], } @@ -561,6 +558,56 @@ def test_run_multiple_nodes_correct_zfill( assert any("j0" in job_name for job_name in job_names_cache) +@pytest.mark.parametrize("test_class", [False, True]) +@pytest.mark.parametrize( + "test_nodes", [(-1, 0), (0, 0), (1, 1), (10, 10), (20, 10)] +) +def test_run_multiple_nodes_num_test_nodes( + test_ctx, runnable_script, test_class, test_nodes, job_names_cache +): + """`run` function calls `_kickoff_hpc_job` for `num_test_nodes`.""" + + tmp_path = test_ctx.obj["TMP_PATH"] + num_test_nodes, expected_job_count = test_nodes + + if test_class: + command_config = CLICommandFromClass( + TestCommand, + "run", + name="run", + split_keys={"project_points", "input3"}, + ) + else: + command_config = CLICommandFromFunction( + _testing_function, + name="run", + split_keys={"project_points", "input3"}, + ) + + config = { + "execution_control": { + "option": "eagle", + "allocation": "test", + "walltime": 1, + "nodes": 5, + "max_workers": 1, + "num_test_nodes": num_test_nodes, + }, + "input1": 1, + "input3": ["unsorted", "strings"], + "project_points": [0, 1, 2, 4, 5, 6, 7, 8, 9], + } + + config_fp = tmp_path / "config.json" + with open(config_fp, "w") as config_file: + json.dump(config, config_file) + + assert len(job_names_cache) == 0 + from_config(config_fp, command_config) + assert len(job_names_cache) == expected_job_count + assert len(set(job_names_cache)) == expected_job_count + + @pytest.mark.parametrize("test_class", [False, True]) def test_run_no_split_keys( test_ctx, runnable_script, test_class, job_names_cache