Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Neat 371 relationship loader #554

Merged
merged 24 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.PHONY: run-explorer run-tests run-linters build-ui build-python build-docker run-docker compose-up
version="0.87.4"
version="0.87.5"
run-explorer:
@echo "Running explorer API server..."
# open "http://localhost:8000/static/index.html" || true
Expand Down
2 changes: 1 addition & 1 deletion cognite/neat/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.87.4"
__version__ = "0.87.5"
29 changes: 17 additions & 12 deletions cognite/neat/graph/loaders/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,23 @@ def _repr_html_(cls) -> str:
class CDFLoader(BaseLoader[T_Output]):
_UPLOAD_BATCH_SIZE: ClassVar[int] = 1000

def load_into_cdf(self, client: CogniteClient, dry_run: bool = False) -> UploadResultList:
return UploadResultList(self.load_into_cdf_iterable(client, dry_run))

def load_into_cdf_iterable(self, client: CogniteClient, dry_run: bool = False) -> Iterable[UploadResult]:
missing_capabilities = client.iam.verify_capabilities(self._get_required_capabilities())
if missing_capabilities:
upload_result = UploadResult[Hashable](name=type(self).__name__)
upload_result.issues.append(
FailedAuthorizationError(action="Upload to CDF", reason=str(missing_capabilities))
)
yield upload_result
return
def load_into_cdf(
self, client: CogniteClient, dry_run: bool = False, check_client: bool = True
) -> UploadResultList:
return UploadResultList(self.load_into_cdf_iterable(client, dry_run, check_client))

def load_into_cdf_iterable(
self, client: CogniteClient, dry_run: bool = False, check_client: bool = True
) -> Iterable[UploadResult]:
if check_client:
missing_capabilities = client.iam.verify_capabilities(self._get_required_capabilities())
if missing_capabilities:
upload_result = UploadResult[Hashable](name=type(self).__name__)
upload_result.issues.append(
FailedAuthorizationError(action="Upload to CDF", reason=str(missing_capabilities))
)
yield upload_result
return

issues = NeatIssueList[NeatIssue]()
items: list[T_Output] = []
Expand Down
203 changes: 179 additions & 24 deletions cognite/neat/graph/loaders/_rdf2asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
from collections.abc import Iterable, Sequence
from dataclasses import dataclass, fields
from pathlib import Path
from typing import cast
from typing import Any, cast

import yaml
from cognite.client import CogniteClient
from cognite.client.data_classes import AssetWrite
from cognite.client.data_classes.capabilities import AssetsAcl, Capability
from cognite.client.data_classes import AssetWrite, RelationshipWrite
from cognite.client.data_classes.capabilities import (
AssetsAcl,
Capability,
RelationshipsAcl,
)
from cognite.client.exceptions import CogniteAPIError

from cognite.neat.graph._tracking.base import Tracker
Expand All @@ -17,6 +21,8 @@
from cognite.neat.issues import NeatIssue, NeatIssueList
from cognite.neat.rules.analysis._asset import AssetAnalysis
from cognite.neat.rules.models import AssetRules
from cognite.neat.rules.models.entities import ClassEntity, EntityTypes
from cognite.neat.utils.auxiliary import create_sha256_hash
from cognite.neat.utils.upload import UploadResult

from ._base import _END_OF_CLASS, CDFLoader
Expand Down Expand Up @@ -50,7 +56,7 @@ def as_aliases(self) -> dict[str, str]:


class AssetLoader(CDFLoader[AssetWrite]):
"""Load Assets from NeatGraph to Cognite Data Fusions.
"""Load Assets and their relationships from NeatGraph to Cognite Data Fusions.

