From 2e07142d897477842085ef69b7ddd6e411c9bf20 Mon Sep 17 00:00:00 2001 From: Rose Pearson Date: Fri, 23 Feb 2024 15:59:32 +1300 Subject: [PATCH] 246 fix error when invalid waterways geometry (#247) * Fix error when invalid geometries * Update dem.py - Explicitly delete the DEM between save and load to encourage Python to release the file. * Add the option of not clipping in the raw dem generation Set this as default for the waterways step. The thought is this may reduce processing load. Hopefully produce fewer dask / garbage collection errors in waterways. * Turn on file compression and check in tests * fixup: Format Python code with Black * Update version * updated benchmark --------- Co-authored-by: github-actions --- pyproject.toml | 2 +- src/geofabrics/dem.py | 149 ++++++++++-------- src/geofabrics/processor.py | 83 ++++------ src/geofabrics/version.py | 2 +- .../data/benchmark_dem.nc | 4 +- 5 files changed, 115 insertions(+), 125 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 68364f28..b0b051e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta" [project] name = "geofabrics" -version = "1.1.11" +version = "1.1.12" description = "A package for creating geofabrics for flood modelling." readme = "README.md" authors = [{ name = "Rose pearson", email = "rose.pearson@niwa.co.nz" }] diff --git a/src/geofabrics/dem.py b/src/geofabrics/dem.py index b3a91dcf..34ca6984 100644 --- a/src/geofabrics/dem.py +++ b/src/geofabrics/dem.py @@ -309,19 +309,77 @@ class DemBase(abc.ABC): "no data": -1, } - def __init__( - self, - catchment_geometry: geometry.CatchmentGeometry, - ): + def __init__(self, catchment_geometry: geometry.CatchmentGeometry, chunk_size: int): """Setup base DEM to add future tiles too""" self.catchment_geometry = catchment_geometry + self.chunk_size = chunk_size @property def dem(self) -> xarray.Dataset: """Return the DEM over the catchment region""" raise NotImplementedError("dem must be instantiated in the child class") + def _load_dem(self, filename: pathlib.Path) -> xarray.Dataset: + """Load in and replace the DEM with a previously cached version.""" + dem = rioxarray.rioxarray.open_rasterio( + filename, + masked=True, + parse_coordinates=True, + chunks={"x": self.chunk_size, "y": self.chunk_size}, + ).squeeze( + "band", drop=True + ) # remove band coordinate added by rasterio.open() + self._write_netcdf_conventions_in_place(dem, self.catchment_geometry.crs) + + if "data_source" in dem.keys(): + dem["data_source"] = dem.data_source.astype(geometry.RASTER_TYPE) + if "lidar_source" in dem.keys(): + dem["lidar_source"] = dem.lidar_source.astype(geometry.RASTER_TYPE) + if "z" in dem.keys(): + dem["z"] = dem.z.astype(geometry.RASTER_TYPE) + if "zo" in dem.keys(): + dem["zo"] = dem.zo.astype(geometry.RASTER_TYPE) + + return dem + + def save_dem( + self, filename: pathlib.Path, dem: xarray.Dataset, compression: dict = None + ): + """Save the DEM to a netCDF file and optionally reload it + + :param filename: .nc file where to save the DEM + :param reload: reload DEM from the saved file + """ + + assert not any( + array.rio.crs is None for array in dem.data_vars.values() + ), "all DataArray variables of a xarray.Dataset must have a CRS" + + try: + self._write_netcdf_conventions_in_place(dem, self.catchment_geometry.crs) + if compression is not None: + compression["grid_mapping"] = dem.encoding["grid_mapping"] + encoding = {} + for key in dem.data_vars: + compression["dtype"] = dem[key].dtype + encoding[key] = compression + dem.to_netcdf( + filename, format="NETCDF4", engine="netcdf4", encoding=encoding + ) + else: + dem.to_netcdf(filename, format="NETCDF4", engine="netcdf4") + dem.close() + + except (Exception, KeyboardInterrupt) as caught_exception: + pathlib.Path(filename).unlink() + self.logger.info( + f"Caught error {caught_exception} and deleting" + "partially created netCDF output " + f"{filename} before re-raising error." + ) + raise caught_exception + @staticmethod def _ensure_positive_indexing( dem: xarray.core.dataarray.DataArray, @@ -439,22 +497,22 @@ def __init__( catchment_geometry: geometry.CatchmentGeometry, raw_dem_path: str | pathlib.Path, interpolation_method: str, + chunk_size, ): """Load in the extents and dense DEM. Ensure the dense DEM is clipped within the extents""" super(HydrologicallyConditionedDem, self).__init__( catchment_geometry=catchment_geometry, + chunk_size=chunk_size, ) self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") # Read in the dense DEM raster - and free up file by performing a deep copy. raw_dem = rioxarray.rioxarray.open_rasterio( pathlib.Path(raw_dem_path), masked=True, parse_coordinates=True, chunks=True - ) - - # Deep copy to ensure the opened file is properly unlocked; Squeeze as - # rasterio.open() adds band coordinate - raw_dem = raw_dem.squeeze("band", drop=True) + ).squeeze( + "band", drop=True + ) # remove band coordinate added by rasterio.open() self._write_netcdf_conventions_in_place(raw_dem, catchment_geometry.crs) # Clip to catchment and set the data_source layer to NaN where there is no data @@ -948,10 +1006,10 @@ def __init__( super(LidarBase, self).__init__( catchment_geometry=catchment_geometry, + chunk_size=chunk_size, ) self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") - self.chunk_size = chunk_size self.elevation_range = elevation_range assert elevation_range is None or ( type(elevation_range) == list and len(elevation_range) == 2 @@ -1202,58 +1260,6 @@ def _add_lidar_no_chunking( "_add_lidar_no_chunking must be instantiated in the " "child class" ) - def _load_dem(self, filename: pathlib.Path) -> xarray.Dataset: - """Load in and replace the DEM with a previously cached version.""" - dem = rioxarray.rioxarray.open_rasterio( - filename, - masked=True, - parse_coordinates=True, - chunks={"x": self.chunk_size, "y": self.chunk_size}, - ) - dem = dem.squeeze("band", drop=True) - self._write_netcdf_conventions_in_place(dem, self.catchment_geometry.crs) - - if "data_source" in dem.keys(): - dem["data_source"] = dem.data_source.astype(geometry.RASTER_TYPE) - if "lidar_source" in dem.keys(): - dem["lidar_source"] = dem.lidar_source.astype(geometry.RASTER_TYPE) - if "z" in dem.keys(): - dem["z"] = dem.z.astype(geometry.RASTER_TYPE) - - return dem - - def save_dem( - self, filename: pathlib.Path, dem: xarray.Dataset, encoding: dict = None - ): - """Save the DEM to a netCDF file and optionally reload it - - :param filename: .nc file where to save the DEM - :param reload: reload DEM from the saved file - """ - - assert not any( - arr.rio.crs is None for arr in dem.data_vars.values() - ), "all DataArray variables of a xarray.Dataset must have a CRS" - - try: - self._write_netcdf_conventions_in_place(dem, self.catchment_geometry.crs) - if encoding is not None: - dem.to_netcdf( - filename, format="NETCDF4", engine="netcdf4", encoding=encoding - ) - else: - dem.to_netcdf(filename, format="NETCDF4", engine="netcdf4") - dem.close() - - except (Exception, KeyboardInterrupt) as caught_exception: - pathlib.Path(filename).unlink() - self.logger.info( - f"Caught error {caught_exception} and deleting" - "partially created netCDF output " - f"{filename} before re-raising error." - ) - raise caught_exception - def save_and_load_dem( self, filename: pathlib.Path, @@ -1265,10 +1271,9 @@ def save_and_load_dem( f"{filename}" ) - dem = self._dem - self.save_dem(filename=filename, dem=dem) - dem = self._load_dem(filename=filename) - self._dem = dem + self.save_dem(filename=filename, dem=self._dem) + del self._dem + self._dem = self._load_dem(filename=filename) class RawDem(LidarBase): @@ -2056,14 +2061,15 @@ def __init__( ) self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") - # Load hyrdological DEM. Squeeze as rasterio.open() adds band coordinate. + # Load hyrdological DEM. hydrological_dem = rioxarray.rioxarray.open_rasterio( pathlib.Path(hydrological_dem_path), masked=True, parse_coordinates=True, chunks=True, - ) - hydrological_dem = hydrological_dem.squeeze("band", drop=True) + ).squeeze( + "band", drop=True + ) # remove band coordinate added by rasterio.open() self._write_netcdf_conventions_in_place( hydrological_dem, catchment_geometry.crs ) @@ -2082,6 +2088,10 @@ def __init__( hydrological_dem = hydrological_dem.rio.clip_box(**catchment.bounds.iloc[0]) mask = clip_mask(hydrological_dem.z, catchment.geometry, self.chunk_size) hydrological_dem = hydrological_dem.where(mask) + # Rerun as otherwise the no data as NaN seems to be lost for the data_source layer + self._write_netcdf_conventions_in_place( + hydrological_dem, catchment_geometry.crs + ) self.temp_folder = temp_folder self.interpolation_method = interpolation_method @@ -2206,6 +2216,7 @@ def add_lidar( self._dem.z, self.catchment_geometry.catchment.geometry, self.chunk_size ) self._dem = self._dem.where(mask) + self._write_netcdf_conventions_in_place(self._dem, self.catchment_geometry.crs) def _add_tiled_lidar_chunked( self, diff --git a/src/geofabrics/processor.py b/src/geofabrics/processor.py index d10bab50..2bb747f9 100644 --- a/src/geofabrics/processor.py +++ b/src/geofabrics/processor.py @@ -245,6 +245,7 @@ def get_instruction_general(self, key: str, subkey: str = None): "rivers": "bed_elevation_Rupp_and_Smart", "ocean": None, }, + "ignore_clipping": False, "filter_waterways_by_osm_ids": [], } @@ -925,16 +926,17 @@ def run(self): self.raw_dem.save_and_load_dem(cached_file) # Clip LiDAR - ensure within bounds/foreshore - self.raw_dem.clip_lidar() + if not self.get_instruction_general("ignore_clipping"): + self.raw_dem.clip_lidar() - # Save a cached copy of DEM to temporary memory cache - temp_file = temp_folder / "raw_lidar_clipped.nc" - self.logger.info(f"Save temp raw DEM to netCDF: {temp_file}") - self.raw_dem.save_and_load_dem(temp_file) + # Save a cached copy of DEM to temporary memory cache + temp_file = temp_folder / "raw_lidar_clipped.nc" + self.logger.info(f"Save temp raw DEM to netCDF: {temp_file}") + self.raw_dem.save_and_load_dem(temp_file) - # Remove previous cached file and replace with new one - cached_file.unlink() - cached_file = temp_file + # Remove previous cached file and replace with new one + cached_file.unlink() + cached_file = temp_file # Add a coarse DEM if significant area without LiDAR and a coarse DEM if self.check_vector_or_raster(key="coarse_dems", api_type="raster"): @@ -968,25 +970,11 @@ def run(self): "In processor.DemGenerator - write out the raw DEM to netCDF: " f"{self.get_instruction_path('raw_dem')}" ) - encoding = { - "data_source": { - "zlib": True, - "complevel": 1, - }, - "lidar_source": { - "zlib": True, - "complevel": 1, - }, - "z": { - "zlib": True, - "complevel": 1, - }, - } - encoding = None # Separately test compressing final netCDF outputs + compression = {"zlib": True, "complevel": 1} self.raw_dem.save_dem( self.get_instruction_path("raw_dem"), dem=self.raw_dem.dem, - encoding=encoding, + compression=compression, ) self.logger.info(f"Remove folder {temp_folder} for temporary files") shutil.rmtree(temp_folder) @@ -1171,6 +1159,7 @@ def run(self): self.hydrologic_dem = dem.HydrologicallyConditionedDem( catchment_geometry=self.catchment_geometry, raw_dem_path=self.get_instruction_path("raw_dem"), + chunk_size=self.get_processing_instructions("chunk_size"), interpolation_method=self.get_instruction_general( key="interpolation", subkey="no_data" ), @@ -1187,10 +1176,11 @@ def run(self): "In processor.DemGenerator - write out the raw DEM to netCDF" ) try: - self.hydrologic_dem.dem.to_netcdf( + compression = {"zlib": True, "complevel": 1} + self.hydrologic_dem.save_dem( self.get_instruction_path("result_dem"), - format="NETCDF4", - engine="netcdf4", + dem=self.hydrologic_dem.dem, + compression=compression, ) except (Exception, KeyboardInterrupt) as caught_exception: pathlib.Path(self.get_instruction_path("raw_dem")).unlink() @@ -1375,29 +1365,11 @@ def run(self): "the raw DEM to netCDF: " f"{self.get_instruction_path('result_geofabric')}" ) - encoding = { - "data_source": { - "zlib": True, - "complevel": 1, - }, - "lidar_source": { - "zlib": True, - "complevel": 1, - }, - "z": { - "zlib": True, - "complevel": 1, - }, - "zo": { - "zlib": True, - "complevel": 1, - }, - } - encoding = None # Separately test compressing final netCDF outputs + compression = {"zlib": True, "complevel": 1} self.roughness_dem.save_dem( filename=self.get_instruction_path("result_geofabric"), dem=self.roughness_dem.dem, - encoding=encoding, + compression=compression, ) self.logger.info( "In processor.RoughnessLengthGenerator - clean folder for " @@ -2806,9 +2778,9 @@ def estimate_open_elevations( if polygon_file.is_file() and elevation_file.is_file(): self.logger.info("Open waterways already recorded. ") return - # If not - estimate the elevations along the open waterways + # If not - estimate the elevations along the open waterways - drop any invalid geometries open_waterways = waterways[numpy.logical_not(waterways["tunnel"])] - + open_waterways = open_waterways[~open_waterways.geometry.isna()] # sample the ends of the waterway - sample over a polygon at each end polygons = open_waterways.interpolate(0).buffer(open_waterways["width"]) open_waterways["start_elevation"] = polygons.apply( @@ -2946,15 +2918,22 @@ def create_dem(self, waterways: geopandas.GeoDataFrame) -> xarray.Dataset: key="waterways_polygon" ) dem_instruction_paths["raw_dem"] = self.get_result_file_name(key="raw_dem") + if "general" not in dem_instructions: + dem_instructions["general"] = {} + dem_instructions["general"]["ignore_clipping"] = True # Create the ground DEM file if this has not be created yet! self.logger.info("Generating waterway DEM.") runner = RawLidarDemGenerator(self.instructions) runner.run() # Load in the DEM - dem = rioxarray.rioxarray.open_rasterio(dem_file, masked=True).squeeze( - "band", drop=True - ) + chunk_size = self.get_processing_instructions("chunk_size") + dem = rioxarray.rioxarray.open_rasterio( + dem_file, + masked=True, + parse_coordinates=True, + chunks={"x": chunk_size, "y": chunk_size}, + ).squeeze("band", drop=True) return dem def download_osm_values(self) -> bool: diff --git a/src/geofabrics/version.py b/src/geofabrics/version.py index ba432cfc..e15e75d8 100644 --- a/src/geofabrics/version.py +++ b/src/geofabrics/version.py @@ -3,4 +3,4 @@ Contains the package version information """ -__version__ = "1.1.11" +__version__ = "1.1.12" diff --git a/tests/test_dem_generation_local_1/data/benchmark_dem.nc b/tests/test_dem_generation_local_1/data/benchmark_dem.nc index 1d7b1d1c..dcb984a1 100644 --- a/tests/test_dem_generation_local_1/data/benchmark_dem.nc +++ b/tests/test_dem_generation_local_1/data/benchmark_dem.nc @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:f345c78987f5b8650b25f3e2139e8d66b1f2a03f3e630bcfd7fcff1b05134a3e -size 186019 +oid sha256:c0d05b79140efbd039366db0cef42962108220df61bd75585860f2562dcf5b9f +size 50917