Skip to content

python(feat): Add HDF5 upload service #261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python_ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Pip install
run: |
python -m pip install --upgrade pip
pip install '.[development,openssl,tdms,rosbags]'
pip install '.[development,openssl,tdms,rosbags,hdf5]'
- name: Lint
run: |
ruff check
Expand Down
72 changes: 72 additions & 0 deletions python/examples/data_import/hdf5/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import os

import h5py
from dotenv import load_dotenv
from sift_py.data_import.config import Hdf5Config
from sift_py.data_import.hdf5 import Hdf5UploadService
from sift_py.rest import SiftRestConfig

if __name__ == "__main__":
"""
Example of uploading an hdf5 into Sift.
"""

load_dotenv()

sift_uri = os.getenv("SIFT_API_URI")
assert sift_uri, "expected 'SIFT_API_URI' environment variable to be set"

apikey = os.getenv("SIFT_API_KEY")
assert apikey, "expected 'SIFT_API_KEY' environment variable to be set"

asset_name = os.getenv("ASSET_NAME")
assert asset_name, "expected 'ASSET_NAME' environment variable to be set"

# Create an HDF5 configuration file to define the data to be ingested
hdf5_config_dict = {
"asset_name": asset_name,
"time": {
"format": "TIME_FORMAT_ABSOLUTE_DATETIME",
},
"data": [],
}

# For this example, each HDF5 dataset uses the common '/timestamp' dataset
# Each is of type double and contains its channel name in the 'Name' attribute
with h5py.File("sample_data.h5", "r") as f:
for dset in f.values():
# Skip adding the timestamp dataset
if dset.name == "/timestamp":
continue

hdf5_config_dict["data"].append(
{
"name": dset.attrs["Name"],
"time_dataset": "/timestamp",
"value_dataset": dset.name,
"data_type": "CHANNEL_DATA_TYPE_DOUBLE",
}
)

hdf5_config = Hdf5Config(hdf5_config_dict)

rest_config: SiftRestConfig = {
"uri": sift_uri,
"apikey": apikey,
}

hdf5_upload_service = Hdf5UploadService(rest_config)
import_services = hdf5_upload_service.upload(
"sample_data.h5",
hdf5_config,
)

# Wait until the data import is completed.
# The hdf5 upload service may split the upload into multiple parts
data_imports = [import_svc.wait_until_complete() for import_svc in import_services]

# Print the data import details and final status.
for data_import in data_imports:
print(data_import.model_dump_json(indent=1))

print("Upload example complete!")
2 changes: 2 additions & 0 deletions python/examples/data_import/hdf5/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
python-dotenv
sift-stack-py[hdf5]
Binary file added python/examples/data_import/hdf5/sample_data.h5
Binary file not shown.
228 changes: 141 additions & 87 deletions python/lib/sift_py/data_import/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,94 +20,9 @@ class ConfigBaseModel(BaseModel):
model_config = ConfigDict(extra="forbid")


class CsvConfigImpl(ConfigBaseModel):
"""
Defines the CSV config spec.
"""

asset_name: str
run_name: str = ""
run_id: str = ""
first_data_row: int
time_column: TimeColumn
data_columns: Dict[int, DataColumn]

@model_validator(mode="after")
def validate_config(self) -> Self:
if not self.data_columns:
raise PydanticCustomError("invalid_config_error", "Empty 'data_columns'")

if self.run_name and self.run_id:
raise PydanticCustomError(
"invalid_config_error", "Only specify run_name or run_id, not both."
)

return self


class EnumType(ConfigBaseModel, ChannelEnumType):
class ConfigDataModel(ConfigBaseModel):
"""
Defines an enum entry in the CSV config.
"""


class BitFieldElement(ConfigBaseModel, ChannelBitFieldElement):
"""
Defines a bit field element entry in the CSV config.
"""


class TimeColumn(ConfigBaseModel):
"""
Defines a time column entry in the CSV config.
"""

format: Union[str, TimeFormatType]
column_number: int
relative_start_time: Optional[str] = None

@field_validator("format", mode="before")
@classmethod
def convert_format(cls, raw: Union[str, TimeFormatType]) -> str:
"""
Converts the provided format value to a string.
"""
if isinstance(raw, TimeFormatType):
return raw.as_human_str()
elif isinstance(raw, str):
value = TimeFormatType.from_str(raw)
if value is not None:
return value.as_human_str()

