Skip to content

Commit

Permalink
246 fix error when invalid waterways geometry (#247)
Browse files Browse the repository at this point in the history
* Fix error when invalid geometries

* Update dem.py - Explicitly delete the DEM between save and load to encourage Python to release the file.

* Add the option of not clipping in the raw dem generation
      Set this as default for the waterways step.
      The thought is this may reduce processing load.
      Hopefully produce fewer dask / garbage collection errors in waterways.

* Turn on file compression and check in tests

* fixup: Format Python code with Black

* Update version

* updated benchmark

---------

Co-authored-by: github-actions <github-actions@github.com>
  • Loading branch information
rosepearson and github-actions committed Feb 23, 2024
1 parent 12f7244 commit 2e07142
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 125 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "geofabrics"
version = "1.1.11"
version = "1.1.12"
description = "A package for creating geofabrics for flood modelling."
readme = "README.md"
authors = [{ name = "Rose pearson", email = "rose.pearson@niwa.co.nz" }]
Expand Down
149 changes: 80 additions & 69 deletions src/geofabrics/dem.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,19 +309,77 @@ class DemBase(abc.ABC):
"no data": -1,
}

def __init__(
self,
catchment_geometry: geometry.CatchmentGeometry,
):
def __init__(self, catchment_geometry: geometry.CatchmentGeometry, chunk_size: int):
"""Setup base DEM to add future tiles too"""

self.catchment_geometry = catchment_geometry
self.chunk_size = chunk_size

@property
def dem(self) -> xarray.Dataset:
"""Return the DEM over the catchment region"""
raise NotImplementedError("dem must be instantiated in the child class")

def _load_dem(self, filename: pathlib.Path) -> xarray.Dataset:
"""Load in and replace the DEM with a previously cached version."""
dem = rioxarray.rioxarray.open_rasterio(
filename,
masked=True,
parse_coordinates=True,
chunks={"x": self.chunk_size, "y": self.chunk_size},
).squeeze(
"band", drop=True
) # remove band coordinate added by rasterio.open()
self._write_netcdf_conventions_in_place(dem, self.catchment_geometry.crs)

if "data_source" in dem.keys():
dem["data_source"] = dem.data_source.astype(geometry.RASTER_TYPE)
if "lidar_source" in dem.keys():
dem["lidar_source"] = dem.lidar_source.astype(geometry.RASTER_TYPE)
if "z" in dem.keys():
dem["z"] = dem.z.astype(geometry.RASTER_TYPE)
if "zo" in dem.keys():
dem["zo"] = dem.zo.astype(geometry.RASTER_TYPE)

return dem

def save_dem(
self, filename: pathlib.Path, dem: xarray.Dataset, compression: dict = None
):
"""Save the DEM to a netCDF file and optionally reload it
:param filename: .nc file where to save the DEM
:param reload: reload DEM from the saved file
"""

assert not any(
array.rio.crs is None for array in dem.data_vars.values()
), "all DataArray variables of a xarray.Dataset must have a CRS"

try:
self._write_netcdf_conventions_in_place(dem, self.catchment_geometry.crs)
if compression is not None:
compression["grid_mapping"] = dem.encoding["grid_mapping"]
encoding = {}
for key in dem.data_vars:
compression["dtype"] = dem[key].dtype
encoding[key] = compression
dem.to_netcdf(
filename, format="NETCDF4", engine="netcdf4", encoding=encoding
)
else:
dem.to_netcdf(filename, format="NETCDF4", engine="netcdf4")
dem.close()

except (Exception, KeyboardInterrupt) as caught_exception:
pathlib.Path(filename).unlink()
self.logger.info(
f"Caught error {caught_exception} and deleting"
"partially created netCDF output "
f"{filename} before re-raising error."
)
raise caught_exception

@staticmethod
def _ensure_positive_indexing(
dem: xarray.core.dataarray.DataArray,
Expand Down Expand Up @@ -439,22 +497,22 @@ def __init__(
catchment_geometry: geometry.CatchmentGeometry,
raw_dem_path: str | pathlib.Path,
interpolation_method: str,
chunk_size,
):
"""Load in the extents and dense DEM. Ensure the dense DEM is clipped within the
extents"""
super(HydrologicallyConditionedDem, self).__init__(
catchment_geometry=catchment_geometry,
chunk_size=chunk_size,
)
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")

# Read in the dense DEM raster - and free up file by performing a deep copy.
raw_dem = rioxarray.rioxarray.open_rasterio(
pathlib.Path(raw_dem_path), masked=True, parse_coordinates=True, chunks=True
)

# Deep copy to ensure the opened file is properly unlocked; Squeeze as
# rasterio.open() adds band coordinate
raw_dem = raw_dem.squeeze("band", drop=True)
).squeeze(
"band", drop=True
) # remove band coordinate added by rasterio.open()
self._write_netcdf_conventions_in_place(raw_dem, catchment_geometry.crs)

