Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating dedicated entity types and new rules #510

Merged
merged 17 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cognite/neat/rules/models/asset/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from ._rules import AssetClass, AssetMetadata, AssetProperty, AssetRules
from ._rules_input import AssetRulesInput

__all__ = [
"AssetRules",
"AssetMetadata",
"AssetClass",
"AssetProperty",
"AssetRulesInput",
]
4 changes: 4 additions & 0 deletions cognite/neat/rules/models/asset/_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from cognite.neat.rules.models.information._converter import _InformationRulesConverter


class _AssetRulesConverter(_InformationRulesConverter): ...
155 changes: 155 additions & 0 deletions cognite/neat/rules/models/asset/_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import sys
from typing import TYPE_CHECKING, Any, Literal, cast

from pydantic import Field, field_validator, model_validator
from pydantic.main import IncEx
from rdflib import Namespace

from cognite.neat.constants import PREFIXES
from cognite.neat.issues import MultiValueError
from cognite.neat.rules import issues
from cognite.neat.rules.models._base import BaseRules, SheetList
from cognite.neat.rules.models.domain import DomainRules
from cognite.neat.rules.models.entities import (
CdfResourceEntityList,
ClassEntity,
MultiValueTypeInfo,
ParentClassEntity,
Undefined,
)
from cognite.neat.rules.models.information import (
InformationClass,
InformationMetadata,
InformationProperty,
InformationRules,
)

if TYPE_CHECKING:
from cognite.neat.rules.models.dms._rules import DMSRules


if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self


class AssetMetadata(InformationMetadata): ...
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a new role type?

Suggested change
class AssetMetadata(InformationMetadata): ...
class AssetMetadata(InformationMetadata):
role: ClassVar[RoleTypes] = RoleTypes.asset

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point! Missed it yesterday !



class AssetClass(InformationClass): ...


class AssetProperty(InformationProperty):
"""
A property is a characteristic of a class. It is a named attribute of a class that describes a range of values
or a relationship to another class.

Args:
class_: Class ID to which property belongs
property_: Property ID of the property
name: Property name.
value_type: Type of value property will hold (data or link to another class)
min_count: Minimum count of the property values. Defaults to 0
max_count: Maximum count of the property values. Defaults to None
default: Default value of the property
reference: Reference to the source of the information, HTTP URI
match_type: The match type of the resource being described and the source entity.
transformation: Actual rule for the transformation from source to target representation of
knowledge graph. Defaults to None (no transformation)
implementation: Details on how given class-property is implemented in the classic CDF
"""

implementation: CdfResourceEntityList | None = Field(alias="Implementation", default=None)


class AssetRules(BaseRules):
metadata: AssetMetadata = Field(alias="Metadata")
properties: SheetList[AssetProperty] = Field(alias="Properties")
classes: SheetList[AssetClass] = Field(alias="Classes")
prefixes: dict[str, Namespace] = Field(default_factory=lambda: PREFIXES.copy())
last: "AssetRules | None" = Field(None, alias="Last")
reference: "AssetRules | None" = Field(None, alias="Reference")

@field_validator("prefixes", mode="before")
def parse_str(cls, values: Any) -> Any:
if isinstance(values, dict):
return {key: Namespace(value) if isinstance(value, str) else value for key, value in values.items()}
return values

@model_validator(mode="after")
def update_entities_prefix(self) -> Self:
# update expected_value_types
for property_ in self.properties:
if isinstance(property_.value_type, ClassEntity) and property_.value_type.prefix is Undefined:
property_.value_type.prefix = self.metadata.prefix

if isinstance(property_.value_type, MultiValueTypeInfo):
property_.value_type.set_default_prefix(self.metadata.prefix)

if property_.class_.prefix is Undefined:
property_.class_.prefix = self.metadata.prefix

# update parent classes
for class_ in self.classes:
if class_.parent:
for parent in cast(list[ParentClassEntity], class_.parent):
if not isinstance(parent.prefix, str):
parent.prefix = self.metadata.prefix
if class_.class_.prefix is Undefined:
class_.class_.prefix = self.metadata.prefix

return self

@model_validator(mode="after")
def post_validation(self) -> "AssetRules":
from ._validation import AssetPostValidation

issue_list = AssetPostValidation(cast(InformationRules, self)).validate()
if issue_list.warnings:
issue_list.trigger_warnings()
if issue_list.has_errors:
raise MultiValueError([error for error in issue_list if isinstance(error, issues.NeatValidationError)])
return self

def dump(
self,
mode: Literal["python", "json"] = "python",
by_alias: bool = False,
exclude: IncEx = None,
exclude_none: bool = False,
exclude_unset: bool = False,
exclude_defaults: bool = False,
as_reference: bool = False,
) -> dict[str, Any]:
from ._serializer import _AssetRulesSerializer

dumped = self.model_dump(
mode=mode,
by_alias=by_alias,
exclude=exclude,
exclude_none=exclude_none,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
)
prefix = self.metadata.prefix
serializer = _AssetRulesSerializer(by_alias, prefix)
cleaned = serializer.clean(dumped, as_reference)
last = "Last" if by_alias else "last"
if last_dump := cleaned.get(last):
cleaned[last] = serializer.clean(last_dump, False)
reference = "Reference" if by_alias else "reference"
if self.reference and (ref_dump := cleaned.get(reference)):
prefix = self.reference.metadata.prefix
cleaned[reference] = _AssetRulesSerializer(by_alias, prefix).clean(ref_dump, True)
return cleaned

def as_domain_rules(self) -> DomainRules:
from ._converter import _AssetRulesConverter