raise PydanticCustomError("invalid_config_error", f"Invalid time format: {raw}.")

@model_validator(mode="after")
def validate_time(self) -> Self:
"""
Validates the provided time format.
"""
format = TimeFormatType.from_str(self.format) # type: ignore
if format is None:
raise PydanticCustomError(
"invalid_config_error", f"Invalid time format: {self.format}."
)

if format.is_relative():
if self.relative_start_time is None:
raise PydanticCustomError("invalid_config_error", "Missing 'relative_start_time'")
else:
if self.relative_start_time is not None:
raise PydanticCustomError(
"invalid_config_error",
"'relative_start_time' specified for non relative time format.",
)

return self


class DataColumn(ConfigBaseModel):
"""
Defines a data column entry in the CSV config.
Base DataModel with common functionality
"""

name: str
Expand Down Expand Up @@ -185,3 +100,142 @@ def validate_bit_fields(self) -> Self:
)

return self


class ConfigTimeModel(ConfigBaseModel):
"""
Base TimeModel with common functionality
"""

format: Union[str, TimeFormatType]
relative_start_time: Optional[str] = None

@field_validator("format", mode="before")
@classmethod
def convert_format(cls, raw: Union[str, TimeFormatType]) -> str:
"""
Converts the provided format value to a string.
"""
if isinstance(raw, TimeFormatType):
return raw.as_human_str()
elif isinstance(raw, str):
value = TimeFormatType.from_str(raw)
if value is not None:
return value.as_human_str()

raise PydanticCustomError("invalid_config_error", f"Invalid time format: {raw}.")

@model_validator(mode="after")
def validate_time(self) -> Self:
"""
Validates the provided time format.
"""
format = TimeFormatType.from_str(self.format) # type: ignore
if format is None:
raise PydanticCustomError(
"invalid_config_error", f"Invalid time format: {self.format}."
)

if format.is_relative():
if self.relative_start_time is None:
raise PydanticCustomError("invalid_config_error", "Missing 'relative_start_time'")
else:
if self.relative_start_time is not None:
raise PydanticCustomError(
"invalid_config_error",
"'relative_start_time' specified for non relative time format.",
)

return self


class CsvConfigImpl(ConfigBaseModel):
"""
Defines the CSV config spec.
"""

asset_name: str
run_name: str = ""
run_id: str = ""
first_data_row: int
time_column: TimeColumn
data_columns: Dict[int, DataColumn]

@model_validator(mode="after")
def validate_config(self) -> Self:
if not self.data_columns:
raise PydanticCustomError("invalid_config_error", "Empty 'data_columns'")

if self.run_name and self.run_id:
raise PydanticCustomError(
"invalid_config_error", "Only specify run_name or run_id, not both."
)

return self


class Hdf5ConfigImpl(ConfigBaseModel):
"""
Defines the HDF5 config spec
"""

asset_name: str
run_name: str = ""
run_id: str = ""
time: TimeCfg
data: List[Hdf5DataCfg]

@model_validator(mode="after")
def validate_config(self) -> Self:
if not self.data:
raise PydanticCustomError("invalid_config_error", "Empty 'data'")

if self.run_name and self.run_id:
raise PydanticCustomError(
"invalid_config_error", "Only specify run_name or run_id, not both."
)

return self


class EnumType(ConfigBaseModel, ChannelEnumType):
"""
Defines an enum entry in the CSV config.
"""


class BitFieldElement(ConfigBaseModel, ChannelBitFieldElement):
"""
Defines a bit field element entry in the CSV config.
"""


class TimeColumn(ConfigTimeModel):
"""
Defines a time column entry in the CSV config.
"""

column_number: int


class DataColumn(ConfigDataModel):
"""
Defines a data column entry in the CSV config.
"""


class TimeCfg(ConfigTimeModel):
"""
Defines a time entry in the generic file config.
"""


class Hdf5DataCfg(ConfigDataModel):
"""
Defines a data entry in the HDF5 config.
"""

time_dataset: str
time_column: int = 1
value_dataset: str
value_column: int = 1
Loading
Loading