Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default consolidated flag for to_zarr #855

Merged
merged 16 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion echopype/convert/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def to_file(
whether or not to use parallel processing. (Not yet implemented)
output_storage_options : dict
Additional keywords to pass to the filesystem class.
**kwargs : dict, optional
Extra arguments to either `xr.Dataset.to_netcdf`
or `xr.Dataset.to_zarr`: refer to each method documentation
for a list of all possible arguments.

"""
if parallel:
raise NotImplementedError("Parallel conversion is not yet implemented.")
Expand Down Expand Up @@ -88,13 +93,14 @@ def to_file(
),
engine=engine,
compress=compress,
**kwargs,
)

# Link path to saved file with attribute as if from open_converted
echodata.converted_raw_path = output_file


def _save_groups_to_file(echodata, output_path, engine, compress=True):
def _save_groups_to_file(echodata, output_path, engine, compress=True, **kwargs):
"""Serialize all groups to file."""
# TODO: in terms of chunking, would using rechunker at the end be faster and more convenient?
# TODO: investigate chunking before we save Dataset to a file
Expand All @@ -106,6 +112,7 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
mode="w",
engine=engine,
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)

# Environment group
Expand All @@ -116,6 +123,7 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
engine=engine,
group="Environment",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)

# Platform group
Expand All @@ -126,6 +134,7 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
engine=engine,
group="Platform",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)

# Platform/NMEA group: some sonar model does not produce NMEA data
Expand All @@ -137,6 +146,7 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
engine=engine,
group="Platform/NMEA",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)

# Provenance group
Expand All @@ -147,6 +157,7 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
mode="a",
engine=engine,
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)

# Sonar group
Expand All @@ -157,6 +168,7 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
mode="a",
engine=engine,
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)

# /Sonar/Beam_groupX group
Expand All @@ -169,6 +181,7 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
engine=engine,
group=f"Sonar/Beam_group{i}",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)
else:
io.save_file(
Expand All @@ -178,6 +191,7 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
engine=engine,
group=f"Sonar/{BEAM_SUBGROUP_DEFAULT}",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)
if echodata["Sonar/Beam_group2"] is not None:
# some sonar model does not produce Sonar/Beam_group2
Expand All @@ -188,6 +202,7 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
engine=engine,
group="Sonar/Beam_group2",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)

# Vendor_specific group
Expand All @@ -198,6 +213,7 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
engine=engine,
group="Vendor_specific",
compression_settings=COMPRESSION_SETTINGS[engine] if compress else None,
**kwargs,
)


Expand Down
5 changes: 5 additions & 0 deletions echopype/echodata/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def combine_echodata(
overwrite: bool = False,
storage_options: Dict[str, Any] = {},
client: Optional[dask.distributed.Client] = None,
consolidated: bool = True,
) -> EchoData:
"""
Combines multiple ``EchoData`` objects into a single ``EchoData`` object.
Expand All @@ -181,6 +182,9 @@ def combine_echodata(
backend (ignored for local paths)
client: dask.distributed.Client, optional
An initialized Dask distributed client
consolidated: bool
Flag to consolidate zarr metadata.
Defaults to ``True``

Returns
-------
Expand Down Expand Up @@ -289,6 +293,7 @@ def combine_echodata(
storage_options=storage_options,
sonar_model=sonar_model,
echodata_filenames=echodata_filenames,
consolidated=consolidated,
)

if client_created:
Expand Down
53 changes: 49 additions & 4 deletions echopype/echodata/echodata.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,15 @@ def _load_group(self, filepath: "PathHint", group: Optional[str] = None):
**self.open_kwargs,
)

def to_netcdf(self, save_path: Optional["PathHint"] = None, **kwargs):
def to_netcdf(
self,
save_path: Optional["PathHint"] = None,
compress: bool = True,
overwrite: bool = False,
parallel: bool = False,
output_storage_options: Dict[str, str] = {},
**kwargs,
):
"""Save content of EchoData to netCDF.

Parameters
Expand All @@ -806,12 +814,33 @@ def to_netcdf(self, save_path: Optional["PathHint"] = None, **kwargs):
whether or not to use parallel processing. (Not yet implemented)
output_storage_options : dict
Additional keywords to pass to the filesystem class.
**kwargs : dict, optional
Extra arguments to `xr.Dataset.to_netcdf`: refer to
xarray's documentation for a list of all possible arguments.
"""
from ..convert.api import to_file

return to_file(self, "netcdf4", save_path=save_path, **kwargs)
return to_file(
echodata=self,
engine="netcdf4",
save_path=save_path,
compress=compress,
overwrite=overwrite,
parallel=parallel,
output_storage_options=output_storage_options,
**kwargs,
)

def to_zarr(self, save_path: Optional["PathHint"] = None, **kwargs):
def to_zarr(
self,
save_path: Optional["PathHint"] = None,
compress: bool = True,
overwrite: bool = False,
parallel: bool = False,
output_storage_options: Dict[str, str] = {},
consolidated: bool = True,
**kwargs,
):
"""Save content of EchoData to zarr.

Parameters
Expand All @@ -828,10 +857,26 @@ def to_zarr(self, save_path: Optional["PathHint"] = None, **kwargs):
whether or not to use parallel processing. (Not yet implemented)
output_storage_options : dict
Additional keywords to pass to the filesystem class.
consolidated : bool
Flag to consolidate zarr metadata.
Defaults to ``True``
**kwargs : dict, optional
Extra arguments to `xr.Dataset.to_zarr`: refer to
xarray's documentation for a list of all possible arguments.
"""
from ..convert.api import to_file

return to_file(self, "zarr", save_path=save_path, **kwargs)
return to_file(
echodata=self,
engine="zarr",
save_path=save_path,
compress=compress,
overwrite=overwrite,
parallel=parallel,
output_storage_options=output_storage_options,
consolidated=consolidated,
**kwargs,
)

# TODO: Remove below in future versions. They are for supporting old API calls.
@property
Expand Down
48 changes: 41 additions & 7 deletions echopype/echodata/zarr_combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import dask
import dask.array
import fsspec
import numpy as np
import pandas as pd
import xarray as xr
Expand Down Expand Up @@ -627,6 +628,7 @@ def write_to_file(
"""

with Lock(lock_name):
# Write to region is not consolidated by default
lsetiawan marked this conversation as resolved.
Show resolved Hide resolved
lsetiawan marked this conversation as resolved.
Show resolved Hide resolved
ds_in.to_zarr(
zarr_path,
group=zarr_group,
Expand Down Expand Up @@ -684,9 +686,9 @@ def _append_ds_list_to_zarr(
compute=False,
group=zarr_group,
encoding=encodings,
consolidated=None,
storage_options=storage_options,
synchronizer=zarr.ThreadSynchronizer(),
consolidated=False,
)

# get all dimensions in ds that are append dimensions
Expand Down Expand Up @@ -729,7 +731,12 @@ def _append_ds_list_to_zarr(
# write the subset of each Dataset to a zarr file
delayed_to_zarr.append(
self.write_to_file(
ds_in, lock_name, zarr_path, zarr_group, region, storage_options
ds_in,
lock_name,
zarr_path,
zarr_group,
region,
storage_options,
)
)

Expand Down Expand Up @@ -781,7 +788,11 @@ def _append_const_to_zarr(
ds_list_ind = int(0)

ds_list[ds_list_ind][[var]].to_zarr(
zarr_path, group=zarr_group, mode="a", storage_options=storage_options
zarr_path,
group=zarr_group,
mode="a",
storage_options=storage_options,
consolidated=False,
)

def _write_append_dims(
Expand Down Expand Up @@ -831,6 +842,7 @@ def _write_append_dims(
compute=True,
storage_options=storage_options,
synchronizer=zarr.ThreadSynchronizer(),
consolidated=False,
)

def _append_provenance_attr_vars(
Expand Down Expand Up @@ -891,7 +903,7 @@ def _append_provenance_attr_vars(
group="Provenance",
mode="a",
storage_options=storage_options,
consolidated=True,
consolidated=False,
)

@staticmethod
Expand Down Expand Up @@ -931,6 +943,7 @@ def combine(
storage_options: Dict[str, Any] = {},
sonar_model: str = None,
echodata_filenames: List[str] = [],
consolidated: bool = True,
) -> EchoData:
"""
Combines all ``EchoData`` objects in ``eds`` by
Expand All @@ -950,6 +963,9 @@ def combine(
The sonar model used for all elements in ``eds``
echodata_filenames : list of str
The source files names for all elements in ``eds``
consolidated: bool
Flag to consolidate zarr metadata.
Defaults to ``True``

Returns
-------
Expand Down Expand Up @@ -1008,23 +1024,41 @@ def combine(
)

self._append_const_to_zarr(
const_names, ds_list, zarr_path, grp_info["ep_group"], storage_options
const_names,
ds_list,
zarr_path,
grp_info["ep_group"],
storage_options,
)

self._write_append_dims(ds_list, zarr_path, grp_info["ep_group"], storage_options)
self._write_append_dims(
ds_list,
zarr_path,
grp_info["ep_group"],
storage_options,
)

# append all group attributes before combination to zarr store
self._append_provenance_attr_vars(zarr_path, storage_options=storage_options)
self._append_provenance_attr_vars(
zarr_path,
storage_options=storage_options,
)

# change filenames numbering to range(len(eds))
self._modify_prov_filenames(zarr_path, len_eds=len(eds), storage_options=storage_options)

if consolidated:
# consolidate at the end if consolidated flag is True
store = fsspec.get_mapper(zarr_path, **storage_options)
zarr.consolidate_metadata(store=store)
lsetiawan marked this conversation as resolved.
Show resolved Hide resolved

# open lazy loaded combined EchoData object
ed_combined = open_converted(
zarr_path,
chunks={},
synchronizer=zarr.ThreadSynchronizer(),
storage_options=storage_options,
backend_kwargs={"consolidated": consolidated},
b-reyes marked this conversation as resolved.
Show resolved Hide resolved
) # TODO: is this appropriate for chunks?

return ed_combined
Loading