diff --git a/.github/workflows/python_ci.yaml b/.github/workflows/python_ci.yaml index a240294d..dee4a54f 100644 --- a/.github/workflows/python_ci.yaml +++ b/.github/workflows/python_ci.yaml @@ -27,7 +27,7 @@ jobs: - name: Pip install run: | python -m pip install --upgrade pip - pip install '.[development,openssl,tdms,rosbags]' + pip install '.[development,openssl,tdms,rosbags,hdf5]' - name: Lint run: | ruff check diff --git a/python/examples/data_import/hdf5/main.py b/python/examples/data_import/hdf5/main.py new file mode 100644 index 00000000..81998300 --- /dev/null +++ b/python/examples/data_import/hdf5/main.py @@ -0,0 +1,72 @@ +import os + +import h5py +from dotenv import load_dotenv +from sift_py.data_import.config import Hdf5Config +from sift_py.data_import.hdf5 import Hdf5UploadService +from sift_py.rest import SiftRestConfig + +if __name__ == "__main__": + """ + Example of uploading an hdf5 into Sift. + """ + + load_dotenv() + + sift_uri = os.getenv("SIFT_API_URI") + assert sift_uri, "expected 'SIFT_API_URI' environment variable to be set" + + apikey = os.getenv("SIFT_API_KEY") + assert apikey, "expected 'SIFT_API_KEY' environment variable to be set" + + asset_name = os.getenv("ASSET_NAME") + assert asset_name, "expected 'ASSET_NAME' environment variable to be set" + + # Create an HDF5 configuration file to define the data to be ingested + hdf5_config_dict = { + "asset_name": asset_name, + "time": { + "format": "TIME_FORMAT_ABSOLUTE_DATETIME", + }, + "data": [], + } + + # For this example, each HDF5 dataset uses the common '/timestamp' dataset + # Each is of type double and contains its channel name in the 'Name' attribute + with h5py.File("sample_data.h5", "r") as f: + for dset in f.values(): + # Skip adding the timestamp dataset + if dset.name == "/timestamp": + continue + + hdf5_config_dict["data"].append( + { + "name": dset.attrs["Name"], + "time_dataset": "/timestamp", + "value_dataset": dset.name, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + } + ) + + hdf5_config = Hdf5Config(hdf5_config_dict) + + rest_config: SiftRestConfig = { + "uri": sift_uri, + "apikey": apikey, + } + + hdf5_upload_service = Hdf5UploadService(rest_config) + import_services = hdf5_upload_service.upload( + "sample_data.h5", + hdf5_config, + ) + + # Wait until the data import is completed. + # The hdf5 upload service may split the upload into multiple parts + data_imports = [import_svc.wait_until_complete() for import_svc in import_services] + + # Print the data import details and final status. + for data_import in data_imports: + print(data_import.model_dump_json(indent=1)) + + print("Upload example complete!") diff --git a/python/examples/data_import/hdf5/requirements.txt b/python/examples/data_import/hdf5/requirements.txt new file mode 100644 index 00000000..04a1ef53 --- /dev/null +++ b/python/examples/data_import/hdf5/requirements.txt @@ -0,0 +1,2 @@ +python-dotenv +sift-stack-py[hdf5] \ No newline at end of file diff --git a/python/examples/data_import/hdf5/sample_data.h5 b/python/examples/data_import/hdf5/sample_data.h5 new file mode 100644 index 00000000..24a268a7 Binary files /dev/null and b/python/examples/data_import/hdf5/sample_data.h5 differ diff --git a/python/lib/sift_py/data_import/_config.py b/python/lib/sift_py/data_import/_config.py index 354cb99b..bd19c5fe 100644 --- a/python/lib/sift_py/data_import/_config.py +++ b/python/lib/sift_py/data_import/_config.py @@ -20,94 +20,9 @@ class ConfigBaseModel(BaseModel): model_config = ConfigDict(extra="forbid") -class CsvConfigImpl(ConfigBaseModel): - """ - Defines the CSV config spec. - """ - - asset_name: str - run_name: str = "" - run_id: str = "" - first_data_row: int - time_column: TimeColumn - data_columns: Dict[int, DataColumn] - - @model_validator(mode="after") - def validate_config(self) -> Self: - if not self.data_columns: - raise PydanticCustomError("invalid_config_error", "Empty 'data_columns'") - - if self.run_name and self.run_id: - raise PydanticCustomError( - "invalid_config_error", "Only specify run_name or run_id, not both." - ) - - return self - - -class EnumType(ConfigBaseModel, ChannelEnumType): +class ConfigDataModel(ConfigBaseModel): """ - Defines an enum entry in the CSV config. - """ - - -class BitFieldElement(ConfigBaseModel, ChannelBitFieldElement): - """ - Defines a bit field element entry in the CSV config. - """ - - -class TimeColumn(ConfigBaseModel): - """ - Defines a time column entry in the CSV config. - """ - - format: Union[str, TimeFormatType] - column_number: int - relative_start_time: Optional[str] = None - - @field_validator("format", mode="before") - @classmethod - def convert_format(cls, raw: Union[str, TimeFormatType]) -> str: - """ - Converts the provided format value to a string. - """ - if isinstance(raw, TimeFormatType): - return raw.as_human_str() - elif isinstance(raw, str): - value = TimeFormatType.from_str(raw) - if value is not None: - return value.as_human_str() - - raise PydanticCustomError("invalid_config_error", f"Invalid time format: {raw}.") - - @model_validator(mode="after") - def validate_time(self) -> Self: - """ - Validates the provided time format. - """ - format = TimeFormatType.from_str(self.format) # type: ignore - if format is None: - raise PydanticCustomError( - "invalid_config_error", f"Invalid time format: {self.format}." - ) - - if format.is_relative(): - if self.relative_start_time is None: - raise PydanticCustomError("invalid_config_error", "Missing 'relative_start_time'") - else: - if self.relative_start_time is not None: - raise PydanticCustomError( - "invalid_config_error", - "'relative_start_time' specified for non relative time format.", - ) - - return self - - -class DataColumn(ConfigBaseModel): - """ - Defines a data column entry in the CSV config. + Base DataModel with common functionality """ name: str @@ -185,3 +100,142 @@ def validate_bit_fields(self) -> Self: ) return self + + +class ConfigTimeModel(ConfigBaseModel): + """ + Base TimeModel with common functionality + """ + + format: Union[str, TimeFormatType] + relative_start_time: Optional[str] = None + + @field_validator("format", mode="before") + @classmethod + def convert_format(cls, raw: Union[str, TimeFormatType]) -> str: + """ + Converts the provided format value to a string. + """ + if isinstance(raw, TimeFormatType): + return raw.as_human_str() + elif isinstance(raw, str): + value = TimeFormatType.from_str(raw) + if value is not None: + return value.as_human_str() + + raise PydanticCustomError("invalid_config_error", f"Invalid time format: {raw}.") + + @model_validator(mode="after") + def validate_time(self) -> Self: + """ + Validates the provided time format. + """ + format = TimeFormatType.from_str(self.format) # type: ignore + if format is None: + raise PydanticCustomError( + "invalid_config_error", f"Invalid time format: {self.format}." + ) + + if format.is_relative(): + if self.relative_start_time is None: + raise PydanticCustomError("invalid_config_error", "Missing 'relative_start_time'") + else: + if self.relative_start_time is not None: + raise PydanticCustomError( + "invalid_config_error", + "'relative_start_time' specified for non relative time format.", + ) + + return self + + +class CsvConfigImpl(ConfigBaseModel): + """ + Defines the CSV config spec. + """ + + asset_name: str + run_name: str = "" + run_id: str = "" + first_data_row: int + time_column: TimeColumn + data_columns: Dict[int, DataColumn] + + @model_validator(mode="after") + def validate_config(self) -> Self: + if not self.data_columns: + raise PydanticCustomError("invalid_config_error", "Empty 'data_columns'") + + if self.run_name and self.run_id: + raise PydanticCustomError( + "invalid_config_error", "Only specify run_name or run_id, not both." + ) + + return self + + +class Hdf5ConfigImpl(ConfigBaseModel): + """ + Defines the HDF5 config spec + """ + + asset_name: str + run_name: str = "" + run_id: str = "" + time: TimeCfg + data: List[Hdf5DataCfg] + + @model_validator(mode="after") + def validate_config(self) -> Self: + if not self.data: + raise PydanticCustomError("invalid_config_error", "Empty 'data'") + + if self.run_name and self.run_id: + raise PydanticCustomError( + "invalid_config_error", "Only specify run_name or run_id, not both." + ) + + return self + + +class EnumType(ConfigBaseModel, ChannelEnumType): + """ + Defines an enum entry in the CSV config. + """ + + +class BitFieldElement(ConfigBaseModel, ChannelBitFieldElement): + """ + Defines a bit field element entry in the CSV config. + """ + + +class TimeColumn(ConfigTimeModel): + """ + Defines a time column entry in the CSV config. + """ + + column_number: int + + +class DataColumn(ConfigDataModel): + """ + Defines a data column entry in the CSV config. + """ + + +class TimeCfg(ConfigTimeModel): + """ + Defines a time entry in the generic file config. + """ + + +class Hdf5DataCfg(ConfigDataModel): + """ + Defines a data entry in the HDF5 config. + """ + + time_dataset: str + time_column: int = 1 + value_dataset: str + value_column: int = 1 diff --git a/python/lib/sift_py/data_import/_config_test.py b/python/lib/sift_py/data_import/_config_test.py index be6c360e..b0f20d9d 100644 --- a/python/lib/sift_py/data_import/_config_test.py +++ b/python/lib/sift_py/data_import/_config_test.py @@ -1,6 +1,8 @@ +import pydantic_core import pytest -from sift_py.data_import.config import CsvConfig +from sift_py.data_import._config import ConfigDataModel, ConfigTimeModel +from sift_py.data_import.config import CsvConfig, Hdf5Config from sift_py.data_import.time_format import TimeFormatType from sift_py.error import SiftAPIDeprecationWarning from sift_py.ingestion.channel import ChannelDataType @@ -24,12 +26,38 @@ def csv_config_data(): } +@pytest.fixture +def hdf5_config_data(): + return { + "asset_name": "test_asset", + "time": { + "format": "TIME_FORMAT_ABSOLUTE_DATETIME", + }, + "data": [ + { + "name": "channel1", + "time_dataset": "/channel1", + "value_dataset": "/channel1", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + + def test_empty_data_columns(csv_config_data: dict): csv_config_data["data_columns"] = {} with pytest.raises(Exception, match="Empty 'data_columns'"): CsvConfig(csv_config_data) +def test_empty_data_columns_hdf5(hdf5_config_data: dict): + hdf5_config_data["data"] = [] + with pytest.raises(Exception, match="Empty 'data'"): + Hdf5Config(hdf5_config_data) + + def test_run_name_and_run_id(csv_config_data: dict): csv_config_data["run_name"] = "Run Title" csv_config_data["run_id"] = "1c5546b4-ee53-460b-9205-4dc3980c200f" @@ -37,6 +65,13 @@ def test_run_name_and_run_id(csv_config_data: dict): CsvConfig(csv_config_data) +def test_run_name_and_run_id_hdf5(hdf5_config_data: dict): + hdf5_config_data["run_name"] = "Run Title" + hdf5_config_data["run_id"] = "1c5546b4-ee53-460b-9205-4dc3980c200f" + with pytest.raises(Exception, match="Only specify run_name or run_id, not both"): + Hdf5Config(hdf5_config_data) + + def test_data_column_validation(csv_config_data: dict): csv_config_data["data_columns"] = { 1: { @@ -68,6 +103,59 @@ def test_data_column_validation(csv_config_data: dict): assert cfg._csv_config.data_columns[1].name == "component.channel" +def test_data_column_validation_hdf5(hdf5_config_data: dict): + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "INVALID_DATA_TYPE", + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + + with pytest.raises(Exception, match="Invalid data_type:"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": complex, + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + with pytest.raises(Exception, match="Invalid data_type:"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["data"] = [ + { + "name": "channel_bool", + "data_type": ChannelDataType.BOOL, + "time_dataset": "channel", + "value_dataset": "channel", + }, + { + "name": "channel_double", + "data_type": ChannelDataType.DOUBLE, + "time_dataset": "channel", + "value_dataset": "channel", + }, + { + "name": "channel_int", + "data_type": ChannelDataType.INT_64, + "time_dataset": "channel", + "value_dataset": "channel", + }, + { + "name": "channel_str", + "data_type": ChannelDataType.STRING, + "time_dataset": "channel", + "value_dataset": "channel", + }, + ] + Hdf5Config(hdf5_config_data) + + def test_enums(csv_config_data: dict): csv_config_data["data_columns"] = { 1: { @@ -108,6 +196,52 @@ def test_enums(csv_config_data: dict): CsvConfig(csv_config_data) +def test_enums_hdf5(hdf5_config_data: dict): + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "CHANNEL_DATA_TYPE_INT_32", + "enum_types": [ + {"key": 1, "name": "value_1"}, + {"key": 2, "name": "value_2"}, + ], + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + with pytest.raises(Exception, match="Enums can only be specified"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "CHANNEL_DATA_TYPE_ENUM", + "enum_types": [ + {"key": 1, "name": "value_1", "extra_key": "value"}, + {"key": 2, "name": "value_2"}, + ], + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + with pytest.raises(Exception, match="validation error"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "CHANNEL_DATA_TYPE_ENUM", + "enum_types": [ + {"key": 1, "name": "value_1"}, + {"key": 2, "name": "value_2"}, + ], + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + Hdf5Config(hdf5_config_data) + + def test_bit_field(csv_config_data: dict): csv_config_data["data_columns"] = { 1: { @@ -150,6 +284,54 @@ def test_bit_field(csv_config_data: dict): CsvConfig(csv_config_data) +def test_bit_field_hdf5(hdf5_config_data: dict): + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "CHANNEL_DATA_TYPE_INT_32", + "bit_field_elements": [ + {"index": 1, "name": "bit_field_name_1", "bit_count": 4}, + ], + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + with pytest.raises(Exception, match="Bit fields can only be specified"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "CHANNEL_DATA_TYPE_INT_32", + "bit_field_elements": [ + { + "index": 1, + "name": "bit_field_name_1", + "bit_count": 4, + "extra_key": "value", + }, + ], + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + with pytest.raises(Exception, match="validation error"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "CHANNEL_DATA_TYPE_BIT_FIELD", + "bit_field_elements": [ + {"index": 1, "name": "bit_field_name_1", "bit_count": 4}, + ], + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + Hdf5Config(hdf5_config_data) + + def test_time_column(csv_config_data: dict): csv_config_data["time_column"] = { "format": "INVALID_TIME_FORMAT", @@ -180,3 +362,53 @@ def test_time_column(csv_config_data: dict): "column_number": 1, } CsvConfig(csv_config_data) + + +def test_time_column_hdf5(hdf5_config_data: dict): + hdf5_config_data["time"] = { + "format": "INVALID_TIME_FORMAT", + } + with pytest.raises(Exception, match="Invalid time format"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["time"] = { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + } + with pytest.raises(Exception, match="Missing 'relative_start_time'"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["time"] = { + "format": "TIME_FORMAT_ABSOLUTE_UNIX_SECONDS", + "relative_start_time": "100", + } + with pytest.raises( + Exception, match="'relative_start_time' specified for non relative time format." + ): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["time"] = { + "format": TimeFormatType.ABSOLUTE_DATETIME, + } + Hdf5Config(hdf5_config_data) + + +def test_config_time_model_extra_field(): + time_cfg = { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": 123456789, + "extra_field": 0, + } + + with pytest.raises( + pydantic_core._pydantic_core.ValidationError, match="Extra inputs are not permitted" + ): + ConfigTimeModel(**time_cfg) + + +def test_config_data_model_extra_field(): + data_cfg = {"name": "testname", "data_type": float, "extra_field": 0} + + with pytest.raises( + pydantic_core._pydantic_core.ValidationError, match="Extra inputs are not permitted" + ): + ConfigDataModel(**data_cfg) diff --git a/python/lib/sift_py/data_import/_hdf5_test.py b/python/lib/sift_py/data_import/_hdf5_test.py new file mode 100644 index 00000000..17557704 --- /dev/null +++ b/python/lib/sift_py/data_import/_hdf5_test.py @@ -0,0 +1,857 @@ +from typing import Dict + +import h5py # type: ignore +import numpy as np +import polars as pl # type: ignore +import pytest +from pytest_mock import MockFixture + +from sift_py.data_import.config import Hdf5Config +from sift_py.data_import.hdf5 import ( + Hdf5UploadService, + _convert_hdf5_to_dataframes, + _create_csv_config, + _extract_hdf5_data_to_dataframe, + _merge_ts_dataframes, + _split_hdf5_configs, +) + + +class MockHdf5File: + def __init__(self, data_dict: Dict): + self.data_dict = data_dict + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def __getitem__(self, key): + return MockHdf5Dataset(self.data_dict[key]) + + def __contains__(self, key): + return key in self.data_dict + + +class MockHdf5Dataset: + def __init__(self, data): + self.data = data + + def __getitem__(self, key): + return self.data[key] + + +@pytest.fixture +def rest_config(): + return { + "uri": "some_uri.com", + "apikey": "123456789", + } + + +@pytest.fixture +def hdf5_config(): + return Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/DoubleChannel", + "value_dataset": "/DoubleChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + { + "name": "DoubleChannelInGroup", + "time_dataset": "/testgrp/DoubleChannelInGroup", + "value_dataset": "/testgrp/DoubleChannelInGroup", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + { + "name": "StringChannel1", + "time_dataset": "/StringChannel1", + "value_dataset": "/StringChannel1", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_STRING", + }, + { + "name": "BinaryStringChannel2", + "time_dataset": "/BinaryStringChannel2", + "value_dataset": "/BinaryStringChannel2", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_STRING", + }, + { + "name": "EnumChannel", + "time_dataset": "/EnumChannel", + "value_dataset": "/EnumChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_ENUM", + "enum_types": [ + {"key": 1, "name": "On"}, + {"key": 0, "name": "Off"}, + ], + }, + { + "name": "BitFieldChannel", + "time_dataset": "/BitFieldChannel", + "value_dataset": "/BitFieldChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_BIT_FIELD", + "bit_field_elements": [ + {"index": 0, "name": "flag1", "bit_count": 4}, + {"index": 4, "name": "flag2", "bit_count": 4}, + ], + }, + { + "name": "BoolChannel", + "time_dataset": "/BoolChannel", + "value_dataset": "/BoolChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_BOOL", + }, + { + "name": "FloatChannel", + "time_dataset": "/FloatChannel", + "value_dataset": "/FloatChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_FLOAT", + }, + { + "name": "Int32Channel", + "time_dataset": "/Int32Channel", + "value_dataset": "/Int32Channel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_INT_32", + }, + { + "name": "Int64Channel", + "time_dataset": "/Int64Channel", + "value_dataset": "/Int64Channel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_INT_64", + }, + { + "name": "UInt32Channel", + "time_dataset": "/UInt32Channel", + "value_dataset": "/UInt32Channel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_UINT_32", + }, + { + "name": "UInt64Channel", + "time_dataset": "/UInt64Channel", + "value_dataset": "/UInt64Channel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_UINT_64", + }, + ], + } + ) + + +@pytest.fixture +def hdf5_data_dict(): + return { + "/DoubleChannel": np.array( + list(zip([0, 1, 2], [1.0, 2.0, 3.0])), dtype=[("time", np.int64), ("value", np.float64)] + ), + "/testgrp/DoubleChannelInGroup": np.array( + list(zip([4, 5, 6], [-1.0, -2.0, -3.0])), + dtype=[("time", np.int64), ("value", np.float64)], + ), + "/StringChannel1": np.array( + list(zip([0, 1, 2], ["a", "b", "c"])), + dtype=[("time", np.int64), ("value", h5py.string_dtype("utf-8"))], + ), + "/BinaryStringChannel2": np.array( + list(zip([0, 1, 2], [b"a", b"b", b"c"])), + dtype=[("time", np.int64), ("value", h5py.string_dtype("ascii"))], + ), + "/EnumChannel": np.array( + list(zip([0, 1, 2], [1, 0, 1])), dtype=[("time", np.int64), ("value", np.int32)] + ), + "/BitFieldChannel": np.array( + list(zip([0, 1, 2], [15, 240, 15])), dtype=[("time", np.int64), ("value", np.int32)] + ), + "/BoolChannel": np.array( + list(zip([0, 1, 2], [True, False, True])), + dtype=[("time", np.int64), ("value", np.bool_)], + ), + "/FloatChannel": np.array( + list(zip([0, 1, 2], [1.1, 2.2, 3.3])), dtype=[("time", np.int64), ("value", np.float32)] + ), + "/Int32Channel": np.array( + list(zip([0, 1, 2], [10, 20, 30])), dtype=[("time", np.int64), ("value", np.int32)] + ), + "/Int64Channel": np.array( + list(zip([0, 1, 2], [10000000000, 20000000000, 30000000000])), + dtype=[("time", np.int64), ("value", np.int64)], + ), + "/UInt32Channel": np.array( + list(zip([0, 1, 2], [1000, 2000, 3000])), + dtype=[("time", np.int64), ("value", np.uint32)], + ), + "/UInt64Channel": np.array( + list(zip([0, 1, 2], [1000000000000, 2000000000000, 3000000000000])), + dtype=[("time", np.int64), ("value", np.uint64)], + ), + } + + +def test_hdf5_upload_service_valid_path(mocker: MockFixture, rest_config, hdf5_config): + mock_path_is_file = mocker.patch("pathlib.Path.is_file") + mock_path_is_file.return_value = False + + with pytest.raises(Exception, match="does not point to a regular file"): + svc = Hdf5UploadService(rest_config) + svc.upload(path="badpath.h5", hdf5_config=hdf5_config) + + +def test_split_hdf5_configs_splits_strings(hdf5_config): + configs = _split_hdf5_configs(hdf5_config) + # Should split into 1 non-string and 2 string configs (StringChannel1 and StringChannel2) + string_configs = [ + cfg for cfg in configs if cfg._hdf5_config.data[0].data_type == "CHANNEL_DATA_TYPE_STRING" + ] + non_string_configs = [ + cfg for cfg in configs if cfg._hdf5_config.data[0].data_type != "CHANNEL_DATA_TYPE_STRING" + ] + assert len(configs) == 3 + assert len(string_configs) == 2 + assert len(non_string_configs) == 1 + + +def test_create_csv_config(mocker: MockFixture, hdf5_config): + # Use a reverse list to make sure the order has changed + data_cols = [d_cfg.name for d_cfg in hdf5_config._hdf5_config.data][::-1] + columns = ["timestamp"] + data_cols + merged_df = pl.DataFrame({col: [] for col in columns}) + + csv_cfg = _create_csv_config(hdf5_config, merged_df) + csv_cfg_dict = csv_cfg.to_dict() + assert "time_column" in csv_cfg_dict + assert "data_columns" in csv_cfg_dict + assert len(csv_cfg_dict["data_columns"]) == 12 + + for csv_col, df_col in zip(csv_cfg_dict["data_columns"].values(), merged_df.columns[1:]): + assert csv_col["name"] == df_col + + +def test_convert_hdf5_to_dataframes(mocker: MockFixture, hdf5_config, hdf5_data_dict): + mocker.patch("h5py.File", return_value=MockHdf5File(hdf5_data_dict)) + + expected_col_count = len(hdf5_data_dict) + 1 + time_stamps = [] + for data in hdf5_data_dict.values(): + for row in data: + time_stamps.append(row[0]) + expected_row_count = len(set(time_stamps)) + + df = _convert_hdf5_to_dataframes("mock.h5", hdf5_config) + + # Dataframe should have cols == parameter count + 1 (timestamps) and rows == unique timestamps + assert df.shape == (expected_row_count, expected_col_count) + + +def test_two_dataset_extraction(): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "Channel1", + "time_dataset": "/Channel1_Time", + "value_dataset": "/Channel1_Value", + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/Channel1_Time": np.array([0, 1, 2], dtype=np.int64), + "/Channel1_Value": np.array([1.0, 2.0, 3.0], dtype=np.float64), + } + + mock_file = MockHdf5File(data_dict) + + for data_cfg in hdf5_config._hdf5_config.data: + df = _extract_hdf5_data_to_dataframe( + mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg] + ) + assert df.shape == (3, 2) + assert df.columns[1] == data_cfg.name + assert (np.array(df[df.columns[0]]) == data_dict["/Channel1_Time"]).all() + assert (np.array(df[df.columns[1]]) == data_dict["/Channel1_Value"]).all() + + +def test_multi_col_dataset_extraction(): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "Channel1", + "time_dataset": "/Channel1", + "value_dataset": "/Channel1", + "time_column": 4, + "value_column": 3, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/Channel1": [ + np.array([9, 9, 9], dtype=np.int64), + np.array([9, 9, 9], dtype=np.int64), + np.array([1.0, 2.0, 3.0], dtype=np.float64), + np.array([0, 1, 2], dtype=np.int64), + ], + } + + mock_file = MockHdf5File(data_dict) + + for data_cfg in hdf5_config._hdf5_config.data: + df = _extract_hdf5_data_to_dataframe( + mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg] + ) + assert df.shape == (3, 2) + assert df.columns[1] == data_cfg.name + assert (np.array(df[df.columns[0]]) == data_dict["/Channel1"][3]).all() + assert (np.array(df[df.columns[1]]) == data_dict["/Channel1"][2]).all() + + +def test_string_conversion(): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "StringChannel1", + "time_dataset": "/StringChannel1", + "value_dataset": "/StringChannel1", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_STRING", + }, + { + "name": "BinaryStringChannel2", + "time_dataset": "/BinaryStringChannel2", + "value_dataset": "/BinaryStringChannel2", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_STRING", + }, + ], + } + ) + + data_dict = { + "/StringChannel1": np.array( + list(zip([0, 1, 2], ["a", "b", "cat"])), + dtype=[("time", np.int64), ("value", h5py.string_dtype("utf-8"))], + ), + "/BinaryStringChannel2": np.array( + list(zip([0, 1, 2], [b"a", b"b", b"cat"])), + dtype=[("time", np.int64), ("value", h5py.string_dtype("ascii"))], + ), + } + + mock_file = MockHdf5File(data_dict) + + for data_cfg in hdf5_config._hdf5_config.data: + df = _extract_hdf5_data_to_dataframe( + mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg] + ) + assert (np.array(df[data_cfg.name]) == np.array(["a", "b", "cat"])).all() + + +def test_bitfield_conversion(): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "bitfield1", + "time_dataset": "/bitChannel1", + "value_dataset": "/bitChannel1", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_BIT_FIELD", + "bit_field_elements": [ + {"index": 0, "name": "flag1", "bit_count": 4}, + {"index": 4, "name": "flag2", "bit_count": 4}, + ], + } + ], + } + ) + + data_dict = { + "/bitChannel1": np.array( + list(zip([0, 1, 2], [0, 2_147_483_647, 15])), + dtype=[("time", np.int64), ("value", np.int32)], + ), + } + + mock_file = MockHdf5File(data_dict) + + for data_cfg in hdf5_config._hdf5_config.data: + df = _extract_hdf5_data_to_dataframe( + mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg] + ) + assert (np.array(df["timestamp"]) == np.array([0, 1, 2])).all() + assert (np.array(df[data_cfg.name]) == np.array([0, 2_147_483_647, 15])).all() + + +def test_enum_conversion(): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "EnumChannel", + "time_dataset": "/EnumChannel", + "value_dataset": "/EnumChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_ENUM", + "enum_types": [ + {"key": 1, "name": "On"}, + {"key": 0, "name": "Off"}, + {"key": 2_147_483_647, "name": "Invalid"}, + ], + }, + ], + } + ) + + data_dict = { + "/EnumChannel": np.array( + list(zip([0, 1, 2], [1, 0, 2_147_483_647])), + dtype=[("time", np.int64), ("value", np.int32)], + ), + } + + mock_file = MockHdf5File(data_dict) + + for data_cfg in hdf5_config._hdf5_config.data: + df = _extract_hdf5_data_to_dataframe( + mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg] + ) + assert (np.array(df["timestamp"]) == np.array([0, 1, 2])).all() + assert (np.array(df[data_cfg.name]) == np.array([1, 0, 2_147_483_647])).all() + + +def test_time_value_len_diff(): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/time", + "value_dataset": "/data", + "time_column": 1, + "value_column": 1, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/time": np.array([0, 1, 2], dtype=np.int64), + "/data": np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float64), + } + + mock_file = MockHdf5File(data_dict) + + for data_cfg in hdf5_config._hdf5_config.data: + with pytest.raises(Exception, match="time and value columns have different lengths"): + _extract_hdf5_data_to_dataframe( + mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg] + ) + + +def test_hdf5_to_dataframe_conversion(mocker: MockFixture, hdf5_config, hdf5_data_dict): + mocker.patch("h5py.File", return_value=MockHdf5File(hdf5_data_dict)) + name_dataframe_map = {data.name: data.value_dataset for data in hdf5_config._hdf5_config.data} + + df: pl.DataFrame = _convert_hdf5_to_dataframes("mock.h5", hdf5_config) + + for name, value_dataset in name_dataframe_map.items(): + assert name in df.columns + + # Remove nulls since they won't be in original data + data = df[name].filter(df[name].is_not_null()) + assert len(data) == len(hdf5_data_dict[value_dataset]) + + +def test_bad_time_col(mocker: MockFixture): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/DoubleChannel", + "value_dataset": "/DoubleChannel", + "time_column": 2, + "value_column": 1, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/DoubleChannel": np.array([0, 1, 2], dtype=np.int64), + } + + mocker.patch("h5py.File", return_value=MockHdf5File(data_dict)) + + with pytest.raises(Exception, match="time_column=2 out of range"): + _convert_hdf5_to_dataframes("mock.h5", hdf5_config) + + +def test_bad_val_col(mocker: MockFixture): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/DoubleChannel", + "value_dataset": "/DoubleChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/DoubleChannel": np.array([0, 1, 2], dtype=np.int64), + } + + mocker.patch("h5py.File", return_value=MockHdf5File(data_dict)) + + with pytest.raises(Exception, match="value_column=2 out of range"): + _convert_hdf5_to_dataframes("mock.h5", hdf5_config) + + +def test_missing_time_data(mocker: MockFixture): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/DoubleChannelTime", + "value_dataset": "/DoubleChannelValue", + "time_column": 1, + "value_column": 1, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/DoubleChannelValue": np.array([0, 1, 2], dtype=np.int64), + } + + mocker.patch("h5py.File", return_value=MockHdf5File(data_dict)) + + with pytest.raises(Exception, match="HDF5 file does not contain dataset"): + _convert_hdf5_to_dataframes("mock.h5", hdf5_config) + + +def test_missing_value_data(mocker: MockFixture): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/DoubleChannelTime", + "value_dataset": "/DoubleChannelValue", + "time_column": 1, + "value_column": 1, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/DoubleChannelTime": np.array([0, 1, 2], dtype=np.int64), + } + + mocker.patch("h5py.File", return_value=MockHdf5File(data_dict)) + + with pytest.raises(Exception, match="HDF5 file does not contain dataset"): + _convert_hdf5_to_dataframes("mock.h5", hdf5_config) + + +def test_hdf5_upload(mocker: MockFixture, hdf5_config, hdf5_data_dict, rest_config): + mock_path_is_file = mocker.patch("pathlib.Path.is_file") + mock_path_is_file.return_value = True + + mocker.patch("h5py.File", return_value=MockHdf5File(hdf5_data_dict)) + + mock_csv_upload = mocker.patch("sift_py.data_import.csv.CsvUploadService.upload") + + svc = Hdf5UploadService(rest_config) + import_services = svc.upload( + "mock.h5", + hdf5_config, + ) + + mock_csv_upload.assert_called() + assert len(import_services) == 3 + + +def test_hdf5_upload_string_timestamps(mocker: MockFixture, hdf5_config, rest_config): + mock_path_is_file = mocker.patch("pathlib.Path.is_file") + mock_path_is_file.return_value = True + + data_dict = { + "/timestamps": np.array( + [ + b"2024-10-07 17:00:09.982126", + b"2024-10-07 17:00:10.022126", + b"2024-10-07 17:00:10.062126", + ] + ), + "/DoubleChannel": np.array([0, 1, 2], dtype=np.int64), + } + + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_ABSOLUTE_DATETIME", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/timestamps", + "value_dataset": "/DoubleChannel", + "time_column": 1, + "value_column": 1, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + mocker.patch("h5py.File", return_value=MockHdf5File(data_dict)) + + mock_csv_upload = mocker.patch("sift_py.data_import.csv.CsvUploadService.upload") + + svc = Hdf5UploadService(rest_config) + svc.upload( + "mock.h5", + hdf5_config, + ) + + mock_csv_upload.assert_called() + + +def test_merge_ts_dataframes_no_duplicates(): + """Test merging dataframes with no duplicate channels""" + df1 = pl.DataFrame({"timestamp": [0, 1, 2], "channel1": [1.0, 2.0, 3.0]}) + df2 = pl.DataFrame({"timestamp": [1, 2, 3], "channel2": [4.0, 5.0, 6.0]}) + + result = _merge_ts_dataframes(df1, df2) + + assert result.shape == (4, 3) + assert "timestamp" in result.columns + assert "channel1" in result.columns + assert "channel2" in result.columns + result = result.sort("timestamp") + assert result["timestamp"].to_list() == [0, 1, 2, 3] + assert result["channel1"].to_list() == [1.0, 2.0, 3.0, None] + assert result["channel2"].to_list() == [None, 4.0, 5.0, 6.0] + + +def test_merge_ts_dataframes_with_duplicates(): + """Test merging dataframes with duplicate channel names""" + df1 = pl.DataFrame( + {"timestamp": [0, 1, 2], "channel1": [1.0, 2.0, 3.0], "common_channel": [10.0, 20.0, 30.0]} + ) + df2 = pl.DataFrame( + {"timestamp": [1, 2, 3], "channel2": [4.0, 5.0, 6.0], "common_channel": [40.0, 50.0, 60.0]} + ) + + result = _merge_ts_dataframes(df1, df2) + + assert result.shape == (4, 4) + assert "timestamp" in result.columns + assert "channel1" in result.columns + assert "channel2" in result.columns + assert "common_channel" in result.columns + + result = result.sort("timestamp") + + # Check that values are coalesced properly + common_values = result["common_channel"].to_list() + assert common_values == [10.0, 20.0, 30.0, 60.0] + + +def test_merge_ts_dataframes_with_nulls(): + """Test merging dataframes where one has null values""" + df1 = pl.DataFrame( + {"timestamp": [0, 1, 2], "channel1": [1.0, None, 3.0], "common_channel": [10.0, None, 30.0]} + ) + df2 = pl.DataFrame( + {"timestamp": [1, 2, 3], "channel2": [4.0, 5.0, 6.0], "common_channel": [40.0, 50.0, 60.0]} + ) + + result = _merge_ts_dataframes(df1, df2) + + assert result.shape == (4, 4) + + timestamps = result["timestamp"].to_list() + common_values = result["common_channel"].to_list() + + # At timestamp 1: df1 has null, so should use df2 value (40.0) + assert common_values[timestamps.index(1)] == 40.0 + # At timestamp 2: df1 has 30.0, so should use df1 value + assert common_values[timestamps.index(2)] == 30.0 + + +def test_merge_ts_dataframes_empty_dataframes(): + """Test merging empty dataframes""" + df1 = pl.DataFrame({"timestamp": [], "channel1": []}) + df2 = pl.DataFrame({"timestamp": [], "channel2": []}) + + result = _merge_ts_dataframes(df1, df2) + + assert result.shape == (0, 3) + assert "timestamp" in result.columns + assert "channel1" in result.columns + assert "channel2" in result.columns + + +def test_merge_ts_dataframes_multiple_duplicates(): + """Test merging dataframes with multiple duplicate channel names""" + df1 = pl.DataFrame( + { + "timestamp": [0, 1, 2], + "channel1": [1.0, 2.0, 3.0], + "dup1": [10.0, 20.0, 30.0], + "dup2": [100.0, 200.0, 300.0], + } + ) + df2 = pl.DataFrame( + { + "timestamp": [1, 2, 3], + "channel2": [4.0, 5.0, 6.0], + "dup1": [40.0, 50.0, 60.0], + "dup2": [400.0, 500.0, 600.0], + } + ) + + result = _merge_ts_dataframes(df1, df2) + + assert result.shape == (4, 5) + expected_columns = {"timestamp", "channel1", "channel2", "dup1", "dup2"} + assert set(result.columns) == expected_columns + + # At timestamp 0: should have df1 values only + assert result.filter(pl.col("timestamp") == 0)["dup1"].item() == 10.0 + assert result.filter(pl.col("timestamp") == 0)["dup2"].item() == 100.0 + + # At timestamp 3: should have df2 values only + assert result.filter(pl.col("timestamp") == 3)["dup1"].item() == 60.0 + assert result.filter(pl.col("timestamp") == 3)["dup2"].item() == 600.0 + + +def test_merge_ts_dataframes_different_dtypes(): + """Test merging dataframes with different data types""" + df1 = pl.DataFrame( + {"timestamp": [0, 1, 2], "int_channel": [1, 2, 3], "common_channel": [10.0, 20.0, 30.0]} + ) + df2 = pl.DataFrame( + { + "timestamp": [1, 2, 3], + "string_channel": ["a", "b", "c"], + "common_channel": [40.0, 50.0, 60.0], + } + ) + + result = _merge_ts_dataframes(df1, df2) + + assert result.shape == (4, 4) + assert "int_channel" in result.columns + assert "string_channel" in result.columns + assert "common_channel" in result.columns + result = result.sort("timestamp") + assert result["string_channel"].to_list() == [None, "a", "b", "c"] + assert result["common_channel"].to_list() == [10.0, 20.0, 30.0, 60.0] diff --git a/python/lib/sift_py/data_import/config.py b/python/lib/sift_py/data_import/config.py index d41354c7..1ffc55d2 100644 --- a/python/lib/sift_py/data_import/config.py +++ b/python/lib/sift_py/data_import/config.py @@ -1,6 +1,6 @@ from typing import Any, Dict -from sift_py.data_import._config import CsvConfigImpl +from sift_py.data_import._config import CsvConfigImpl, Hdf5ConfigImpl class CsvConfig: @@ -17,3 +17,19 @@ def to_json(self) -> str: def to_dict(self) -> Dict[str, Any]: return self._csv_config.model_dump() + + +class Hdf5Config: + """ + Defines the HDF5 config for data imports. + """ + + def __init__(self, config_info: Dict[str, Any]): + self._config_info = config_info + self._hdf5_config = Hdf5ConfigImpl(**self._config_info) + + def to_json(self) -> str: + return self._hdf5_config.model_dump_json() + + def to_dict(self) -> Dict[str, Any]: + return self._hdf5_config.model_dump() diff --git a/python/lib/sift_py/data_import/hdf5.py b/python/lib/sift_py/data_import/hdf5.py new file mode 100644 index 00000000..d9dae3df --- /dev/null +++ b/python/lib/sift_py/data_import/hdf5.py @@ -0,0 +1,419 @@ +import json +import uuid +from contextlib import ExitStack +from pathlib import Path +from typing import Dict, List, TextIO, Tuple, Union, cast +from urllib.parse import urljoin + +try: + import h5py # type: ignore +except ImportError as e: + raise RuntimeError( + "The h5py package is required to use the HDF5 upload service. " + "Please include this dependency in your project by specifying `sift-stack-py[hdf5]`." + ) from e + +try: + import polars as pl # type: ignore +except ImportError as e: + raise RuntimeError( + "The polars package is required to use the HDF5 upload service. " + "Please include this dependency in your project by specifying `sift-stack-py[hdf5]`." + ) from e + +from sift_py.data_import._config import Hdf5DataCfg +from sift_py.data_import.config import CsvConfig, Hdf5Config +from sift_py.data_import.csv import CsvUploadService +from sift_py.data_import.status import DataImportService +from sift_py.data_import.tempfile import NamedTemporaryFile +from sift_py.rest import SiftRestConfig + + +class Hdf5UploadService: + """ + Service to upload HDF5 files. + """ + + _csv_upload_service: CsvUploadService + + def __init__(self, rest_conf: SiftRestConfig): + self.RUN_PATH = "/api/v2/runs" + self._csv_upload_service = CsvUploadService(rest_conf) + self._prev_run_id: str = "" + + def upload( + self, + path: Union[str, Path], + hdf5_config: Hdf5Config, + show_progress: bool = True, + ) -> List[DataImportService]: + """ + Uploads the HDF5 file pointed to by `path` using a custom HDF5 config. + + Args: + path: The path to the HDF5 file. + hdf5_config: The HDF5 config. + show_progress: Whether to show the status bar or not. + + Returns: + A list of DataImportServices used to get the status of the import. + """ + + posix_path = Path(path) if isinstance(path, str) else path + + if not posix_path.is_file(): + raise Exception(f"Provided path, '{path}', does not point to a regular file.") + + # Prefer to combine data into a single CSV for upload + # Empty data points for the String data type however will be ingested as empty strings + # This necessitates separate files for each string dataframe + # Split up hdf5_config into separate configs. String data is split into separate configs. All other data is a single config + split_configs = _split_hdf5_configs(hdf5_config) + + # NamedTemporaryFiles will delete upon exiting with block + # ExitStack used to ensures all temp files stay open through upload, than are closed upon existing block or if program exits early + with ExitStack() as stack: + # First convert each csv file + csv_items: List[Tuple[str, CsvConfig]] = [] + for config in split_configs: + temp_file = stack.enter_context(NamedTemporaryFile(mode="w", suffix=".csv")) + csv_config = _convert_to_csv_file( + path, + temp_file, + config, + ) + csv_items.append((temp_file.name, csv_config)) + + # If a config defines a run_name and is split up, multiple runs will be created. + # Instead, generate a run_id now, and use that instead of a run_name + # Perform now instead of before the config split to avoid creating a run any problems arise before ready to upload + # Active run_id copied to _prev_run_id for user reference + if hdf5_config._hdf5_config.run_name != "": + run_id = self._create_run(hdf5_config._hdf5_config.run_name) + for _, csv_config in csv_items: + csv_config._csv_config.run_name = "" + csv_config._csv_config.run_id = run_id + + self._prev_run_id = run_id + elif hdf5_config._hdf5_config.run_id != "": + self._prev_run_id = hdf5_config._hdf5_config.run_id + else: + self._prev_run_id = "" + + # Upload each file + import_services = [] + for filename, csv_config in csv_items: + import_services.append( + self._csv_upload_service.upload( + filename, csv_config, show_progress=show_progress + ) + ) + + return import_services + + def get_previous_upload_run_id(self) -> str: + """Return the run_id used in the previous upload""" + return self._prev_run_id + + def _create_run(self, run_name: str) -> str: + """Create a new run using the REST service, and return a run_id""" + run_uri = urljoin(self._csv_upload_service._base_uri, self.RUN_PATH) + + # Since CSVUploadService is already a RestService, we can utilize that + response = self._csv_upload_service._session.post( + url=run_uri, + headers={ + "Content-Encoding": "application/json", + }, + data=json.dumps( + { + "name": run_name, + "description": "", + } + ), + ) + if response.status_code != 200: + raise Exception( + f"Run creation failed with status code {response.status_code}. {response.text}" + ) + + try: + run_info = response.json() + except (json.decoder.JSONDecodeError, KeyError): + raise Exception(f"Invalid response: {response.text}") + + if "run" not in run_info: + raise Exception("Response missing key: run") + if "runId" not in run_info["run"]: + raise Exception("Response missing key: runId") + + return run_info["run"]["runId"] + + +def _convert_to_csv_file( + src_path: Union[str, Path], + dst_file: TextIO, + hdf5_config: Hdf5Config, +) -> CsvConfig: + """Converts the HDF5 file to a temporary CSV on disk that we will upload. + + Args: + src_path: The source path to the HDF5 file. + dst_file: The output CSV file. + hdf5_config: The HDF5 config. + + Returns: + The CSV config for the import. + """ + + merged_df = _convert_hdf5_to_dataframes(src_path, hdf5_config) + csv_cfg = _create_csv_config(hdf5_config, merged_df) + merged_df.write_csv(dst_file) + + return csv_cfg + + +def _convert_hdf5_to_dataframes( + src_path: Union[str, Path], hdf5_config: Hdf5Config +) -> pl.DataFrame: + """Convert the HDF5 file to a polars DataFrame. + + Args: + src_path: The source path to the HDF5 file. + hdf5_config: The HDF5 config. + + Returns: + A polars DataFrame containing the data. + """ + # Group data configs by matching time arrays to optimize downstream data processing + data_cfg_ts_map: Dict[Tuple[str, int], List[Hdf5DataCfg]] = {} + for data_cfg in hdf5_config._hdf5_config.data: + map_tuple = (data_cfg.time_dataset, data_cfg.time_column) + if map_tuple not in data_cfg_ts_map: + data_cfg_ts_map[map_tuple] = [] + data_cfg_ts_map[map_tuple].append(data_cfg) + + data_frames = [] + # Using swmr=True allows opening of HDF5 files written in SWMR mode which may have not been properly closed, but may be otherwise valid + with h5py.File(src_path, "r", libver="latest", swmr=True) as h5f: + for (time_path, time_col), data_cfgs in data_cfg_ts_map.items(): + df = _extract_hdf5_data_to_dataframe(h5f, time_path, time_col, data_cfgs) + data_frames.append(df) + + # Merge polars dataframes by joining pairs, then merging those pairs until one dataframe remains + # More optimized than joining one by one + # pl.concat(data_frames, how="align") in practice can lead to a fatal crash with larger files + # https://github.com/pola-rs/polars/issues/14591 + while len(data_frames) > 1: + next_round = [] + for i in range(0, len(data_frames), 2): + if i + 1 < len(data_frames): + df1 = data_frames[i] + df2 = data_frames[i + 1] + merged = _merge_ts_dataframes(df1, df2) + next_round.append(merged) + else: + next_round.append(data_frames[i]) + data_frames = next_round + merged_df = data_frames[0].sort("timestamp") + return merged_df + + +def _merge_ts_dataframes(df1: pl.DataFrame, df2: pl.DataFrame) -> pl.DataFrame: + """Merge two dataframes together. Handles duplicate channels""" + + df1_channels = [col for col in df1.columns if col != "timestamp"] + df2_channels = [col for col in df2.columns if col != "timestamp"] + dup_channels = set(df1_channels) & set(df2_channels) + + if dup_channels: + # Create a unique id to mark duplicate channels + uid = uuid.uuid4() + + df2_renamed = df2.clone() + for col in dup_channels: + df2_renamed = df2_renamed.rename({col: f"{col}_{uid}"}) + + merged_df = df1.join(df2_renamed, on="timestamp", how="full", coalesce=True) + + # Merge duplicate column data + for col in dup_channels: + temp_col_name = f"{col}_{uid}" + merged_df = merged_df.with_columns( + pl.coalesce([pl.col(col), pl.col(temp_col_name)]).alias(col) + ).drop(temp_col_name) + + else: + merged_df = df1.join(df2, on="timestamp", how="full", coalesce=True) + + return merged_df + + +def _extract_hdf5_data_to_dataframe( + hdf5_file: h5py.File, + time_path: str, + time_col: int, + hdf5_data_configs: List[Hdf5DataCfg], +) -> pl.DataFrame: + """Extract data from an hdf5_file to a polars DataFrame. + + Args: + hdf5_file: HDF5 File + time_path: HDF5 time array path + time_col: HDF5 time array col (1-indexed) + hdf5_data_config: The HDF5 Data Config + + Returns: + A multi-column polars DataFrame containing the timestamps and associated channels + """ + + if not time_path in hdf5_file: + raise Exception(f"HDF5 file does not contain dataset {time_path}") + time_dataset = cast(h5py.Dataset, hdf5_file[time_path]) + df_time = pl.DataFrame(time_dataset[:]) + time_idx = time_col - 1 + + if df_time.shape[1] <= time_idx: + raise Exception(f"{time_path}: time_column={time_col} out of range") + time_series = df_time[df_time.columns[time_idx]] + + # HDF5 string data may come in as binary, so convert + if time_series.dtype == pl.Binary: + time_series = time_series.cast(pl.String) + + data_frame = pl.DataFrame(data={"timestamp": time_series}) + + for hdf5_data_config in hdf5_data_configs: + if not hdf5_data_config.value_dataset in hdf5_file: + raise Exception(f"HDF5 file does not contain dataset {hdf5_data_config.value_dataset}") + if time_path != hdf5_data_config.time_dataset: + raise Exception( + f"Working time dataset {time_path} does not match data cfg defined dataset {hdf5_data_config.time_dataset}" + ) + if time_col != hdf5_data_config.time_column: + raise Exception( + f"Working time col {time_col} does not match data cfg defined col {hdf5_data_config.time_column}" + ) + + value_dataset = cast(h5py.Dataset, hdf5_file[hdf5_data_config.value_dataset]) + + # Convert the full value dataset to a dataframe + # This will make it easier to work with any nested columns from a numpy structured array + df_value = pl.DataFrame(value_dataset[:]) + val_idx = hdf5_data_config.value_column - 1 + + if df_value.shape[1] <= val_idx: + raise Exception( + f"{hdf5_data_config.name}: value_column={hdf5_data_config.value_column} out of range for {hdf5_data_config.value_dataset}" + ) + value_series = df_value[df_value.columns[val_idx]] + + if len(time_series) != len(value_series): + raise Exception( + f"{hdf5_data_config.name}: time and value columns have different lengths ({len(time_series)} vs {len(value_series)})" + ) + + # HDF5 string data may come in as binary, so convert + if value_series.dtype == pl.Binary: + value_series = value_series.cast(pl.String) + + data_frame = data_frame.with_columns(value_series.alias(hdf5_data_config.name)) + + return data_frame + + +def _create_csv_config(hdf5_config: Hdf5Config, merged_df: pl.DataFrame) -> CsvConfig: + """Construct a CsvConfig from a Hdf5Config + + Args: + hdf5_config: The HDF5 config + merged_df: The merged dataFrame of data + + Returns: + The CSV config. + """ + + csv_config_dict = { + "asset_name": hdf5_config._hdf5_config.asset_name, + "run_name": hdf5_config._hdf5_config.run_name, + "run_id": hdf5_config._hdf5_config.run_id, + "first_data_row": 2, # Row 1 is headers + "time_column": { + "format": hdf5_config._hdf5_config.time.format, + "column_number": 1, + "relative_start_time": hdf5_config._hdf5_config.time.relative_start_time, + }, + } + + # Map each data config to its channel name + config_map = {d_cfg.name: d_cfg for d_cfg in hdf5_config._hdf5_config.data} + + if merged_df.columns[0] != "timestamp": + raise Exception( + f"Unexpected merged DataFrame layout. Expected first column to be timestamp, not {merged_df.columns[0]}" + ) + + data_columns = {} + for idx, channel_name in enumerate(merged_df.columns[1:]): + data_cfg = config_map[channel_name] + col_num = idx + 2 # 1-indexed and col 1 is time col + data_columns[col_num] = { + "name": data_cfg.name, + "data_type": data_cfg.data_type, + "units": data_cfg.units, + "description": data_cfg.description, + "enum_types": data_cfg.enum_types, + "bit_field_elements": data_cfg.bit_field_elements, + } + + csv_config_dict["data_columns"] = data_columns + + return CsvConfig(csv_config_dict) + + +def _split_hdf5_configs(hdf5_config: Hdf5Config) -> List[Hdf5Config]: + """ + Split up hdf5_config into separate configs used to generate each CSV file + Needed as string channels cannot be merged without creating empty string data points in the app + + Args: + hdf5_config: The HDF5 config. + + Returns: + List of HDF5Configs for later CSV conversion + """ + + # Combined config for non string types + non_string_config_dict = { + "asset_name": hdf5_config._hdf5_config.asset_name, + "run_name": hdf5_config._hdf5_config.run_name, + "run_id": hdf5_config._hdf5_config.run_id, + "time": hdf5_config._hdf5_config.time, + "data": [ + data_cfg + for data_cfg in hdf5_config._hdf5_config.data + if data_cfg.data_type != "CHANNEL_DATA_TYPE_STRING" + ], + } + + filtered_hdf5_configs = [] + + # Avoid adding combined config if no non-string data present + if non_string_config_dict["data"]: + filtered_hdf5_configs.append(Hdf5Config(non_string_config_dict)) + + for data_cfg in hdf5_config._hdf5_config.data: + if data_cfg.data_type != "CHANNEL_DATA_TYPE_STRING": + continue + string_config = Hdf5Config( + { + "asset_name": hdf5_config._hdf5_config.asset_name, + "run_name": hdf5_config._hdf5_config.run_name, + "run_id": hdf5_config._hdf5_config.run_id, + "time": hdf5_config._hdf5_config.time, + "data": [data_cfg], + } + ) + filtered_hdf5_configs.append(string_config) + + return filtered_hdf5_configs diff --git a/python/pyproject.toml b/python/pyproject.toml index 921a7a85..5bd6b731 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -58,6 +58,7 @@ build = ["pdoc==14.5.0", "build==1.2.1"] openssl = ["pyOpenSSL<24.0.0", "types-pyOpenSSL<24.0.0", "cffi~=1.14"] tdms = ["npTDMS~=1.9"] rosbags = ["rosbags~=0.0"] +hdf5 = ["h5py~=3.11", "polars~=1.8"] [build-system] requires = ["setuptools"]