From 904465a6ab0d155394f801fee0d87979857cebd6 Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Tue, 1 Jul 2025 13:54:06 -0700 Subject: [PATCH 01/11] hdf5 upload service --- python/examples/data_import/hdf5/main.py | 72 +++ .../data_import/hdf5/requirements.txt | 2 + .../examples/data_import/hdf5/sample_data.h5 | Bin 0 -> 15088 bytes python/lib/sift_py/data_import/_config.py | 157 +++++ .../lib/sift_py/data_import/_config_test.py | 210 +++++- python/lib/sift_py/data_import/_hdf5_test.py | 608 ++++++++++++++++++ python/lib/sift_py/data_import/config.py | 18 +- python/lib/sift_py/data_import/hdf5.py | 281 ++++++++ python/pyproject.toml | 1 + 9 files changed, 1347 insertions(+), 2 deletions(-) create mode 100644 python/examples/data_import/hdf5/main.py create mode 100644 python/examples/data_import/hdf5/requirements.txt create mode 100644 python/examples/data_import/hdf5/sample_data.h5 create mode 100644 python/lib/sift_py/data_import/_hdf5_test.py create mode 100644 python/lib/sift_py/data_import/hdf5.py diff --git a/python/examples/data_import/hdf5/main.py b/python/examples/data_import/hdf5/main.py new file mode 100644 index 00000000..81998300 --- /dev/null +++ b/python/examples/data_import/hdf5/main.py @@ -0,0 +1,72 @@ +import os + +import h5py +from dotenv import load_dotenv +from sift_py.data_import.config import Hdf5Config +from sift_py.data_import.hdf5 import Hdf5UploadService +from sift_py.rest import SiftRestConfig + +if __name__ == "__main__": + """ + Example of uploading an hdf5 into Sift. + """ + + load_dotenv() + + sift_uri = os.getenv("SIFT_API_URI") + assert sift_uri, "expected 'SIFT_API_URI' environment variable to be set" + + apikey = os.getenv("SIFT_API_KEY") + assert apikey, "expected 'SIFT_API_KEY' environment variable to be set" + + asset_name = os.getenv("ASSET_NAME") + assert asset_name, "expected 'ASSET_NAME' environment variable to be set" + + # Create an HDF5 configuration file to define the data to be ingested + hdf5_config_dict = { + "asset_name": asset_name, + "time": { + "format": "TIME_FORMAT_ABSOLUTE_DATETIME", + }, + "data": [], + } + + # For this example, each HDF5 dataset uses the common '/timestamp' dataset + # Each is of type double and contains its channel name in the 'Name' attribute + with h5py.File("sample_data.h5", "r") as f: + for dset in f.values(): + # Skip adding the timestamp dataset + if dset.name == "/timestamp": + continue + + hdf5_config_dict["data"].append( + { + "name": dset.attrs["Name"], + "time_dataset": "/timestamp", + "value_dataset": dset.name, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + } + ) + + hdf5_config = Hdf5Config(hdf5_config_dict) + + rest_config: SiftRestConfig = { + "uri": sift_uri, + "apikey": apikey, + } + + hdf5_upload_service = Hdf5UploadService(rest_config) + import_services = hdf5_upload_service.upload( + "sample_data.h5", + hdf5_config, + ) + + # Wait until the data import is completed. + # The hdf5 upload service may split the upload into multiple parts + data_imports = [import_svc.wait_until_complete() for import_svc in import_services] + + # Print the data import details and final status. + for data_import in data_imports: + print(data_import.model_dump_json(indent=1)) + + print("Upload example complete!") diff --git a/python/examples/data_import/hdf5/requirements.txt b/python/examples/data_import/hdf5/requirements.txt new file mode 100644 index 00000000..04a1ef53 --- /dev/null +++ b/python/examples/data_import/hdf5/requirements.txt @@ -0,0 +1,2 @@ +python-dotenv +sift-stack-py[hdf5] \ No newline at end of file diff --git a/python/examples/data_import/hdf5/sample_data.h5 b/python/examples/data_import/hdf5/sample_data.h5 new file mode 100644 index 0000000000000000000000000000000000000000..24a268a76d33d17cbcd45d00c791246ba5c3cb8a GIT binary patch literal 15088 zcmeHN30O_r+doYtQ-q38-B6NF0}X~>hHx{5C~`@PBxIJc5S1yEAxV-ZQi#e>X;4x+ zmFBrq=QN&^S&_c8owL8YpY#7Z_q)&E-*dn3@p_!Ke{22rTI+rHyVic+z4Dwhb5>uO zp)xc=q@`(+G}-T$!rTpu%K4|3aE1{3`!nTVRgqG;=~h8-b(63ADXL|Iex|Mw9yP zzx*E>ui$&Km>7Q|x`N@!Qi9h`v?%i0iIqhB=QH`9Dn*n2j>X0K)zX6gB*b_P-$hx1 zUy{EbXf_!knm0$#Y(7#n@1Nc(M(e?M z0D(XMZ~bb3G6ardLeuX})9(^^%b$KF{=LUP{YrTl-xl72k@%<&&E)$9;opOJJuX>X6Th7cYrgf)dQaWD0LVj)m5;RsYb zTnH0e5w)Gza;R8Y2&)KTRU!PF5LOeyql7TA=BVQlAr%!9OP`8~5Q&P3t(l66EtraX z3Sns>OssV(p9m7EnAirWxQ`I-D}?(A;r>FHcvn*U8z6)S3Sl`RJV*!+7Q#b>u)Gi^ z-f7hF6?u5}jD_>Q?|sDNctY-j1dkA1x~{&K4qc0GpsZtHOs5;`(8uc=>gwoDH_fNhcEEe6r{-1pEbuF_0iGT9R*S1JLMPI_F2oc4fNYR(@ zIYLD8Df$vVNr*^3MPGhm^A~)IzJgbq=>94C5&riU+Gf%7X*tU``4xP zF9@bY^C|Wu-m!#;nvWjEo@6>8l25TGnHGrTQ}k8-KAXRo4@F<%9Z!hJ{wewrF#sW= z`1%xmi5P(pk$j52L<~WQNIpehBE}#@B%h)$5u*_zimyYN4-vx=B9c#;4-w-LB9c#; z4-o?rB9c$BKM^AmB8tyj6aW3bCSpiJMDi*65-~3!qWE;m{E3*D5RrV!`~{!6MDleg z^Cx0-LPYjYnLiQ36C#pNd0s?LK!`{_Wj^G0k#6~Pqwz)*efc>hjYhub*E+1(yxHDq zH91dyj`gqYeV5b3Zw*aX0PcxRbhR zC&!)KO*=WRpqXY)26|-0ukx1#ep%p`1%6rJZ&~2O9rNI%vKmq5T`q&=)|G>(6_UHMC+M{gLWrII%OBaHpkuT@@58x!ZOQwIU57ZR-Ea(g9(e@ zT|Ye0yptt>UuX;2MX ztko;m>2uK0VJ_{F$;JBlUB~aOYe2Ppk>`a8T)2n0hZ>5vAj#WKTr=k${3cfn)(mUF zF(;n|zeK6`c>0sW-%qtK_42|J{Ph zurj2%$&IvVt;OU|=Z<{Y_8L~RLiP3fCSpti!*lGSYTP#3aeTPZa~M|CyxrH-0;}I; zMkMO;`dzfqYgfMl)V*Hsy5vb7Tz6hurs7!&T0vIi)Irzb&DLI;f{@8BJLO3{8GS_*}b?OCnU49hC zIBVZ%;5OsQWB;8Wg9|Xu`Rk~&gVS+uR~1`4x|R14e+%c-x`%i@W6X|(+ppk6U$iYN zvKUqtw>#diVM5aHM81?=J`Nv8sb;5r1$I zHeK)9c*6o5JYw(RPv_$2eYZCeq#{55yFEX4Gx=NlLU-h!XAg!T)%x-F;K!=}*X&^< zNDY6yJ@~QtkFiI0e)CVQ|9Sr0CP>wP{PW}IPCw@P{XBcP3O+6Tczf`3#s6#eFcGBC zKi(evoczbw*J=rVtEY=6pZv=_|-zz-V&!a zr$fVCsh`>MdQ7~vXxJJl29zCRP9C#u!pvy7_a8HA@L4JRrS6f3*eI)aT4r4}l*e1M z>}v18Gre?}wPiIL<_@_gRTqs_@xzDqk;}v0uFn1m&r2|LpZ}fBUkb2%XsUIQRSo>b zJVrl_ZNa&+VOtDvlU`S_E=kC5*E+FZSqg`@E?-VQW2B0i>NrTY}2R7>ym`;scu zB^=JyxRVTjwK}=5eht{abDgI_cr`i)II7=wV#BFx^RB6#S(xPV?8yGH9PBzW!*N$v zDd;s@Lp2tbL*2H|(X$g0v1-Vyw=P-5pd0kcoa*%wKF;4Z44<41>GMTb+L9Y#Wi-Lh zvo#wpZe4PEdxeA3qY}eY26J&``}|Bkx0_%stSbq>Y(imxq~khm;@h%fs;b#$QjV)Z^W~>qcmDKFG)qu&F z>At;kGGJ0MHgkD)JQR)HO82TW5iC<}V`h*6HRZ~F2GbkxN$cpMfQgNuS-tgBDLoF> zzT~Fo++v(tXW6*Fi}#$?u=~bJXQKbhkD86Q8xi5Fw$L|`2|b3Hb11VBzE`un;v?&! z|Ht85Av>Gl7#^H$`lS@wkKYtrJ6?|)k9^0um$Tr#Da_~K`*=*;n0;pK*m`(oypU|# z*o@Blf@AvvtHE}b8{Fm70F!gY`#j4ophL@iR*iB!7#78UBx)BSvSZP)++Iw4e5kd% zQ-gzJ(@tyqrE+n*vQ??!Oet)Z2j$E?7K54ko5ioM;zB*B>*%igMTplP@#@l+R>;n= znE&bNP3)9AbS7H77OJxz#9e%l4?EQ}QX{A3JBeo9eZ8zf%t%0 zNWITeXD}Mz^RUj)vHjrU(76eZ5|ZPg*=b#7md3T?$r&; zIZ)!9pT>Q~#kg{H(|%8eU$>h)tAP%Yb_9 zGVj3mRq!~Fj%>4-!9J@3HnaPEP1RSoFl&@v+7IvX>NL|RR{&xM0yqx7qs z3fK+2FYV0?Tc#^77)6 z6g|r!l_*nK9}(+b1J^jmfOnFQF!X-S_(eBas5z*Za4(VzvqqI9@13Q%CDYt1RE)O| zo;7nltJ#9Xz3Z|jjL%1PTby@JU@aV!%$SC;Ow{h)7?5r)(9%wx@n_^@pyANkI z=0dG&4QtNZEEt(T)t@X;jb+@qu^Ew#_||!`XV3CtuvA~~XKycr{5|o$jbCeUY)GCZ zCnz8K=N3yj20VwNzme53xoSkERpshf*W*juxPUcfweX&B(c_B>7k2($wodUcVR1b{ z_RaAce0@ycl*G%|1CO3dmCFjko+&rptnB*~`uB&YXuapa)pxB1(>V_Ls!oR5XRA>e zZo;-(Sda2(F^L(5Rj5u`CU#A`63g}vIHYp78CQPuySel<2R12dv@?fZz?MI3J3owV zfm~AQsX8tjRsqE;F4l8l73flQb0rfU(+_Sv+*}TNNkl?r&P!-tS<$0qU@OeeHIA-~ zEW+Q76Zn4*+LLA_xY0&gbPo=KW@YkVv>W;!K|P&35bcKNA*eSY4??@)T@uuLkOySl z@WKT3pUDHIZg|pydf~s;?S}rB)juzG%iOKUj5|ih3Bqq=2NX*YEX*3d(=WpcShmn5tcXa)k z+@I%fHUiW9cz@%cg_`_Kv;7+Dmj!-VfNFt}^tU+)yu4}PsE3}e6|fq&&2-b|XiS?v zDs|%i7Hsm>_6*oji;P240-ws1U|o5&T0Dyl6R%*ilSVi3KKSd$k&W4y)z*|_Ud4r> zrM<$?#l?8{cBEc1FTS)m+-2;M$v|(n=+9^Do1uRqzrS&>G(^j0CV9I)!4u=~UNg=$ zgT778*1?Sfwo|;pwf+rw?R92YD+xY9nR;?%&vk3T9 z`Urj2GWuA*&xg`Z>$_W8a$tL^HZ`6%p2nU|`uQw2`V9Pfd#y?l)Jx+wX-2$)>XesP zW%?Jup<#5tt$}$MD5lbvMX$lk@Y)uO1HALMdE0XXv+H2qbudCJ_BAh_Iwr}PFM)Ag zp~(pm`7paW^kEq<4_T$L)yL>;879YYcDCoHfuUU)(lwfQZZ&PxQ?<`2IDPc&eSJq3 zd=~BgqWPc#AqlO=FD~U`f7$pn1&wO7FD@C{IE?q5X}sZP^ydrgG%v4fD0Vz(#|1dyM3z5=h_dH(O6T3FR6_nx?mp!&n&x0246;W*YlQPH&+du~nq z%}+i7V;>uKP7f?Y+nfUr%{DUO+Oc|Bj3fh5BSYstE^ELg$=bOg!9HTjosUwdEbqcR=*62D1?s2NXCOn&A9OO#+|6kx$wHz+!->N3EkaVpH3`d zK(_5jd}$7M~Jo&QK zV08)n9tH2|b*}(}r=&-gRxmOA=wqYtsnw90v_~oPQax-Yw^}o&3*H?u5sv-B{esIcaFWJb8C12 zFYi=uV?7&HbPsomt=hR^g*a|G;pUYH4(}W|sOS7A$tbZ7`gp*p8m; Self: return self +class Hdf5ConfigImpl(ConfigBaseModel): + """ + Defines the HDF5 config spec + """ + + asset_name: str + run_name: str = "" + run_id: str = "" + time: TimeCfg + data: List[Hdf5DataCfg] + + @model_validator(mode="after") + def validate_config(self) -> Self: + if not self.data: + raise PydanticCustomError("invalid_config_error", "Empty 'data'") + + if self.run_name and self.run_id: + raise PydanticCustomError( + "invalid_config_error", "Only specify run_name or run_id, not both." + ) + + return self + + class EnumType(ConfigBaseModel, ChannelEnumType): """ Defines an enum entry in the CSV config. @@ -185,3 +209,136 @@ def validate_bit_fields(self) -> Self: ) return self + + +class Hdf5DataCfg(ConfigBaseModel): + """ + Defines a data entry in the HDF5 config. + """ + + name: str + time_dataset: str + time_column: int = 1 + value_dataset: str + value_column: int = 1 + data_type: Union[str, ChannelDataType, Type] = "" + units: str = "" + description: str = "" + # Only valid if data_type is "CHANNEL_DATA_TYPE_ENUM". + enum_types: List[EnumType] = [] + # Only valid if data_type is "CHANNEL_DATA_TYPE_BIT_FIELD" + bit_field_elements: List[BitFieldElement] = [] + + @field_validator("data_type", mode="before") + @classmethod + def convert_data_type(cls, raw: Union[str, ChannelDataType, Type]) -> str: + """ + Converts the provided data_type value to a string. + """ + if isinstance(raw, type): + if raw == int: + return ChannelDataType.INT_64.as_human_str(api_format=True) + elif raw == float: + return ChannelDataType.DOUBLE.as_human_str(api_format=True) + elif raw == str: + return ChannelDataType.STRING.as_human_str(api_format=True) + elif raw == bool: + return ChannelDataType.BOOL.as_human_str(api_format=True) + elif isinstance(raw, ChannelDataType): + return raw.as_human_str(api_format=True) + elif isinstance(raw, str): + value = ChannelDataType.from_str(raw) + if value is not None: + return value.as_human_str(api_format=True) + + raise PydanticCustomError("invalid_config_error", f"Invalid data_type: {raw}.") + + @model_validator(mode="before") + @classmethod + def concatenate_component_and_name(cls, data: Any) -> Any: + """ + Concatenates Component and Name. If Component is not an empty string, raises a deprecation warning. + """ + if isinstance(data, dict): + if "component" in data.keys() and "name" in data.keys(): + _component_deprecation_warning() + data["name"] = channel_fqn(name=data["name"], component=data["component"]) + data.pop("component") + return data + + @model_validator(mode="after") + def validate_enums(self) -> Self: + """ + Validates the enum configuration. + """ + data_type = ChannelDataType.from_str(self.data_type) # type: ignore + if self.enum_types: + if data_type != ChannelDataType.ENUM: + raise PydanticCustomError( + "invalid_config_error", + f"Enums can only be specified with the CHANNEL_DATA_TYPE_ENUM data type. {self.name} is {self.data_type}", + ) + + return self + + @model_validator(mode="after") + def validate_bit_fields(self) -> Self: + """ + Validates the bit field configuration. + """ + data_type = ChannelDataType.from_str(self.data_type) # type: ignore + if self.bit_field_elements: + if data_type != ChannelDataType.BIT_FIELD: + raise PydanticCustomError( + "invalid_config_error", + f"Bit fields can only be specified with the CHANNEL_DATA_TYPE_BIT_FIELD data type. {self.name} is {self.data_type}", + ) + + return self + + +class TimeCfg(ConfigBaseModel): + """ + Defines a time entry in the generic file config. + """ + + format: Union[str, TimeFormatType] + relative_start_time: Optional[str] = None + + @field_validator("format", mode="before") + @classmethod + def convert_format(cls, raw: Union[str, TimeFormatType]) -> str: + """ + Converts the provided format value to a string. + """ + if isinstance(raw, TimeFormatType): + return raw.as_human_str() + elif isinstance(raw, str): + value = TimeFormatType.from_str(raw) + if value is not None: + return value.as_human_str() + + raise PydanticCustomError("invalid_config_error", f"Invalid time format: {raw}.") + + @model_validator(mode="after") + def validate_time(self) -> Self: + """ + Validates the provided time format. + """ + format = TimeFormatType.from_str(self.format) # type: ignore + if format is None: + raise PydanticCustomError( + "invalid_config_error", f"Invalid time format: {self.format}." + ) + + if format.is_relative(): + if self.relative_start_time is None: + raise PydanticCustomError("invalid_config_error", "Missing 'relative_start_time'") + else: + if self.relative_start_time is not None: + raise PydanticCustomError( + "invalid_config_error", + "'relative_start_time' specified for non relative time format.", + ) + + return self diff --git a/python/lib/sift_py/data_import/_config_test.py b/python/lib/sift_py/data_import/_config_test.py index be6c360e..df75d70f 100644 --- a/python/lib/sift_py/data_import/_config_test.py +++ b/python/lib/sift_py/data_import/_config_test.py @@ -1,6 +1,6 @@ import pytest -from sift_py.data_import.config import CsvConfig +from sift_py.data_import.config import CsvConfig, Hdf5Config from sift_py.data_import.time_format import TimeFormatType from sift_py.error import SiftAPIDeprecationWarning from sift_py.ingestion.channel import ChannelDataType @@ -24,12 +24,38 @@ def csv_config_data(): } +@pytest.fixture +def hdf5_config_data(): + return { + "asset_name": "test_asset", + "time": { + "format": "TIME_FORMAT_ABSOLUTE_DATETIME", + }, + "data": [ + { + "name": "channel1", + "time_dataset": "/channel1", + "value_dataset": "/channel1", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + + def test_empty_data_columns(csv_config_data: dict): csv_config_data["data_columns"] = {} with pytest.raises(Exception, match="Empty 'data_columns'"): CsvConfig(csv_config_data) +def test_empty_data_columns_hdf5(hdf5_config_data: dict): + hdf5_config_data["data"] = [] + with pytest.raises(Exception, match="Empty 'data'"): + Hdf5Config(hdf5_config_data) + + def test_run_name_and_run_id(csv_config_data: dict): csv_config_data["run_name"] = "Run Title" csv_config_data["run_id"] = "1c5546b4-ee53-460b-9205-4dc3980c200f" @@ -37,6 +63,13 @@ def test_run_name_and_run_id(csv_config_data: dict): CsvConfig(csv_config_data) +def test_run_name_and_run_id_hdf5(hdf5_config_data: dict): + hdf5_config_data["run_name"] = "Run Title" + hdf5_config_data["run_id"] = "1c5546b4-ee53-460b-9205-4dc3980c200f" + with pytest.raises(Exception, match="Only specify run_name or run_id, not both"): + Hdf5Config(hdf5_config_data) + + def test_data_column_validation(csv_config_data: dict): csv_config_data["data_columns"] = { 1: { @@ -68,6 +101,59 @@ def test_data_column_validation(csv_config_data: dict): assert cfg._csv_config.data_columns[1].name == "component.channel" +def test_data_column_validation_hdf5(hdf5_config_data: dict): + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "INVALID_DATA_TYPE", + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + + with pytest.raises(Exception, match="Invalid data_type:"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": complex, + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + with pytest.raises(Exception, match="Invalid data_type:"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["data"] = [ + { + "name": "channel_bool", + "data_type": ChannelDataType.BOOL, + "time_dataset": "channel", + "value_dataset": "channel", + }, + { + "name": "channel_double", + "data_type": ChannelDataType.DOUBLE, + "time_dataset": "channel", + "value_dataset": "channel", + }, + { + "name": "channel_int", + "data_type": ChannelDataType.INT_64, + "time_dataset": "channel", + "value_dataset": "channel", + }, + { + "name": "channel_str", + "data_type": ChannelDataType.STRING, + "time_dataset": "channel", + "value_dataset": "channel", + }, + ] + Hdf5Config(hdf5_config_data) + + def test_enums(csv_config_data: dict): csv_config_data["data_columns"] = { 1: { @@ -108,6 +194,52 @@ def test_enums(csv_config_data: dict): CsvConfig(csv_config_data) +def test_enums_hdf5(hdf5_config_data: dict): + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "CHANNEL_DATA_TYPE_INT_32", + "enum_types": [ + {"key": 1, "name": "value_1"}, + {"key": 2, "name": "value_2"}, + ], + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + with pytest.raises(Exception, match="Enums can only be specified"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "CHANNEL_DATA_TYPE_ENUM", + "enum_types": [ + {"key": 1, "name": "value_1", "extra_key": "value"}, + {"key": 2, "name": "value_2"}, + ], + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + with pytest.raises(Exception, match="validation error"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "CHANNEL_DATA_TYPE_ENUM", + "enum_types": [ + {"key": 1, "name": "value_1"}, + {"key": 2, "name": "value_2"}, + ], + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + Hdf5Config(hdf5_config_data) + + def test_bit_field(csv_config_data: dict): csv_config_data["data_columns"] = { 1: { @@ -150,6 +282,54 @@ def test_bit_field(csv_config_data: dict): CsvConfig(csv_config_data) +def test_bit_field_hdf5(hdf5_config_data: dict): + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "CHANNEL_DATA_TYPE_INT_32", + "bit_field_elements": [ + {"index": 1, "name": "bit_field_name_1", "bit_count": 4}, + ], + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + with pytest.raises(Exception, match="Bit fields can only be specified"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "CHANNEL_DATA_TYPE_INT_32", + "bit_field_elements": [ + { + "index": 1, + "name": "bit_field_name_1", + "bit_count": 4, + "extra_key": "value", + }, + ], + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + with pytest.raises(Exception, match="validation error"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["data"] = [ + { + "name": "channel", + "data_type": "CHANNEL_DATA_TYPE_BIT_FIELD", + "bit_field_elements": [ + {"index": 1, "name": "bit_field_name_1", "bit_count": 4}, + ], + "time_dataset": "channel", + "value_dataset": "channel", + } + ] + Hdf5Config(hdf5_config_data) + + def test_time_column(csv_config_data: dict): csv_config_data["time_column"] = { "format": "INVALID_TIME_FORMAT", @@ -180,3 +360,31 @@ def test_time_column(csv_config_data: dict): "column_number": 1, } CsvConfig(csv_config_data) + + +def test_time_column_hdf5(hdf5_config_data: dict): + hdf5_config_data["time"] = { + "format": "INVALID_TIME_FORMAT", + } + with pytest.raises(Exception, match="Invalid time format"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["time"] = { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + } + with pytest.raises(Exception, match="Missing 'relative_start_time'"): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["time"] = { + "format": "TIME_FORMAT_ABSOLUTE_UNIX_SECONDS", + "relative_start_time": "100", + } + with pytest.raises( + Exception, match="'relative_start_time' specified for non relative time format." + ): + Hdf5Config(hdf5_config_data) + + hdf5_config_data["time"] = { + "format": TimeFormatType.ABSOLUTE_DATETIME, + } + Hdf5Config(hdf5_config_data) diff --git a/python/lib/sift_py/data_import/_hdf5_test.py b/python/lib/sift_py/data_import/_hdf5_test.py new file mode 100644 index 00000000..3f7e6cb0 --- /dev/null +++ b/python/lib/sift_py/data_import/_hdf5_test.py @@ -0,0 +1,608 @@ +from typing import Dict + +import h5py # type: ignore +import numpy as np +import polars as pl +import pytest +from pytest_mock import MockFixture + +from sift_py.data_import._config import Hdf5DataCfg +from sift_py.data_import.config import Hdf5Config +from sift_py.data_import.hdf5 import ( + Hdf5UploadService, + _convert_hdf5_to_dataframes, + _create_csv_config, + _extract_hdf5_data_to_dataframe, + _parse_hdf5_data_cfg, + _split_hdf5_configs, +) + + +class MockHdf5File: + def __init__(self, data_dict: Dict): + self.data_dict = data_dict + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def __getitem__(self, key): + return MockHdf5Dataset(self.data_dict[key]) + + def __contains__(self, key): + return key in self.data_dict + + +class MockHdf5Dataset: + def __init__(self, data): + self.data = data + + def __getitem__(self, key): + return self.data[key] + + +@pytest.fixture +def rest_config(): + return { + "uri": "some_uri.com", + "apikey": "123456789", + } + + +@pytest.fixture +def hdf5_config(): + return Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/DoubleChannel", + "value_dataset": "/DoubleChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + { + "name": "DoubleChannelInGroup", + "time_dataset": "/testgrp/DoubleChannelInGroup", + "value_dataset": "/testgrp/DoubleChannelInGroup", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + { + "name": "StringChannel1", + "time_dataset": "/StringChannel1", + "value_dataset": "/StringChannel1", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_STRING", + }, + { + "name": "BinaryStringChannel2", + "time_dataset": "/BinaryStringChannel2", + "value_dataset": "/BinaryStringChannel2", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_STRING", + }, + { + "name": "EnumChannel", + "time_dataset": "/EnumChannel", + "value_dataset": "/EnumChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_ENUM", + "enum_types": [ + {"key": 1, "name": "On"}, + {"key": 0, "name": "Off"}, + ], + }, + { + "name": "BitFieldChannel", + "time_dataset": "/BitFieldChannel", + "value_dataset": "/BitFieldChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_BIT_FIELD", + "bit_field_elements": [ + {"index": 0, "name": "flag1", "bit_count": 4}, + {"index": 4, "name": "flag2", "bit_count": 4}, + ], + }, + { + "name": "BoolChannel", + "time_dataset": "/BoolChannel", + "value_dataset": "/BoolChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_BOOL", + }, + { + "name": "FloatChannel", + "time_dataset": "/FloatChannel", + "value_dataset": "/FloatChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_FLOAT", + }, + { + "name": "Int32Channel", + "time_dataset": "/Int32Channel", + "value_dataset": "/Int32Channel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_INT_32", + }, + { + "name": "Int64Channel", + "time_dataset": "/Int64Channel", + "value_dataset": "/Int64Channel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_INT_64", + }, + { + "name": "UInt32Channel", + "time_dataset": "/UInt32Channel", + "value_dataset": "/UInt32Channel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_UINT_32", + }, + { + "name": "UInt64Channel", + "time_dataset": "/UInt64Channel", + "value_dataset": "/UInt64Channel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_UINT_64", + }, + ], + } + ) + + +@pytest.fixture +def hdf5_data_dict(): + return { + "/DoubleChannel": np.array( + list(zip([0, 1, 2], [1.0, 2.0, 3.0])), dtype=[("time", np.int64), ("value", np.float64)] + ), + "/testgrp/DoubleChannelInGroup": np.array( + list(zip([4, 5, 6], [-1.0, -2.0, -3.0])), + dtype=[("time", np.int64), ("value", np.float64)], + ), + "/StringChannel1": np.array( + list(zip([0, 1, 2], ["a", "b", "c"])), + dtype=[("time", np.int64), ("value", h5py.string_dtype("utf-8"))], + ), + "/BinaryStringChannel2": np.array( + list(zip([0, 1, 2], [b"a", b"b", b"c"])), + dtype=[("time", np.int64), ("value", h5py.string_dtype("ascii"))], + ), + "/EnumChannel": np.array( + list(zip([0, 1, 2], [1, 0, 1])), dtype=[("time", np.int64), ("value", np.int32)] + ), + "/BitFieldChannel": np.array( + list(zip([0, 1, 2], [15, 240, 15])), dtype=[("time", np.int64), ("value", np.int32)] + ), + "/BoolChannel": np.array( + list(zip([0, 1, 2], [True, False, True])), + dtype=[("time", np.int64), ("value", np.bool_)], + ), + "/FloatChannel": np.array( + list(zip([0, 1, 2], [1.1, 2.2, 3.3])), dtype=[("time", np.int64), ("value", np.float32)] + ), + "/Int32Channel": np.array( + list(zip([0, 1, 2], [10, 20, 30])), dtype=[("time", np.int64), ("value", np.int32)] + ), + "/Int64Channel": np.array( + list(zip([0, 1, 2], [10000000000, 20000000000, 30000000000])), + dtype=[("time", np.int64), ("value", np.int64)], + ), + "/UInt32Channel": np.array( + list(zip([0, 1, 2], [1000, 2000, 3000])), + dtype=[("time", np.int64), ("value", np.uint32)], + ), + "/UInt64Channel": np.array( + list(zip([0, 1, 2], [1000000000000, 2000000000000, 3000000000000])), + dtype=[("time", np.int64), ("value", np.uint64)], + ), + } + + +def test_hdf5_upload_service_valid_path(mocker: MockFixture, rest_config, hdf5_config): + mock_path_is_file = mocker.patch("pathlib.Path.is_file") + mock_path_is_file.return_value = False + + with pytest.raises(Exception, match="does not point to a regular file"): + svc = Hdf5UploadService(rest_config) + svc.upload(path="badpath.h5", hdf5_config=hdf5_config) + + +def test_split_hdf5_configs_splits_strings(hdf5_config): + configs = _split_hdf5_configs(hdf5_config) + # Should split into 1 non-string and 2 string configs (StringChannel1 and StringChannel2) + string_configs = [ + cfg for cfg in configs if cfg._hdf5_config.data[0].data_type == "CHANNEL_DATA_TYPE_STRING" + ] + non_string_configs = [ + cfg for cfg in configs if cfg._hdf5_config.data[0].data_type != "CHANNEL_DATA_TYPE_STRING" + ] + assert len(configs) == 3 + assert len(string_configs) == 2 + assert len(non_string_configs) == 1 + + +def test_create_csv_config(hdf5_config): + csv_cfg = _create_csv_config(hdf5_config) + csv_cfg_dict = csv_cfg.to_dict() + assert "time_column" in csv_cfg_dict + assert "data_columns" in csv_cfg_dict + assert len(csv_cfg_dict["data_columns"]) == 12 + + +def test_parse_hdf5_data_cfg(): + data_cfg = Hdf5DataCfg( + name="TestChannel", + time_dataset="/TestChannel", + value_dataset="/TestChannel", + data_type="CHANNEL_DATA_TYPE_DOUBLE", + units="m/s", + description="Test channel", + enum_types=[], + bit_field_elements=[], + ) + parsed_cfg = _parse_hdf5_data_cfg(data_cfg) + assert parsed_cfg["name"] == "TestChannel" + assert parsed_cfg["data_type"] == "CHANNEL_DATA_TYPE_DOUBLE" + assert parsed_cfg["units"] == "m/s" + assert parsed_cfg["description"] == "Test channel" + assert not parsed_cfg["enum_types"] + assert not parsed_cfg["bit_field_elements"] + + +def test_convert_hdf5_to_dataframes(mocker: MockFixture, hdf5_config, hdf5_data_dict): + mocker.patch("h5py.File", return_value=MockHdf5File(hdf5_data_dict)) + + expected_col_count = len(hdf5_data_dict) + 1 + time_stamps = [] + for data in hdf5_data_dict.values(): + for row in data: + time_stamps.append(row[0]) + expected_row_count = len(set(time_stamps)) + + df = _convert_hdf5_to_dataframes("mock.h5", hdf5_config) + + # Dataframe should have cols == parameter count + 1 (timestamps) and rows == unique timestamps + assert df.shape == (expected_row_count, expected_col_count) + + +def test_two_dataset_extraction(): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "Channel1", + "time_dataset": "/Channel1_Time", + "value_dataset": "/Channel1_Value", + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/Channel1_Time": np.array([0, 1, 2], dtype=np.int64), + "/Channel1_Value": np.array([1.0, 2.0, 3.0], dtype=np.float64), + } + + mock_file = MockHdf5File(data_dict) + + for data_cfg in hdf5_config._hdf5_config.data: + df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg) + assert df.shape == (3, 2) + assert df.columns[1] == data_cfg.name + assert (np.array(df[df.columns[0]]) == data_dict["/Channel1_Time"]).all() + assert (np.array(df[df.columns[1]]) == data_dict["/Channel1_Value"]).all() + + +def test_multi_col_dataset_extraction(): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "Channel1", + "time_dataset": "/Channel1", + "value_dataset": "/Channel1", + "time_column": 4, + "value_column": 3, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/Channel1": [ + np.array([9, 9, 9], dtype=np.int64), + np.array([9, 9, 9], dtype=np.int64), + np.array([1.0, 2.0, 3.0], dtype=np.float64), + np.array([0, 1, 2], dtype=np.int64), + ], + } + + mock_file = MockHdf5File(data_dict) + + for data_cfg in hdf5_config._hdf5_config.data: + df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg) + assert df.shape == (3, 2) + assert df.columns[1] == data_cfg.name + assert (np.array(df[df.columns[0]]) == data_dict["/Channel1"][3]).all() + assert (np.array(df[df.columns[1]]) == data_dict["/Channel1"][2]).all() + + +def test_string_conversion(): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "StringChannel1", + "time_dataset": "/StringChannel1", + "value_dataset": "/StringChannel1", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_STRING", + }, + { + "name": "BinaryStringChannel2", + "time_dataset": "/BinaryStringChannel2", + "value_dataset": "/BinaryStringChannel2", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_STRING", + }, + ], + } + ) + + data_dict = { + "/StringChannel1": np.array( + list(zip([0, 1, 2], ["a", "b", "cat"])), + dtype=[("time", np.int64), ("value", h5py.string_dtype("utf-8"))], + ), + "/BinaryStringChannel2": np.array( + list(zip([0, 1, 2], [b"a", b"b", b"cat"])), + dtype=[("time", np.int64), ("value", h5py.string_dtype("ascii"))], + ), + } + + mock_file = MockHdf5File(data_dict) + + for data_cfg in hdf5_config._hdf5_config.data: + df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg) + assert (np.array(df[data_cfg.name]) == np.array(["a", "b", "cat"])).all() + + +def test_hdf5_to_dataframe_conversion(mocker: MockFixture, hdf5_config, hdf5_data_dict): + mocker.patch("h5py.File", return_value=MockHdf5File(hdf5_data_dict)) + name_dataframe_map = {data.name: data.value_dataset for data in hdf5_config._hdf5_config.data} + + df: pl.DataFrame = _convert_hdf5_to_dataframes("mock.h5", hdf5_config) + + for name, value_dataset in name_dataframe_map.items(): + assert name in df.columns + + # Remove nulls since they won't be in original data + data = df[name].filter(df[name].is_not_null()) + assert len(data) == len(hdf5_data_dict[value_dataset]) + + +def test_bad_time_col(mocker: MockFixture): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/DoubleChannel", + "value_dataset": "/DoubleChannel", + "time_column": 2, + "value_column": 1, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/DoubleChannel": np.array([0, 1, 2], dtype=np.int64), + } + + mocker.patch("h5py.File", return_value=MockHdf5File(data_dict)) + + with pytest.raises(Exception, match="time_column=2 out of range"): + _convert_hdf5_to_dataframes("mock.h5", hdf5_config) + + +def test_bad_val_col(mocker: MockFixture): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/DoubleChannel", + "value_dataset": "/DoubleChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/DoubleChannel": np.array([0, 1, 2], dtype=np.int64), + } + + mocker.patch("h5py.File", return_value=MockHdf5File(data_dict)) + + with pytest.raises(Exception, match="value_column=2 out of range"): + _convert_hdf5_to_dataframes("mock.h5", hdf5_config) + + +def test_missing_time_data(mocker: MockFixture): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/DoubleChannelTime", + "value_dataset": "/DoubleChannelValue", + "time_column": 1, + "value_column": 1, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/DoubleChannelValue": np.array([0, 1, 2], dtype=np.int64), + } + + mocker.patch("h5py.File", return_value=MockHdf5File(data_dict)) + + with pytest.raises(Exception, match="HDF5 file does not contain dataset"): + _convert_hdf5_to_dataframes("mock.h5", hdf5_config) + + +def test_missing_value_data(mocker: MockFixture): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/DoubleChannelTime", + "value_dataset": "/DoubleChannelValue", + "time_column": 1, + "value_column": 1, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/DoubleChannelTime": np.array([0, 1, 2], dtype=np.int64), + } + + mocker.patch("h5py.File", return_value=MockHdf5File(data_dict)) + + with pytest.raises(Exception, match="HDF5 file does not contain dataset"): + _convert_hdf5_to_dataframes("mock.h5", hdf5_config) + + +def test_hdf5_upload(mocker: MockFixture, hdf5_config, hdf5_data_dict, rest_config): + mock_path_is_file = mocker.patch("pathlib.Path.is_file") + mock_path_is_file.return_value = True + + mocker.patch("h5py.File", return_value=MockHdf5File(hdf5_data_dict)) + + mock_csv_upload = mocker.patch("sift_py.data_import.csv.CsvUploadService.upload") + + svc = Hdf5UploadService(rest_config) + import_services = svc.upload( + "mock.h5", + hdf5_config, + ) + + mock_csv_upload.assert_called() + assert len(import_services) == 3 + + +def test_hdf5_upload_string_timestamps(mocker: MockFixture, hdf5_config, rest_config): + mock_path_is_file = mocker.patch("pathlib.Path.is_file") + mock_path_is_file.return_value = True + + data_dict = { + "/timestamps": np.array([ + b"2024-10-07 17:00:09.982126", + b"2024-10-07 17:00:10.022126", + b"2024-10-07 17:00:10.062126" + ]), + "/DoubleChannel": np.array([0, 1, 2], dtype=np.int64), + } + + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_ABSOLUTE_DATETIME", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/timestamps", + "value_dataset": "/DoubleChannel", + "time_column": 1, + "value_column": 1, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + mocker.patch("h5py.File", return_value=MockHdf5File(data_dict)) + + mock_csv_upload = mocker.patch("sift_py.data_import.csv.CsvUploadService.upload") + + svc = Hdf5UploadService(rest_config) + svc.upload( + "mock.h5", + hdf5_config, + ) + + mock_csv_upload.assert_called() diff --git a/python/lib/sift_py/data_import/config.py b/python/lib/sift_py/data_import/config.py index d41354c7..1ffc55d2 100644 --- a/python/lib/sift_py/data_import/config.py +++ b/python/lib/sift_py/data_import/config.py @@ -1,6 +1,6 @@ from typing import Any, Dict -from sift_py.data_import._config import CsvConfigImpl +from sift_py.data_import._config import CsvConfigImpl, Hdf5ConfigImpl class CsvConfig: @@ -17,3 +17,19 @@ def to_json(self) -> str: def to_dict(self) -> Dict[str, Any]: return self._csv_config.model_dump() + + +class Hdf5Config: + """ + Defines the HDF5 config for data imports. + """ + + def __init__(self, config_info: Dict[str, Any]): + self._config_info = config_info + self._hdf5_config = Hdf5ConfigImpl(**self._config_info) + + def to_json(self) -> str: + return self._hdf5_config.model_dump_json() + + def to_dict(self) -> Dict[str, Any]: + return self._hdf5_config.model_dump() diff --git a/python/lib/sift_py/data_import/hdf5.py b/python/lib/sift_py/data_import/hdf5.py new file mode 100644 index 00000000..1b92739f --- /dev/null +++ b/python/lib/sift_py/data_import/hdf5.py @@ -0,0 +1,281 @@ +from pathlib import Path +from typing import Any, Dict, List, TextIO, Union, cast + +try: + import h5py # type: ignore +except ImportError as e: + raise RuntimeError( + "The h5py package is required to use the HDF5 upload service. " + "Please include this dependency in your project by specifying `sift-stack-py[hdf5]`." + ) from e + +try: + import polars as pl +except ImportError as e: + raise RuntimeError( + "The polars package is required to use the HDF5 upload service. " + "Please include this dependency in your project by specifying `sift-stack-py[hdf5]`." + ) from e + +from sift_py.data_import._config import Hdf5DataCfg +from sift_py.data_import.config import CsvConfig, Hdf5Config +from sift_py.data_import.csv import CsvUploadService +from sift_py.data_import.status import DataImportService +from sift_py.data_import.tempfile import NamedTemporaryFile +from sift_py.rest import SiftRestConfig + + +class Hdf5UploadService: + """ + Service to upload HDF5 files. + """ + + _csv_upload_service: CsvUploadService + + def __init__(self, rest_conf: SiftRestConfig): + self._csv_upload_service = CsvUploadService(rest_conf) + + def upload( + self, + path: Union[str, Path], + hdf5_config: Hdf5Config, + show_progress: bool = True, + ) -> List[DataImportService]: + """ + Uploads the HDF5 file pointed to by `path` using a custom HDF5 config. + + Args: + path: The path to the HDF5 file. + hdf5_config: The HDF5 config. + show_progress: Whether to show the status bar or not. + + Returns: + A list of DataImportServices used to get the status of the import. + """ + + posix_path = Path(path) if isinstance(path, str) else path + + if not posix_path.is_file(): + raise Exception(f"Provided path, '{path}', does not point to a regular file.") + + # Prefer to combine data into a single CSV for upload + # Empty data points for the String data type however will be ingested as empty strings + # This necessitates separate files for each string dataframe + # Split up hdf5_config into separate configs. String data is split into separate configs. All other data is a single config + split_configs = _split_hdf5_configs(hdf5_config) + + # For each hdf5_split_config, convert to a csv file and upload + import_services = [] + for config in split_configs: + with NamedTemporaryFile(mode="w", suffix=".csv") as temp_file: + csv_config = _convert_to_csv_file( + path, + temp_file, + config, + ) + import_services.append( + self._csv_upload_service.upload( + temp_file.name, csv_config, show_progress=show_progress + ) + ) + + return import_services + + +def _convert_to_csv_file( + src_path: Union[str, Path], + dst_file: TextIO, + hdf5_config: Hdf5Config, +) -> CsvConfig: + """Converts the HDF5 file to a temporary CSV on disk that we will upload. + + Args: + src_path: The source path to the HDF5 file. + dst_file: The output CSV file. + hdf5_config: The HDF5 config. + + Returns: + The CSV config for the import. + """ + + csv_cfg = _create_csv_config(hdf5_config) + merged_df = _convert_hdf5_to_dataframes(src_path, hdf5_config) + merged_df.write_csv(dst_file.name) + + return csv_cfg + + +def _convert_hdf5_to_dataframes( + src_path: Union[str, Path], hdf5_config: Hdf5Config +) -> pl.DataFrame: + """Convert the HDF5 file to a polars DataFrame. + + Args: + src_path: The source path to the HDF5 file. + hdf5_config: The HDF5 config. + + Returns: + A polars DataFrame containing the data. + """ + data_frames = [] + # Using swmr=True allows opening of HDF5 files written in SWMR mode which may have not been properly closed, but may be otherwise valid + with h5py.File(src_path, "r", libver="latest", swmr=True) as h5f: + for data_cfg in hdf5_config._hdf5_config.data: + df = _extract_hdf5_data_to_dataframe(h5f, data_cfg) + data_frames.append(df) + + # Merge polars dataframes, sort by timestamp, then write to a temp file + # Could write csv without headers, but keeping for debugging purposes + # If header removed, need to update 'first_data_row' in _create_csv_config to 1 + merged_df = data_frames[0] + if len(merged_df) > 1: + for df in data_frames[1:]: + merged_df = merged_df.join(df, on="timestamp", how="full", coalesce=True) + merged_df = merged_df.sort("timestamp") + return merged_df + + +def _extract_hdf5_data_to_dataframe( + hdf5_file: h5py.File, + hdf5_data_config: Hdf5DataCfg, +) -> pl.DataFrame: + """Extract data from an hdf5_file to a polars DataFrame. + + Args: + hdf5_file: HDF5 File + hdf5_data_config: The HDF5 Data Config + + Returns: + A two-column polars DataFrame containing the timestamps and values + """ + + if not hdf5_data_config.time_dataset in hdf5_file: + raise Exception(f"HDF5 file does not contain dataset {hdf5_data_config.time_dataset}") + time_dataset = cast(h5py.Dataset, hdf5_file[hdf5_data_config.time_dataset]) + if not hdf5_data_config.value_dataset in hdf5_file: + raise Exception(f"HDF5 file does not contain dataset {hdf5_data_config.value_dataset}") + value_dataset = cast(h5py.Dataset, hdf5_file[hdf5_data_config.value_dataset]) + + # Convert the full time and value dataset to a dataframe + # This will make it easier to work with any nested columns from a numpy structured array + df_time = pl.DataFrame(time_dataset[:]) + df_value = pl.DataFrame(value_dataset[:]) + time_col = hdf5_data_config.time_column - 1 + val_col = hdf5_data_config.value_column - 1 + + if df_time.shape[1] <= time_col: + raise Exception( + f"time_column={hdf5_data_config.time_column} out of range for {hdf5_data_config.time_dataset}" + ) + if df_value.shape[1] <= val_col: + raise Exception( + f"value_column={hdf5_data_config.value_column} out of range for {hdf5_data_config.value_dataset}" + ) + + time_series = df_time[df_time.columns[time_col]] + value_series = df_value[df_value.columns[val_col]] + + # HDF5 string data may come in as binary, so convert + if time_series.dtype == pl.Binary: + time_series = time_series.cast(pl.String) + if value_series.dtype == pl.Binary: + value_series = value_series.cast(pl.String) + + return pl.DataFrame(data={"timestamp": time_series, hdf5_data_config.name: value_series}) + + +def _create_csv_config(hdf5_config: Hdf5Config) -> CsvConfig: + """Construct a CsvConfig from a Hdf5Config + + Args: + hdf5_path: Path to the HDF5 file + hdf5_config: The HDF5 config + + Returns: + The CSV config. + """ + + csv_config_dict = { + "asset_name": hdf5_config._hdf5_config.asset_name, + "run_name": hdf5_config._hdf5_config.run_name, + "run_id": hdf5_config._hdf5_config.run_id, + "first_data_row": 2, # Row 1 is headers + "time_column": { + "format": hdf5_config._hdf5_config.time.format, + "column_number": 1, + "relative_start_time": hdf5_config._hdf5_config.time.relative_start_time, + }, + } + + data_columns = {} + for idx, data_cfg in enumerate(hdf5_config._hdf5_config.data): + col_num = idx + 2 # 1-indexed and col 1 is time col + data_columns[col_num] = _parse_hdf5_data_cfg(data_cfg) + + csv_config_dict["data_columns"] = data_columns + + return CsvConfig(csv_config_dict) + + +def _parse_hdf5_data_cfg(data_cfg: Hdf5DataCfg) -> Dict[str, Any]: + """ + Parse a HDF5 Data Config to a dict for later conversion to CsvConfig + """ + + csv_dict: Dict[str, Any] = {} + csv_dict["name"] = data_cfg.name + csv_dict["data_type"] = data_cfg.data_type + csv_dict["units"] = data_cfg.units + csv_dict["description"] = data_cfg.description + csv_dict["enum_types"] = data_cfg.enum_types + csv_dict["bit_field_elements"] = data_cfg.bit_field_elements + + return csv_dict + + +def _split_hdf5_configs(hdf5_config: Hdf5Config) -> List[Hdf5Config]: + """ + Split up hdf5_config into separate configs used to generate each CSV file + Needed as string channels cannot be merged without creating empty string data points in the app + + Args: + hdf5_config: The HDF5 config. + + Returns: + List of HDF5Configs for later CSV conversion + """ + + # Combined config for non string types + non_string_config_dict = { + "asset_name": hdf5_config._hdf5_config.asset_name, + "run_name": hdf5_config._hdf5_config.run_name, + "run_id": hdf5_config._hdf5_config.run_id, + "time": hdf5_config._hdf5_config.time, + "data": [ + data_cfg + for data_cfg in hdf5_config._hdf5_config.data + if data_cfg.data_type != "CHANNEL_DATA_TYPE_STRING" + ], + } + + filtered_hdf5_configs = [] + + # Avoid adding combined config if no non-string data present + if non_string_config_dict["data"]: + filtered_hdf5_configs.append(Hdf5Config(non_string_config_dict)) + + for data_cfg in hdf5_config._hdf5_config.data: + if data_cfg.data_type != "CHANNEL_DATA_TYPE_STRING": + continue + string_config = Hdf5Config( + { + "asset_name": hdf5_config._hdf5_config.asset_name, + "run_name": hdf5_config._hdf5_config.run_name, + "run_id": hdf5_config._hdf5_config.run_id, + "time": hdf5_config._hdf5_config.time, + "data": [data_cfg], + } + ) + filtered_hdf5_configs.append(string_config) + + return filtered_hdf5_configs diff --git a/python/pyproject.toml b/python/pyproject.toml index 921a7a85..5bd6b731 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -58,6 +58,7 @@ build = ["pdoc==14.5.0", "build==1.2.1"] openssl = ["pyOpenSSL<24.0.0", "types-pyOpenSSL<24.0.0", "cffi~=1.14"] tdms = ["npTDMS~=1.9"] rosbags = ["rosbags~=0.0"] +hdf5 = ["h5py~=3.11", "polars~=1.8"] [build-system] requires = ["setuptools"] From 35f23e1192180518d6a1e8214c6d5d03887ae2c4 Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Tue, 1 Jul 2025 14:13:56 -0700 Subject: [PATCH 02/11] ruff linting --- python/lib/sift_py/data_import/_hdf5_test.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/python/lib/sift_py/data_import/_hdf5_test.py b/python/lib/sift_py/data_import/_hdf5_test.py index 3f7e6cb0..a79700d2 100644 --- a/python/lib/sift_py/data_import/_hdf5_test.py +++ b/python/lib/sift_py/data_import/_hdf5_test.py @@ -568,11 +568,13 @@ def test_hdf5_upload_string_timestamps(mocker: MockFixture, hdf5_config, rest_co mock_path_is_file.return_value = True data_dict = { - "/timestamps": np.array([ - b"2024-10-07 17:00:09.982126", - b"2024-10-07 17:00:10.022126", - b"2024-10-07 17:00:10.062126" - ]), + "/timestamps": np.array( + [ + b"2024-10-07 17:00:09.982126", + b"2024-10-07 17:00:10.022126", + b"2024-10-07 17:00:10.062126", + ] + ), "/DoubleChannel": np.array([0, 1, 2], dtype=np.int64), } From 1060b0c63c9e38e37ef384a655b390b11fb8f000 Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Tue, 1 Jul 2025 14:19:20 -0700 Subject: [PATCH 03/11] fix mypy typing --- python/lib/sift_py/data_import/_hdf5_test.py | 2 +- python/lib/sift_py/data_import/hdf5.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/lib/sift_py/data_import/_hdf5_test.py b/python/lib/sift_py/data_import/_hdf5_test.py index a79700d2..e4f6981f 100644 --- a/python/lib/sift_py/data_import/_hdf5_test.py +++ b/python/lib/sift_py/data_import/_hdf5_test.py @@ -2,7 +2,7 @@ import h5py # type: ignore import numpy as np -import polars as pl +import polars as pl # type: ignore import pytest from pytest_mock import MockFixture diff --git a/python/lib/sift_py/data_import/hdf5.py b/python/lib/sift_py/data_import/hdf5.py index 1b92739f..823b4b99 100644 --- a/python/lib/sift_py/data_import/hdf5.py +++ b/python/lib/sift_py/data_import/hdf5.py @@ -10,7 +10,7 @@ ) from e try: - import polars as pl + import polars as pl # type: ignore except ImportError as e: raise RuntimeError( "The polars package is required to use the HDF5 upload service. " From a79a4ea969100e42b1a027c74aa0076364d8ae1c Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Tue, 1 Jul 2025 14:26:15 -0700 Subject: [PATCH 04/11] Add optional hdf5 to CI deps --- .github/workflows/python_ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python_ci.yaml b/.github/workflows/python_ci.yaml index a240294d..dee4a54f 100644 --- a/.github/workflows/python_ci.yaml +++ b/.github/workflows/python_ci.yaml @@ -27,7 +27,7 @@ jobs: - name: Pip install run: | python -m pip install --upgrade pip - pip install '.[development,openssl,tdms,rosbags]' + pip install '.[development,openssl,tdms,rosbags,hdf5]' - name: Lint run: | ruff check From 2cf4763f9f5791df240cf1f3f5aadb7e3fdb9a56 Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Wed, 9 Jul 2025 12:18:23 -0700 Subject: [PATCH 05/11] Added base ConfigTimeModel and ConfigDataModel --- python/lib/sift_py/data_import/_config.py | 294 ++++++------------ .../lib/sift_py/data_import/_config_test.py | 24 ++ 2 files changed, 119 insertions(+), 199 deletions(-) diff --git a/python/lib/sift_py/data_import/_config.py b/python/lib/sift_py/data_import/_config.py index 781c2f34..934f93b6 100644 --- a/python/lib/sift_py/data_import/_config.py +++ b/python/lib/sift_py/data_import/_config.py @@ -20,118 +20,9 @@ class ConfigBaseModel(BaseModel): model_config = ConfigDict(extra="forbid") -class CsvConfigImpl(ConfigBaseModel): - """ - Defines the CSV config spec. - """ - - asset_name: str - run_name: str = "" - run_id: str = "" - first_data_row: int - time_column: TimeColumn - data_columns: Dict[int, DataColumn] - - @model_validator(mode="after") - def validate_config(self) -> Self: - if not self.data_columns: - raise PydanticCustomError("invalid_config_error", "Empty 'data_columns'") - - if self.run_name and self.run_id: - raise PydanticCustomError( - "invalid_config_error", "Only specify run_name or run_id, not both." - ) - - return self - - -class Hdf5ConfigImpl(ConfigBaseModel): +class ConfigDataModel(ConfigBaseModel): """ - Defines the HDF5 config spec - """ - - asset_name: str - run_name: str = "" - run_id: str = "" - time: TimeCfg - data: List[Hdf5DataCfg] - - @model_validator(mode="after") - def validate_config(self) -> Self: - if not self.data: - raise PydanticCustomError("invalid_config_error", "Empty 'data'") - - if self.run_name and self.run_id: - raise PydanticCustomError( - "invalid_config_error", "Only specify run_name or run_id, not both." - ) - - return self - - -class EnumType(ConfigBaseModel, ChannelEnumType): - """ - Defines an enum entry in the CSV config. - """ - - -class BitFieldElement(ConfigBaseModel, ChannelBitFieldElement): - """ - Defines a bit field element entry in the CSV config. - """ - - -class TimeColumn(ConfigBaseModel): - """ - Defines a time column entry in the CSV config. - """ - - format: Union[str, TimeFormatType] - column_number: int - relative_start_time: Optional[str] = None - - @field_validator("format", mode="before") - @classmethod - def convert_format(cls, raw: Union[str, TimeFormatType]) -> str: - """ - Converts the provided format value to a string. - """ - if isinstance(raw, TimeFormatType): - return raw.as_human_str() - elif isinstance(raw, str): - value = TimeFormatType.from_str(raw) - if value is not None: - return value.as_human_str() - - raise PydanticCustomError("invalid_config_error", f"Invalid time format: {raw}.") - - @model_validator(mode="after") - def validate_time(self) -> Self: - """ - Validates the provided time format. - """ - format = TimeFormatType.from_str(self.format) # type: ignore - if format is None: - raise PydanticCustomError( - "invalid_config_error", f"Invalid time format: {self.format}." - ) - - if format.is_relative(): - if self.relative_start_time is None: - raise PydanticCustomError("invalid_config_error", "Missing 'relative_start_time'") - else: - if self.relative_start_time is not None: - raise PydanticCustomError( - "invalid_config_error", - "'relative_start_time' specified for non relative time format.", - ) - - return self - - -class DataColumn(ConfigBaseModel): - """ - Defines a data column entry in the CSV config. + Base DataModel with common functionality """ name: str @@ -211,95 +102,9 @@ def validate_bit_fields(self) -> Self: return self -class Hdf5DataCfg(ConfigBaseModel): +class ConfigTimeModel(ConfigBaseModel): """ - Defines a data entry in the HDF5 config. - """ - - name: str - time_dataset: str - time_column: int = 1 - value_dataset: str - value_column: int = 1 - data_type: Union[str, ChannelDataType, Type] = "" - units: str = "" - description: str = "" - # Only valid if data_type is "CHANNEL_DATA_TYPE_ENUM". - enum_types: List[EnumType] = [] - # Only valid if data_type is "CHANNEL_DATA_TYPE_BIT_FIELD" - bit_field_elements: List[BitFieldElement] = [] - - @field_validator("data_type", mode="before") - @classmethod - def convert_data_type(cls, raw: Union[str, ChannelDataType, Type]) -> str: - """ - Converts the provided data_type value to a string. - """ - if isinstance(raw, type): - if raw == int: - return ChannelDataType.INT_64.as_human_str(api_format=True) - elif raw == float: - return ChannelDataType.DOUBLE.as_human_str(api_format=True) - elif raw == str: - return ChannelDataType.STRING.as_human_str(api_format=True) - elif raw == bool: - return ChannelDataType.BOOL.as_human_str(api_format=True) - elif isinstance(raw, ChannelDataType): - return raw.as_human_str(api_format=True) - elif isinstance(raw, str): - value = ChannelDataType.from_str(raw) - if value is not None: - return value.as_human_str(api_format=True) - - raise PydanticCustomError("invalid_config_error", f"Invalid data_type: {raw}.") - - @model_validator(mode="before") - @classmethod - def concatenate_component_and_name(cls, data: Any) -> Any: - """ - Concatenates Component and Name. If Component is not an empty string, raises a deprecation warning. - """ - if isinstance(data, dict): - if "component" in data.keys() and "name" in data.keys(): - _component_deprecation_warning() - data["name"] = channel_fqn(name=data["name"], component=data["component"]) - data.pop("component") - return data - - @model_validator(mode="after") - def validate_enums(self) -> Self: - """ - Validates the enum configuration. - """ - data_type = ChannelDataType.from_str(self.data_type) # type: ignore - if self.enum_types: - if data_type != ChannelDataType.ENUM: - raise PydanticCustomError( - "invalid_config_error", - f"Enums can only be specified with the CHANNEL_DATA_TYPE_ENUM data type. {self.name} is {self.data_type}", - ) - - return self - - @model_validator(mode="after") - def validate_bit_fields(self) -> Self: - """ - Validates the bit field configuration. - """ - data_type = ChannelDataType.from_str(self.data_type) # type: ignore - if self.bit_field_elements: - if data_type != ChannelDataType.BIT_FIELD: - raise PydanticCustomError( - "invalid_config_error", - f"Bit fields can only be specified with the CHANNEL_DATA_TYPE_BIT_FIELD data type. {self.name} is {self.data_type}", - ) - - return self - - -class TimeCfg(ConfigBaseModel): - """ - Defines a time entry in the generic file config. + Base TimeModel with common functionality """ format: Union[str, TimeFormatType] @@ -342,3 +147,94 @@ def validate_time(self) -> Self: ) return self + + +class CsvConfigImpl(ConfigBaseModel): + """ + Defines the CSV config spec. + """ + + asset_name: str + run_name: str = "" + run_id: str = "" + first_data_row: int + time_column: TimeColumn + data_columns: Dict[int, DataColumn] + + @model_validator(mode="after") + def validate_config(self) -> Self: + if not self.data_columns: + raise PydanticCustomError("invalid_config_error", "Empty 'data_columns'") + + if self.run_name and self.run_id: + raise PydanticCustomError( + "invalid_config_error", "Only specify run_name or run_id, not both." + ) + + return self + + +class Hdf5ConfigImpl(ConfigBaseModel): + """ + Defines the HDF5 config spec + """ + + asset_name: str + run_name: str = "" + run_id: str = "" + time: TimeCfg + data: List[Hdf5DataCfg] + + @model_validator(mode="after") + def validate_config(self) -> Self: + if not self.data: + raise PydanticCustomError("invalid_config_error", "Empty 'data'") + + if self.run_name and self.run_id: + raise PydanticCustomError( + "invalid_config_error", "Only specify run_name or run_id, not both." + ) + + return self + + +class EnumType(ConfigBaseModel, ChannelEnumType): + """ + Defines an enum entry in the CSV config. + """ + + +class BitFieldElement(ConfigBaseModel, ChannelBitFieldElement): + """ + Defines a bit field element entry in the CSV config. + """ + + +class TimeColumn(ConfigTimeModel): + """ + Defines a time column entry in the CSV config. + """ + column_number: int + + +class DataColumn(ConfigDataModel): + """ + Defines a data column entry in the CSV config. + """ + + +class TimeCfg(ConfigTimeModel): + """ + Defines a time entry in the generic file config. + """ + + +class Hdf5DataCfg(ConfigDataModel): + """ + Defines a data entry in the HDF5 config. + """ + + time_dataset: str + time_column: int = 1 + value_dataset: str + value_column: int = 1 diff --git a/python/lib/sift_py/data_import/_config_test.py b/python/lib/sift_py/data_import/_config_test.py index df75d70f..fbf561de 100644 --- a/python/lib/sift_py/data_import/_config_test.py +++ b/python/lib/sift_py/data_import/_config_test.py @@ -1,5 +1,7 @@ +import pydantic_core import pytest +from sift_py.data_import._config import ConfigDataModel, ConfigTimeModel from sift_py.data_import.config import CsvConfig, Hdf5Config from sift_py.data_import.time_format import TimeFormatType from sift_py.error import SiftAPIDeprecationWarning @@ -388,3 +390,25 @@ def test_time_column_hdf5(hdf5_config_data: dict): "format": TimeFormatType.ABSOLUTE_DATETIME, } Hdf5Config(hdf5_config_data) + +def test_config_time_model_extra_field(): + time_cfg = { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": 123456789, + "extra_field": 0 + } + + with pytest.raises(pydantic_core._pydantic_core.ValidationError, match="Extra inputs are not permitted"): + ConfigTimeModel(**time_cfg) + +def test_config_data_model_extra_field(): + data_cfg = { + "name": "testname", + "data_type": float, + "extra_field": 0 + } + + with pytest.raises(pydantic_core._pydantic_core.ValidationError, match="Extra inputs are not permitted"): + ConfigDataModel(**data_cfg) + + From 2277768adc8fbc52a35a58e4d7aa946a94c2ef22 Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Wed, 9 Jul 2025 23:18:06 -0600 Subject: [PATCH 06/11] Address PR comments --- python/lib/sift_py/data_import/_config.py | 1 + .../lib/sift_py/data_import/_config_test.py | 20 +-- python/lib/sift_py/data_import/_hdf5_test.py | 136 +++++++++++++++--- python/lib/sift_py/data_import/hdf5.py | 103 +++++++++---- 4 files changed, 202 insertions(+), 58 deletions(-) diff --git a/python/lib/sift_py/data_import/_config.py b/python/lib/sift_py/data_import/_config.py index 934f93b6..bd19c5fe 100644 --- a/python/lib/sift_py/data_import/_config.py +++ b/python/lib/sift_py/data_import/_config.py @@ -214,6 +214,7 @@ class TimeColumn(ConfigTimeModel): """ Defines a time column entry in the CSV config. """ + column_number: int diff --git a/python/lib/sift_py/data_import/_config_test.py b/python/lib/sift_py/data_import/_config_test.py index fbf561de..b0f20d9d 100644 --- a/python/lib/sift_py/data_import/_config_test.py +++ b/python/lib/sift_py/data_import/_config_test.py @@ -391,24 +391,24 @@ def test_time_column_hdf5(hdf5_config_data: dict): } Hdf5Config(hdf5_config_data) + def test_config_time_model_extra_field(): time_cfg = { "format": "TIME_FORMAT_RELATIVE_SECONDS", "relative_start_time": 123456789, - "extra_field": 0 + "extra_field": 0, } - with pytest.raises(pydantic_core._pydantic_core.ValidationError, match="Extra inputs are not permitted"): + with pytest.raises( + pydantic_core._pydantic_core.ValidationError, match="Extra inputs are not permitted" + ): ConfigTimeModel(**time_cfg) + def test_config_data_model_extra_field(): - data_cfg = { - "name": "testname", - "data_type": float, - "extra_field": 0 - } + data_cfg = {"name": "testname", "data_type": float, "extra_field": 0} - with pytest.raises(pydantic_core._pydantic_core.ValidationError, match="Extra inputs are not permitted"): + with pytest.raises( + pydantic_core._pydantic_core.ValidationError, match="Extra inputs are not permitted" + ): ConfigDataModel(**data_cfg) - - diff --git a/python/lib/sift_py/data_import/_hdf5_test.py b/python/lib/sift_py/data_import/_hdf5_test.py index e4f6981f..87525ed8 100644 --- a/python/lib/sift_py/data_import/_hdf5_test.py +++ b/python/lib/sift_py/data_import/_hdf5_test.py @@ -6,14 +6,12 @@ import pytest from pytest_mock import MockFixture -from sift_py.data_import._config import Hdf5DataCfg from sift_py.data_import.config import Hdf5Config from sift_py.data_import.hdf5 import ( Hdf5UploadService, _convert_hdf5_to_dataframes, _create_csv_config, _extract_hdf5_data_to_dataframe, - _parse_hdf5_data_cfg, _split_hdf5_configs, ) @@ -250,26 +248,6 @@ def test_create_csv_config(hdf5_config): assert len(csv_cfg_dict["data_columns"]) == 12 -def test_parse_hdf5_data_cfg(): - data_cfg = Hdf5DataCfg( - name="TestChannel", - time_dataset="/TestChannel", - value_dataset="/TestChannel", - data_type="CHANNEL_DATA_TYPE_DOUBLE", - units="m/s", - description="Test channel", - enum_types=[], - bit_field_elements=[], - ) - parsed_cfg = _parse_hdf5_data_cfg(data_cfg) - assert parsed_cfg["name"] == "TestChannel" - assert parsed_cfg["data_type"] == "CHANNEL_DATA_TYPE_DOUBLE" - assert parsed_cfg["units"] == "m/s" - assert parsed_cfg["description"] == "Test channel" - assert not parsed_cfg["enum_types"] - assert not parsed_cfg["bit_field_elements"] - - def test_convert_hdf5_to_dataframes(mocker: MockFixture, hdf5_config, hdf5_data_dict): mocker.patch("h5py.File", return_value=MockHdf5File(hdf5_data_dict)) @@ -407,6 +385,120 @@ def test_string_conversion(): assert (np.array(df[data_cfg.name]) == np.array(["a", "b", "cat"])).all() +def test_bitfield_conversion(): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "bitfield1", + "time_dataset": "/bitChannel1", + "value_dataset": "/bitChannel1", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_BIT_FIELD", + "bit_field_elements": [ + {"index": 0, "name": "flag1", "bit_count": 4}, + {"index": 4, "name": "flag2", "bit_count": 4}, + ], + } + ], + } + ) + + data_dict = { + "/bitChannel1": np.array( + list(zip([0, 1, 2], [0, 2_147_483_647, 15])), + dtype=[("time", np.int64), ("value", np.int32)], + ), + } + + mock_file = MockHdf5File(data_dict) + + for data_cfg in hdf5_config._hdf5_config.data: + df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg) + assert (np.array(df["timestamp"]) == np.array([0, 1, 2])).all() + assert (np.array(df[data_cfg.name]) == np.array([0, 2_147_483_647, 15])).all() + + +def test_enum_conversion(): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "EnumChannel", + "time_dataset": "/EnumChannel", + "value_dataset": "/EnumChannel", + "time_column": 1, + "value_column": 2, + "data_type": "CHANNEL_DATA_TYPE_ENUM", + "enum_types": [ + {"key": 1, "name": "On"}, + {"key": 0, "name": "Off"}, + {"key": 2_147_483_647, "name": "Invalid"}, + ], + }, + ], + } + ) + + data_dict = { + "/EnumChannel": np.array( + list(zip([0, 1, 2], [1, 0, 2_147_483_647])), + dtype=[("time", np.int64), ("value", np.int32)], + ), + } + + mock_file = MockHdf5File(data_dict) + + for data_cfg in hdf5_config._hdf5_config.data: + df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg) + assert (np.array(df["timestamp"]) == np.array([0, 1, 2])).all() + assert (np.array(df[data_cfg.name]) == np.array([1, 0, 2_147_483_647])).all() + + +def test_time_value_len_diff(): + hdf5_config = Hdf5Config( + { + "asset_name": "TestAsset", + "time": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2025-01-01T01:00:00Z", + }, + "data": [ + { + "name": "DoubleChannel", + "time_dataset": "/time", + "value_dataset": "/data", + "time_column": 1, + "value_column": 1, + "data_type": "CHANNEL_DATA_TYPE_DOUBLE", + }, + ], + } + ) + + data_dict = { + "/time": np.array([0, 1, 2], dtype=np.int64), + "/data": np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float64), + } + + mock_file = MockHdf5File(data_dict) + + for data_cfg in hdf5_config._hdf5_config.data: + with pytest.raises(Exception, match="time and value columns have different lengths"): + _extract_hdf5_data_to_dataframe(mock_file, data_cfg) + + def test_hdf5_to_dataframe_conversion(mocker: MockFixture, hdf5_config, hdf5_data_dict): mocker.patch("h5py.File", return_value=MockHdf5File(hdf5_data_dict)) name_dataframe_map = {data.name: data.value_dataset for data in hdf5_config._hdf5_config.data} diff --git a/python/lib/sift_py/data_import/hdf5.py b/python/lib/sift_py/data_import/hdf5.py index 823b4b99..ab1a33a1 100644 --- a/python/lib/sift_py/data_import/hdf5.py +++ b/python/lib/sift_py/data_import/hdf5.py @@ -1,5 +1,8 @@ +import json +from contextlib import ExitStack from pathlib import Path -from typing import Any, Dict, List, TextIO, Union, cast +from typing import List, TextIO, Tuple, Union, cast +from urllib.parse import urljoin try: import h5py # type: ignore @@ -33,6 +36,7 @@ class Hdf5UploadService: _csv_upload_service: CsvUploadService def __init__(self, rest_conf: SiftRestConfig): + self.RUN_PATH = "/api/v2/runs" self._csv_upload_service = CsvUploadService(rest_conf) def upload( @@ -64,23 +68,73 @@ def upload( # Split up hdf5_config into separate configs. String data is split into separate configs. All other data is a single config split_configs = _split_hdf5_configs(hdf5_config) - # For each hdf5_split_config, convert to a csv file and upload - import_services = [] - for config in split_configs: - with NamedTemporaryFile(mode="w", suffix=".csv") as temp_file: + # Ensures all temp files opened under stack.enter_context() will have __exit__ called as with a standard with statement + with ExitStack() as stack: + # First convert each csv file + csv_items: List[Tuple[str, CsvConfig]] = [] + for config in split_configs: + temp_file = stack.enter_context(NamedTemporaryFile(mode="wt", suffix=".csv")) csv_config = _convert_to_csv_file( path, temp_file, config, ) + csv_items.append((temp_file.name, csv_config)) + + # If a config defines a run_name and is split up, multiple runs will be created. + # Instead, generate a run_id now, and use that instead of a run_name + # Perform now instead of before the config split to avoid creating a run any problems arise before ready to upload + if hdf5_config._hdf5_config.run_name != "": + run_id = self._create_run(hdf5_config._hdf5_config.run_name) + for _, csv_config in csv_items: + csv_config._csv_config.run_name = "" + csv_config._csv_config.run_id = run_id + + # Upload each file + import_services = [] + for filename, csv_config in csv_items: import_services.append( self._csv_upload_service.upload( - temp_file.name, csv_config, show_progress=show_progress + filename, csv_config, show_progress=show_progress ) ) return import_services + def _create_run(self, run_name: str) -> str: + """Create a new run using the REST service, and return a run_id""" + run_uri = urljoin(self._csv_upload_service._base_uri, self.RUN_PATH) + + # Since CSVUploadService is already a RestService, we can utilize that + response = self._csv_upload_service._session.post( + url=run_uri, + headers={ + "Content-Encoding": "application/json", + }, + data=json.dumps( + { + "name": run_name, + "description": "", + } + ), + ) + if response.status_code != 200: + raise Exception( + f"Run creation failed with status code {response.status_code}. {response.text}" + ) + + try: + run_info = response.json() + except (json.decoder.JSONDecodeError, KeyError): + raise Exception(f"Invalid response: {response.text}") + + if "run" not in run_info: + raise Exception("Response missing key: run") + if "runId" not in run_info["run"]: + raise Exception("Response missing key: runId") + + return run_info["run"]["runId"] + def _convert_to_csv_file( src_path: Union[str, Path], @@ -100,7 +154,7 @@ def _convert_to_csv_file( csv_cfg = _create_csv_config(hdf5_config) merged_df = _convert_hdf5_to_dataframes(src_path, hdf5_config) - merged_df.write_csv(dst_file.name) + merged_df.write_csv(dst_file) return csv_cfg @@ -127,6 +181,7 @@ def _convert_hdf5_to_dataframes( # Merge polars dataframes, sort by timestamp, then write to a temp file # Could write csv without headers, but keeping for debugging purposes # If header removed, need to update 'first_data_row' in _create_csv_config to 1 + # Using join instead of concat to avoid issue if all data columns share a name merged_df = data_frames[0] if len(merged_df) > 1: for df in data_frames[1:]: @@ -165,16 +220,21 @@ def _extract_hdf5_data_to_dataframe( if df_time.shape[1] <= time_col: raise Exception( - f"time_column={hdf5_data_config.time_column} out of range for {hdf5_data_config.time_dataset}" + f"{hdf5_data_config.name}: time_column={hdf5_data_config.time_column} out of range for {hdf5_data_config.time_dataset}" ) if df_value.shape[1] <= val_col: raise Exception( - f"value_column={hdf5_data_config.value_column} out of range for {hdf5_data_config.value_dataset}" + f"{hdf5_data_config.name}: value_column={hdf5_data_config.value_column} out of range for {hdf5_data_config.value_dataset}" ) time_series = df_time[df_time.columns[time_col]] value_series = df_value[df_value.columns[val_col]] + if len(time_series) != len(value_series): + raise Exception( + f"{hdf5_data_config.name}: time and value columns have different lengths ({len(time_series)} vs {len(value_series)})" + ) + # HDF5 string data may come in as binary, so convert if time_series.dtype == pl.Binary: time_series = time_series.cast(pl.String) @@ -210,29 +270,20 @@ def _create_csv_config(hdf5_config: Hdf5Config) -> CsvConfig: data_columns = {} for idx, data_cfg in enumerate(hdf5_config._hdf5_config.data): col_num = idx + 2 # 1-indexed and col 1 is time col - data_columns[col_num] = _parse_hdf5_data_cfg(data_cfg) + data_columns[col_num] = { + "name": data_cfg.name, + "data_type": data_cfg.data_type, + "units": data_cfg.units, + "description": data_cfg.description, + "enum_types": data_cfg.enum_types, + "bit_field_elements": data_cfg.bit_field_elements, + } csv_config_dict["data_columns"] = data_columns return CsvConfig(csv_config_dict) -def _parse_hdf5_data_cfg(data_cfg: Hdf5DataCfg) -> Dict[str, Any]: - """ - Parse a HDF5 Data Config to a dict for later conversion to CsvConfig - """ - - csv_dict: Dict[str, Any] = {} - csv_dict["name"] = data_cfg.name - csv_dict["data_type"] = data_cfg.data_type - csv_dict["units"] = data_cfg.units - csv_dict["description"] = data_cfg.description - csv_dict["enum_types"] = data_cfg.enum_types - csv_dict["bit_field_elements"] = data_cfg.bit_field_elements - - return csv_dict - - def _split_hdf5_configs(hdf5_config: Hdf5Config) -> List[Hdf5Config]: """ Split up hdf5_config into separate configs used to generate each CSV file From febb50eadc48517d5d5e0be44f0d48d9d053a5fe Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Wed, 16 Jul 2025 22:47:09 -0700 Subject: [PATCH 07/11] Remove redundant flag in mode --- python/lib/sift_py/data_import/hdf5.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/sift_py/data_import/hdf5.py b/python/lib/sift_py/data_import/hdf5.py index ab1a33a1..8d6660c4 100644 --- a/python/lib/sift_py/data_import/hdf5.py +++ b/python/lib/sift_py/data_import/hdf5.py @@ -73,7 +73,7 @@ def upload( # First convert each csv file csv_items: List[Tuple[str, CsvConfig]] = [] for config in split_configs: - temp_file = stack.enter_context(NamedTemporaryFile(mode="wt", suffix=".csv")) + temp_file = stack.enter_context(NamedTemporaryFile(mode="w", suffix=".csv")) csv_config = _convert_to_csv_file( path, temp_file, From 20152267ef58b1ea355d2635e603df5e647d32e8 Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Fri, 18 Jul 2025 16:13:31 -0700 Subject: [PATCH 08/11] Perf improvements --- python/lib/sift_py/data_import/_hdf5_test.py | 169 ++++++++++++++++- python/lib/sift_py/data_import/hdf5.py | 179 ++++++++++++++----- 2 files changed, 292 insertions(+), 56 deletions(-) diff --git a/python/lib/sift_py/data_import/_hdf5_test.py b/python/lib/sift_py/data_import/_hdf5_test.py index 87525ed8..f10a2bc1 100644 --- a/python/lib/sift_py/data_import/_hdf5_test.py +++ b/python/lib/sift_py/data_import/_hdf5_test.py @@ -12,6 +12,7 @@ _convert_hdf5_to_dataframes, _create_csv_config, _extract_hdf5_data_to_dataframe, + _merge_ts_dataframes, _split_hdf5_configs, ) @@ -240,13 +241,21 @@ def test_split_hdf5_configs_splits_strings(hdf5_config): assert len(non_string_configs) == 1 -def test_create_csv_config(hdf5_config): - csv_cfg = _create_csv_config(hdf5_config) +def test_create_csv_config(mocker: MockFixture, hdf5_config): + # Use a reverse list to make sure the order has changed + data_cols = [d_cfg.name for d_cfg in hdf5_config._hdf5_config.data][::-1] + columns = ["timestamp"] + data_cols + merged_df = pl.DataFrame({col: [] for col in columns}) + + csv_cfg = _create_csv_config(hdf5_config, merged_df) csv_cfg_dict = csv_cfg.to_dict() assert "time_column" in csv_cfg_dict assert "data_columns" in csv_cfg_dict assert len(csv_cfg_dict["data_columns"]) == 12 + for csv_col, df_col in zip(csv_cfg_dict["data_columns"].values(), merged_df.columns[1:]): + assert(csv_col["name"] == df_col) + def test_convert_hdf5_to_dataframes(mocker: MockFixture, hdf5_config, hdf5_data_dict): mocker.patch("h5py.File", return_value=MockHdf5File(hdf5_data_dict)) @@ -291,7 +300,7 @@ def test_two_dataset_extraction(): mock_file = MockHdf5File(data_dict) for data_cfg in hdf5_config._hdf5_config.data: - df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg) + df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg]) assert df.shape == (3, 2) assert df.columns[1] == data_cfg.name assert (np.array(df[df.columns[0]]) == data_dict["/Channel1_Time"]).all() @@ -331,7 +340,7 @@ def test_multi_col_dataset_extraction(): mock_file = MockHdf5File(data_dict) for data_cfg in hdf5_config._hdf5_config.data: - df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg) + df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg]) assert df.shape == (3, 2) assert df.columns[1] == data_cfg.name assert (np.array(df[df.columns[0]]) == data_dict["/Channel1"][3]).all() @@ -381,7 +390,7 @@ def test_string_conversion(): mock_file = MockHdf5File(data_dict) for data_cfg in hdf5_config._hdf5_config.data: - df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg) + df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg]) assert (np.array(df[data_cfg.name]) == np.array(["a", "b", "cat"])).all() @@ -420,7 +429,7 @@ def test_bitfield_conversion(): mock_file = MockHdf5File(data_dict) for data_cfg in hdf5_config._hdf5_config.data: - df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg) + df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg]) assert (np.array(df["timestamp"]) == np.array([0, 1, 2])).all() assert (np.array(df[data_cfg.name]) == np.array([0, 2_147_483_647, 15])).all() @@ -461,7 +470,7 @@ def test_enum_conversion(): mock_file = MockHdf5File(data_dict) for data_cfg in hdf5_config._hdf5_config.data: - df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg) + df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg]) assert (np.array(df["timestamp"]) == np.array([0, 1, 2])).all() assert (np.array(df[data_cfg.name]) == np.array([1, 0, 2_147_483_647])).all() @@ -496,7 +505,7 @@ def test_time_value_len_diff(): for data_cfg in hdf5_config._hdf5_config.data: with pytest.raises(Exception, match="time and value columns have different lengths"): - _extract_hdf5_data_to_dataframe(mock_file, data_cfg) + _extract_hdf5_data_to_dataframe(mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg]) def test_hdf5_to_dataframe_conversion(mocker: MockFixture, hdf5_config, hdf5_data_dict): @@ -700,3 +709,147 @@ def test_hdf5_upload_string_timestamps(mocker: MockFixture, hdf5_config, rest_co ) mock_csv_upload.assert_called() + + +def test_merge_ts_dataframes_no_duplicates(): + """Test merging dataframes with no duplicate channels""" + df1 = pl.DataFrame({ + "timestamp": [0, 1, 2], + "channel1": [1.0, 2.0, 3.0] + }) + df2 = pl.DataFrame({ + "timestamp": [1, 2, 3], + "channel2": [4.0, 5.0, 6.0] + }) + + result = _merge_ts_dataframes(df1, df2) + + assert result.shape == (4, 3) + assert "timestamp" in result.columns + assert "channel1" in result.columns + assert "channel2" in result.columns + result = result.sort("timestamp") + assert result["timestamp"].to_list() == [0, 1, 2, 3] + assert result["channel1"].to_list() == [1.0, 2.0, 3.0, None] + assert result["channel2"].to_list() == [None, 4.0, 5.0, 6.0] + + +def test_merge_ts_dataframes_with_duplicates(): + """Test merging dataframes with duplicate channel names""" + df1 = pl.DataFrame({ + "timestamp": [0, 1, 2], + "channel1": [1.0, 2.0, 3.0], + "common_channel": [10.0, 20.0, 30.0] + }) + df2 = pl.DataFrame({ + "timestamp": [1, 2, 3], + "channel2": [4.0, 5.0, 6.0], + "common_channel": [40.0, 50.0, 60.0] + }) + + result = _merge_ts_dataframes(df1, df2) + + assert result.shape == (4, 4) + assert "timestamp" in result.columns + assert "channel1" in result.columns + assert "channel2" in result.columns + assert "common_channel" in result.columns + + result = result.sort("timestamp") + + # Check that values are coalesced properly + common_values = result["common_channel"].to_list() + assert common_values == [10.0, 20.0, 30.0, 60.0] + + +def test_merge_ts_dataframes_with_nulls(): + """Test merging dataframes where one has null values""" + df1 = pl.DataFrame({ + "timestamp": [0, 1, 2], + "channel1": [1.0, None, 3.0], + "common_channel": [10.0, None, 30.0] + }) + df2 = pl.DataFrame({ + "timestamp": [1, 2, 3], + "channel2": [4.0, 5.0, 6.0], + "common_channel": [40.0, 50.0, 60.0] + }) + + result = _merge_ts_dataframes(df1, df2) + + assert result.shape == (4, 4) + + timestamps = result["timestamp"].to_list() + common_values = result["common_channel"].to_list() + + # At timestamp 1: df1 has null, so should use df2 value (40.0) + assert common_values[timestamps.index(1)] == 40.0 + # At timestamp 2: df1 has 30.0, so should use df1 value + assert common_values[timestamps.index(2)] == 30.0 + + +def test_merge_ts_dataframes_empty_dataframes(): + """Test merging empty dataframes""" + df1 = pl.DataFrame({"timestamp": [], "channel1": []}) + df2 = pl.DataFrame({"timestamp": [], "channel2": []}) + + result = _merge_ts_dataframes(df1, df2) + + assert result.shape == (0, 3) + assert "timestamp" in result.columns + assert "channel1" in result.columns + assert "channel2" in result.columns + + +def test_merge_ts_dataframes_multiple_duplicates(): + """Test merging dataframes with multiple duplicate channel names""" + df1 = pl.DataFrame({ + "timestamp": [0, 1, 2], + "channel1": [1.0, 2.0, 3.0], + "dup1": [10.0, 20.0, 30.0], + "dup2": [100.0, 200.0, 300.0] + }) + df2 = pl.DataFrame({ + "timestamp": [1, 2, 3], + "channel2": [4.0, 5.0, 6.0], + "dup1": [40.0, 50.0, 60.0], + "dup2": [400.0, 500.0, 600.0] + }) + + result = _merge_ts_dataframes(df1, df2) + + assert result.shape == (4, 5) + expected_columns = {"timestamp", "channel1", "channel2", "dup1", "dup2"} + assert set(result.columns) == expected_columns + + # At timestamp 0: should have df1 values only + assert result.filter(pl.col("timestamp") == 0)["dup1"].item() == 10.0 + assert result.filter(pl.col("timestamp") == 0)["dup2"].item() == 100.0 + + # At timestamp 3: should have df2 values only + assert result.filter(pl.col("timestamp") == 3)["dup1"].item() == 60.0 + assert result.filter(pl.col("timestamp") == 3)["dup2"].item() == 600.0 + + +def test_merge_ts_dataframes_different_dtypes(): + """Test merging dataframes with different data types""" + df1 = pl.DataFrame({ + "timestamp": [0, 1, 2], + "int_channel": [1, 2, 3], + "common_channel": [10.0, 20.0, 30.0] + }) + df2 = pl.DataFrame({ + "timestamp": [1, 2, 3], + "string_channel": ["a", "b", "c"], + "common_channel": [40.0, 50.0, 60.0] + }) + + result = _merge_ts_dataframes(df1, df2) + + assert result.shape == (4, 4) + assert "int_channel" in result.columns + assert "string_channel" in result.columns + assert "common_channel" in result.columns + result = result.sort("timestamp") + assert result["string_channel"].to_list() == [None, "a", "b", "c"] + assert result["common_channel"].to_list() == [10.0, 20.0, 30.0, 60.0] diff --git a/python/lib/sift_py/data_import/hdf5.py b/python/lib/sift_py/data_import/hdf5.py index 8d6660c4..862ba31e 100644 --- a/python/lib/sift_py/data_import/hdf5.py +++ b/python/lib/sift_py/data_import/hdf5.py @@ -1,7 +1,8 @@ import json +import uuid from contextlib import ExitStack from pathlib import Path -from typing import List, TextIO, Tuple, Union, cast +from typing import Dict, List, TextIO, Tuple, Union, cast from urllib.parse import urljoin try: @@ -38,6 +39,7 @@ class Hdf5UploadService: def __init__(self, rest_conf: SiftRestConfig): self.RUN_PATH = "/api/v2/runs" self._csv_upload_service = CsvUploadService(rest_conf) + self._prev_run_id: str = "" def upload( self, @@ -84,12 +86,19 @@ def upload( # If a config defines a run_name and is split up, multiple runs will be created. # Instead, generate a run_id now, and use that instead of a run_name # Perform now instead of before the config split to avoid creating a run any problems arise before ready to upload + # Active run_id copied to _prev_run_id for user reference if hdf5_config._hdf5_config.run_name != "": run_id = self._create_run(hdf5_config._hdf5_config.run_name) for _, csv_config in csv_items: csv_config._csv_config.run_name = "" csv_config._csv_config.run_id = run_id + self._prev_run_id = run_id + elif hdf5_config._hdf5_config.run_id != "": + self._prev_run_id = hdf5_config._hdf5_config.run_id + else: + self._prev_run_id = "" + # Upload each file import_services = [] for filename, csv_config in csv_items: @@ -101,6 +110,10 @@ def upload( return import_services + def get_previous_upload_run_id(self) -> str | None: + """Return the run_id used in the previous upload""" + return self._prev_run_id + def _create_run(self, run_name: str) -> str: """Create a new run using the REST service, and return a run_id""" run_uri = urljoin(self._csv_upload_service._base_uri, self.RUN_PATH) @@ -152,13 +165,12 @@ def _convert_to_csv_file( The CSV config for the import. """ - csv_cfg = _create_csv_config(hdf5_config) merged_df = _convert_hdf5_to_dataframes(src_path, hdf5_config) + csv_cfg = _create_csv_config(hdf5_config, merged_df) merged_df.write_csv(dst_file) return csv_cfg - def _convert_hdf5_to_dataframes( src_path: Union[str, Path], hdf5_config: Hdf5Config ) -> pl.DataFrame: @@ -171,85 +183,149 @@ def _convert_hdf5_to_dataframes( Returns: A polars DataFrame containing the data. """ + # Group data configs by matching time arrays to optimize downstream data processing + data_cfg_ts_map: Dict[Tuple[str, int], List[Hdf5DataCfg]] = {} + for data_cfg in hdf5_config._hdf5_config.data: + map_tuple = (data_cfg.time_dataset, data_cfg.time_column) + if map_tuple not in data_cfg_ts_map: + data_cfg_ts_map[map_tuple] = [] + data_cfg_ts_map[map_tuple].append(data_cfg) + data_frames = [] # Using swmr=True allows opening of HDF5 files written in SWMR mode which may have not been properly closed, but may be otherwise valid with h5py.File(src_path, "r", libver="latest", swmr=True) as h5f: - for data_cfg in hdf5_config._hdf5_config.data: - df = _extract_hdf5_data_to_dataframe(h5f, data_cfg) + for (time_path, time_col), data_cfgs in data_cfg_ts_map.items(): + df = _extract_hdf5_data_to_dataframe(h5f, time_path, time_col, data_cfgs) data_frames.append(df) - # Merge polars dataframes, sort by timestamp, then write to a temp file - # Could write csv without headers, but keeping for debugging purposes - # If header removed, need to update 'first_data_row' in _create_csv_config to 1 - # Using join instead of concat to avoid issue if all data columns share a name - merged_df = data_frames[0] - if len(merged_df) > 1: - for df in data_frames[1:]: - merged_df = merged_df.join(df, on="timestamp", how="full", coalesce=True) - merged_df = merged_df.sort("timestamp") + # Merge polars dataframes by joining pairs, then merging those pairs until one dataframe remains + # More optimized than joining one by one + # pl.concat(data_frames, how="align") in practice can lead to a fatal crash with larger files + # https://github.com/pola-rs/polars/issues/14591 + while len(data_frames) > 1: + next_round = [] + for i in range(0, len(data_frames), 2): + if i + 1 < len(data_frames): + df1 = data_frames[i] + df2 = data_frames[i + 1] + merged = _merge_ts_dataframes(df1, df2) + next_round.append(merged) + else: + next_round.append(data_frames[i]) + data_frames = next_round + merged_df = data_frames[0].sort("timestamp") + return merged_df + + +def _merge_ts_dataframes(df1: pl.DataFrame, df2: pl.DataFrame) -> pl.DataFrame: + """Merge two dataframes together. Handles duplicate channels""" + + df1_channels = [col for col in df1.columns if col != "timestamp"] + df2_channels = [col for col in df2.columns if col != "timestamp"] + dup_channels = set(df1_channels) & set(df2_channels) + + if dup_channels: + # Create a unique id to mark duplicate channels + uid = uuid.uuid4() + + df2_renamed = df2.clone() + for col in dup_channels: + df2_renamed = df2_renamed.rename({col: f"{col}_{uid}"}) + + merged_df = df1.join(df2_renamed, on="timestamp", how="full", coalesce=True) + + # Merge duplicate column data + for col in dup_channels: + temp_col_name = f"{col}_{uid}" + merged_df = merged_df.with_columns( + pl.coalesce([pl.col(col), pl.col(temp_col_name)]).alias(col) + ).drop(temp_col_name) + + else: + merged_df = df1.join(df2, on="timestamp", how="full", coalesce=True) + return merged_df def _extract_hdf5_data_to_dataframe( hdf5_file: h5py.File, - hdf5_data_config: Hdf5DataCfg, + time_path: str, + time_col: int, + hdf5_data_configs: List[Hdf5DataCfg], ) -> pl.DataFrame: """Extract data from an hdf5_file to a polars DataFrame. Args: hdf5_file: HDF5 File + time_path: HDF5 time array path + time_col: HDF5 time array col (1-indexed) hdf5_data_config: The HDF5 Data Config Returns: - A two-column polars DataFrame containing the timestamps and values + A multi-column polars DataFrame containing the timestamps and associated channels """ - if not hdf5_data_config.time_dataset in hdf5_file: - raise Exception(f"HDF5 file does not contain dataset {hdf5_data_config.time_dataset}") - time_dataset = cast(h5py.Dataset, hdf5_file[hdf5_data_config.time_dataset]) - if not hdf5_data_config.value_dataset in hdf5_file: - raise Exception(f"HDF5 file does not contain dataset {hdf5_data_config.value_dataset}") - value_dataset = cast(h5py.Dataset, hdf5_file[hdf5_data_config.value_dataset]) - - # Convert the full time and value dataset to a dataframe - # This will make it easier to work with any nested columns from a numpy structured array + if not time_path in hdf5_file: + raise Exception(f"HDF5 file does not contain dataset {time_path}") + time_dataset = cast(h5py.Dataset, hdf5_file[time_path]) df_time = pl.DataFrame(time_dataset[:]) - df_value = pl.DataFrame(value_dataset[:]) - time_col = hdf5_data_config.time_column - 1 - val_col = hdf5_data_config.value_column - 1 + time_idx = time_col - 1 - if df_time.shape[1] <= time_col: - raise Exception( - f"{hdf5_data_config.name}: time_column={hdf5_data_config.time_column} out of range for {hdf5_data_config.time_dataset}" - ) - if df_value.shape[1] <= val_col: + if df_time.shape[1] <= time_idx: raise Exception( - f"{hdf5_data_config.name}: value_column={hdf5_data_config.value_column} out of range for {hdf5_data_config.value_dataset}" - ) - - time_series = df_time[df_time.columns[time_col]] - value_series = df_value[df_value.columns[val_col]] - - if len(time_series) != len(value_series): - raise Exception( - f"{hdf5_data_config.name}: time and value columns have different lengths ({len(time_series)} vs {len(value_series)})" + f"{time_path}: time_column={time_col} out of range" ) + time_series = df_time[df_time.columns[time_idx]] # HDF5 string data may come in as binary, so convert if time_series.dtype == pl.Binary: time_series = time_series.cast(pl.String) - if value_series.dtype == pl.Binary: - value_series = value_series.cast(pl.String) - return pl.DataFrame(data={"timestamp": time_series, hdf5_data_config.name: value_series}) + data_frame = pl.DataFrame(data={"timestamp": time_series}) + + for hdf5_data_config in hdf5_data_configs: + if not hdf5_data_config.value_dataset in hdf5_file: + raise Exception(f"HDF5 file does not contain dataset {hdf5_data_config.value_dataset}") + if time_path != hdf5_data_config.time_dataset: + raise Exception(f"Working time dataset {time_path} does not match data cfg defined dataset {hdf5_data_config.time_dataset}") + if time_col != hdf5_data_config.time_column: + raise Exception(f"Working time col {time_col} does not match data cfg defined col {hdf5_data_config.time_column}") + + value_dataset = cast(h5py.Dataset, hdf5_file[hdf5_data_config.value_dataset]) + + # Convert the full value dataset to a dataframe + # This will make it easier to work with any nested columns from a numpy structured array + df_value = pl.DataFrame(value_dataset[:]) + val_idx = hdf5_data_config.value_column - 1 + + if df_value.shape[1] <= val_idx: + raise Exception( + f"{hdf5_data_config.name}: value_column={hdf5_data_config.value_column} out of range for {hdf5_data_config.value_dataset}" + ) + value_series = df_value[df_value.columns[val_idx]] + + if len(time_series) != len(value_series): + raise Exception( + f"{hdf5_data_config.name}: time and value columns have different lengths ({len(time_series)} vs {len(value_series)})" + ) + # HDF5 string data may come in as binary, so convert + if value_series.dtype == pl.Binary: + value_series = value_series.cast(pl.String) + + data_frame = data_frame.with_columns( + value_series.alias(hdf5_data_config.name) + ) -def _create_csv_config(hdf5_config: Hdf5Config) -> CsvConfig: + return data_frame + + +def _create_csv_config(hdf5_config: Hdf5Config, merged_df: pl.DataFrame) -> CsvConfig: """Construct a CsvConfig from a Hdf5Config Args: - hdf5_path: Path to the HDF5 file hdf5_config: The HDF5 config + merged_df: The merged dataFrame of data Returns: The CSV config. @@ -267,8 +343,15 @@ def _create_csv_config(hdf5_config: Hdf5Config) -> CsvConfig: }, } + # Map each data config to its channel name + config_map = {d_cfg.name: d_cfg for d_cfg in hdf5_config._hdf5_config.data} + + if merged_df.columns[0] != "timestamp": + raise Exception(f"Unexpected merged DataFrame layout. Expected first column to be timestamp, not {merged_df.columns[0]}") + data_columns = {} - for idx, data_cfg in enumerate(hdf5_config._hdf5_config.data): + for idx, channel_name in enumerate(merged_df.columns[1:]): + data_cfg = config_map[channel_name] col_num = idx + 2 # 1-indexed and col 1 is time col data_columns[col_num] = { "name": data_cfg.name, From 0dfe68346071feaec20a78f8665edfe7423084b4 Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Fri, 18 Jul 2025 16:27:32 -0700 Subject: [PATCH 09/11] Comment on exit stack --- python/lib/sift_py/data_import/hdf5.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/lib/sift_py/data_import/hdf5.py b/python/lib/sift_py/data_import/hdf5.py index 862ba31e..0e7cb0d8 100644 --- a/python/lib/sift_py/data_import/hdf5.py +++ b/python/lib/sift_py/data_import/hdf5.py @@ -70,7 +70,8 @@ def upload( # Split up hdf5_config into separate configs. String data is split into separate configs. All other data is a single config split_configs = _split_hdf5_configs(hdf5_config) - # Ensures all temp files opened under stack.enter_context() will have __exit__ called as with a standard with statement + # NamedTemporaryFiles will delete upon exiting with block + # ExitStack used to ensures all temp files stay open through upload, than are closed upon existing block or if program exits early with ExitStack() as stack: # First convert each csv file csv_items: List[Tuple[str, CsvConfig]] = [] From 6fa9a700456e6792da88c5784b2f931b8d79428a Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Fri, 18 Jul 2025 16:29:50 -0700 Subject: [PATCH 10/11] ruff fixes --- python/lib/sift_py/data_import/_hdf5_test.py | 116 ++++++++++--------- python/lib/sift_py/data_import/hdf5.py | 21 ++-- 2 files changed, 71 insertions(+), 66 deletions(-) diff --git a/python/lib/sift_py/data_import/_hdf5_test.py b/python/lib/sift_py/data_import/_hdf5_test.py index f10a2bc1..17557704 100644 --- a/python/lib/sift_py/data_import/_hdf5_test.py +++ b/python/lib/sift_py/data_import/_hdf5_test.py @@ -254,7 +254,7 @@ def test_create_csv_config(mocker: MockFixture, hdf5_config): assert len(csv_cfg_dict["data_columns"]) == 12 for csv_col, df_col in zip(csv_cfg_dict["data_columns"].values(), merged_df.columns[1:]): - assert(csv_col["name"] == df_col) + assert csv_col["name"] == df_col def test_convert_hdf5_to_dataframes(mocker: MockFixture, hdf5_config, hdf5_data_dict): @@ -300,7 +300,9 @@ def test_two_dataset_extraction(): mock_file = MockHdf5File(data_dict) for data_cfg in hdf5_config._hdf5_config.data: - df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg]) + df = _extract_hdf5_data_to_dataframe( + mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg] + ) assert df.shape == (3, 2) assert df.columns[1] == data_cfg.name assert (np.array(df[df.columns[0]]) == data_dict["/Channel1_Time"]).all() @@ -340,7 +342,9 @@ def test_multi_col_dataset_extraction(): mock_file = MockHdf5File(data_dict) for data_cfg in hdf5_config._hdf5_config.data: - df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg]) + df = _extract_hdf5_data_to_dataframe( + mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg] + ) assert df.shape == (3, 2) assert df.columns[1] == data_cfg.name assert (np.array(df[df.columns[0]]) == data_dict["/Channel1"][3]).all() @@ -390,7 +394,9 @@ def test_string_conversion(): mock_file = MockHdf5File(data_dict) for data_cfg in hdf5_config._hdf5_config.data: - df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg]) + df = _extract_hdf5_data_to_dataframe( + mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg] + ) assert (np.array(df[data_cfg.name]) == np.array(["a", "b", "cat"])).all() @@ -429,7 +435,9 @@ def test_bitfield_conversion(): mock_file = MockHdf5File(data_dict) for data_cfg in hdf5_config._hdf5_config.data: - df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg]) + df = _extract_hdf5_data_to_dataframe( + mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg] + ) assert (np.array(df["timestamp"]) == np.array([0, 1, 2])).all() assert (np.array(df[data_cfg.name]) == np.array([0, 2_147_483_647, 15])).all() @@ -470,7 +478,9 @@ def test_enum_conversion(): mock_file = MockHdf5File(data_dict) for data_cfg in hdf5_config._hdf5_config.data: - df = _extract_hdf5_data_to_dataframe(mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg]) + df = _extract_hdf5_data_to_dataframe( + mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg] + ) assert (np.array(df["timestamp"]) == np.array([0, 1, 2])).all() assert (np.array(df[data_cfg.name]) == np.array([1, 0, 2_147_483_647])).all() @@ -505,7 +515,9 @@ def test_time_value_len_diff(): for data_cfg in hdf5_config._hdf5_config.data: with pytest.raises(Exception, match="time and value columns have different lengths"): - _extract_hdf5_data_to_dataframe(mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg]) + _extract_hdf5_data_to_dataframe( + mock_file, data_cfg.time_dataset, data_cfg.time_column, [data_cfg] + ) def test_hdf5_to_dataframe_conversion(mocker: MockFixture, hdf5_config, hdf5_data_dict): @@ -713,14 +725,8 @@ def test_hdf5_upload_string_timestamps(mocker: MockFixture, hdf5_config, rest_co def test_merge_ts_dataframes_no_duplicates(): """Test merging dataframes with no duplicate channels""" - df1 = pl.DataFrame({ - "timestamp": [0, 1, 2], - "channel1": [1.0, 2.0, 3.0] - }) - df2 = pl.DataFrame({ - "timestamp": [1, 2, 3], - "channel2": [4.0, 5.0, 6.0] - }) + df1 = pl.DataFrame({"timestamp": [0, 1, 2], "channel1": [1.0, 2.0, 3.0]}) + df2 = pl.DataFrame({"timestamp": [1, 2, 3], "channel2": [4.0, 5.0, 6.0]}) result = _merge_ts_dataframes(df1, df2) @@ -736,16 +742,12 @@ def test_merge_ts_dataframes_no_duplicates(): def test_merge_ts_dataframes_with_duplicates(): """Test merging dataframes with duplicate channel names""" - df1 = pl.DataFrame({ - "timestamp": [0, 1, 2], - "channel1": [1.0, 2.0, 3.0], - "common_channel": [10.0, 20.0, 30.0] - }) - df2 = pl.DataFrame({ - "timestamp": [1, 2, 3], - "channel2": [4.0, 5.0, 6.0], - "common_channel": [40.0, 50.0, 60.0] - }) + df1 = pl.DataFrame( + {"timestamp": [0, 1, 2], "channel1": [1.0, 2.0, 3.0], "common_channel": [10.0, 20.0, 30.0]} + ) + df2 = pl.DataFrame( + {"timestamp": [1, 2, 3], "channel2": [4.0, 5.0, 6.0], "common_channel": [40.0, 50.0, 60.0]} + ) result = _merge_ts_dataframes(df1, df2) @@ -764,16 +766,12 @@ def test_merge_ts_dataframes_with_duplicates(): def test_merge_ts_dataframes_with_nulls(): """Test merging dataframes where one has null values""" - df1 = pl.DataFrame({ - "timestamp": [0, 1, 2], - "channel1": [1.0, None, 3.0], - "common_channel": [10.0, None, 30.0] - }) - df2 = pl.DataFrame({ - "timestamp": [1, 2, 3], - "channel2": [4.0, 5.0, 6.0], - "common_channel": [40.0, 50.0, 60.0] - }) + df1 = pl.DataFrame( + {"timestamp": [0, 1, 2], "channel1": [1.0, None, 3.0], "common_channel": [10.0, None, 30.0]} + ) + df2 = pl.DataFrame( + {"timestamp": [1, 2, 3], "channel2": [4.0, 5.0, 6.0], "common_channel": [40.0, 50.0, 60.0]} + ) result = _merge_ts_dataframes(df1, df2) @@ -803,18 +801,22 @@ def test_merge_ts_dataframes_empty_dataframes(): def test_merge_ts_dataframes_multiple_duplicates(): """Test merging dataframes with multiple duplicate channel names""" - df1 = pl.DataFrame({ - "timestamp": [0, 1, 2], - "channel1": [1.0, 2.0, 3.0], - "dup1": [10.0, 20.0, 30.0], - "dup2": [100.0, 200.0, 300.0] - }) - df2 = pl.DataFrame({ - "timestamp": [1, 2, 3], - "channel2": [4.0, 5.0, 6.0], - "dup1": [40.0, 50.0, 60.0], - "dup2": [400.0, 500.0, 600.0] - }) + df1 = pl.DataFrame( + { + "timestamp": [0, 1, 2], + "channel1": [1.0, 2.0, 3.0], + "dup1": [10.0, 20.0, 30.0], + "dup2": [100.0, 200.0, 300.0], + } + ) + df2 = pl.DataFrame( + { + "timestamp": [1, 2, 3], + "channel2": [4.0, 5.0, 6.0], + "dup1": [40.0, 50.0, 60.0], + "dup2": [400.0, 500.0, 600.0], + } + ) result = _merge_ts_dataframes(df1, df2) @@ -833,16 +835,16 @@ def test_merge_ts_dataframes_multiple_duplicates(): def test_merge_ts_dataframes_different_dtypes(): """Test merging dataframes with different data types""" - df1 = pl.DataFrame({ - "timestamp": [0, 1, 2], - "int_channel": [1, 2, 3], - "common_channel": [10.0, 20.0, 30.0] - }) - df2 = pl.DataFrame({ - "timestamp": [1, 2, 3], - "string_channel": ["a", "b", "c"], - "common_channel": [40.0, 50.0, 60.0] - }) + df1 = pl.DataFrame( + {"timestamp": [0, 1, 2], "int_channel": [1, 2, 3], "common_channel": [10.0, 20.0, 30.0]} + ) + df2 = pl.DataFrame( + { + "timestamp": [1, 2, 3], + "string_channel": ["a", "b", "c"], + "common_channel": [40.0, 50.0, 60.0], + } + ) result = _merge_ts_dataframes(df1, df2) diff --git a/python/lib/sift_py/data_import/hdf5.py b/python/lib/sift_py/data_import/hdf5.py index 0e7cb0d8..205280a4 100644 --- a/python/lib/sift_py/data_import/hdf5.py +++ b/python/lib/sift_py/data_import/hdf5.py @@ -172,6 +172,7 @@ def _convert_to_csv_file( return csv_cfg + def _convert_hdf5_to_dataframes( src_path: Union[str, Path], hdf5_config: Hdf5Config ) -> pl.DataFrame: @@ -273,9 +274,7 @@ def _extract_hdf5_data_to_dataframe( time_idx = time_col - 1 if df_time.shape[1] <= time_idx: - raise Exception( - f"{time_path}: time_column={time_col} out of range" - ) + raise Exception(f"{time_path}: time_column={time_col} out of range") time_series = df_time[df_time.columns[time_idx]] # HDF5 string data may come in as binary, so convert @@ -288,9 +287,13 @@ def _extract_hdf5_data_to_dataframe( if not hdf5_data_config.value_dataset in hdf5_file: raise Exception(f"HDF5 file does not contain dataset {hdf5_data_config.value_dataset}") if time_path != hdf5_data_config.time_dataset: - raise Exception(f"Working time dataset {time_path} does not match data cfg defined dataset {hdf5_data_config.time_dataset}") + raise Exception( + f"Working time dataset {time_path} does not match data cfg defined dataset {hdf5_data_config.time_dataset}" + ) if time_col != hdf5_data_config.time_column: - raise Exception(f"Working time col {time_col} does not match data cfg defined col {hdf5_data_config.time_column}") + raise Exception( + f"Working time col {time_col} does not match data cfg defined col {hdf5_data_config.time_column}" + ) value_dataset = cast(h5py.Dataset, hdf5_file[hdf5_data_config.value_dataset]) @@ -314,9 +317,7 @@ def _extract_hdf5_data_to_dataframe( if value_series.dtype == pl.Binary: value_series = value_series.cast(pl.String) - data_frame = data_frame.with_columns( - value_series.alias(hdf5_data_config.name) - ) + data_frame = data_frame.with_columns(value_series.alias(hdf5_data_config.name)) return data_frame @@ -348,7 +349,9 @@ def _create_csv_config(hdf5_config: Hdf5Config, merged_df: pl.DataFrame) -> CsvC config_map = {d_cfg.name: d_cfg for d_cfg in hdf5_config._hdf5_config.data} if merged_df.columns[0] != "timestamp": - raise Exception(f"Unexpected merged DataFrame layout. Expected first column to be timestamp, not {merged_df.columns[0]}") + raise Exception( + f"Unexpected merged DataFrame layout. Expected first column to be timestamp, not {merged_df.columns[0]}" + ) data_columns = {} for idx, channel_name in enumerate(merged_df.columns[1:]): From 456858a7eac6dc111fb88843e4caba6eb11ec27f Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Fri, 18 Jul 2025 16:32:42 -0700 Subject: [PATCH 11/11] type fix --- python/lib/sift_py/data_import/hdf5.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/sift_py/data_import/hdf5.py b/python/lib/sift_py/data_import/hdf5.py index 205280a4..d9dae3df 100644 --- a/python/lib/sift_py/data_import/hdf5.py +++ b/python/lib/sift_py/data_import/hdf5.py @@ -111,7 +111,7 @@ def upload( return import_services - def get_previous_upload_run_id(self) -> str | None: + def get_previous_upload_run_id(self) -> str: """Return the run_id used in the previous upload""" return self._prev_run_id