return _AssetRulesConverter(cast(InformationRules, self)).as_domain_rules()

def as_dms_architect_rules(self) -> "DMSRules":
from ._converter import _AssetRulesConverter

return _AssetRulesConverter(cast(InformationRules, self)).as_dms_architect_rules()
171 changes: 171 additions & 0 deletions cognite/neat/rules/models/asset/_rules_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Any, cast, overload

from cognite.neat.rules.models._base import _add_alias
from cognite.neat.rules.models.data_types import DataType
from cognite.neat.rules.models.entities import (
ClassEntity,
MultiValueTypeInfo,
Unknown,
UnknownEntity,
)
from cognite.neat.rules.models.information._rules_input import InformationClassInput, InformationMetadataInput

from ._rules import AssetProperty, AssetRules


@dataclass
class AssetMetadataInput(InformationMetadataInput): ...


@dataclass
class AssetPropertyInput:
class_: str
property_: str
value_type: str
name: str | None = None
description: str | None = None
comment: str | None = None
min_count: int | None = None
max_count: int | float | None = None
default: Any | None = None
reference: str | None = None
match_type: str | None = None
transformation: str | None = None
implementation: str | None = None

@classmethod
@overload
def load(cls, data: None) -> None: ...

@classmethod
@overload
def load(cls, data: dict[str, Any]) -> "AssetPropertyInput": ...

@classmethod
@overload
def load(cls, data: list[dict[str, Any]]) -> list["AssetPropertyInput"]: ...

@classmethod
def load(
cls, data: dict[str, Any] | list[dict[str, Any]] | None
) -> "AssetPropertyInput | list[AssetPropertyInput] | None":
if data is None:
return None
if isinstance(data, list) or (isinstance(data, dict) and isinstance(data.get("data"), list)):
items = cast(list[dict[str, Any]], data.get("data") if isinstance(data, dict) else data)
return [loaded for item in items if (loaded := cls.load(item)) is not None]

_add_alias(data, AssetProperty)
return cls(
class_=data.get("class_"), # type: ignore[arg-type]
property_=data.get("property_"), # type: ignore[arg-type]
name=data.get("name", None),
description=data.get("description", None),
comment=data.get("comment", None),
value_type=data.get("value_type"), # type: ignore[arg-type]
min_count=data.get("min_count", None),
max_count=data.get("max_count", None),
default=data.get("default", None),
reference=data.get("reference", None),
match_type=data.get("match_type", None),
transformation=data.get("transformation", None),
implementation=data.get("implementation", None),
)

def dump(self, default_prefix: str) -> dict[str, Any]:
value_type: MultiValueTypeInfo | DataType | ClassEntity | UnknownEntity

# property holding xsd data type
# check if it is multi value type
if "|" in self.value_type:
value_type = MultiValueTypeInfo.load(self.value_type)
value_type.set_default_prefix(default_prefix)

elif DataType.is_data_type(self.value_type):
value_type = DataType.load(self.value_type)

# unknown value type
elif self.value_type == str(Unknown):
value_type = UnknownEntity()

# property holding link to class
else:
value_type = ClassEntity.load(self.value_type, prefix=default_prefix)

return {
"Class": ClassEntity.load(self.class_, prefix=default_prefix),
"Property": self.property_,
"Name": self.name,
"Description": self.description,
"Comment": self.comment,
"Value Type": value_type,
"Min Count": self.min_count,
"Max Count": self.max_count,
"Default": self.default,
"Reference": self.reference,
"Match Type": self.match_type,
"Transformation": self.transformation,
"Implementation": self.implementation,
}


class AssetClassInput(InformationClassInput): ...


@dataclass
class AssetRulesInput:
metadata: AssetMetadataInput
properties: Sequence[AssetPropertyInput]
classes: Sequence[AssetClassInput]
last: "AssetRulesInput | AssetRules | None" = None
reference: "AssetRulesInput | AssetRules | None" = None

@classmethod
@overload
def load(cls, data: dict[str, Any]) -> "AssetRulesInput": ...

@classmethod
@overload
def load(cls, data: None) -> None: ...

@classmethod
def load(cls, data: dict | None) -> "AssetRulesInput | None":
if data is None:
return None
_add_alias(data, AssetRules)

return cls(
metadata=AssetMetadataInput.load(data.get("metadata")), # type: ignore[arg-type]
properties=AssetPropertyInput.load(data.get("properties")), # type: ignore[arg-type]
classes=InformationClassInput.load(data.get("classes")), # type: ignore[arg-type]
last=AssetRulesInput.load(data.get("last")),
reference=AssetRulesInput.load(data.get("reference")),
)

def as_rules(self) -> AssetRules:
return AssetRules.model_validate(self.dump())

def dump(self) -> dict[str, Any]:
default_prefix = self.metadata.prefix
reference: dict[str, Any] | None = None
if isinstance(self.reference, AssetRulesInput):
reference = self.reference.dump()
elif isinstance(self.reference, AssetRules):
# We need to load through the AssetRulesInput to set the correct default space and version
reference = AssetRulesInput.load(self.reference.model_dump()).dump()
last: dict[str, Any] | None = None
if isinstance(self.last, AssetRulesInput):
last = self.last.dump()
elif isinstance(self.last, AssetRules):
# We need to load through the AssetRulesInput to set the correct default space and version
last = AssetRulesInput.load(self.last.model_dump()).dump()

return dict(
Metadata=self.metadata.dump(),
Properties=[prop.dump(default_prefix) for prop in self.properties],
Classes=[class_.dump(default_prefix) for class_ in self.classes],
Last=last,
Reference=reference,
)
Loading
Loading