Skip to content

Commit

Permalink
fix(MFFileMgmt): avoid IndexError in strip_model_relative_path (#1748)
Browse files Browse the repository at this point in the history
* fix strip_model_relative_path() to avoid indexing error
* test simulation load with filenames, abs paths & rel paths
* add tests for POSIX separator use on all platforms (ignored for now)
* MFPackage fixes: support PathLike, replace Windows pathsep in filename argument
  • Loading branch information
wpbonelli committed Apr 27, 2023
1 parent 6041835 commit 7db9263
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 23 deletions.
226 changes: 225 additions & 1 deletion autotest/test_mf6.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import os
import platform
from os.path import join
from pathlib import Path
from shutil import which
from shutil import copytree, which

import numpy as np
import pytest
from modflow_devtools.markers import requires_exe
from modflow_devtools.misc import set_dir

import flopy
from flopy.mf6 import (
Expand Down Expand Up @@ -238,6 +241,227 @@ def get_gwt_model(sim, gwtname, gwtpath, modelshape, sourcerecarray=None):
return gwt


def to_win_sep(s):
return s.replace("/", "\\")


def to_posix_sep(s):
return s.replace("\\", "/")


def to_os_sep(s):
return s.replace("\\", os.sep).replace("/", os.sep)


def test_load_and_run_sim_when_namefile_uses_filenames(
function_tmpdir, example_data_path
):
ws = function_tmpdir / "ws"
ml_name = "freyberg"
nam_name = "mfsim.nam"
nam_path = ws / nam_name
copytree(example_data_path / f"mf6-{ml_name}", ws)

sim = MFSimulation.load(nam_name, sim_ws=ws)
sim.check()
success, buff = sim.run_simulation(report=True)
assert success


def test_load_and_run_sim_when_namefile_uses_abs_paths(
function_tmpdir, example_data_path
):
ws = function_tmpdir / "ws"
ml_name = "freyberg"
nam_name = "mfsim.nam"
nam_path = ws / nam_name
copytree(example_data_path / f"mf6-{ml_name}", ws)

with set_dir(ws):
lines = open(nam_path).readlines()
with open(nam_path, "w") as f:
for l in lines:
pattern = f"{ml_name}."
if pattern in l:
l = l.replace(
pattern, str(ws.absolute()) + os.sep + pattern
)
f.write(l)

sim = MFSimulation.load(nam_name, sim_ws=ws)
sim.check()
success, buff = sim.run_simulation(report=True)
assert success


@pytest.mark.parametrize("sep", ["win", "posix"])
def test_load_sim_when_namefile_uses_rel_paths(
function_tmpdir, example_data_path, sep
):
ws = function_tmpdir / "ws"
ml_name = "freyberg"
nam_name = "mfsim.nam"
nam_path = ws / nam_name
copytree(example_data_path / f"mf6-{ml_name}", ws)

with set_dir(ws):
lines = open(nam_path).readlines()
with open(nam_path, "w") as f:
for l in lines:
pattern = f"{ml_name}."
if pattern in l:
if sep == "win":
l = to_win_sep(
l.replace(
pattern, "../" + ws.name + "/" + ml_name + "."
)
)
else:
l = to_posix_sep(
l.replace(
pattern, "../" + ws.name + "/" + ml_name + "."
)
)
f.write(l)

sim = MFSimulation.load(nam_name, sim_ws=ws)
sim.check()

# don't run simulation with Windows sep on Linux or Mac
if sep == "win" and platform.system() != "Windows":
return

success, buff = sim.run_simulation(report=True)
assert success


@pytest.mark.skip(reason="currently flopy uses OS-specific path separators")
@pytest.mark.parametrize("sep", ["win", "posix"])
def test_write_simulation_always_writes_posix_path_separators(
function_tmpdir, example_data_path, sep
):
ws = function_tmpdir / "ws"
ml_name = "freyberg"
nam_name = "mfsim.nam"
nam_path = ws / nam_name
copytree(example_data_path / f"mf6-{ml_name}", ws)

with set_dir(ws):
lines = open(nam_path).readlines()
with open(nam_path, "w") as f:
for l in lines:
pattern = f"{ml_name}."
if pattern in l:
if sep == "win":
l = to_win_sep(
l.replace(
pattern, "../" + ws.name + "/" + ml_name + "."
)
)
else:
l = to_posix_sep(
l.replace(
pattern, "../" + ws.name + "/" + ml_name + "."
)
)
f.write(l)

sim = MFSimulation.load(nam_name, sim_ws=ws)
sim.write_simulation()

lines = open(ws / "mfsim.nam").readlines()
assert all("\\" not in l for l in lines)


@requires_exe("mf6")
@pytest.mark.parametrize("filename", ["name", "rel", "rel_win"])
def test_basic_gwf(function_tmpdir, filename):
ws = function_tmpdir
name = "basic_gwf_prep"
sim = flopy.mf6.MFSimulation(sim_name=name, sim_ws=ws, exe_name="mf6")
pd = [(1.0, 1, 1.0), (1.0, 1, 1.0)]

innerdir = Path(function_tmpdir / "inner")
innerdir.mkdir()

# mfpackage filename can be path or string..
# if string, it can either be a file name or
# path relative to the simulation workspace.
tdis_name = f"{name}.tdis"
tdis_path = innerdir / tdis_name
tdis_path.touch()
tdis_relpath = tdis_path.relative_to(ws).as_posix()
tdis_relpath_win = str(tdis_relpath).replace("/", "\\")

if filename == "name":
# file named with no path will be created in simulation workspace
tdis = flopy.mf6.ModflowTdis(
sim, nper=len(pd), perioddata=pd, filename=tdis_name
)
assert tdis.filename == tdis_name
elif filename == "rel":
# filename may be a relative pathlib.Path
tdis = flopy.mf6.ModflowTdis(
sim, nper=len(pd), perioddata=pd, filename=tdis_relpath
)
assert tdis.filename == str(tdis_relpath)

# relative paths may also be provided as strings
tdis = flopy.mf6.ModflowTdis(
sim, nper=len(pd), perioddata=pd, filename=str(tdis_relpath)
)
assert tdis.filename == str(tdis_relpath)
elif filename == "rel_win":
# windows path backslash separator should be converted to forward slash
tdis = flopy.mf6.ModflowTdis(
sim, nper=len(pd), perioddata=pd, filename=tdis_relpath_win
)
assert tdis.filename == str(tdis_relpath)

# create other packages
ims = flopy.mf6.ModflowIms(sim)
gwf = flopy.mf6.ModflowGwf(sim, modelname=name, save_flows=True)
dis = flopy.mf6.ModflowGwfdis(gwf, nrow=10, ncol=10)
ic = flopy.mf6.ModflowGwfic(gwf)
npf = flopy.mf6.ModflowGwfnpf(
gwf, save_specific_discharge=True, save_saturation=True
)
spd = {
0: [[(0, 0, 0), 1.0, 1.0], [(0, 9, 9), 0.0, 0.0]],
1: [[(0, 0, 0), 0.0, 0.0], [(0, 9, 9), 1.0, 2.0]],
}
chd = flopy.mf6.ModflowGwfchd(
gwf, pname="CHD-1", stress_period_data=spd, auxiliary=["concentration"]
)
budget_file = f"{name}.bud"
head_file = f"{name}.hds"
oc = flopy.mf6.ModflowGwfoc(
gwf,
budget_filerecord=budget_file,
head_filerecord=head_file,
saverecord=[("HEAD", "ALL"), ("BUDGET", "ALL")],
)

# write the simulation
sim.write_simulation()

# check for input files
assert (ws / innerdir / tdis_name).is_file()
assert (ws / f"{name}.ims").is_file()
assert (ws / f"{name}.dis").is_file()
assert (ws / f"{name}.ic").is_file()
assert (ws / f"{name}.npf").is_file()
assert (ws / f"{name}.chd").is_file()
assert (ws / f"{name}.oc").is_file()

# run the simulation
sim.run_simulation()

# check for output files
assert (ws / budget_file).is_file()
assert (ws / head_file).is_file()


def test_subdir(function_tmpdir):
sim = MFSimulation(sim_ws=function_tmpdir)
assert sim.sim_path == function_tmpdir
Expand Down
41 changes: 23 additions & 18 deletions flopy/mf6/mfbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pathlib import Path
from shutil import copyfile
from typing import Union
from warnings import warn


# internal handled exceptions
Expand Down Expand Up @@ -273,26 +274,30 @@ def _build_relative_path(self, model_name):
current_abs_path = self.resolve_path("", model_name, False)
return os.path.relpath(old_abs_path, current_abs_path)

def strip_model_relative_path(self, model_name, path):
def strip_model_relative_path(self, model_name, path) -> str:
"""Strip out the model relative path part of `path`. For internal
FloPy use, not intended for end user."""
new_path = path
if model_name in self.model_relative_path:
model_rel_path = self.model_relative_path[model_name]
if (
model_rel_path is not None
and len(model_rel_path) > 0
and model_rel_path != "."
):
model_rel_path_lst = model_rel_path.split(os.path.sep)
path_lst = path.split(os.path.sep)
new_path = ""
for i, mrp in enumerate(model_rel_path_lst):
if i >= len(path_lst) or mrp != path_lst[i]:
new_path = os.path.join(new_path, path_lst[i])
for rp in path_lst[len(model_rel_path_lst) :]:
new_path = os.path.join(new_path, rp)
return new_path
if model_name not in self.model_relative_path:
return path

model_rel_path = Path(self.model_relative_path[model_name])
if (
model_rel_path is None
or model_rel_path.is_absolute()
or not any(str(model_rel_path))
or str(model_rel_path) == os.curdir
):
return path

try:
ret_path = Path(path).relative_to(model_rel_path)
except ValueError:
warnings.warn(
f"Could not strip model relative path from {path}: {traceback.format_exc()}"
)
ret_path = Path(path)

return str(ret_path.as_posix())

@staticmethod
def unique_file_name(file_name, lookup):
Expand Down
10 changes: 6 additions & 4 deletions flopy/mf6/mfpackage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1548,8 +1548,8 @@ class MFPackage(PackageContainer, PackageInterface):
The parent model, simulation, or package containing this package
package_type : str
String defining the package type
filename : str
Filename of file where this package is stored
filename : str or PathLike
Name or path of file where this package is stored
quoted_filename : str
Filename with quotes around it when there is a space in the name
pname : str
Expand Down Expand Up @@ -1674,7 +1674,7 @@ def __init__(
# filename uses model base name
self._filename = f"{self.model_or_sim.name}.{package_type}"
else:
if not isinstance(filename, str):
if not isinstance(filename, (str, os.PathLike)):
message = (
"Invalid fname parameter. Expecting type str. "
'Instead type "{}" was '
Expand All @@ -1694,7 +1694,9 @@ def __init__(
message,
self.model_or_sim.simulation_data.debug,
)
self._filename = datautil.clean_filename(filename)
self._filename = datautil.clean_filename(
str(filename).replace("\\", "/")
)
self.path, self.structure = self.model_or_sim.register_package(
self, not loading_package, pname is None, filename is None
)
Expand Down

0 comments on commit 7db9263

Please sign in to comment.