Skip to content

Commit

Permalink
refactor(convert): refactor and cleanup parsed2zarr (OSOceanAcoustics…
Browse files Browse the repository at this point in the history
…#1070)

* Bump pypa/gh-action-pypi-publish from 1.6.4 to 1.8.1 (OSOceanAcoustics#999)

Bumps [pypa/gh-action-pypi-publish](https://github.com/pypa/gh-action-pypi-publish) from 1.6.4 to 1.8.1.
- [Release notes](https://github.com/pypa/gh-action-pypi-publish/releases)
- [Commits](pypa/gh-action-pypi-publish@v1.6.4...v1.8.1)

---
updated-dependencies:
- dependency-name: pypa/gh-action-pypi-publish
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump actions/cache from 3.2.5 to 3.3.1 (OSOceanAcoustics#982)

Bumps [actions/cache](https://github.com/actions/cache) from 3.2.5 to 3.3.1.
- [Release notes](https://github.com/actions/cache/releases)
- [Changelog](https://github.com/actions/cache/blob/main/RELEASES.md)
- [Commits](actions/cache@v3.2.5...v3.3.1)

---
updated-dependencies:
- dependency-name: actions/cache
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* [pre-commit.ci] pre-commit autoupdate (OSOceanAcoustics#1022)

updates:
- [github.com/psf/black: 23.1.0 → 23.3.0](psf/black@23.1.0...23.3.0)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* Bump pypa/gh-action-pypi-publish from 1.8.1 to 1.8.5 (OSOceanAcoustics#1021)

Bumps [pypa/gh-action-pypi-publish](https://github.com/pypa/gh-action-pypi-publish) from 1.8.1 to 1.8.5.
- [Release notes](https://github.com/pypa/gh-action-pypi-publish/releases)
- [Commits](pypa/gh-action-pypi-publish@v1.8.1...v1.8.5)

---
updated-dependencies:
- dependency-name: pypa/gh-action-pypi-publish
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump actions/setup-python from 4.5.0 to 4.6.0 (OSOceanAcoustics#1036)

Bumps [actions/setup-python](https://github.com/actions/setup-python) from 4.5.0 to 4.6.0.
- [Release notes](https://github.com/actions/setup-python/releases)
- [Commits](actions/setup-python@v4.5.0...v4.6.0)

---
updated-dependencies:
- dependency-name: actions/setup-python
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump pypa/gh-action-pypi-publish from 1.8.5 to 1.8.6 (OSOceanAcoustics#1041)

Bumps [pypa/gh-action-pypi-publish](https://github.com/pypa/gh-action-pypi-publish) from 1.8.5 to 1.8.6.
- [Release notes](https://github.com/pypa/gh-action-pypi-publish/releases)
- [Commits](pypa/gh-action-pypi-publish@v1.8.5...v1.8.6)

---
updated-dependencies:
- dependency-name: pypa/gh-action-pypi-publish
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump mamba-org/provision-with-micromamba from 15 to 16 (OSOceanAcoustics#1048)

Bumps [mamba-org/provision-with-micromamba](https://github.com/mamba-org/provision-with-micromamba) from 15 to 16.
- [Release notes](https://github.com/mamba-org/provision-with-micromamba/releases)
- [Commits](mamba-org/provision-with-micromamba@v15...v16)

---
updated-dependencies:
- dependency-name: mamba-org/provision-with-micromamba
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump actions/setup-python from 4.6.0 to 4.6.1 (OSOceanAcoustics#1052)

Bumps [actions/setup-python](https://github.com/actions/setup-python) from 4.6.0 to 4.6.1.
- [Release notes](https://github.com/actions/setup-python/releases)
- [Commits](actions/setup-python@v4.6.0...v4.6.1)

---
updated-dependencies:
- dependency-name: actions/setup-python
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Extract out whether_write_to_zarr and remove use_swap arg

* Initial add of destination_path

* Start using destination path and storage options, modify cleanup

* Update docstrings, and add some checks

* Add sonar model attribute to parser object

* Pass sonar_model to parser and fix bug

* Move dataarrays creation to parsed2zarr

* Remove atexit registration

* Add default DEFAULT_ZARR_TEMP_DIR global var

* Initial test for parsed2zarr components

* Add explicit no swap for conversion test

* fix: add parsed2zarr_obj to self to fix bug

* fix: uncomment zarr store close to close the store

* fix: remove unneeded typing

* fix: no need to close fsmap zarr store anymore, removing code

* feat: write tx datagram to zarr in  during p2z

* feat: add  property for ek80 p2z

* feat: set  from swap array

* fix: missing transmit data when no swap

* fix: only delete col when it exists in p2z_ek80

* test(echodata): add simple P2Z object to utils mock

* fix: Import 'List' typehint

Added import for 'List' that is currently
in use but missing.

* fix: Remove elif for column removal

Changed the if elif to a for if so
it removes both 'power' and 'angle' columns
for RAW4 'tx_datagram_df' data.

* fix: Removed dependency to 'more-itertools'

Removed dependency to 'more-itertools' by using similar
method that uses 'numpy.array_split' instead to
evenly split data into desired chunks

* test: Set 'no_swap' for 'test_combine_echodata_combined_append'

Assign 'no_swap' to 'destination_path' during 'open_raw'
to ensure that everything is in memory since the new
parsed to zarr functionality is 'auto' by default.

* feat: Add 'auto' keyword to enable auto 'use_swap'

Changed the way that 'auto' determination of 'use_swap'
by specifying an 'auto' keyword, rather than by default.
Now defaulting back to 'no_swap' for empty 'destination_path'.

* revert: Removed 'no_swap' in 'test_combine_echodata_combined_append'

On OSOceanAcoustics@b6b79fa
'no_swap' was set, however because now the default is 'no_swap'
this shouldn't be needed!

* fix: Use convention yaml for 'backscatter_x'

Sets 'long_name' attributes for 'backscatter_r' and
'backscatter_i' from the convention yaml for p2z
outputs. Additionally, units changed from 'V' to 'dB' to
sync up with the "no_swap" counterpart.

Old tests for 'test_direct_to_zarr_integration' has been activated
again to ensure equivalency b/w the two methods.

* test: Added P2Z Arrays tests

Added testing for underlying methods that
gets called during a 'datagram_to_zarr'
call to ensure that zarr arrays actually gets
created.

Ref: OSOceanAcoustics#777

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored and ruxandra-valcu committed Sep 25, 2023
1 parent a949c5b commit d7095ec
Show file tree
Hide file tree
Showing 19 changed files with 905 additions and 392 deletions.
91 changes: 64 additions & 27 deletions echopype/convert/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ..utils.coding import COMPRESSION_SETTINGS
from ..utils.log import _init_logger
from ..utils.prov import add_processing_level
from .utils.ek import should_use_swap

BEAM_SUBGROUP_DEFAULT = "Beam_group1"

Expand Down Expand Up @@ -315,6 +316,8 @@ def open_raw(
convert_params: Optional[Dict[str, str]] = None,
storage_options: Optional[Dict[str, str]] = None,
use_swap: bool = False,
destination_path: Optional[str] = None,
destination_storage_options: Optional[Dict[str, str]] = None,
max_mb: int = 100,
) -> Optional[EchoData]:
"""Create an EchoData object containing parsed data from a single raw data file.
Expand Down Expand Up @@ -342,11 +345,21 @@ def open_raw(
convert_params : dict
parameters (metadata) that may not exist in the raw file
and need to be added to the converted file
storage_options : dict
storage_options : dict, optional
options for cloud storage
use_swap: bool
**DEPRECATED: This flag is ignored**
If True, variables with a large memory footprint will be
written to a temporary zarr store at ``~/.echopype/temp_output/parsed2zarr_temp_files``
destination_path: str
The path to a swap directory in a case of a large memory footprint.
This path can be a remote path like s3://bucket/swap_dir.
By default, it will create a temporary zarr store at
``~/.echopype/temp_output/parsed2zarr_temp_files`` if needed,
when set to "auto".
destination_storage_options: dict, optional
Options for remote storage for the swap directory ``destination_path``
argument.
max_mb : int
The maximum data chunk size in Megabytes (MB), when offloading
variables with a large memory footprint to a temporary zarr store
Expand All @@ -369,10 +382,23 @@ def open_raw(
Notes
-----
``use_swap=True`` is only available for the following
In a case of a large memory footprint, the program will determine if using
a temporary swap space is needed. If so, it will use that space
during conversion to prevent out of memory errors.
Users can override this behaviour by either passing ``"swap"`` or ``"no_swap"``
into the ``destination_path`` argument.
This feature is only available for the following
echosounders: EK60, ES70, EK80, ES80, EA640. Additionally, this feature
is currently in beta.
"""

# Initially set use_swap False
use_swap = False

# Set initial destination_path of "no_swap"
if destination_path is None:
destination_path = "no_swap"

if raw_file is None:
raise FileNotFoundError("The path to the raw data file must be specified.")

Expand Down Expand Up @@ -402,15 +428,6 @@ def open_raw(
# Check file extension and existence
file_chk, xml_chk = _check_file(raw_file, sonar_model, xml_path, storage_options)

# TODO: remove once 'auto' option is added
if not isinstance(use_swap, bool):
raise ValueError("use_swap must be of type bool.")

# Ensure use_swap is 'auto', if it is a string
# TODO: use the following when we allow for 'auto' option
# if isinstance(use_swap, str) and use_swap != "auto":
# raise ValueError("use_swap must be a bool or equal to 'auto'.")

# TODO: the if-else below only works for the AZFP vs EK contrast,
# but is brittle since it is abusing params by using it implicitly
if SONAR_MODELS[sonar_model]["xml"]:
Expand All @@ -423,29 +440,49 @@ def open_raw(

# Parse raw file and organize data into groups
parser = SONAR_MODELS[sonar_model]["parser"](
file_chk, params=params, storage_options=storage_options, dgram_zarr_vars=dgram_zarr_vars
file_chk,
params=params,
storage_options=storage_options,
dgram_zarr_vars=dgram_zarr_vars,
sonar_model=sonar_model,
)

parser.parse_raw()

# Direct offload to zarr and rectangularization only available for some sonar models
if sonar_model in ["EK60", "ES70", "EK80", "ES80", "EA640"]:
# Create sonar_model-specific p2z object
p2z = SONAR_MODELS[sonar_model]["parsed2zarr"](parser)

# Determines if writing to zarr is necessary and writes to zarr
p2z_flag = use_swap is True or (
use_swap == "auto" and p2z.whether_write_to_zarr(mem_mult=0.4)
)

if p2z_flag:
p2z.datagram_to_zarr(max_mb=max_mb)
# Rectangularize the transmit data
parser.rectangularize_transmit_ping_data(data_type="complex")
swap_map = {
"swap": True,
"no_swap": False,
}
if destination_path == "auto":
# Overwrite use_swap if it's True below
# Use local swap directory
use_swap = should_use_swap(parser.zarr_datagrams, dgram_zarr_vars, mem_mult=0.4)
elif destination_path in swap_map:
use_swap = swap_map[destination_path]
else:
# TODO: Add docstring about swap path
use_swap = True
if "://" in destination_path and destination_storage_options is None:
raise ValueError(
(
"Please provide storage options for remote destination. ",
"If access is already configured locally, ",
"simply put an empty dictionary.",
)
)

if use_swap:
# Create sonar_model-specific p2z object
p2z = SONAR_MODELS[sonar_model]["parsed2zarr"](parser)
p2z.datagram_to_zarr(
dest_path=destination_path,
dest_storage_options=destination_storage_options,
max_mb=max_mb,
)
else:
del p2z
# Create general p2z object
p2z = Parsed2Zarr(parser)
p2z = Parsed2Zarr(parser) # Create general p2z object
parser.rectangularize_data()

else:
Expand Down
4 changes: 2 additions & 2 deletions echopype/convert/parse_ad2cp.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ class NoMorePackets(Exception):


class ParseAd2cp(ParseBase):
def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}):
super().__init__(file, storage_options)
def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}, sonar_model="AD2CP"):
super().__init__(file, storage_options, sonar_model)
self.config = None
self.packets: List[Ad2cpDataPacket] = []

Expand Down
4 changes: 2 additions & 2 deletions echopype/convert/parse_azfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class ParseAZFP(ParseBase):
HEADER_FORMAT = ">HHHHIHHHHHHHHHHHHHHHHHHHHHHHHHHHHHBBBBHBBBBBBBBHHHHHHHHHHHHHHHHHHHH"
FILE_TYPE = 64770

def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}):
super().__init__(file, storage_options)
def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}, sonar_model="AZFP"):
super().__init__(file, storage_options, sonar_model)
# Parent class attributes
# regex pattern used to grab datetime embedded in filename
self.timestamp_pattern = FILENAME_DATETIME_AZFP
Expand Down
23 changes: 16 additions & 7 deletions echopype/convert/parse_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
class ParseBase:
"""Parent class for all convert classes."""

def __init__(self, file, storage_options):
def __init__(self, file, storage_options, sonar_model):
self.source_file = file
self.timestamp_pattern = None # regex pattern used to grab datetime embedded in filename
self.ping_time = [] # list to store ping time
self.storage_options = storage_options
self.zarr_datagrams = [] # holds all parsed datagrams
self.zarr_tx_datagrams = [] # holds all parsed transmit datagrams
self.sonar_model = sonar_model

def _print_status(self):
"""Prints message to console giving information about the raw file being parsed."""
Expand All @@ -31,8 +33,8 @@ def _print_status(self):
class ParseEK(ParseBase):
"""Class for converting data from Simrad echosounders."""

def __init__(self, file, params, storage_options, dgram_zarr_vars):
super().__init__(file, storage_options)
def __init__(self, file, params, storage_options, dgram_zarr_vars, sonar_model):
super().__init__(file, storage_options, sonar_model)

# Parent class attributes
# regex pattern used to grab datetime embedded in filename
Expand Down Expand Up @@ -78,6 +80,10 @@ def rectangularize_data(self):
for dgram in self.zarr_datagrams:
self._append_channel_ping_data(dgram, zarr_vars=False)

# append zarr transmit datagrams to channel ping data
for dgram in self.zarr_tx_datagrams:
self._append_channel_ping_data(dgram, rx=False, zarr_vars=False)

# Rectangularize all data and convert to numpy array indexed by channel
for data_type in ["power", "angle", "complex"]:
# Receive data
Expand Down Expand Up @@ -350,7 +356,7 @@ def _read_datagrams(self, fid):
else:
logger.info("Unknown datagram type: " + str(new_datagram["type"]))

def _append_zarr_dgram(self, full_dgram: dict):
def _append_zarr_dgram(self, full_dgram: dict, rx: bool):
"""
Selects a subset of the datagram values that
need to be sent directly to a zarr file and
Expand Down Expand Up @@ -389,7 +395,10 @@ def _append_zarr_dgram(self, full_dgram: dict):
reduced_datagram["power"] = reduced_datagram["power"].astype("float32") * INDEX2POWER

if reduced_datagram:
self.zarr_datagrams.append(reduced_datagram)
if rx:
self.zarr_datagrams.append(reduced_datagram)
else:
self.zarr_tx_datagrams.append(reduced_datagram)

def _append_channel_ping_data(self, datagram, rx=True, zarr_vars=True):
"""
Expand All @@ -411,10 +420,10 @@ def _append_channel_ping_data(self, datagram, rx=True, zarr_vars=True):
ch_id = datagram["channel_id"] if "channel_id" in datagram else datagram["channel"]

# append zarr variables, if they exist
if zarr_vars and rx:
if zarr_vars:
common_vars = set(self.dgram_zarr_vars.keys()).intersection(set(datagram.keys()))
if common_vars:
self._append_zarr_dgram(datagram)
self._append_zarr_dgram(datagram, rx=rx)
for var in common_vars:
del datagram[var]

Expand Down
4 changes: 2 additions & 2 deletions echopype/convert/parse_ek60.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
class ParseEK60(ParseEK):
"""Class for converting data from Simrad EK60 echosounders."""

def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}):
super().__init__(file, params, storage_options, dgram_zarr_vars)
def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}, sonar_model="EK60"):
super().__init__(file, params, storage_options, dgram_zarr_vars, sonar_model)

def _select_datagrams(self, params):
# Translates user input into specific datagrams or ALL
Expand Down
4 changes: 2 additions & 2 deletions echopype/convert/parse_ek80.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
class ParseEK80(ParseEK):
"""Class for converting data from Simrad EK80 echosounders."""

def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}):
super().__init__(file, params, storage_options, dgram_zarr_vars)
def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}, sonar_model="EK80"):
super().__init__(file, params, storage_options, dgram_zarr_vars, sonar_model)
self.environment = {} # dictionary to store environment data

def _select_datagrams(self, params):
Expand Down
Loading

0 comments on commit d7095ec

Please sign in to comment.