diff --git a/doc/changelog.d/3713.dependencies.md b/doc/changelog.d/3713.dependencies.md new file mode 100644 index 0000000000..a8821480b2 --- /dev/null +++ b/doc/changelog.d/3713.dependencies.md @@ -0,0 +1 @@ +feat: adding support to launch MAPDL on remote HPC clusters \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index b26916ebed..c8ba120098 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,11 @@ classifiers = [ ] [project.optional-dependencies] + +hpc = [ + "paramiko==3.5.0", +] + jupyter = [ "jupyterlab>=3", "ipywidgets", @@ -59,6 +64,7 @@ tests = [ "autopep8==2.3.2", "matplotlib==3.10.1", "pandas==2.2.3", + "paramiko==3.5.0", "pyansys-tools-report==0.8.2", "pyfakefs==5.7.4", "pyiges[full]==0.3.1", diff --git a/src/ansys/mapdl/core/hpc.py b/src/ansys/mapdl/core/hpc.py new file mode 100644 index 0000000000..9673acb25b --- /dev/null +++ b/src/ansys/mapdl/core/hpc.py @@ -0,0 +1,447 @@ +# Copyright (C) 2016 - 2025 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from functools import wraps +from typing import Any, Dict, Optional, Union + +import paramiko +from paramiko.client import SSHClient + +from ansys.mapdl.core import LOG +from ansys.mapdl.core.launcher import ( + MAPDL_DEFAULT_PORT, + check_kwargs, + check_mapdl_launch_on_hpc, + generate_mapdl_launch_command, + generate_sbatch_command, + generate_start_parameters, + get_cpus, + get_job_info, + kill_job, + launch_grpc, + pack_arguments, + pre_check_args, + set_license_switch, + set_MPI_additional_switches, + update_env_vars, +) +from ansys.mapdl.core.mapdl_grpc import MapdlGrpc + + +def launch_on_remote_hpc( + hostname: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + exec_file: Optional[str] = None, + run_location: Optional[str] = None, + jobname: str = "file", + *, + ssh_port: int = 22, + nproc: Optional[int] = None, + ram: Optional[Union[int, str]] = None, + override: bool = False, + loglevel: str = "ERROR", + additional_switches: str = "", + start_timeout: Optional[int] = None, + port: int = MAPDL_DEFAULT_PORT, + start_instance: Optional[bool] = None, + clear_on_connect: bool = True, + log_apdl: Optional[Union[bool, str]] = None, + remove_temp_dir_on_exit: bool = False, + license_type: Optional[bool] = None, + print_com: bool = False, + add_env_vars: Optional[Dict[str, str]] = None, + replace_env_vars: Optional[Dict[str, str]] = None, + launch_on_hpc: bool = True, + mapdl_output: Optional[str] = None, + **kwargs: Dict[str, Any], +) -> MapdlGrpc: + """Start MAPDL locally. + + Parameters + ---------- + exec_file : str, optional + The location of the MAPDL executable. Will use the cached + location when left at the default :class:`None` and no environment + variable is set. + + The executable path can be also set through the environment variable + :envvar:`PYMAPDL_MAPDL_EXEC`. For example: + + .. code:: console + + export PYMAPDL_MAPDL_EXEC=/ansys_inc/v211/ansys/bin/mapdl + + run_location : str, optional + MAPDL working directory. Defaults to a temporary working + directory. If directory doesn't exist, one is created. + + jobname : str, optional + MAPDL jobname. Defaults to ``'file'``. + + nproc : int, optional + Number of processors. Defaults to ``2``. + + ram : float, optional + Total size in megabytes of the workspace (memory) used for the initial + allocation. The default is :class:`None`, in which case 2 GB (2048 MB) is + used. To force a fixed size throughout the run, specify a negative + number. + + override : bool, optional + Attempts to delete the lock file at the ``run_location``. + Useful when a prior MAPDL session has exited prematurely and + the lock file has not been deleted. + + loglevel : str, optional + Sets which messages are printed to the console. ``'INFO'`` + prints out all ANSYS messages, ``'WARNING'`` prints only + messages containing ANSYS warnings, and ``'ERROR'`` logs only + error messages. + + additional_switches : str, optional + Additional switches for MAPDL, for example ``'aa_r'``, the + academic research license, would be added with: + + - ``additional_switches="-aa_r"`` + + Avoid adding switches like ``-i``, ``-o`` or ``-b`` as these are already + included to start up the MAPDL server. See the notes + section for additional details. + + start_timeout : float, optional + Maximum allowable time to connect to the MAPDL server. By default it is + 45 seconds, however, it is increased to 90 seconds if running on HPC. + + port : int + Port to launch MAPDL gRPC on. Final port will be the first + port available after (or including) this port. Defaults to + ``50052``. You can also provide this value through the environment variable + :envvar:`PYMAPDL_PORT`. For instance ``PYMAPDL_PORT=50053``. + However the argument (if specified) has precedence over the environment + variable. If this environment variable is empty, it is as it is not set. + + start_instance : bool, optional + When :class:`False`, connect to an existing MAPDL instance at ``ip`` + and ``port``, which default to ip ``'127.0.0.1'`` at port ``50052``. + Otherwise, launch a local instance of MAPDL. You can also + provide this value through the environment variable + :envvar:`PYMAPDL_START_INSTANCE`. + However the argument (if specified) has precedence over the environment + variable. If this environment variable is empty, it is as it is not set. + + clear_on_connect : bool, optional + Defaults to :class:`True`, giving you a fresh environment when + connecting to MAPDL. When if ``start_instance`` is specified + it defaults to :class:`False`. + + log_apdl : str, optional + Enables logging every APDL command to the local disk. This + can be used to "record" all the commands that are sent to + MAPDL via PyMAPDL so a script can be run within MAPDL without + PyMAPDL. This argument is the path of the output file (e.g. + ``log_apdl='pymapdl_log.txt'``). By default this is disabled. + + license_type : str, optional + Enable license type selection. You can input a string for its + license name (for example ``'meba'`` or ``'ansys'``) or its description + ("enterprise solver" or "enterprise" respectively). + You can also use legacy licenses (for example ``'aa_t_a'``) but it will + also raise a warning. If it is not used (:class:`None`), no specific + license will be requested, being up to the license server to provide a + specific license type. Default is :class:`None`. + + print_com : bool, optional + Print the command ``/COM`` arguments to the standard output. + Default :class:`False`. + + add_env_vars : dict, optional + The provided dictionary will be used to extend the MAPDL process + environment variables. If you want to control all of the environment + variables, use the argument ``replace_env_vars``. + Defaults to :class:`None`. + + replace_env_vars : dict, optional + The provided dictionary will be used to replace all the MAPDL process + environment variables. It replace the system environment variables + which otherwise would be used in the process. + To just add some environment variables to the MAPDL + process, use ``add_env_vars``. Defaults to :class:`None`. + + kwargs : dict, Optional + These keyword arguments are interface-specific or for + development purposes. For more information, see Notes. + + scheduler_options : :class:`str`, :class:`dict` + Use it to specify options to the scheduler run command. It can be a + string or a dictionary with arguments and its values (both as strings). + For more information visit :ref:`ref_hpc_slurm`. + + set_no_abort : :class:`bool` + *(Development use only)* + Sets MAPDL to not abort at the first error within /BATCH mode. + Defaults to :class:`True`. + + force_intel : :class:`bool` + *(Development use only)* + Forces the use of Intel message pass interface (MPI) in versions between + Ansys 2021R0 and 2022R2, where because of VPNs issues this MPI is + deactivated by default. + See :ref:`vpn_issues_troubleshooting` for more information. + Defaults to :class:`False`. + + Returns + ------- + Union[MapdlGrpc, MapdlConsole] + An instance of Mapdl. Type depends on the selected ``mode``. + """ + ######################################## + # Processing arguments + # -------------------- + # + # packing arguments + args = pack_arguments(locals()) # packs args and kwargs + + args["session_ssh"] = SshSession( + hostname=hostname, + username=username, + password=password, + port=ssh_port, + ) + + check_kwargs(args) # check if passing wrong arguments + + args["start_instance"] = True + args["version"] = None + + if args.get("ip", None): + raise ValueError("Argument IP is not allowed for launching MAPDL on HPC.") + + pre_check_args(args) + + get_cpus(args) + + ######################################## + # Local adjustments + # ----------------- + # + # Only when starting MAPDL (aka Local) + if not args.get("exec_file"): + raise ValueError("The 'exec_file' argument must be provided.") + + args["additional_switches"] = set_license_switch( + args["license_type"], args["additional_switches"] + ) + + env_vars: Dict[str, str] = update_env_vars( + args["add_env_vars"], args["replace_env_vars"] + ) + + get_run_location_hpc(args) + + # Check for a valid connection mode + args.setdefault("mode", "grpc") + if args["mode"] != "grpc": + raise ValueError( + "Only gRPC mode is allowed for launching MAPDL on an SLURM HPC." + ) + + ######################################## + # Context specific launching adjustments + # -------------------------------------- + # + # Set compatible MPI + args["additional_switches"] = set_MPI_additional_switches( + args["additional_switches"], + force_intel=args["force_intel"], + version=args["version"], + ) + + LOG.debug(f"Using additional switches {args['additional_switches']}.") + + if args["launch_on_hpc"]: + env_vars.setdefault("ANS_MULTIPLE_NODES", "1") + env_vars.setdefault("HYDRA_BOOTSTRAP", "slurm") + env_vars.setdefault("I_MPI_SHM_LMT", "shm") # ubuntu + + start_parm = generate_start_parameters(args) + + ######################################## + # Launch MAPDL with gRPC + # ---------------------- + # + cmd = generate_mapdl_launch_command( + exec_file=args["exec_file"], + jobname=args["jobname"], + nproc=args["nproc"], + ram=args["ram"], + port=args["port"], + additional_switches=args["additional_switches"], + launch_on_hpc=args["launch_on_hpc"], + ) + + cmd = generate_sbatch_command(cmd, scheduler_options=args.get("scheduler_options")) + + try: + # + process = launch_grpc( + cmd=cmd, + run_location=args["run_location"], + env_vars=env_vars, + launch_on_hpc=args.get("launch_on_hpc"), + mapdl_output=args.get("mapdl_output"), + ssh_session=args["session_ssh"], + ) + + start_parm["jobid"] = check_mapdl_launch_on_hpc(process, start_parm) + get_job_info( + start_parm=start_parm, + timeout=args["start_timeout"], + ssh_session=args["session_ssh"], + ) + + except Exception as exception: + LOG.error("An error occurred when launching MAPDL.") + + jobid: int = start_parm.get("jobid", "Not found") + + if ( + args["launch_on_hpc"] + and start_parm.get("finish_job_on_exit", True) + and jobid not in ["Not found", None] + ): + LOG.debug(f"Killing HPC job with id: {jobid}") + kill_job(jobid, ssh_session=args["session_ssh"]) + + raise exception + + ######################################## + # Connect to MAPDL using gRPC + # --------------------------- + # + try: + mapdl = MapdlGrpc( + cleanup_on_exit=False, + loglevel=args["loglevel"], + set_no_abort=args["set_no_abort"], + remove_temp_dir_on_exit=False, + log_apdl=args["log_apdl"], + # process=process, + use_vtk=args["use_vtk"], + **start_parm, + ) + mapdl._ssh_session = args["session_ssh"] + + except Exception as exception: + LOG.error("An error occurred when connecting to MAPDL.") + raise exception + + return mapdl + + +class SshSession: + + def __init__( + self, + hostname: str, + username: str, + password: str, + port: int = 22, + allow_missing_host_key: bool = False, + ): + self.username = username + self.hostname = hostname + self.password = password + self.port = port + self.allow_missing_host_key = allow_missing_host_key + self._connected = False + + def __enter__(self): + self.session = SSHClient() + if self.allow_missing_host_key: + self.session.set_missing_host_key_policy( + paramiko.WarningPolicy() + ) # nosec B507 + else: + self.session.set_missing_host_key_policy(paramiko.RejectPolicy()) + + self.session.connect( + hostname=self.hostname, + username=self.username, + password=self.password, + port=self.port, + ) + self._connected = True + return self + + def __exit__(self, *args) -> None: + self.session.close() + self._connected = False + + @wraps(SSHClient.exec_command) + def exec_command(self, *args, **kwargs): + if not self._connected: + raise Exception("ssh session is not connected") + stdin, stdout, stderr = self.session.exec_command(*args, **kwargs) # nosec B601 + output = stdout.read().decode("utf-8") + error = stderr.read().decode("utf-8") + return stdin, output, error + + def run(self, cmd, environment=None): + if not self._connected: + raise Exception("ssh session is not connected") + if isinstance(cmd, list): + cmd = " ".join(cmd) + + LOG.debug(cmd) + _, stdout, stderr = self.exec_command( + command=cmd, environment=environment + ) # nosec B78 + + if stderr: + raise Exception(f"ERROR: {stderr}") + + return stdout, stderr + + def submit(self, cmd, cwd, environment): + try: + if cwd: + self.run(f"mkdir -p {cwd}") + cmd = f"cd {cwd};{cmd}" + + return self.run(cmd, environment=environment) + + except Exception as e: + raise Exception(f"Unexpected error occurred: {e}") + finally: + self.session.close() + + +def get_run_location_hpc(args: Dict[str, Any]) -> None: + if args["run_location"] is None: + args["run_location"] = ( + f"/home/{args['session_ssh'].username}/pymapdl/simulations" + ) + + LOG.debug( + f"Using default temporary directory for MAPDL run location: {args['run_location']}" + ) diff --git a/src/ansys/mapdl/core/launcher.py b/src/ansys/mapdl/core/launcher.py index 10486ea972..2266402c6d 100644 --- a/src/ansys/mapdl/core/launcher.py +++ b/src/ansys/mapdl/core/launcher.py @@ -106,7 +106,7 @@ def version_from_path(*args, **kwargs): "cleanup_on_exit", "clear_on_connect", "exec_file", - "force_intel" "ip", + "force_intel", "ip", "jobname", "launch_on_hpc", @@ -339,6 +339,7 @@ def generate_mapdl_launch_command( ram: Optional[int] = None, port: int = MAPDL_DEFAULT_PORT, additional_switches: str = "", + launch_on_hpc=False, ) -> list[str]: """Generate the command line to start MAPDL in gRPC mode. @@ -393,7 +394,7 @@ def generate_mapdl_launch_command( grpc_sw = "-grpc" # Windows will spawn a new window, special treatment - if os.name == "nt": + if os.name == "nt" and not launch_on_hpc: exec_file = f"{exec_file}" # must start in batch mode on windows to hide APDL window tmp_inp = ".__tmp__.inp" @@ -439,6 +440,7 @@ def launch_grpc( env_vars: Optional[Dict[str, str]] = None, launch_on_hpc: bool = False, mapdl_output: Optional[str] = None, + ssh_session=None, ) -> subprocess.Popen: """Start MAPDL locally in gRPC mode. @@ -520,7 +522,8 @@ def launch_grpc( stdout=stdout, stderr=stderr, env_vars=env_vars, - ) + ssh_session=ssh_session, + ) # nosec B604 def check_mapdl_launch( @@ -1593,6 +1596,7 @@ def launch_mapdl( ram=args["ram"], port=args["port"], additional_switches=args["additional_switches"], + launch_on_hpc=args["launch_on_hpc"], ) if args["launch_on_hpc"]: @@ -2523,7 +2527,7 @@ def get_cpus(args: Dict[str, Any]): # Bypassing number of processors checks because VDI/VNC might have # different number of processors than the cluster compute nodes. # Also the CPUs are set in `get_slurm_options` - if args["running_on_hpc"]: + if args.get("running_on_hpc"): return # Setting number of processors @@ -2632,7 +2636,7 @@ def launch_mapdl_on_cluster( ) -def get_hostname_host_cluster(job_id: int, timeout: int = 30) -> str: +def get_hostname_host_cluster(job_id: int, timeout: int = 30, ssh_session=None) -> str: options = f"show jobid -dd {job_id}" LOG.debug(f"Executing the command 'scontrol {options}'") @@ -2640,9 +2644,12 @@ def get_hostname_host_cluster(job_id: int, timeout: int = 30) -> str: time_start = time.time() counter = 0 while not ready: - proc = send_scontrol(options) + proc = send_scontrol(options, ssh_session=ssh_session) - stdout = proc.stdout.read().decode() + if isinstance(proc, tuple): + stdout = proc[0] + else: + stdout = proc.stdout.read().decode() if "JobState=RUNNING" not in stdout: counter += 1 @@ -2788,9 +2795,16 @@ def check_mapdl_launch_on_hpc( MapdlDidNotStart The job submission failed. """ - stdout = process.stdout.read().decode() - if "Submitted batch job" not in stdout: + if isinstance(process, tuple): + stdout, stderr = process + elif isinstance(process, str): + stdout = process + stderr = "" + else: + stdout = process.stdout.read().decode() stderr = process.stderr.read().decode() + + if "Submitted batch job" not in stdout: raise MapdlDidNotStart( "PyMAPDL failed to submit the sbatch job:\n" f"stdout:\n{stdout}\nstderr:\n{stderr}" @@ -2802,7 +2816,10 @@ def check_mapdl_launch_on_hpc( def get_job_info( - start_parm: Dict[str, str], jobid: Optional[int] = None, timeout: int = 30 + start_parm: Dict[str, str], + jobid: Optional[int] = None, + timeout: int = 30, + ssh_session=None, ): """Get job info like BatchHost IP and hostname @@ -2824,21 +2841,23 @@ def get_job_info( jobid = jobid or start_parm["jobid"] - batch_host, batch_ip = get_hostname_host_cluster(jobid, timeout=timeout) + batch_host, batch_ip = get_hostname_host_cluster( + jobid, timeout=timeout, ssh_session=ssh_session + ) start_parm["ip"] = batch_ip start_parm["hostname"] = batch_host start_parm["jobid"] = jobid -def kill_job(jobid: int): +def kill_job(jobid: int, ssh_session=None): """Kill SLURM job""" - submitter(["scancel", str(jobid)]) + submitter(["scancel", str(jobid)], ssh_session=ssh_session) -def send_scontrol(args: str): +def send_scontrol(args: str, ssh_session=None): cmd = f"scontrol {args}".split(" ") - return submitter(cmd) + return submitter(cmd, ssh_session=ssh_session) def submitter( @@ -2851,6 +2870,7 @@ def submitter( stdout: subprocess.PIPE = None, stderr: subprocess.PIPE = None, env_vars: dict[str, str] = None, + ssh_session=None, ): if executable: @@ -2868,15 +2888,20 @@ def submitter( # cmd is controlled by the library with generate_mapdl_launch_command. # Excluding bandit check. - return subprocess.Popen( - args=cmd, - shell=shell, # sbatch does not work without shell. - cwd=cwd, - stdin=stdin, - stdout=stdout, - stderr=stderr, - env=env_vars, - ) + if ssh_session: + with ssh_session as ssh: + return ssh.submit(cmd, cwd, env_vars) + + else: + return subprocess.Popen( + args=cmd, + shell=shell, # sbatch does not work without shell. + cwd=cwd, + stdin=stdin, + stdout=stdout, + stderr=stderr, + env=env_vars, + ) def check_console_start_parameters(start_parm): diff --git a/src/ansys/mapdl/core/mapdl_grpc.py b/src/ansys/mapdl/core/mapdl_grpc.py index 3507869e78..9e89a127e3 100644 --- a/src/ansys/mapdl/core/mapdl_grpc.py +++ b/src/ansys/mapdl/core/mapdl_grpc.py @@ -3833,9 +3833,11 @@ def kill_job(self, jobid: int) -> None: jobid : int Job ID. """ + from ansys.mapdl.core.launcher import submitter + cmd = ["scancel", f"{jobid}"] # to ensure the job is stopped properly, let's issue the scancel twice. - subprocess.Popen(cmd) # nosec B603 + submitter(cmd, ssh_session=self._ssh_session) # nosec B603 def __del__(self): """In case the object is deleted""" diff --git a/tests/test_hpc.py b/tests/test_hpc.py new file mode 100644 index 0000000000..ed68590948 --- /dev/null +++ b/tests/test_hpc.py @@ -0,0 +1,301 @@ +# Copyright (C) 2016 - 2025 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from unittest.mock import patch + +import pytest + +from ansys.mapdl.core.hpc import SshSession, launch_on_remote_hpc +from ansys.mapdl.core.mapdl_grpc import MapdlGrpc +from conftest import TESTING_MINIMAL, has_dependency + +if not has_dependency("paramiko") or TESTING_MINIMAL: + pytest.skip(allow_module_level=True) + + +def test_launch_on_remote_hpc(): + import io + + with ( + patch("paramiko.client.SSHClient.connect") as mck_connect, + patch("paramiko.client.SSHClient.exec_command") as mck_exec_command, + patch("ansys.mapdl.core.launcher.get_hostname_host_cluster") as mock_ghhc, + patch("socket.gethostbyname") as mck_ghn, + patch("ansys.mapdl.core.mapdl_grpc.MapdlGrpc.__init__") as mock_mapdl, + ): + + mck_connect.return_value = None + mock_ghhc.return_value = "myhost", "123.45.67.89" + mck_ghn.return_value = "123.45.67.89" + mock_mapdl.return_value = None + str_0 = [io.BytesIO(i) for i in [b"stdint", b"Directory created", b""]] + str_1 = [io.BytesIO(i) for i in [b"stdint", b"Submitted batch job 1001", b""]] + mck_exec_command.side_effect = (str_0, str_1) + + hostname = "myhost" + username = "myuser" + password = "mypass" # nosec B105 + exec_file = "my/path/to/ansys" + port = 50054 + + mapdl = launch_on_remote_hpc( + hostname=hostname, + username=username, + password=password, + exec_file=exec_file, + port=port, + ) + + assert isinstance(mapdl, MapdlGrpc) + + mck_connect.assert_called() + kwargs = mck_connect.call_args_list[0].kwargs + assert kwargs["hostname"] == hostname + assert kwargs["username"] == username + assert kwargs["password"] == password + assert kwargs["port"] == 22 + + mck_exec_command.assert_called() + assert len(mck_exec_command.call_args_list) == 2 + kwargs = mck_exec_command.call_args_list[1].kwargs + assert "sbatch" in kwargs["command"] + assert exec_file in kwargs["command"] + assert str(port) in kwargs["command"] + + kwargs = mck_exec_command.call_args_list[0].kwargs + assert f"/home/{username}/pymapdl/simulations" in kwargs["command"] + + kwargs = mock_mapdl.call_args_list[0].kwargs + assert kwargs["ip"] == "123.45.67.89" + assert kwargs["hostname"] == hostname + assert kwargs["jobid"] == 1001 + assert kwargs["port"] == 50054 + + +@pytest.mark.parametrize( + "args, match", + [ + ( + {"exec_file": "path/to/exec", "ip": "123.45.67"}, + "Argument IP is not allowed for launching MAPDL on HPC", + ), + ( + {"exec_file": "path/to/exec", "mode": "console"}, + "Only gRPC mode is allowed for launching MAPDL on an SLURM HPC", + ), + ({}, "The 'exec_file' argument must be provided."), + ], +) +def test_non_valid_args(args, match): + with pytest.raises(ValueError, match=match): + launch_on_remote_hpc(**args) + + +def test_failed_to_launch_mapdl_no_jobid(): + with ( + patch("ansys.mapdl.core.hpc.launch_grpc") as mock_launch_grpc, + patch("ansys.mapdl.core.hpc.kill_job") as mock_kill_job, + ): + mock_launch_grpc.side_effect = RuntimeError("Failed to launch MAPDL on HPC") + + with pytest.raises(RuntimeError, match="Failed to launch MAPDL on HPC"): + launch_on_remote_hpc(exec_file="path/to/exec") + + mock_kill_job.assert_not_called() + + +def test_failed_to_launch_mapdl_jobid(): + import io + + with ( + patch("paramiko.client.SSHClient.connect") as mck_connect, + patch("paramiko.client.SSHClient.exec_command") as mck_exec_command, + patch("ansys.mapdl.core.launcher.get_hostname_host_cluster") as mock_ghhc, + patch("socket.gethostbyname") as mck_ghn, + patch("ansys.mapdl.core.mapdl_grpc.MapdlGrpc.__init__") as mock_mapdl, + patch("ansys.mapdl.core.hpc.get_job_info") as mock_get_job_info, + patch("ansys.mapdl.core.hpc.kill_job") as mock_kill_job, + ): + + mck_connect.return_value = None + mock_ghhc.return_value = "myhost", "123.45.67.89" + mck_ghn.return_value = "123.45.67.89" + mock_mapdl.return_value = None + str_0 = [io.BytesIO(i) for i in [b"stdint", b"Directory created", b""]] + str_1 = [io.BytesIO(i) for i in [b"stdint", b"Submitted batch job 1001", b""]] + mck_exec_command.side_effect = (str_0, str_1) + mock_get_job_info.side_effect = RuntimeError("Failed to launch MAPDL on HPC") + + hostname = "myhost" + username = "myuser" + password = "mypass" # nosec B105 + exec_file = "my/path/to/ansys" + port = 50054 + + with pytest.raises(RuntimeError, match="Failed to launch MAPDL on HPC"): + launch_on_remote_hpc( + hostname=hostname, + username=username, + password=password, + exec_file=exec_file, + port=port, + ) + + mock_kill_job.assert_called() + assert mock_kill_job.call_args_list[0].args[0] == 1001 + + +def test_launch_on_remote_hpc_failed(): + import io + + with ( + patch("paramiko.client.SSHClient.connect") as mck_connect, + patch("paramiko.client.SSHClient.exec_command") as mck_exec_command, + patch("ansys.mapdl.core.launcher.get_hostname_host_cluster") as mock_ghhc, + patch("socket.gethostbyname") as mck_ghn, + patch("ansys.mapdl.core.mapdl_grpc.MapdlGrpc.__init__") as mock_mapdl, + ): + + mck_connect.return_value = None + mock_ghhc.return_value = "myhost", "123.45.67.89" + mck_ghn.return_value = "123.45.67.89" + mock_mapdl.side_effect = RuntimeError("Failed to launch MAPDL on HPC") + str_0 = [io.BytesIO(i) for i in [b"stdint", b"Directory created", b""]] + str_1 = [io.BytesIO(i) for i in [b"stdint", b"Submitted batch job 1001", b""]] + mck_exec_command.side_effect = (str_0, str_1) + + hostname = "myhost" + username = "myuser" + password = "mypass" # nosec B105 + exec_file = "my/path/to/ansys" + port = 50054 + + with pytest.raises(RuntimeError, match="Failed to launch MAPDL on HPC"): + launch_on_remote_hpc( + hostname=hostname, + username=username, + password=password, + exec_file=exec_file, + port=port, + ) + + mck_connect.assert_called() + kwargs = mck_connect.call_args_list[0].kwargs + assert kwargs["hostname"] == hostname + assert kwargs["username"] == username + assert kwargs["password"] == password + assert kwargs["port"] == 22 + + mck_exec_command.assert_called() + assert len(mck_exec_command.call_args_list) == 2 + kwargs = mck_exec_command.call_args_list[1].kwargs + assert "sbatch" in kwargs["command"] + assert exec_file in kwargs["command"] + assert str(port) in kwargs["command"] + + kwargs = mck_exec_command.call_args_list[0].kwargs + assert f"/home/{username}/pymapdl/simulations" in kwargs["command"] + + kwargs = mock_mapdl.call_args_list[0].kwargs + assert kwargs["ip"] == "123.45.67.89" + assert kwargs["hostname"] == hostname + assert kwargs["jobid"] == 1001 + assert kwargs["port"] == 50054 + + +class Test_SshSession: + + @pytest.mark.parametrize("cmd", ["exec_command", "run"]) + def test_failed_not_connected_after_started(self, cmd): + with patch("paramiko.client.SSHClient.connect") as mck_connect: + mck_connect.return_value = None + with pytest.raises(Exception, match="ssh session is not connected"): + with SshSession("myhost", "myuser", "mypass") as ssh: + ssh._connected = False + cmd = getattr(ssh, cmd) + cmd("ls") + + def test_failed_exec_command(self): + import io + + str_0 = [io.BytesIO(i) for i in [b"", b"", b"We couldn't start MAPDL"]] + with ( + patch("paramiko.client.SSHClient.connect") as mck_connect, + patch("paramiko.client.SSHClient.exec_command") as mck_exec_command, + ): + mck_exec_command.return_value = str_0 + mck_connect.return_value = None + + with pytest.raises(Exception, match=f"ERROR: We couldn't start MAPDL"): + with SshSession("myhost", "myuser", "mypass") as ssh: + ssh.run(["cmd1", "cmd2"]) + + mck_exec_command.assert_called() + kwargs = mck_exec_command.call_args_list[0].kwargs + assert kwargs["command"] == "cmd1 cmd2" + + def test_submit(self): + import io + + str_0 = [io.BytesIO(i) for i in [b"", b"", b"We couldn't start MAPDL"]] + with ( + patch("paramiko.client.SSHClient.connect") as mck_connect, + patch("paramiko.client.SSHClient.exec_command") as mck_exec_command, + patch("ansys.mapdl.core.hpc.SshSession.run") as mck_run, + ): + mck_exec_command.return_value = str_0 + mck_connect.return_value = None + + cmd = "cmd1" + cwd = "mydir" + env = {"MYVAR": "myval"} + + with SshSession( + "myhost", "myuser", "mypass", allow_missing_host_key=True + ) as ssh: + ssh.submit(cmd, cwd=cwd, environment=env) + + mck_run.assert_called() + + assert mck_run.call_args_list[0].args[0] == f"mkdir -p {cwd}" + assert mck_run.call_args_list[1].args[0] == f"cd {cwd};{cmd}" + assert mck_run.call_args_list[1].kwargs["environment"] == env + + def test_submit_fail(self): + with ( + patch("paramiko.client.SSHClient.connect") as mck_connect, + patch("paramiko.client.SSHClient.close") as mck_close, + patch("ansys.mapdl.core.hpc.SshSession.run") as mck_run, + ): + cmd = "cmd1" + cwd = "mydir" + env = {"MYVAR": "myval"} + error = "Failed to run command" + + mck_run.side_effect = Exception(error) + mck_connect.return_value = None + + with pytest.raises(Exception, match=f"Unexpected error occurred: {error}"): + with SshSession("myhost", "myuser", "mypass") as ssh: + ssh.submit(cmd, cwd=cwd, environment=env) + + mck_close.assert_called()