Args:
graph_store (NeatGraphStore): The graph store to load the data into.
Expand All @@ -59,8 +65,7 @@ class AssetLoader(CDFLoader[AssetWrite]):
use_orphanage (bool): Whether to use an orphanage for assets that are not part
of the hierarchy. Defaults to False.
use_labels (bool): Whether to use labels for assets. Defaults to False.
asset_external_id_prefix (str | None): The prefix to use for the external id of the assets.
Defaults to None.
external_id_prefix (str | None): The prefix to use for the external ids. Defaults to None.
metadata_keys (AssetLoaderMetadataKeys | None): Mapping between NEAT metadata key names and
their desired names in CDF Asset metadata. Defaults to None.
create_issues (Sequence[NeatIssue] | None): A list of issues that occurred during reading. Defaults to None.
Expand All @@ -74,7 +79,7 @@ def __init__(
data_set_id: int,
use_orphanage: bool = False,
use_labels: bool = False,
asset_external_id_prefix: str | None = None,
external_id_prefix: str | None = None,
metadata_keys: AssetLoaderMetadataKeys | None = None,
create_issues: Sequence[NeatIssue] | None = None,
tracker: type[Tracker] | None = None,
Expand All @@ -89,9 +94,7 @@ def __init__(
AssetWrite.load(
{
"dataSetId": self.data_set_id,
"externalId": (
f"{asset_external_id_prefix or ''}orphanage-{data_set_id}" if use_orphanage else None
),
"externalId": (f"{external_id_prefix or ''}orphanage-{data_set_id}" if use_orphanage else None),
"name": "Orphanage",
"description": "Orphanage for assets whose parents do not exist",
}
Expand All @@ -100,9 +103,10 @@ def __init__(
else None
)

self.asset_external_id_prefix = asset_external_id_prefix
self.external_id_prefix = external_id_prefix
self.metadata_keys = metadata_keys or AssetLoaderMetadataKeys()

self.processed_assets: set[str] = set()
self._issues = NeatIssueList[NeatIssue](create_issues or [])
self._tracker: type[Tracker] = tracker or LogTracker

Expand All @@ -124,27 +128,39 @@ def _load(self, stop_on_exception: bool = False) -> Iterable[AssetWrite | NeatIs
"classes",
)

processed_instances = set()

if self.orphanage:
yield self.orphanage
processed_instances.add(self.orphanage.external_id)
self.processed_assets.add(cast(str, self.orphanage.external_id))

yield from self._create_assets(ordered_classes, tracker, stop_on_exception)
yield from self._create_relationship(ordered_classes, tracker, stop_on_exception)

def _create_assets(
self,
ordered_classes: list[ClassEntity],
tracker: Tracker,
stop_on_exception: bool,
) -> Iterable[Any]:
for class_ in ordered_classes:
tracker.start(repr(class_.id))

property_renaming_config = AssetAnalysis(self.rules).define_property_renaming_config(class_)
property_renaming_config = AssetAnalysis(self.rules).define_asset_property_renaming_config(class_)

for identifier, properties in self.graph_store.read(class_.suffix):
fields = _process_properties(properties, property_renaming_config)
identifier = f"{self.external_id_prefix or ''}{identifier}"

fields = _process_asset_properties(properties, property_renaming_config)
# set data set id and external id
fields["dataSetId"] = self.data_set_id
fields["externalId"] = identifier

if parent_external_id := fields.get("parentExternalId", None):
fields["parentExternalId"] = f"{self.external_id_prefix or ''}{parent_external_id}"

# check on parent
if "parentExternalId" in fields and fields["parentExternalId"] not in processed_instances:
if "parentExternalId" in fields and fields["parentExternalId"] not in self.processed_assets:
error = loader_issues.InvalidInstanceError(
type_="asset",
type_=EntityTypes.asset,
identifier=identifier,
reason=(
f"Parent asset {fields['parentExternalId']} does not exist or failed creation"
Expand All @@ -169,16 +185,98 @@ def _load(self, stop_on_exception: bool = False) -> Iterable[AssetWrite | NeatIs

try:
yield AssetWrite.load(fields)
processed_instances.add(identifier)
self.processed_assets.add(identifier)
except KeyError as e:
error = loader_issues.InvalidInstanceError(type_="asset", identifier=identifier, reason=str(e))
error = loader_issues.InvalidInstanceError(
type_=EntityTypes.asset, identifier=identifier, reason=str(e)
)
tracker.issue(error)
if stop_on_exception:
raise error.as_exception() from e
yield error

yield _END_OF_CLASS

def _create_relationship(
self,
ordered_classes: list[ClassEntity],
tracker: Tracker,
stop_on_exception: bool,
) -> Iterable[Any]:
for class_ in ordered_classes:
tracker.start(repr(class_.id))

property_renaming_config = AssetAnalysis(self.rules).define_relationship_property_renaming_config(class_)

# class does not have any relationship properties
if not property_renaming_config:
continue

for source_external_id, properties in self.graph_store.read(class_.suffix):
relationships = _process_relationship_properties(properties, property_renaming_config)

source_external_id = f"{self.external_id_prefix or ''}{source_external_id}"

# check if source asset exists
if source_external_id not in self.processed_assets:
error = loader_issues.InvalidInstanceError(
type_=EntityTypes.relationship,
identifier=source_external_id,
reason=(
f"Asset {source_external_id} does not exist! "
"Aborting creation of relationships which use this asset as the source."
),
)
tracker.issue(error)
if stop_on_exception:
raise error.as_exception()
yield error
continue

for _, target_external_ids in relationships.items():
# we can have 1-many relationships
for target_external_id in target_external_ids:
target_external_id = f"{self.external_id_prefix or ''}{target_external_id}"
# check if source asset exists
if target_external_id not in self.processed_assets:
error = loader_issues.InvalidInstanceError(
type_=EntityTypes.relationship,
identifier=target_external_id,
reason=(
f"Asset {target_external_id} does not exist! "
f"Cannot create relationship between {source_external_id}"
f" and {target_external_id}. "
),
)
tracker.issue(error)
if stop_on_exception:
raise error.as_exception()
yield error
continue

external_id = "relationship_" + create_sha256_hash(f"{source_external_id}_{target_external_id}")
try:
yield RelationshipWrite(
external_id=external_id,
source_external_id=source_external_id,
target_external_id=target_external_id,
source_type="asset",
target_type="asset",
data_set_id=self.data_set_id,
)
except KeyError as e:
error = loader_issues.InvalidInstanceError(
type_=EntityTypes.relationship,
identifier=external_id,
reason=str(e),
)
tracker.issue(error)
if stop_on_exception:
raise error.as_exception() from e
yield error

yield _END_OF_CLASS

def _get_required_capabilities(self) -> list[Capability]:
return [
AssetsAcl(
Expand All @@ -187,10 +285,33 @@ def _get_required_capabilities(self) -> list[Capability]:
AssetsAcl.Action.Read,
],
scope=AssetsAcl.Scope.DataSet([self.data_set_id]),
)
),
RelationshipsAcl(
actions=[
RelationshipsAcl.Action.Write,
RelationshipsAcl.Action.Read,
],
scope=RelationshipsAcl.Scope.DataSet([self.data_set_id]),
),
]

def _upload_to_cdf(
self,
client: CogniteClient,
items: list[AssetWrite] | list[RelationshipWrite],
dry_run: bool,
read_issues: NeatIssueList,
) -> Iterable[UploadResult]:
if isinstance(items[0], AssetWrite) and all(isinstance(item, AssetWrite) for item in items):
yield from self._upload_assets_to_cdf(client, cast(list[AssetWrite], items), dry_run, read_issues)
elif isinstance(items[0], RelationshipWrite) and all(isinstance(item, RelationshipWrite) for item in items):
yield from self._upload_relationships_to_cdf(
client, cast(list[RelationshipWrite], items), dry_run, read_issues
)
else:
raise ValueError(f"Item {items[0]} is not supported. This is a bug in neat please report it.")

def _upload_assets_to_cdf(
self,
client: CogniteClient,
items: list[AssetWrite],
Expand All @@ -207,17 +328,39 @@ def _upload_to_cdf(
yield result
else:
for asset in upserted:
result = UploadResult[str](name="asset", issues=read_issues)
result = UploadResult[str](name="Asset", issues=read_issues)
result.upserted.add(cast(str, asset.external_id))
yield result

def _upload_relationships_to_cdf(
self,
client: CogniteClient,
items: list[RelationshipWrite],
dry_run: bool,
read_issues: NeatIssueList,
) -> Iterable[UploadResult]:
try:
upserted = client.relationships.upsert(items, mode="replace")
except CogniteAPIError as e:
result = UploadResult[str](name="Relationship", issues=read_issues)
result.error_messages.append(str(e))
result.failed_upserted.update(item.as_id() for item in e.failed + e.unknown)
result.upserted.update(item.as_id() for item in e.successful)
yield result
else:
for relationship in upserted:
result = UploadResult[str](name="relationship", issues=read_issues)
result.upserted.add(cast(str, relationship.external_id))
yield result

def write_to_file(self, filepath: Path) -> None:
if filepath.suffix not in [".json", ".yaml", ".yml"]:
raise ValueError(f"File format {filepath.suffix} is not supported")
dumped: dict[str, list] = {"assets": []}
dumped: dict[str, list] = {"assets": [], "relationship": []}
for item in self.load(stop_on_exception=False):
key = {
AssetWrite: "assets",
RelationshipWrite: "relationship",
NeatIssue: "issues",
_END_OF_CLASS: "end_of_class",
}.get(type(item))
Expand All @@ -234,7 +377,7 @@ def write_to_file(self, filepath: Path) -> None:
yaml.safe_dump(dumped, f, sort_keys=False)


def _process_properties(properties: dict[str, list[str]], property_renaming_config: dict[str, str]) -> dict:
def _process_asset_properties(properties: dict[str, list[str]], property_renaming_config: dict[str, str]) -> dict:
metadata: dict[str, str] = {}
fields: dict[str, str | dict] = {}

Expand All @@ -251,3 +394,15 @@ def _process_properties(properties: dict[str, list[str]], property_renaming_conf
fields["metadata"] = metadata

return fields


def _process_relationship_properties(
properties: dict[str, list[str]], property_renaming_config: dict[str, str]
) -> dict:
relationships: dict[str, list[str]] = {}

for original_property, values in properties.items():
if renamed_property := property_renaming_config.get(original_property, None):
relationships[renamed_property] = values

return relationships
2 changes: 1 addition & 1 deletion cognite/neat/graph/loaders/_rdf2dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DMSLoader(CDFLoader[dm.InstanceApply]):
data_model (dm.DataModel[dm.View] | None): The data model to load.
instance_space (str): The instance space to load the data into.
class_by_view_id (dict[ViewId, str] | None): A mapping from view id to class name. Defaults to None.
creat_issues (Sequence[NeatIssue] | None): A list of issues that occurred during reading. Defaults to None.
create_issues (Sequence[NeatIssue] | None): A list of issues that occurred during reading. Defaults to None.
tracker (type[Tracker] | None): The tracker to use. Defaults to None.
"""

Expand Down
Loading
Loading