Skip to content

OMCPath #317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
56 changes: 56 additions & 0 deletions OMPython/OMCSession.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,51 @@ def getClassNames(self, className=None, recursive=False, qualified=False, sort=F
return self._ask(question='getClassNames', opt=opt)


class OMCPath(pathlib.PurePosixPath):
"""
Implementation of a basic Path object which uses OMC as backend. The connection to OMC is provided via a
OMCSessionZMQ session object.
"""

# TODO: need to handle PurePosixPath and PureWindowsPath
# PureOMCPath => OMCPathPosix(PureOMCPath, PurePosixPath)
# => OMCPathWindows(PureOMCPath, PureWindowsPath)
# TODO: only working for Python 3.12+ (not working for 3.10!; 3.11?)

def __init__(self, *path, session: OMCSessionZMQ):
super().__init__(*path)
self._session = session

def with_segments(self, *pathsegments):
# overwrite this function of PurePosixPath to ensure session is set
return type(self)(*pathsegments, session=self._session)

def is_file(self) -> bool:
return self._session.sendExpression(f'regularFileExists("{self.as_posix()}")')

def is_dir(self) -> bool:
return self._session.sendExpression(f'directoryExists("{self.as_posix()}")')

def read_text(self) -> str:
return self._session.sendExpression(f'readFile("{self.as_posix()}")')

def write_text(self, data: str) -> bool:
return self._session.sendExpression(f'writeFile("{self.as_posix()}", "{data}", false)')

def unlink(self) -> bool:
return self._session.sendExpression(f'deleteFile("{self.as_posix()}")')

# TODO: implement needed methods from pathlib._abc.PathBase:
# OK - is_dir()
# OK - is_file()
# OK - read_text() + binary()?
# OK - write_text() + binary()?
# OK - unlink()
# resolve()
# ... more ...
# ??? test if local (write OMC => READ local and the other way) and use shortcuts ???


class OMCSessionZMQ:

def __init__(
Expand Down Expand Up @@ -322,6 +367,17 @@ def __del__(self):

self.omc_zmq = None

def omcpath(self, *path) -> OMCPath:
# TODO: need to handle PurePosixPath and PureWindowsPath
# define it here based on the backend (omc_process) used?

if sys.version_info < (3, 12):
raise OMCSessionException("OMCPath requires Python 3.12 or newer!")
# reason (3.11): TypeError: PurePath.__new__() got an unexpected keyword argument 'session'
# reason (3.10): TypeError: PurePath.__new__() got an unexpected keyword argument 'session'

return OMCPath(*path, session=self)

def execute(self, command: str):
warnings.warn("This function is depreciated and will be removed in future versions; "
"please use sendExpression() instead", DeprecationWarning, stacklevel=2)
Expand Down
64 changes: 64 additions & 0 deletions tests/test_OMCPath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import sys
import OMPython
import pytest

skip_on_windows = pytest.mark.skipif(
sys.platform.startswith("win"),
reason="OpenModelica Docker image is Linux-only; skipping on Windows.",
)

skip_python_older_312 = pytest.mark.skipif(
sys.version_info < (3, 12),
reason="OMCPath only working for Python >= 3.12 (definition of pathlib.PurePath).",
)


@skip_on_windows
@skip_python_older_312
def test_OMCPath_docker():
omcp = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.25.0-minimal")
om = OMPython.OMCSessionZMQ(omc_process=omcp)
assert om.sendExpression("getVersion()") == "OpenModelica 1.25.0"

p1 = om.omcpath('/tmp')
assert str(p1) == "/tmp"
p2 = p1 / 'test.txt'
assert str(p2) == "/tmp/test.txt"
assert p2.write_text('test')
assert p2.read_text() == "test"
assert p2.is_file()
assert p2.parent.is_dir()
assert p2.unlink()
assert p2.is_file() is False

del omcp
del om


@skip_python_older_312
def test_OMCPath_local():
om = OMPython.OMCSessionZMQ()

if sys.platform.startswith("win"):
tempdir = 'C:/temp'
else:
tempdir = '/tmp'

p1 = om.omcpath(tempdir)
assert str(p1) == tempdir
p2 = p1 / 'test.txt'
assert str(p2) == f"{tempdir}/test.txt"
assert p2.write_text('test')
assert p2.read_text() == "test"
assert p2.is_file()
assert p2.parent.is_dir()
assert p2.unlink()
assert p2.is_file() is False

del om


if __name__ == '__main__':
test_OMCPath_docker()
test_OMCPath_local()
print('DONE')
Loading