# Clip to catchment and set the data_source layer to NaN where there is no data
Expand Down Expand Up @@ -948,10 +1006,10 @@ def __init__(

super(LidarBase, self).__init__(
catchment_geometry=catchment_geometry,
chunk_size=chunk_size,
)
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")

self.chunk_size = chunk_size
self.elevation_range = elevation_range
assert elevation_range is None or (
type(elevation_range) == list and len(elevation_range) == 2
Expand Down Expand Up @@ -1202,58 +1260,6 @@ def _add_lidar_no_chunking(
"_add_lidar_no_chunking must be instantiated in the " "child class"
)

def _load_dem(self, filename: pathlib.Path) -> xarray.Dataset:
"""Load in and replace the DEM with a previously cached version."""
dem = rioxarray.rioxarray.open_rasterio(
filename,
masked=True,
parse_coordinates=True,
chunks={"x": self.chunk_size, "y": self.chunk_size},
)
dem = dem.squeeze("band", drop=True)
self._write_netcdf_conventions_in_place(dem, self.catchment_geometry.crs)

if "data_source" in dem.keys():
dem["data_source"] = dem.data_source.astype(geometry.RASTER_TYPE)
if "lidar_source" in dem.keys():
dem["lidar_source"] = dem.lidar_source.astype(geometry.RASTER_TYPE)
if "z" in dem.keys():
dem["z"] = dem.z.astype(geometry.RASTER_TYPE)

return dem

def save_dem(
self, filename: pathlib.Path, dem: xarray.Dataset, encoding: dict = None
):
"""Save the DEM to a netCDF file and optionally reload it
:param filename: .nc file where to save the DEM
:param reload: reload DEM from the saved file
"""

assert not any(
arr.rio.crs is None for arr in dem.data_vars.values()
), "all DataArray variables of a xarray.Dataset must have a CRS"

try:
self._write_netcdf_conventions_in_place(dem, self.catchment_geometry.crs)
if encoding is not None:
dem.to_netcdf(
filename, format="NETCDF4", engine="netcdf4", encoding=encoding
)
else:
dem.to_netcdf(filename, format="NETCDF4", engine="netcdf4")
dem.close()

except (Exception, KeyboardInterrupt) as caught_exception:
pathlib.Path(filename).unlink()
self.logger.info(
f"Caught error {caught_exception} and deleting"
"partially created netCDF output "
f"{filename} before re-raising error."
)
raise caught_exception

def save_and_load_dem(
self,
filename: pathlib.Path,
Expand All @@ -1265,10 +1271,9 @@ def save_and_load_dem(
f"{filename}"
)

dem = self._dem
self.save_dem(filename=filename, dem=dem)
dem = self._load_dem(filename=filename)
self._dem = dem
self.save_dem(filename=filename, dem=self._dem)
del self._dem
self._dem = self._load_dem(filename=filename)


class RawDem(LidarBase):
Expand Down Expand Up @@ -2056,14 +2061,15 @@ def __init__(
)
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")

# Load hyrdological DEM. Squeeze as rasterio.open() adds band coordinate.
# Load hyrdological DEM.
hydrological_dem = rioxarray.rioxarray.open_rasterio(
pathlib.Path(hydrological_dem_path),
masked=True,
parse_coordinates=True,
chunks=True,
)
hydrological_dem = hydrological_dem.squeeze("band", drop=True)
).squeeze(
"band", drop=True
) # remove band coordinate added by rasterio.open()
self._write_netcdf_conventions_in_place(
hydrological_dem, catchment_geometry.crs
)
Expand All @@ -2082,6 +2088,10 @@ def __init__(
hydrological_dem = hydrological_dem.rio.clip_box(**catchment.bounds.iloc[0])
mask = clip_mask(hydrological_dem.z, catchment.geometry, self.chunk_size)
hydrological_dem = hydrological_dem.where(mask)
# Rerun as otherwise the no data as NaN seems to be lost for the data_source layer
self._write_netcdf_conventions_in_place(
hydrological_dem, catchment_geometry.crs
)

self.temp_folder = temp_folder
self.interpolation_method = interpolation_method
Expand Down Expand Up @@ -2206,6 +2216,7 @@ def add_lidar(
self._dem.z, self.catchment_geometry.catchment.geometry, self.chunk_size
)
self._dem = self._dem.where(mask)
self._write_netcdf_conventions_in_place(self._dem, self.catchment_geometry.crs)

def _add_tiled_lidar_chunked(
self,
Expand Down
83 changes: 31 additions & 52 deletions src/geofabrics/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ def get_instruction_general(self, key: str, subkey: str = None):
"rivers": "bed_elevation_Rupp_and_Smart",
"ocean": None,
},
"ignore_clipping": False,
"filter_waterways_by_osm_ids": [],
}

Expand Down Expand Up @@ -925,16 +926,17 @@ def run(self):
self.raw_dem.save_and_load_dem(cached_file)

# Clip LiDAR - ensure within bounds/foreshore
self.raw_dem.clip_lidar()
if not self.get_instruction_general("ignore_clipping"):
self.raw_dem.clip_lidar()

# Save a cached copy of DEM to temporary memory cache
temp_file = temp_folder / "raw_lidar_clipped.nc"
self.logger.info(f"Save temp raw DEM to netCDF: {temp_file}")
self.raw_dem.save_and_load_dem(temp_file)
# Save a cached copy of DEM to temporary memory cache
temp_file = temp_folder / "raw_lidar_clipped.nc"
self.logger.info(f"Save temp raw DEM to netCDF: {temp_file}")
self.raw_dem.save_and_load_dem(temp_file)

# Remove previous cached file and replace with new one
cached_file.unlink()
cached_file = temp_file
# Remove previous cached file and replace with new one
cached_file.unlink()
cached_file = temp_file

# Add a coarse DEM if significant area without LiDAR and a coarse DEM
if self.check_vector_or_raster(key="coarse_dems", api_type="raster"):
Expand Down Expand Up @@ -968,25 +970,11 @@ def run(self):
"In processor.DemGenerator - write out the raw DEM to netCDF: "
f"{self.get_instruction_path('raw_dem')}"
)
encoding = {
"data_source": {
"zlib": True,
"complevel": 1,
},
"lidar_source": {
"zlib": True,
"complevel": 1,
},
"z": {
"zlib": True,
"complevel": 1,
},
}
encoding = None # Separately test compressing final netCDF outputs
compression = {"zlib": True, "complevel": 1}
self.raw_dem.save_dem(
self.get_instruction_path("raw_dem"),
dem=self.raw_dem.dem,
encoding=encoding,
compression=compression,
)
self.logger.info(f"Remove folder {temp_folder} for temporary files")
shutil.rmtree(temp_folder)
Expand Down Expand Up @@ -1171,6 +1159,7 @@ def run(self):
self.hydrologic_dem = dem.HydrologicallyConditionedDem(
catchment_geometry=self.catchment_geometry,
raw_dem_path=self.get_instruction_path("raw_dem"),
chunk_size=self.get_processing_instructions("chunk_size"),
interpolation_method=self.get_instruction_general(
key="interpolation", subkey="no_data"
),
Expand All @@ -1187,10 +1176,11 @@ def run(self):
"In processor.DemGenerator - write out the raw DEM to netCDF"
)
try:
self.hydrologic_dem.dem.to_netcdf(
compression = {"zlib": True, "complevel": 1}
self.hydrologic_dem.save_dem(
self.get_instruction_path("result_dem"),
format="NETCDF4",
engine="netcdf4",
dem=self.hydrologic_dem.dem,
compression=compression,
)
except (Exception, KeyboardInterrupt) as caught_exception:
pathlib.Path(self.get_instruction_path("raw_dem")).unlink()
Expand Down Expand Up @@ -1375,29 +1365,11 @@ def run(self):
"the raw DEM to netCDF: "
f"{self.get_instruction_path('result_geofabric')}"
)
encoding = {
"data_source": {
"zlib": True,
"complevel": 1,
},
"lidar_source": {
"zlib": True,
"complevel": 1,
},
"z": {
"zlib": True,
"complevel": 1,
},
"zo": {
"zlib": True,
"complevel": 1,
},
}
encoding = None # Separately test compressing final netCDF outputs
compression = {"zlib": True, "complevel": 1}
self.roughness_dem.save_dem(
filename=self.get_instruction_path("result_geofabric"),
dem=self.roughness_dem.dem,
encoding=encoding,
compression=compression,
)
self.logger.info(
"In processor.RoughnessLengthGenerator - clean folder for "
Expand Down Expand Up @@ -2806,9 +2778,9 @@ def estimate_open_elevations(
if polygon_file.is_file() and elevation_file.is_file():
self.logger.info("Open waterways already recorded. ")
return
# If not - estimate the elevations along the open waterways
# If not - estimate the elevations along the open waterways - drop any invalid geometries
open_waterways = waterways[numpy.logical_not(waterways["tunnel"])]

open_waterways = open_waterways[~open_waterways.geometry.isna()]
# sample the ends of the waterway - sample over a polygon at each end
polygons = open_waterways.interpolate(0).buffer(open_waterways["width"])
open_waterways["start_elevation"] = polygons.apply(
Expand Down Expand Up @@ -2946,15 +2918,22 @@ def create_dem(self, waterways: geopandas.GeoDataFrame) -> xarray.Dataset:
key="waterways_polygon"
)
dem_instruction_paths["raw_dem"] = self.get_result_file_name(key="raw_dem")
if "general" not in dem_instructions:
dem_instructions["general"] = {}
dem_instructions["general"]["ignore_clipping"] = True

# Create the ground DEM file if this has not be created yet!
self.logger.info("Generating waterway DEM.")
runner = RawLidarDemGenerator(self.instructions)
runner.run()
# Load in the DEM
dem = rioxarray.rioxarray.open_rasterio(dem_file, masked=True).squeeze(
"band", drop=True
)
chunk_size = self.get_processing_instructions("chunk_size")
dem = rioxarray.rioxarray.open_rasterio(
dem_file,
masked=True,
parse_coordinates=True,
chunks={"x": chunk_size, "y": chunk_size},
).squeeze("band", drop=True)
return dem

def download_osm_values(self) -> bool:
Expand Down
Loading

0 comments on commit 2e07142

Please sign in to comment.