diff --git a/.github/workflows/Test.yml b/.github/workflows/Test.yml index 269a66c5..fc1305e4 100644 --- a/.github/workflows/Test.yml +++ b/.github/workflows/Test.yml @@ -12,7 +12,7 @@ jobs: timeout-minutes: 30 strategy: matrix: - python-version: ['3.10', '3.12'] + python-version: ['3.12'] os: ['ubuntu-latest', 'windows-latest'] omc-version: ['stable'] diff --git a/OMPython/ModelicaSystem.py b/OMPython/ModelicaSystem.py index d0178cf2..6567f0e2 100644 --- a/OMPython/ModelicaSystem.py +++ b/OMPython/ModelicaSystem.py @@ -32,24 +32,20 @@ CONDITIONS OF OSMC-PL. """ -import csv +import ast from dataclasses import dataclass -import importlib import logging import numbers import numpy as np import os import pathlib -import platform -import re import subprocess -import tempfile import textwrap from typing import Optional, Any import warnings import xml.etree.ElementTree as ET -from OMPython.OMCSession import OMCSessionException, OMCSessionZMQ, OMCProcessLocal +from OMPython.OMCSession import OMCSessionException, OMCSessionZMQ, OMCProcessLocal, OMCPath, OMCSessionRunData # define logger using the current module name as ID logger = logging.getLogger(__name__) @@ -115,7 +111,14 @@ def __getitem__(self, index: int): class ModelicaSystemCmd: """A compiled model executable.""" - def __init__(self, runpath: pathlib.Path, modelname: str, timeout: Optional[float] = None) -> None: + def __init__( + self, + session: OMCSessionZMQ, + runpath: OMCPath, + modelname: str, + timeout: Optional[float] = None, + ) -> None: + self._session = session self._runpath = pathlib.Path(runpath).resolve().absolute() self._model_name = modelname self._timeout = timeout @@ -176,27 +179,12 @@ def args_set(self, args: dict[str, Optional[str | dict[str, str]]]) -> None: for arg in args: self.arg_set(key=arg, val=args[arg]) - def get_exe(self) -> pathlib.Path: - """Get the path to the compiled model executable.""" - if platform.system() == "Windows": - path_exe = self._runpath / f"{self._model_name}.exe" - else: - path_exe = self._runpath / self._model_name - - if not path_exe.exists(): - raise ModelicaSystemError(f"Application file path not found: {path_exe}") - - return path_exe - - def get_cmd(self) -> list: - """Get a list with the path to the executable and all command line args. - - This can later be used as an argument for subprocess.run(). + def get_cmd_args(self) -> list: + """ + Get a list with the command arguments for the model executable. """ - path_exe = self.get_exe() - - cmdl = [path_exe.as_posix()] + cmdl = [] for key in self._args: if self._args[key] is None: cmdl.append(f"-{key}") @@ -205,40 +193,51 @@ def get_cmd(self) -> list: return cmdl - def run(self) -> int: - """Run the requested simulation. - - Returns - ------- - Subprocess return code (0 on success). + def run_def(self) -> OMCSessionRunData: + """ + Define all needed data to run the model executable. The data is stored in an OMCSessionRunData object. """ + # ensure that a result filename is provided + result_file = self.arg_get('r') + if not isinstance(result_file, str): + result_file = (self._runpath / f"{self._model_name}.mat").as_posix() + + omc_run_data = OMCSessionRunData( + cmd_path=self._runpath.as_posix(), + cmd_model_name=self._model_name, + cmd_args=self.get_cmd_args(), + cmd_result_path=result_file, + cmd_timeout=self._timeout, + ) - cmdl: list = self.get_cmd() + omc_run_data_updated = self._session.omc_run_data_update(omc_run_data, session=self._session) - logger.debug("Run OM command %s in %s", repr(cmdl), self._runpath.as_posix()) + return omc_run_data_updated - if platform.system() == "Windows": - path_dll = "" + @staticmethod + def run_cmd(cmd_run_data: OMCSessionRunData) -> int: + """ + Run the command defined in cmd_run_data. This class is defined as static method such that there is no need to + keep instances of over classes around. + """ - # set the process environment from the generated .bat file in windows which should have all the dependencies - path_bat = self._runpath / f"{self._model_name}.bat" - if not path_bat.exists(): - raise ModelicaSystemError("Batch file (*.bat) does not exist " + str(path_bat)) + my_env = os.environ.copy() + if isinstance(cmd_run_data.cmd_library_path, str): + my_env["PATH"] = cmd_run_data.cmd_library_path + os.pathsep + my_env["PATH"] - with open(file=path_bat, mode='r', encoding='utf-8') as fh: - for line in fh: - match = re.match(r"^SET PATH=([^%]*)", line, re.IGNORECASE) - if match: - path_dll = match.group(1).strip(';') # Remove any trailing semicolons - my_env = os.environ.copy() - my_env["PATH"] = path_dll + os.pathsep + my_env["PATH"] - else: - # TODO: how to handle path to resources of external libraries for any system not Windows? - my_env = None + cmdl = cmd_run_data.get_cmd() + logger.debug("Run OM command %s in %s", repr(cmdl), cmd_run_data.cmd_path) try: - cmdres = subprocess.run(cmdl, capture_output=True, text=True, env=my_env, cwd=self._runpath, - timeout=self._timeout, check=True) + cmdres = subprocess.run( + cmdl, + capture_output=True, + text=True, + env=my_env, + cwd=cmd_run_data.cmd_cwd_local, + timeout=cmd_run_data.cmd_timeout, + check=True, + ) stdout = cmdres.stdout.strip() stderr = cmdres.stderr.strip() returncode = cmdres.returncode @@ -254,6 +253,16 @@ def run(self) -> int: return returncode + def run(self) -> int: + """Run the requested simulation. + + Returns + ------- + Subprocess return code (0 on success). + """ + cmd_run_data = self.run_def() + return self.run_cmd(cmd_run_data=cmd_run_data) + @staticmethod def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, str]]]: """ @@ -368,8 +377,6 @@ def __init__( self._linearized_states: list[str] = [] # linearization states list if omc_process is not None: - if not isinstance(omc_process, OMCProcessLocal): - raise ModelicaSystemError("Invalid (local) omc process definition provided!") self._getconn = OMCSessionZMQ(omc_process=omc_process) else: self._getconn = OMCSessionZMQ(omhome=omhome) @@ -383,13 +390,11 @@ def __init__( if not isinstance(lmodel, list): raise ModelicaSystemError(f"Invalid input type for lmodel: {type(lmodel)} - list expected!") - self._xml_file = None self._lmodel = lmodel # may be needed if model is derived from other model self._model_name = modelName # Model class name - self._file_name = pathlib.Path(fileName).resolve() if fileName is not None else None # Model file/package name - self._has_inputs = False # for model with input quantity + self._file_name: Optional[OMCPath] = self._getconn.omcpath(fileName).resolve() if fileName is not None else None # Model file/package name self._simulated = False # True if the model has already been simulated - self._result_file: Optional[pathlib.Path] = None # for storing result file + self._result_file: Optional[OMCPath] = None # for storing result file self._variable_filter = variableFilter if self._file_name is not None and not self._file_name.is_file(): # if file does not exist @@ -401,7 +406,7 @@ def __init__( self.setCommandLineOptions("--linearizationDumpLanguage=python") self.setCommandLineOptions("--generateSymbolicLinearization") - self._tempdir = self.setTempDirectory(customBuildDirectory) + self._tempdir: OMCPath = self.setTempDirectory(customBuildDirectory) if self._file_name is not None: self._loadLibrary(lmodel=self._lmodel) @@ -421,7 +426,7 @@ def setCommandLineOptions(self, commandLineOptions: Optional[str] = None): exp = f'setCommandLineOptions("{commandLineOptions}")' self.sendExpression(exp) - def _loadFile(self, fileName: pathlib.Path): + def _loadFile(self, fileName: OMCPath): # load file self.sendExpression(f'loadFile("{fileName.as_posix()}")') @@ -449,14 +454,14 @@ def _loadLibrary(self, lmodel: list): '1)["Modelica"]\n' '2)[("Modelica","3.2.3"), "PowerSystems"]\n') - def setTempDirectory(self, customBuildDirectory: Optional[str | os.PathLike | pathlib.Path] = None) -> pathlib.Path: + def setTempDirectory(self, customBuildDirectory: Optional[str | os.PathLike | pathlib.Path] = None) -> OMCPath: # create a unique temp directory for each session and build the model in that directory if customBuildDirectory is not None: if not os.path.exists(customBuildDirectory): raise IOError(f"{customBuildDirectory} does not exist") - tempdir = pathlib.Path(customBuildDirectory).absolute() + tempdir = self._getconn.omcpath(customBuildDirectory).absolute() else: - tempdir = pathlib.Path(tempfile.mkdtemp()).absolute() + tempdir = self._getconn.omcpath_tempdir().absolute() if not tempdir.is_dir(): raise IOError(f"{tempdir} could not be created") @@ -466,7 +471,7 @@ def setTempDirectory(self, customBuildDirectory: Optional[str | os.PathLike | pa return tempdir - def getWorkDirectory(self) -> pathlib.Path: + def getWorkDirectory(self) -> OMCPath: return self._tempdir def buildModel(self, variableFilter: Optional[str] = None): @@ -481,8 +486,8 @@ def buildModel(self, variableFilter: Optional[str] = None): buildModelResult = self._requestApi("buildModel", self._model_name, properties=varFilter) logger.debug("OM model build result: %s", buildModelResult) - self._xml_file = pathlib.Path(buildModelResult[0]).parent / buildModelResult[1] - self._xmlparse() + xml_file = self._getconn.omcpath(buildModelResult[0]).parent / buildModelResult[1] + self._xmlparse(xml_file=xml_file) def sendExpression(self, expr: str, parsed: bool = True): try: @@ -508,23 +513,34 @@ def _requestApi(self, apiName, entity=None, properties=None): # 2 return self.sendExpression(exp) - def _xmlparse(self): - if not self._xml_file.is_file(): - raise ModelicaSystemError(f"XML file not generated: {self._xml_file}") + def _xmlparse(self, xml_file: OMCPath): + if not xml_file.is_file(): + raise ModelicaSystemError(f"XML file not generated: {xml_file}") - tree = ET.parse(self._xml_file) + xml_content = xml_file.read_text() + tree = ET.ElementTree(ET.fromstring(xml_content)) rootCQ = tree.getroot() for attr in rootCQ.iter('DefaultExperiment'): for key in ("startTime", "stopTime", "stepSize", "tolerance", "solver", "outputFormat"): - self._simulate_options[key] = attr.get(key) + self._simulate_options[key] = str(attr.get(key)) for sv in rootCQ.iter('ScalarVariable'): - scalar = {} - for key in ("name", "description", "variability", "causality", "alias"): - scalar[key] = sv.get(key) - scalar["changeable"] = sv.get('isValueChangeable') - scalar["aliasvariable"] = sv.get('aliasVariable') + translations = { + "alias": "alias", + "aliasvariable": "aliasVariable", + "causality": "causality", + "changeable": "isValueChangeable", + "description": "description", + "name": "name", + "variability": "variability", + } + + scalar: dict[str, Any] = {} + for key_dst, key_src in translations.items(): + val = sv.get(key_src) + scalar[key_dst] = None if val is None else str(val) + ch = list(sv) for att in ch: scalar["start"] = att.get('start') @@ -532,6 +548,7 @@ def _xmlparse(self): scalar["max"] = att.get('max') scalar["unit"] = att.get('unit') + # save parameters in the corresponding class variables if scalar["variability"] == "parameter": if scalar["name"] in self._override_variables: self._params[scalar["name"]] = self._override_variables[scalar["name"]] @@ -916,7 +933,7 @@ def getOptimizationOptions(self, names: Optional[str | list[str]] = None) -> dic def simulate_cmd( self, - result_file: pathlib.Path, + result_file: OMCPath, simflags: Optional[str] = None, simargs: Optional[dict[str, Optional[str | dict[str, str]]]] = None, timeout: Optional[float] = None, @@ -943,7 +960,12 @@ def simulate_cmd( An instance if ModelicaSystemCmd to run the requested simulation. """ - om_cmd = ModelicaSystemCmd(runpath=self._tempdir, modelname=self._model_name, timeout=timeout) + om_cmd = ModelicaSystemCmd( + session=self._getconn, + runpath=self.getWorkDirectory(), + modelname=self._model_name, + timeout=timeout, + ) # always define the result file to use om_cmd.arg_set(key="r", val=result_file.as_posix()) @@ -955,33 +977,29 @@ def simulate_cmd( if simargs: om_cmd.args_set(args=simargs) - overrideFile = self._tempdir / f"{self._model_name}_override.txt" + overrideFile = self.getWorkDirectory() / f"{self._model_name}_override.txt" if self._override_variables or self._simulate_options_override: tmpdict = self._override_variables.copy() tmpdict.update(self._simulate_options_override) - # write to override file - with open(file=overrideFile, mode="w", encoding="utf-8") as fh: - for key, value in tmpdict.items(): - fh.write(f"{key}={value}\n") + override_content = "\n".join([f"{key}={value}" for key, value in tmpdict.items()]) + "\n" + overrideFile.write_text(override_content) om_cmd.arg_set(key="overrideFile", val=overrideFile.as_posix()) - if self._has_inputs: # if model has input quantities - # csvfile is based on name used for result file - csvfile = result_file.parent / f"{result_file.stem}.csv" - - for i in self._inputs: - val = self._inputs[i] + if self._inputs: # if model has input quantities + for key in self._inputs: + val = self._inputs[key] if val is None: val = [(float(self._simulate_options["startTime"]), 0.0), (float(self._simulate_options["stopTime"]), 0.0)] - self._inputs[i] = [(float(self._simulate_options["startTime"]), 0.0), - (float(self._simulate_options["stopTime"]), 0.0)] + self._inputs[key] = val if float(self._simulate_options["startTime"]) != val[0][0]: - raise ModelicaSystemError(f"startTime not matched for Input {i}!") + raise ModelicaSystemError(f"startTime not matched for Input {key}!") if float(self._simulate_options["stopTime"]) != val[-1][0]: - raise ModelicaSystemError(f"stopTime not matched for Input {i}!") + raise ModelicaSystemError(f"stopTime not matched for Input {key}!") + # csvfile is based on name used for result file + csvfile = result_file.parent / f"{result_file.stem}.csv" # write csv file and store the name csvfile = self._createCSVData(csvfile=csvfile) @@ -1016,11 +1034,14 @@ def simulate( if resultfile is None: # default result file generated by OM - self._result_file = self._tempdir / f"{self._model_name}_res.mat" + self._result_file = self.getWorkDirectory() / f"{self._model_name}_res.mat" elif os.path.exists(resultfile): - self._result_file = pathlib.Path(resultfile) + self._result_file = self._getconn.omcpath(resultfile) else: - self._result_file = self._tempdir / resultfile + self._result_file = self.getWorkDirectory() / resultfile + + if not isinstance(self._result_file, OMCPath): + raise ModelicaSystemError(f"Invalid result file path: {self._result_file} - must be an OMCPath object!") om_cmd = self.simulate_cmd( result_file=self._result_file, @@ -1039,7 +1060,7 @@ def simulate( # check for an empty (=> 0B) result file which indicates a crash of the model executable # see: https://github.com/OpenModelica/OMPython/issues/261 # https://github.com/OpenModelica/OpenModelica/issues/13829 - if self._result_file.stat().st_size == 0: + if self._result_file.size() == 0: self._result_file.unlink() raise ModelicaSystemError("Empty result file - this indicates a crash of the model executable!") @@ -1084,7 +1105,7 @@ def getSolutions(self, varList: Optional[str | list[str]] = None, resultfile: Op raise ModelicaSystemError("No result file found. Run simulate() first.") result_file = self._result_file else: - result_file = pathlib.Path(resultfile) + result_file = self._getconn.omcpath(resultfile) # check for result file exits if not result_file.is_file(): @@ -1117,166 +1138,270 @@ def getSolutions(self, varList: Optional[str | list[str]] = None, resultfile: Op return np_res @staticmethod - def _strip_space(name): - if isinstance(name, str): - return name.replace(" ", "") + def _prepare_input_data( + raw_input: str | list[str] | dict[str, Any], + ) -> dict[str, str]: + """ + Convert raw input to a structured dictionary {'key1': 'value1', 'key2': 'value2'}. + """ + + def prepare_str(str_in: str) -> dict[str, str]: + str_in = str_in.replace(" ", "") + key_val_list: list[str] = str_in.split("=") + if len(key_val_list) != 2: + raise ModelicaSystemError(f"Invalid 'key=value' pair: {str_in}") + + input_data_from_str: dict[str, str] = {key_val_list[0]: key_val_list[1]} + + return input_data_from_str - if isinstance(name, list): - return [x.replace(" ", "") for x in name] + input_data: dict[str, str] = {} - raise ModelicaSystemError("Unhandled input for strip_space()") + if isinstance(raw_input, str): + warnings.warn(message="The definition of values to set should use a dictionary, " + "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " + "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", + category=DeprecationWarning, + stacklevel=3) + return prepare_str(raw_input) - def _setMethodHelper(self, args1, args2, args3, args4=None): - """Helper function for setters. + if isinstance(raw_input, list): + warnings.warn(message="The definition of values to set should use a dictionary, " + "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " + "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", + category=DeprecationWarning, + stacklevel=3) - args1 - string or list of string given by user - args2 - dict() containing the values of different variables(eg:, parameter,continuous,simulation parameters) - args3 - function name (eg; continuous, parameter, simulation, linearization,optimization) - args4 - dict() which stores the new override variables list, + for item in raw_input: + input_data |= prepare_str(item) + + return input_data + + if isinstance(raw_input, dict): + for key, val in raw_input.items(): + # convert all values to strings to align it on one type: dict[str, str] + # spaces have to be removed as setInput() could take list of tuples as input and spaces would + str_val = str(val).replace(' ', '') + if ' ' in key or ' ' in str_val: + raise ModelicaSystemError(f"Spaces not allowed in key/value pairs: {repr(key)} = {repr(val)}!") + input_data[key] = str_val + + return input_data + + raise ModelicaSystemError(f"Invalid type of input: {type(raw_input)}") + + def _set_method_helper( + self, + inputdata: dict[str, str], + classdata: dict[str, Any], + datatype: str, + overwritedata: Optional[dict[str, str]] = None, + ) -> bool: """ - def apply_single(args1): - args1 = self._strip_space(args1) - value = args1.split("=") - if value[0] in args2: - if args3 == "parameter" and self.isParameterChangeable(value[0], value[1]): - args2[value[0]] = value[1] - if args4 is not None: - args4[value[0]] = value[1] - elif args3 != "parameter": - args2[value[0]] = value[1] - if args4 is not None: - args4[value[0]] = value[1] - - return True + Helper function for: + * setParameter() + * setContinuous() + * setSimulationOptions() + * setLinearizationOption() + * setOptimizationOption() + * setInputs() + Parameters + ---------- + inputdata + string or list of string given by user + classdata + dict() containing the values of different variables (eg: parameter, continuous, simulation parameters) + datatype + type identifier (eg; continuous, parameter, simulation, linearization, optimization) + overwritedata + dict() which stores the new override variables list, + """ + + inputdata_status: dict[str, bool] = {} + for key, val in inputdata.items(): + if key not in classdata: + raise ModelicaSystemError("Unhandled case in setMethodHelper.apply_single() - " + f"{repr(key)} is not a {repr(datatype)} variable") + + status = False + if datatype == "parameter" and not self.isParameterChangeable(key): + logger.debug(f"It is not possible to set the parameter {repr(key)}. It seems to be " + "structural, final, protected, evaluated or has a non-constant binding. " + "Use sendExpression(...) and rebuild the model using buildModel() API; example: " + "sendExpression(\"setParameterValue(" + f"{self._model_name}, {key}, {val if val is not None else ''}" + ")\") ") else: - raise ModelicaSystemError("Unhandled case in _setMethodHelper.apply_single() - " - f"{repr(value[0])} is not a {repr(args3)} variable") + classdata[key] = val + if overwritedata is not None: + overwritedata[key] = val + status = True - result = [] - if isinstance(args1, str): - result = [apply_single(args1)] + inputdata_status[key] = status - elif isinstance(args1, list): - result = [] - args1 = self._strip_space(args1) - for var in args1: - result.append(apply_single(var)) + return all(inputdata_status.values()) - return all(result) + def isParameterChangeable( + self, + name: str, + ) -> bool: + q = self.getQuantities(name) + if q[0]["changeable"] == "false": + return False + return True - def setContinuous(self, cvals): # 13 + def setContinuous( + self, + cvals: str | list[str] | dict[str, Any], + ) -> bool: """ This method is used to set continuous values. It can be called: with a sequence of continuous name and assigning corresponding values as arguments as show in the example below: usage - >>> setContinuous("Name=value") - >>> setContinuous(["Name1=value1","Name2=value2"]) + >>> setContinuous("Name=value") # depreciated + >>> setContinuous(["Name1=value1","Name2=value2"]) # depreciated + >>> setContinuous(cvals={"Name1": "value1", "Name2": "value2"}) """ - return self._setMethodHelper(cvals, self._continuous, "continuous", self._override_variables) + inputdata = self._prepare_input_data(raw_input=cvals) + + return self._set_method_helper( + inputdata=inputdata, + classdata=self._continuous, + datatype="continuous", + overwritedata=self._override_variables) - def setParameters(self, pvals): # 14 + def setParameters( + self, + pvals: str | list[str] | dict[str, Any], + ) -> bool: """ This method is used to set parameter values. It can be called: with a sequence of parameter name and assigning corresponding value as arguments as show in the example below: usage - >>> setParameters("Name=value") - >>> setParameters(["Name1=value1","Name2=value2"]) + >>> setParameters("Name=value") # depreciated + >>> setParameters(["Name1=value1","Name2=value2"]) # depreciated + >>> setParameters(pvals={"Name1": "value1", "Name2": "value2"}) """ - return self._setMethodHelper(pvals, self._params, "parameter", self._override_variables) + inputdata = self._prepare_input_data(raw_input=pvals) - def isParameterChangeable(self, name, value): - q = self.getQuantities(name) - if q[0]["changeable"] == "false": - logger.debug(f"setParameters() failed : It is not possible to set the following signal {repr(name)}. " - "It seems to be structural, final, protected or evaluated or has a non-constant binding, " - f"use sendExpression(\"setParameterValue({self._model_name}, {name}, {value})\") " - "and rebuild the model using buildModel() API") - return False - return True + return self._set_method_helper( + inputdata=inputdata, + classdata=self._params, + datatype="parameter", + overwritedata=self._override_variables) - def setSimulationOptions(self, simOptions): # 16 + def setSimulationOptions( + self, + simOptions: str | list[str] | dict[str, Any], + ) -> bool: """ This method is used to set simulation options. It can be called: with a sequence of simulation options name and assigning corresponding values as arguments as show in the example below: usage - >>> setSimulationOptions("Name=value") - >>> setSimulationOptions(["Name1=value1","Name2=value2"]) + >>> setSimulationOptions("Name=value") # depreciated + >>> setSimulationOptions(["Name1=value1","Name2=value2"]) # depreciated + >>> setSimulationOptions(simOptions={"Name1": "value1", "Name2": "value2"}) """ - return self._setMethodHelper(simOptions, self._simulate_options, "simulation-option", self._simulate_options_override) + inputdata = self._prepare_input_data(raw_input=simOptions) - def setLinearizationOptions(self, linearizationOptions): # 18 + return self._set_method_helper( + inputdata=inputdata, + classdata=self._simulate_options, + datatype="simulation-option", + overwritedata=self._simulate_options_override) + + def setLinearizationOptions( + self, + linearizationOptions: str | list[str] | dict[str, Any], + ) -> bool: """ This method is used to set linearization options. It can be called: with a sequence of linearization options name and assigning corresponding value as arguments as show in the example below usage - >>> setLinearizationOptions("Name=value") - >>> setLinearizationOptions(["Name1=value1","Name2=value2"]) + >>> setLinearizationOptions("Name=value") # depreciated + >>> setLinearizationOptions(["Name1=value1","Name2=value2"]) # depreciated + >>> setLinearizationOptions(linearizationOtions={"Name1": "value1", "Name2": "value2"}) """ - return self._setMethodHelper(linearizationOptions, self._linearization_options, "Linearization-option", None) + inputdata = self._prepare_input_data(raw_input=linearizationOptions) + + return self._set_method_helper( + inputdata=inputdata, + classdata=self._linearization_options, + datatype="Linearization-option", + overwritedata=None) - def setOptimizationOptions(self, optimizationOptions): # 17 + def setOptimizationOptions( + self, + optimizationOptions: str | list[str] | dict[str, Any], + ) -> bool: """ This method is used to set optimization options. It can be called: with a sequence of optimization options name and assigning corresponding values as arguments as show in the example below: usage - >>> setOptimizationOptions("Name=value") - >>> setOptimizationOptions(["Name1=value1","Name2=value2"]) + >>> setOptimizationOptions("Name=value") # depreciated + >>> setOptimizationOptions(["Name1=value1","Name2=value2"]) # depreciated + >>> setOptimizationOptions(optimizationOptions={"Name1": "value1", "Name2": "value2"}) """ - return self._setMethodHelper(optimizationOptions, self._optimization_options, "optimization-option", None) + inputdata = self._prepare_input_data(raw_input=optimizationOptions) + + return self._set_method_helper( + inputdata=inputdata, + classdata=self._optimization_options, + datatype="optimization-option", + overwritedata=None) - def setInputs(self, name): # 15 + def setInputs( + self, + name: str | list[str] | dict[str, Any], + ) -> bool: """ - This method is used to set input values. It can be called: - with a sequence of input name and assigning corresponding values as arguments as show in the example below: - usage - >>> setInputs("Name=value") - >>> setInputs(["Name1=value1","Name2=value2"]) + This method is used to set input values. It can be called with a sequence of input name and assigning + corresponding values as arguments as show in the example below. Compared to other set*() methods this is a + special case as value could be a list of tuples - these are converted to a string in _prepare_input_data() + and restored here via ast.literal_eval(). + + >>> setInputs("Name=value") # depreciated + >>> setInputs(["Name1=value1","Name2=value2"]) # depreciated + >>> setInputs(name={"Name1": "value1", "Name2": "value2"}) """ - if isinstance(name, str): - name = self._strip_space(name) - value = name.split("=") - if value[0] in self._inputs: - tmpvalue = eval(value[1]) - if isinstance(tmpvalue, (int, float)): - self._inputs[value[0]] = [(float(self._simulate_options["startTime"]), float(value[1])), - (float(self._simulate_options["stopTime"]), float(value[1]))] - elif isinstance(tmpvalue, list): - self._checkValidInputs(tmpvalue) - self._inputs[value[0]] = tmpvalue - self._has_inputs = True - else: - raise ModelicaSystemError(f"{value[0]} is not an input") - elif isinstance(name, list): - name = self._strip_space(name) - for var in name: - value = var.split("=") - if value[0] in self._inputs: - tmpvalue = eval(value[1]) - if isinstance(tmpvalue, (int, float)): - self._inputs[value[0]] = [(float(self._simulate_options["startTime"]), float(value[1])), - (float(self._simulate_options["stopTime"]), float(value[1]))] - elif isinstance(tmpvalue, list): - self._checkValidInputs(tmpvalue) - self._inputs[value[0]] = tmpvalue - self._has_inputs = True - else: - raise ModelicaSystemError(f"{value[0]} is not an input!") - - def _checkValidInputs(self, name): - if name != sorted(name, key=lambda x: x[0]): - raise ModelicaSystemError('Time value should be in increasing order') - for l in name: - if isinstance(l, tuple): - # if l[0] < float(self.simValuesList[0]): - if l[0] < float(self._simulate_options["startTime"]): - raise ModelicaSystemError('Input time value is less than simulation startTime') - if len(l) != 2: - raise ModelicaSystemError(f'Value for {l} is in incorrect format!') + inputdata = self._prepare_input_data(raw_input=name) + + for key, val in inputdata.items(): + if key not in self._inputs: + raise ModelicaSystemError(f"{key} is not an input") + + if not isinstance(val, str): + raise ModelicaSystemError(f"Invalid data in input for {repr(key)}: {repr(val)}") + + val_evaluated = ast.literal_eval(val) + + if isinstance(val_evaluated, (int, float)): + self._inputs[key] = [(float(self._simulate_options["startTime"]), float(val)), + (float(self._simulate_options["stopTime"]), float(val))] + elif isinstance(val_evaluated, list): + if not all([isinstance(item, tuple) for item in val_evaluated]): + raise ModelicaSystemError("Value for setInput() must be in tuple format; " + f"got {repr(val_evaluated)}") + if val_evaluated != sorted(val_evaluated, key=lambda x: x[0]): + raise ModelicaSystemError("Time value should be in increasing order; " + f"got {repr(val_evaluated)}") + + for item in val_evaluated: + if item[0] < float(self._simulate_options["startTime"]): + raise ModelicaSystemError(f"Time value in {repr(item)} of {repr(val_evaluated)} is less " + "than the simulation start time") + if len(item) != 2: + raise ModelicaSystemError(f"Value {repr(item)} of {repr(val_evaluated)} " + "is in incorrect format!") + + self._inputs[key] = val_evaluated else: - raise ModelicaSystemError('Error!!! Value must be in tuple format') + raise ModelicaSystemError(f"Data cannot be evaluated for {repr(key)}: {repr(val)}") - def _createCSVData(self, csvfile: Optional[pathlib.Path] = None) -> pathlib.Path: + return True + + def _createCSVData(self, csvfile: Optional[OMCPath] = None) -> OMCPath: """ Create a csv file with inputs for the simulation/optimization of the model. If csvfile is provided as argument, this file is used; else a generic file name is created. @@ -1322,11 +1447,12 @@ def _createCSVData(self, csvfile: Optional[pathlib.Path] = None) -> pathlib.Path csv_rows.append(row) if csvfile is None: - csvfile = self._tempdir / f'{self._model_name}.csv' + csvfile = self.getWorkDirectory() / f'{self._model_name}.csv' + + # basic definition of a CSV file using csv_rows as input + csv_content = "\n".join([",".join(map(str, row)) for row in csv_rows]) + "\n" - with open(file=csvfile, mode="w", encoding="utf-8", newline="") as fh: - writer = csv.writer(fh) - writer.writerows(csv_rows) + csvfile.write_text(csv_content) return csvfile @@ -1438,39 +1564,36 @@ def linearize(self, lintime: Optional[float] = None, simflags: Optional[str] = N compatibility, because linearize() used to return `[A, B, C, D]`. """ - # replacement for depreciated importlib.load_module() - def load_module_from_path(module_name, file_path): - spec = importlib.util.spec_from_file_location(module_name, file_path) - module_def = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module_def) - - return module_def - - if self._xml_file is None: + if len(self._quantities) == 0: + # if self._quantities has no content, the xml file was not parsed; see self._xmlparse() raise ModelicaSystemError( "Linearization cannot be performed as the model is not build, " "use ModelicaSystem() to build the model first" ) - om_cmd = ModelicaSystemCmd(runpath=self._tempdir, modelname=self._model_name, timeout=timeout) - - overrideLinearFile = self._tempdir / f'{self._model_name}_override_linear.txt' + om_cmd = ModelicaSystemCmd( + session=self._getconn, + runpath=self.getWorkDirectory(), + modelname=self._model_name, + timeout=timeout, + ) - with open(file=overrideLinearFile, mode="w", encoding="utf-8") as fh: - for key, value in self._override_variables.items(): - fh.write(f"{key}={value}\n") - for key, value in self._linearization_options.items(): - fh.write(f"{key}={value}\n") + override_content = ( + "\n".join([f"{key}={value}" for key, value in self._override_variables.items()]) + + "\n".join([f"{key}={value}" for key, value in self._linearization_options.items()]) + + "\n" + ) + override_file = self.getWorkDirectory() / f'{self._model_name}_override_linear.txt' + override_file.write_text(override_content) - om_cmd.arg_set(key="overrideFile", val=overrideLinearFile.as_posix()) + om_cmd.arg_set(key="overrideFile", val=override_file.as_posix()) - if self._has_inputs: - nameVal = self.getInputs() - for n in nameVal: - tupleList = nameVal.get(n) - if tupleList is not None: - for l in tupleList: - if l[0] < float(self._simulate_options["startTime"]): + if self._inputs: + for key in self._inputs: + data = self._inputs[key] + if data is not None: + for value in data: + if value[0] < float(self._simulate_options["startTime"]): raise ModelicaSystemError('Input time value is less than simulation startTime') csvfile = self._createCSVData() om_cmd.arg_set(key="csvInput", val=csvfile.as_posix()) @@ -1484,38 +1607,59 @@ def load_module_from_path(module_name, file_path): if simargs: om_cmd.args_set(args=simargs) + # the file create by the model executable which contains the matrix and linear inputs, outputs and states + linear_file = self.getWorkDirectory() / "linearized_model.py" + + linear_file.unlink(missing_ok=True) + returncode = om_cmd.run() if returncode != 0: raise ModelicaSystemError(f"Linearize failed with return code: {returncode}") self._simulated = True - # code to get the matrix and linear inputs, outputs and states - linearFile = self._tempdir / "linearized_model.py" - - # support older openmodelica versions before OpenModelica v1.16.2 where linearize() generates "linear_model_name.mo" file - if not linearFile.exists(): - linearFile = pathlib.Path(f'linear_{self._model_name}.py') + if not linear_file.is_file(): + raise ModelicaSystemError(f"Linearization failed: {linear_file} not found!") - if not linearFile.exists(): - raise ModelicaSystemError(f"Linearization failed: {linearFile} not found!") - - # this function is called from the generated python code linearized_model.py at runtime, - # to improve the performance by directly reading the matrices A, B, C and D from the julia code and avoid building the linearized modelica model + # extract data from the python file with the linearized model using the ast module - this allows to get the + # needed information without executing the created code + linear_data = {} + linear_file_content = linear_file.read_text() try: - # do not add the linearfile directory to path, as multiple execution of linearization will always use the first added path, instead execute the file - # https://github.com/OpenModelica/OMPython/issues/196 - module = load_module_from_path(module_name="linearized_model", file_path=linearFile.as_posix()) - - result = module.linearized_model() - (n, m, p, x0, u0, A, B, C, D, stateVars, inputVars, outputVars) = result - self._linearized_inputs = inputVars - self._linearized_outputs = outputVars - self._linearized_states = stateVars - return LinearizationResult(n, m, p, A, B, C, D, x0, u0, stateVars, - inputVars, outputVars) - except ModuleNotFoundError as ex: - raise ModelicaSystemError("No module named 'linearized_model'") from ex + # ignore possible typing errors below (mypy) - these are caught by the try .. except .. block + linear_file_ast = ast.parse(linear_file_content) + for body_part in linear_file_ast.body[0].body: # type: ignore + if not isinstance(body_part, ast.Assign): + continue + + target = body_part.targets[0].id # type: ignore + value = ast.literal_eval(body_part.value) + + linear_data[target] = value + except (AttributeError, IndexError, ValueError, SyntaxError, TypeError) as ex: + raise ModelicaSystemError(f"Error parsing linearization file {linear_file}!") from ex + + # remove the file + linear_file.unlink() + + self._linearized_inputs = linear_data["inputVars"] + self._linearized_outputs = linear_data["outputVars"] + self._linearized_states = linear_data["stateVars"] + + return LinearizationResult( + n=linear_data["n"], + m=linear_data["m"], + p=linear_data["p"], + x0=linear_data["x0"], + u0=linear_data["u0"], + A=linear_data["A"], + B=linear_data["B"], + C=linear_data["C"], + D=linear_data["D"], + stateVars=linear_data["stateVars"], + inputVars=linear_data["inputVars"], + outputVars=linear_data["outputVars"], + ) def getLinearInputs(self) -> list[str]: """Get names of input variables of the linearized model.""" diff --git a/OMPython/OMCSession.py b/OMPython/OMCSession.py index ac99dc05..4b5992e4 100644 --- a/OMPython/OMCSession.py +++ b/OMPython/OMCSession.py @@ -34,11 +34,14 @@ CONDITIONS OF OSMC-PL. """ +import abc +import dataclasses import io import json import logging import os import pathlib +import platform import psutil import pyparsing import re @@ -268,6 +271,210 @@ def getClassNames(self, className=None, recursive=False, qualified=False, sort=F return self._ask(question='getClassNames', opt=opt) +class OMCPathReal(pathlib.PurePosixPath): + """ + Implementation of a basic Path object which uses OMC as backend. The connection to OMC is provided via a + OMCSessionZMQ session object. + """ + + def __init__(self, *path, session: OMCSessionZMQ): + super().__init__(*path) + self._session = session + + def with_segments(self, *pathsegments): + """ + Create a new OMCPath object with the given path segments. + + The original definition of Path is overridden to ensure session is set. + """ + return type(self)(*pathsegments, session=self._session) + + def is_file(self) -> bool: + """ + Check if the path is a regular file. + """ + return self._session.sendExpression(f'regularFileExists("{self.as_posix()}")') + + def is_dir(self) -> bool: + """ + Check if the path is a directory. + """ + return self._session.sendExpression(f'directoryExists("{self.as_posix()}")') + + def read_text(self, encoding=None, errors=None) -> str: + """ + Read the content of the file represented by this path as text. + + The additional arguments `encoding` and `errors` are only defined for compatibility with Path() definitions. + """ + return self._session.sendExpression(f'readFile("{self.as_posix()}")') + + def write_text(self, data: str, encoding=None, errors=None, newline=None) -> bool: + """ + Write text data to the file represented by this path. + + The additional arguments `encoding`, `errors`, and `newline` are only defined for compatibility with Path() + definitions. + """ + if not isinstance(data, str): + raise TypeError('data must be str, not %s' % + data.__class__.__name__) + + return self._session.sendExpression(f'writeFile("{self.as_posix()}", "{data}", false)') + + def mkdir(self, mode=0o777, parents=False, exist_ok=False): + """ + Create a directory at the path represented by this OMCPath object. + + The additional arguments `mode`, and `parents` are only defined for compatibility with Path() definitions. + """ + if self.is_dir() and not exist_ok: + raise FileExistsError(f"Directory {self.as_posix()} already exists!") + + return self._session.sendExpression(f'mkdir("{self.as_posix()}")') + + def cwd(self): + """ + Returns the current working directory as an OMCPath object. + """ + cwd_str = self._session.sendExpression('cd()') + return OMCPath(cwd_str, session=self._session) + + def unlink(self, missing_ok: bool = False) -> bool: + """ + Unlink (delete) the file or directory represented by this path. + """ + res = self._session.sendExpression(f'deleteFile("{self.as_posix()}")') + if not res and not missing_ok: + raise FileNotFoundError(f"Cannot delete file {self.as_posix()} - it does not exists!") + return res + + def resolve(self, strict: bool = False) -> OMCPath: + """ + Resolve the path to an absolute path. This is done based on available OMC functions. + """ + if strict and not (self.is_file() or self.is_dir()): + raise OMCSessionException(f"Path {self.as_posix()} does not exist!") + + if self.is_file(): + omcpath = self._omc_resolve(self.parent.as_posix()) / self.name + elif self.is_dir(): + omcpath = self._omc_resolve(self.as_posix()) + else: + raise OMCSessionException(f"Path {self.as_posix()} is neither a file nor a directory!") + + return omcpath + + def _omc_resolve(self, pathstr: str) -> OMCPath: + """ + Internal function to resolve the path of the OMCPath object using OMC functions *WITHOUT* changing the cwd + within OMC. + """ + expression = ('omcpath_cwd := cd(); ' + f'omcpath_check := cd("{pathstr}"); ' # check requested pathstring + 'cd(omcpath_cwd)') + + try: + result = self._session.sendExpression(command=expression, parsed=False) + result_parts = result.split('\n') + pathstr_resolved = result_parts[1] + pathstr_resolved = pathstr_resolved[1:-1] # remove quotes + + omcpath_resolved = self._session.omcpath(pathstr_resolved) + except OMCSessionException as ex: + raise OMCSessionException(f"OMCPath resolve failed for {pathstr}!") from ex + + if not omcpath_resolved.is_file() and not omcpath_resolved.is_dir(): + raise OMCSessionException(f"OMCPath resolve failed for {pathstr} - path does not exist!") + + return omcpath_resolved + + def absolute(self) -> OMCPath: + """ + Resolve the path to an absolute path. This is done by calling resolve() as it is the best we can do + using OMC functions. + """ + return self.resolve(strict=True) + + def exists(self) -> bool: + """ + Semi replacement for pathlib.Path.exists(). + """ + return self.is_file() or self.is_dir() + + def size(self) -> int: + """ + Get the size of the file in bytes - this is a extra function and the best we can do using OMC. + """ + if not self.is_file(): + raise OMCSessionException(f"Path {self.as_posix()} is not a file!") + + res = self._session.sendExpression(f'stat("{self.as_posix()}")') + if res[0]: + return int(res[1]) + + raise OMCSessionException(f"Error reading file size for path {self.as_posix()}!") + + +if sys.version_info < (3, 12): + warnings.warn( + message="Python < 3.12 - using a limited compatibility class as OMCPath replacement.", + category=DeprecationWarning, + stacklevel=1, + ) + + class OMCPathCompatibility(pathlib.PosixPath): + + def size(self) -> int: + return self.stat().st_size + + OMCPath = OMCPathCompatibility # noqa: F811 + +else: + OMCPath = OMCPathReal + + +@dataclasses.dataclass +class OMCSessionRunData: + """ + Data class to store the command line data for running a model executable in the OMC environment. + + All data should be defined for the environment, where OMC is running (local, docker or WSL) + """ + # cmd_path is the expected working directory + cmd_path: str + cmd_model_name: str + # command line arguments for the model executable + cmd_args: list[str] + # result file with the simulation output + cmd_result_path: str + + # command prefix data (as list of strings); needed for docker or WSL + cmd_prefix: Optional[list[str]] = None + # cmd_model_executable is build out of cmd_path and cmd_model_name; this is mainly needed on Windows (add *.exe) + cmd_model_executable: Optional[str] = None + # additional library search path; this is mainly needed if OMCProcessLocal is run on Windows + cmd_library_path: Optional[str] = None + # command timeout + cmd_timeout: Optional[float] = 10.0 + + # working directory to be used on the *local* system + cmd_cwd_local: Optional[str] = None + + def get_cmd(self) -> list[str]: + """ + Get the command line to run the model executable in the environment defined by the OMCProcess definition. + """ + + if self.cmd_model_executable is None: + raise OMCSessionException("No model file defined for the model executable!") + + cmdl = [] if self.cmd_prefix is None else self.cmd_prefix + cmdl += [self.cmd_model_executable] + self.cmd_args + + return cmdl + + class OMCSessionZMQ: def __init__( @@ -322,6 +529,50 @@ def __del__(self): self.omc_zmq = None + def omcpath(self, *path) -> OMCPath: + """ + Create an OMCPath object based on the given path segments and the current OMC session. + """ + + # fallback solution for Python < 3.12; a modified pathlib.Path object is used as OMCPath replacement + if sys.version_info < (3, 12): + # noinspection PyArgumentList + return OMCPath(*path) + else: + return OMCPath(*path, session=self) + + def omcpath_tempdir(self) -> OMCPath: + """ + Get a temporary directory using OMC. + """ + names = [str(uuid.uuid4()) for _ in range(100)] + + tempdir_str = self.sendExpression("getTempDirectoryPath()") + tempdir_base = self.omcpath(tempdir_str) + tempdir: Optional[OMCPath] = None + for name in names: + # create a unique temporary directory name + tempdir = tempdir_base / name + + if tempdir.exists(): + continue + + tempdir.mkdir(parents=True, exist_ok=False) + break + + if tempdir is None or not tempdir.is_dir(): + raise OMCSessionException("Cannot create a temporary directory!") + + return tempdir + + def omc_run_data_update(self, omc_run_data: OMCSessionRunData, session: OMCSessionZMQ) -> OMCSessionRunData: + """ + Modify data based on the selected OMCProcess implementation. + + Needs to be implemented in the subclasses. + """ + return self.omc_process.omc_run_data_update(omc_run_data=omc_run_data, session=session) + def execute(self, command: str): warnings.warn("This function is depreciated and will be removed in future versions; " "please use sendExpression() instead", DeprecationWarning, stacklevel=2) @@ -329,8 +580,11 @@ def execute(self, command: str): return self.sendExpression(command, parsed=False) def sendExpression(self, command: str, parsed: bool = True) -> Any: + """ + Send an expression to the OMC server and return the result. + """ if self.omc_zmq is None: - raise OMCSessionException("No OMC running. Create a new instance of OMCSessionZMQ!") + raise OMCSessionException("No OMC running. Create a new instance of OMCProcess!") logger.debug("sendExpression(%r, parsed=%r)", command, parsed) @@ -425,7 +679,7 @@ def sendExpression(self, command: str, parsed: bool = True) -> Any: raise OMCSessionException("Cannot parse OMC result") from ex -class OMCProcess: +class OMCProcess(metaclass=abc.ABCMeta): def __init__( self, @@ -507,6 +761,15 @@ def _get_portfile_path(self) -> Optional[pathlib.Path]: return portfile_path + @abc.abstractmethod + def omc_run_data_update(self, omc_run_data: OMCSessionRunData, session: OMCSessionZMQ) -> OMCSessionRunData: + """ + Modify data based on the selected OMCProcess implementation. + + Needs to be implemented in the subclasses. + """ + raise NotImplementedError("This method must be implemented in subclasses!") + class OMCProcessPort(OMCProcess): @@ -517,6 +780,9 @@ def __init__( super().__init__() self._omc_port = omc_port + def omc_run_data_update(self, omc_run_data: OMCSessionRunData, session: OMCSessionZMQ) -> OMCSessionRunData: + raise OMCSessionException("OMCProcessPort does not support omc_run_data_update()!") + class OMCProcessLocal(OMCProcess): @@ -598,6 +864,41 @@ def _omc_port_get(self) -> str: return port + def omc_run_data_update(self, omc_run_data: OMCSessionRunData, session: OMCSessionZMQ) -> OMCSessionRunData: + omc_run_data_copy = dataclasses.replace(omc_run_data) + + cmd_path = session.omcpath(omc_run_data_copy.cmd_path) + + if platform.system() == "Windows": + path_dll = "" + + # set the process environment from the generated .bat file in windows which should have all the dependencies + path_bat = cmd_path / f"{omc_run_data.cmd_model_name}.bat" + if not path_bat.is_file(): + raise OMCSessionException("Batch file (*.bat) does not exist " + str(path_bat)) + + content = path_bat.read_text(encoding='utf-8') + for line in content.splitlines(): + match = re.match(r"^SET PATH=([^%]*)", line, re.IGNORECASE) + if match: + path_dll = match.group(1).strip(';') # Remove any trailing semicolons + my_env = os.environ.copy() + my_env["PATH"] = path_dll + os.pathsep + my_env["PATH"] + + omc_run_data_copy.cmd_library_path = path_dll + + cmd_model_executable = cmd_path / f"{omc_run_data_copy.cmd_model_name}.exe" + else: + cmd_model_executable = cmd_path / omc_run_data_copy.cmd_model_name + + if not cmd_model_executable.is_file(): + raise OMCSessionException(f"Application file path not found: {cmd_model_executable}") + omc_run_data_copy.cmd_model_executable = cmd_model_executable.as_posix() + + omc_run_data_copy.cmd_cwd_local = omc_run_data.cmd_path + + return omc_run_data_copy + class OMCProcessDockerHelper(OMCProcess): @@ -704,6 +1005,30 @@ def get_docker_container_id(self) -> str: return self._dockerCid + def omc_run_data_update(self, omc_run_data: OMCSessionRunData, session: OMCSessionZMQ) -> OMCSessionRunData: + """ + Update the OMCSessionRunData object based on the selected OMCProcess implementation. + """ + omc_run_data_copy = dataclasses.replace(omc_run_data) + + omc_run_data_copy.cmd_prefix = ( + [ + "docker", "exec", + "--user", str(self._getuid()), + "--workdir", omc_run_data_copy.cmd_path, + ] + + self._dockerExtraArgs + + [self._dockerCid] + ) + + cmd_path = session.omcpath(omc_run_data_copy.cmd_path) + cmd_model_executable = cmd_path / omc_run_data_copy.cmd_model_name + if not cmd_model_executable.is_file(): + raise OMCSessionException(f"Application file path not found: {cmd_model_executable}") + omc_run_data_copy.cmd_model_executable = cmd_model_executable.as_posix() + + return omc_run_data_copy + class OMCProcessDocker(OMCProcessDockerHelper): @@ -947,25 +1272,32 @@ def __init__( super().__init__(timeout=timeout) - # get wsl base command - self._wsl_cmd = ['wsl'] - if isinstance(wsl_distribution, str): - self._wsl_cmd += ['--distribution', wsl_distribution] - if isinstance(wsl_user, str): - self._wsl_cmd += ['--user', wsl_user] - self._wsl_cmd += ['--'] - # where to find OpenModelica self._wsl_omc = wsl_omc + # store WSL distribution and user + self._wsl_distribution = wsl_distribution + self._wsl_user = wsl_user # start up omc executable, which is waiting for the ZMQ connection self._omc_process = self._omc_process_get() # connect to the running omc instance using ZMQ self._omc_port = self._omc_port_get() + def _wsl_cmd(self, wsl_cwd: Optional[str] = None) -> list[str]: # get wsl base command + wsl_cmd = ['wsl'] + if isinstance(self._wsl_distribution, str): + wsl_cmd += ['--distribution', self._wsl_distribution] + if isinstance(self._wsl_user, str): + wsl_cmd += ['--user', self._wsl_user] + if isinstance(wsl_cwd, str): + wsl_cmd += ['--cd', wsl_cwd] + wsl_cmd += ['--'] + + return wsl_cmd + def _omc_process_get(self) -> subprocess.Popen: my_env = os.environ.copy() - omc_command = self._wsl_cmd + [ + omc_command = self._wsl_cmd() + [ self._wsl_omc, "--locale=C", "--interactive=zmq", @@ -988,7 +1320,7 @@ def _omc_port_get(self) -> str: omc_portfile_path = self._get_portfile_path() if omc_portfile_path is not None: output = subprocess.check_output( - args=self._wsl_cmd + ["cat", omc_portfile_path.as_posix()], + args=self._wsl_cmd() + ["cat", omc_portfile_path.as_posix()], stderr=subprocess.DEVNULL, ) port = output.decode().strip() @@ -1009,3 +1341,19 @@ def _omc_port_get(self) -> str: f"pid={self._omc_process.pid if isinstance(self._omc_process, subprocess.Popen) else '?'}") return port + + def omc_run_data_update(self, omc_run_data: OMCSessionRunData, session: OMCSessionZMQ) -> OMCSessionRunData: + """ + Update the OMCSessionRunData object based on the selected OMCProcess implementation. + """ + omc_run_data_copy = dataclasses.replace(omc_run_data) + + omc_run_data_copy.cmd_prefix = self._wsl_cmd(wsl_cwd=omc_run_data.cmd_path) + + cmd_path = session.omcpath(omc_run_data_copy.cmd_path) + cmd_model_executable = cmd_path / omc_run_data_copy.cmd_model_name + if not cmd_model_executable.is_file(): + raise OMCSessionException(f"Application file path not found: {cmd_model_executable}") + omc_run_data_copy.cmd_model_executable = cmd_model_executable.as_posix() + + return omc_run_data_copy diff --git a/tests/test_FMIExport.py b/tests/test_FMIExport.py index f47b87ae..788f7311 100644 --- a/tests/test_FMIExport.py +++ b/tests/test_FMIExport.py @@ -1,12 +1,14 @@ import OMPython import shutil import os +import pathlib def test_CauerLowPassAnalog(): mod = OMPython.ModelicaSystem(modelName="Modelica.Electrical.Analog.Examples.CauerLowPassAnalog", lmodel=["Modelica"]) - tmp = mod.getWorkDirectory() + # TODO [OMCPath]: need to work using OMCPath + tmp = pathlib.Path(mod.getWorkDirectory()) try: fmu = mod.convertMo2Fmu(fileNamePrefix="CauerLowPassAnalog") assert os.path.exists(fmu) @@ -16,7 +18,8 @@ def test_CauerLowPassAnalog(): def test_DrumBoiler(): mod = OMPython.ModelicaSystem(modelName="Modelica.Fluid.Examples.DrumBoiler.DrumBoiler", lmodel=["Modelica"]) - tmp = mod.getWorkDirectory() + # TODO [OMCPath]: need to work using OMCPath + tmp = pathlib.Path(mod.getWorkDirectory()) try: fmu = mod.convertMo2Fmu(fileNamePrefix="DrumBoiler") assert os.path.exists(fmu) diff --git a/tests/test_ModelicaSystem.py b/tests/test_ModelicaSystem.py index a7a4b472..9fadf403 100644 --- a/tests/test_ModelicaSystem.py +++ b/tests/test_ModelicaSystem.py @@ -2,20 +2,31 @@ import os import pathlib import pytest +import sys import tempfile import numpy as np +skip_on_windows = pytest.mark.skipif( + sys.platform.startswith("win"), + reason="OpenModelica Docker image is Linux-only; skipping on Windows.", +) + @pytest.fixture -def model_firstorder(tmp_path): - mod = tmp_path / "M.mo" - mod.write_text("""model M +def model_firstorder_content(): + return ("""model M Real x(start = 1, fixed = true); parameter Real a = -1; equation der(x) = x*a; end M; """) + + +@pytest.fixture +def model_firstorder(tmp_path, model_firstorder_content): + mod = tmp_path / "M.mo" + mod.write_text(model_firstorder_content) return mod @@ -35,8 +46,8 @@ def test_setParameters(): mod = OMPython.ModelicaSystem(model_path + "BouncingBall.mo", "BouncingBall") # method 1 - mod.setParameters("e=1.234") - mod.setParameters("g=321.0") + mod.setParameters(pvals={"e": 1.234}) + mod.setParameters(pvals={"g": 321.0}) assert mod.getParameters("e") == ["1.234"] assert mod.getParameters("g") == ["321.0"] assert mod.getParameters() == { @@ -47,7 +58,7 @@ def test_setParameters(): mod.getParameters("thisParameterDoesNotExist") # method 2 - mod.setParameters(["e=21.3", "g=0.12"]) + mod.setParameters(pvals={"e": 21.3, "g": 0.12}) assert mod.getParameters() == { "e": "21.3", "g": "0.12", @@ -64,8 +75,8 @@ def test_setSimulationOptions(): mod = OMPython.ModelicaSystem(fileName=model_path + "BouncingBall.mo", modelName="BouncingBall") # method 1 - mod.setSimulationOptions("stopTime=1.234") - mod.setSimulationOptions("tolerance=1.1e-08") + mod.setSimulationOptions(simOptions={"stopTime": 1.234}) + mod.setSimulationOptions(simOptions={"tolerance": 1.1e-08}) assert mod.getSimulationOptions("stopTime") == ["1.234"] assert mod.getSimulationOptions("tolerance") == ["1.1e-08"] assert mod.getSimulationOptions(["tolerance", "stopTime"]) == ["1.1e-08", "1.234"] @@ -77,7 +88,7 @@ def test_setSimulationOptions(): mod.getSimulationOptions("thisOptionDoesNotExist") # method 2 - mod.setSimulationOptions(["stopTime=2.1", "tolerance=1.2e-08"]) + mod.setSimulationOptions(simOptions={"stopTime": 2.1, "tolerance": "1.2e-08"}) d = mod.getSimulationOptions() assert d["stopTime"] == "2.1" assert d["tolerance"] == "1.2e-08" @@ -105,21 +116,45 @@ def test_customBuildDirectory(tmp_path, model_firstorder): tmpdir = tmp_path / "tmpdir1" tmpdir.mkdir() m = OMPython.ModelicaSystem(filePath, "M", customBuildDirectory=tmpdir) - assert m.getWorkDirectory().resolve() == tmpdir.resolve() + # TODO [OMCPath]: need to work using OMCPath + assert pathlib.Path(m.getWorkDirectory()).resolve() == tmpdir.resolve() result_file = tmpdir / "a.mat" assert not result_file.exists() m.simulate(resultfile="a.mat") assert result_file.is_file() +@skip_on_windows +def test_getSolutions_docker(model_firstorder_content): + omcp = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.25.0-minimal") + omc = OMPython.OMCSessionZMQ(omc_process=omcp) + + modelpath = omc.omcpath_tempdir() / 'M.mo' + modelpath.write_text(model_firstorder_content) + + file_path = pathlib.Path(modelpath) + mod = OMPython.ModelicaSystem( + fileName=file_path, + modelName="M", + omc_process=omc.omc_process, + ) + + _run_getSolutions(mod) + + def test_getSolutions(model_firstorder): filePath = model_firstorder.as_posix() mod = OMPython.ModelicaSystem(filePath, "M") + + _run_getSolutions(mod) + + +def _run_getSolutions(mod): x0 = 1 a = -1 tau = -1 / a stopTime = 5*tau - mod.setSimulationOptions([f"stopTime={stopTime}", "stepSize=0.1", "tolerance=1e-8"]) + mod.setSimulationOptions(simOptions={"stopTime": stopTime, "stepSize": 0.1, "tolerance": 1e-8}) mod.simulate() x = mod.getSolutions("x") @@ -298,7 +333,7 @@ def test_getters(tmp_path): x0 = 1.0 x_analytical = -b/a + (x0 + b/a) * np.exp(a * stopTime) dx_analytical = (x0 + b/a) * a * np.exp(a * stopTime) - mod.setSimulationOptions(f"stopTime={stopTime}") + mod.setSimulationOptions(simOptions={"stopTime": stopTime}) mod.simulate() # getOutputs after simulate() @@ -327,7 +362,7 @@ def test_getters(tmp_path): mod.getContinuous("a") # a is a parameter with pytest.raises(OMPython.ModelicaSystemError): - mod.setSimulationOptions("thisOptionDoesNotExist=3") + mod.setSimulationOptions(simOptions={"thisOptionDoesNotExist": 3}) def test_simulate_inputs(tmp_path): @@ -345,7 +380,7 @@ def test_simulate_inputs(tmp_path): """) mod = OMPython.ModelicaSystem(fileName=model_file.as_posix(), modelName="M_input") - mod.setSimulationOptions("stopTime=1.0") + mod.setSimulationOptions(simOptions={"stopTime": 1.0}) # integrate zero (no setInputs call) - it should default to None -> 0 assert mod.getInputs() == { @@ -357,20 +392,24 @@ def test_simulate_inputs(tmp_path): assert np.isclose(y[-1], 0.0) # integrate a constant - mod.setInputs("u1=2.5") + mod.setInputs(name={"u1": 2.5}) assert mod.getInputs() == { "u1": [ (0.0, 2.5), (1.0, 2.5), ], - "u2": None, + # u2 is set due to the call to simulate() above + "u2": [ + (0.0, 0.0), + (1.0, 0.0), + ], } mod.simulate() y = mod.getSolutions("y")[0] assert np.isclose(y[-1], 2.5) # now let's integrate the sum of two ramps - mod.setInputs("u1=[(0.0, 0.0), (0.5, 2), (1.0, 0)]") + mod.setInputs(name={"u1": [(0.0, 0.0), (0.5, 2), (1.0, 0)]}) assert mod.getInputs("u1") == [[ (0.0, 0.0), (0.5, 2.0), @@ -383,19 +422,17 @@ def test_simulate_inputs(tmp_path): # let's try some edge cases # unmatched startTime with pytest.raises(OMPython.ModelicaSystemError): - mod.setInputs("u1=[(-0.5, 0.0), (1.0, 1)]") + mod.setInputs(name={"u1": [(-0.5, 0.0), (1.0, 1)]}) mod.simulate() # unmatched stopTime with pytest.raises(OMPython.ModelicaSystemError): - mod.setInputs("u1=[(0.0, 0.0), (0.5, 1)]") + mod.setInputs(name={"u1": [(0.0, 0.0), (0.5, 1)]}) mod.simulate() - # Let's use both inputs, but each one with different number of of + # Let's use both inputs, but each one with different number of # samples. This has an effect when generating the csv file. - mod.setInputs([ - "u1=[(0.0, 0), (1.0, 1)]", - "u2=[(0.0, 0), (0.25, 0.5), (0.5, 1.0), (1.0, 0)]", - ]) + mod.setInputs(name={"u1": [(0.0, 0), (1.0, 1)], + "u2": [(0.0, 0), (0.25, 0.5), (0.5, 1.0), (1.0, 0)]}) csv_file = mod._createCSVData() assert pathlib.Path(csv_file).read_text() == """time,u1,u2,end 0.0,0.0,0.0,0 diff --git a/tests/test_ModelicaSystemCmd.py b/tests/test_ModelicaSystemCmd.py index 3b28699c..dfb8da91 100644 --- a/tests/test_ModelicaSystemCmd.py +++ b/tests/test_ModelicaSystemCmd.py @@ -18,7 +18,11 @@ def model_firstorder(tmp_path): @pytest.fixture def mscmd_firstorder(model_firstorder): mod = OMPython.ModelicaSystem(fileName=model_firstorder.as_posix(), modelName="M") - mscmd = OMPython.ModelicaSystemCmd(runpath=mod.getWorkDirectory(), modelname=mod._model_name) + mscmd = OMPython.ModelicaSystemCmd( + session=mod._getconn, + runpath=mod.getWorkDirectory(), + modelname=mod._model_name, + ) return mscmd @@ -32,8 +36,7 @@ def test_simflags(mscmd_firstorder): with pytest.deprecated_call(): mscmd.args_set(args=mscmd.parse_simflags(simflags="-noEventEmit -noRestart -override=a=1,x=3")) - assert mscmd.get_cmd() == [ - mscmd.get_exe().as_posix(), + assert mscmd.get_cmd_args() == [ '-noEventEmit', '-override=b=2,a=1,x=3', '-noRestart', diff --git a/tests/test_OMCPath.py b/tests/test_OMCPath.py new file mode 100644 index 00000000..45689622 --- /dev/null +++ b/tests/test_OMCPath.py @@ -0,0 +1,65 @@ +import sys +import OMPython +import pytest + +skip_on_windows = pytest.mark.skipif( + sys.platform.startswith("win"), + reason="OpenModelica Docker image is Linux-only; skipping on Windows.", +) + +skip_python_older_312 = pytest.mark.skipif( + sys.version_info < (3, 12), + reason="OMCPath only working for Python >= 3.12 (definition of pathlib.PurePath).", +) + + +@skip_on_windows +@skip_python_older_312 +def test_OMCPath_docker(): + omcp = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.25.0-minimal") + om = OMPython.OMCSessionZMQ(omc_process=omcp) + assert om.sendExpression("getVersion()") == "OpenModelica 1.25.0" + + _run_OMCPath_checks(om) + + del omcp + del om + + +@skip_python_older_312 +def test_OMCPath_local(): + om = OMPython.OMCSessionZMQ() + + _run_OMCPath_checks(om) + + del om + + +@pytest.mark.skip(reason="Not able to run WSL on github") +def test_OMCPath_WSL(): + omcp = OMPython.OMCProcessWSL( + wsl_omc='omc', + wsl_user='omc', + timeout=30.0, + ) + om = OMPython.OMCSessionZMQ(omc_process=omcp) + + _run_OMCPath_checks(om) + + del omcp + del om + + +def _run_OMCPath_checks(om: OMPython.OMCSessionZMQ): + p1 = om.omcpath_tempdir() + p2 = p1 / '..' / p1.name / 'test.txt' + assert p2.is_file() is False + assert p2.write_text('test') + assert p2.is_file() + p2 = p2.resolve().absolute() + assert str(p2) == f"{str(p1)}/test.txt" + assert p2.read_text() == "test" + assert p2.is_file() + assert p2.parent.is_dir() + assert p2.unlink() + assert p2.is_file() is False diff --git a/tests/test_linearization.py b/tests/test_linearization.py index 2c79190c..6af565c6 100644 --- a/tests/test_linearization.py +++ b/tests/test_linearization.py @@ -62,10 +62,10 @@ def test_getters(tmp_path): assert "startTime" in d assert "stopTime" in d assert mod.getLinearizationOptions(["stopTime", "startTime"]) == [d["stopTime"], d["startTime"]] - mod.setLinearizationOptions("stopTime=0.02") + mod.setLinearizationOptions(linearizationOptions={"stopTime": 0.02}) assert mod.getLinearizationOptions("stopTime") == ["0.02"] - mod.setInputs(["u1=10", "u2=0"]) + mod.setInputs(name={"u1": 10, "u2": 0}) [A, B, C, D] = mod.linearize() g = float(mod.getParameters("g")[0]) l = float(mod.getParameters("l")[0]) diff --git a/tests/test_optimization.py b/tests/test_optimization.py index aa74df79..b4164397 100644 --- a/tests/test_optimization.py +++ b/tests/test_optimization.py @@ -35,13 +35,15 @@ def test_optimization_example(tmp_path): mod = OMPython.ModelicaSystem(fileName=model_file.as_posix(), modelName="BangBang2021") - mod.setOptimizationOptions(["numberOfIntervals=16", "stopTime=1", - "stepSize=0.001", "tolerance=1e-8"]) + mod.setOptimizationOptions(optimizationOptions={"numberOfIntervals": 16, + "stopTime": 1, + "stepSize": 0.001, + "tolerance": 1e-8}) # test the getter assert mod.getOptimizationOptions()["stopTime"] == "1" assert mod.getOptimizationOptions("stopTime") == ["1"] - assert mod.getOptimizationOptions(["tolerance", "stopTime"]) == ["1e-8", "1"] + assert mod.getOptimizationOptions(["tolerance", "stopTime"]) == ["1e-08", "1"] r = mod.optimize() # it is necessary to specify resultfile, otherwise it wouldn't find it.