Skip to content

Commit

Permalink
fix: recursive references across multiple schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
simon-schoonjans committed Mar 6, 2024
1 parent f0c00b1 commit 021b739
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 41 deletions.
96 changes: 59 additions & 37 deletions jsf/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ def __parse_primitive(self, name: str, path: str, schema: Dict[str, Any]) -> Pri
}
)

def __parse_object(self, name: str, path: str, schema: Dict[str, Any]) -> Object:
def __parse_object(
self, name: str, path: str, schema: Dict[str, Any], root: Optional[AllTypes] = None
) -> Object:
_, is_nullable = self.__is_field_nullable(schema)
model = Object.from_dict(
{
Expand All @@ -136,21 +138,25 @@ def __parse_object(self, name: str, path: str, schema: Dict[str, Any]) -> Object
**schema,
}
)
self.root = model if not self.root else self.root
root = model if root is None else root
props = []
for _name, definition in schema.get("properties", {}).items():
props.append(self.__parse_definition(_name, path=f"{path}/{_name}", schema=definition))
props.append(
self.__parse_definition(_name, path=f"{path}/{_name}", schema=definition, root=root)
)
model.properties = props
pattern_props = []
for _name, definition in schema.get("patternProperties", {}).items():
pattern_props.append(
self.__parse_definition(_name, path=f"{path}/{_name}", schema=definition)
self.__parse_definition(_name, path=f"{path}/{_name}", schema=definition, root=root)
)
model.patternProperties = pattern_props

return model

def __parse_array(self, name: str, path: str, schema: Dict[str, Any]) -> Array:
def __parse_array(
self, name: str, path: str, schema: Dict[str, Any], root: Optional[AllTypes] = None
) -> Array:
_, is_nullable = self.__is_field_nullable(schema)
arr = Array.from_dict(
{
Expand All @@ -162,11 +168,13 @@ def __parse_array(self, name: str, path: str, schema: Dict[str, Any]) -> Array:
**schema,
}
)
self.root = arr if not self.root else self.root
arr.items = self.__parse_definition(name, f"{path}/items", schema["items"])
root = arr if root is None else root
arr.items = self.__parse_definition(name, f"{path}/items", schema["items"], root=root)
return arr

def __parse_tuple(self, name: str, path: str, schema: Dict[str, Any]) -> JSFTuple:
def __parse_tuple(
self, name: str, path: str, schema: Dict[str, Any], root: Optional[AllTypes] = None
) -> JSFTuple:
_, is_nullable = self.__is_field_nullable(schema)
arr = JSFTuple.from_dict(
{
Expand All @@ -178,10 +186,12 @@ def __parse_tuple(self, name: str, path: str, schema: Dict[str, Any]) -> JSFTupl
**schema,
}
)
self.root = arr if not self.root else self.root
root = arr if root is None else root
arr.items = []
for i, item in enumerate(schema["items"]):
arr.items.append(self.__parse_definition(name, path=f"{path}/{name}[{i}]", schema=item))
arr.items.append(
self.__parse_definition(name, path=f"{path}/{name}[{i}]", schema=item, root=root)
)
return arr

def __is_field_nullable(self, schema: Dict[str, Any]) -> Tuple[str, bool]:
Expand All @@ -196,46 +206,55 @@ def __is_field_nullable(self, schema: Dict[str, Any]) -> Tuple[str, bool]:
return random.choice(item_type_deep_copy), False
return item_type, False

def __parse_anyOf(self, name: str, path: str, schema: Dict[str, Any]) -> AnyOf:
def __parse_anyOf(
self, name: str, path: str, schema: Dict[str, Any], root: Optional[AllTypes] = None
) -> AnyOf:
model = AnyOf(name=name, path=path, max_recursive_depth=self.max_recursive_depth, **schema)
self.root = model if not self.root else self.root
root = model if root is None else root
schemas = []
for d in schema["anyOf"]:
schemas.append(self.__parse_definition(name, path, d))
schemas.append(self.__parse_definition(name, path, d, root=root))
model.schemas = schemas
return model

def __parse_allOf(self, name: str, path: str, schema: Dict[str, Any]) -> AllOf:
def __parse_allOf(
self, name: str, path: str, schema: Dict[str, Any], root: Optional[AllTypes] = None
) -> AllOf:
combined_schema = dict(ChainMap(*schema["allOf"]))
model = AllOf(name=name, path=path, max_recursive_depth=self.max_recursive_depth, **schema)
self.root = model if not self.root else self.root
model.combined_schema = self.__parse_definition(name, path, combined_schema)
root = model if root is None else root
model.combined_schema = self.__parse_definition(name, path, combined_schema, root=root)
return model

def __parse_oneOf(self, name: str, path: str, schema: Dict[str, Any]) -> OneOf:
def __parse_oneOf(
self, name: str, path: str, schema: Dict[str, Any], root: Optional[AllTypes] = None
) -> OneOf:
model = OneOf(name=name, path=path, max_recursive_depth=self.max_recursive_depth, **schema)
self.root = model if not self.root else self.root
root = model if root is None else root
schemas = []
for d in schema["oneOf"]:
schemas.append(self.__parse_definition(name, path, d))
schemas.append(self.__parse_definition(name, path, d, root=root))
model.schemas = schemas
return model

def __parse_named_definition(self, path: str, def_name: str) -> AllTypes:
def __parse_named_definition(self, path: str, def_name: str, root) -> AllTypes:
schema = self.root_schema
parsed_definition = None
for def_tag in ("definitions", "$defs"):
if path.startswith(f"#/{def_tag}/{def_name}"):
self.root.is_recursive = True
return self.root
elif definition := schema.get(def_tag, {}).get(def_name):
root.is_recursive = True
return root
definition = schema.get(def_tag, {}).get(def_name)
if definition is not None:
parsed_definition = self.__parse_definition(
def_name, path=f"{path}/#/{def_tag}/{def_name}", schema=definition
def_name, path=f"{path}/#/{def_tag}/{def_name}", schema=definition, root=root
)
self.definitions[f"#/{def_tag}/{def_name}"] = parsed_definition
return parsed_definition

def __parse_definition(self, name: str, path: str, schema: Dict[str, Any]) -> AllTypes:
def __parse_definition(
self, name: str, path: str, schema: Dict[str, Any], root: Optional[AllTypes] = None
) -> AllTypes:
self.base_state["__all_json_paths__"].append(path)
item_type, is_nullable = self.__is_field_nullable(schema)
if "const" in schema:
Expand All @@ -259,20 +278,20 @@ def __parse_definition(self, name: str, path: str, schema: Dict[str, Any]) -> Al
)
elif "type" in schema:
if item_type == "object" and "properties" in schema:
return self.__parse_object(name, path, schema)
return self.__parse_object(name, path, schema, root)
elif item_type == "object" and "anyOf" in schema:
return self.__parse_anyOf(name, path, schema)
return self.__parse_anyOf(name, path, schema, root)
elif item_type == "object" and "allOf" in schema:
return self.__parse_allOf(name, path, schema)
return self.__parse_allOf(name, path, schema, root)

Check warning on line 285 in jsf/parser.py

View check run for this annotation

Codecov / codecov/patch

jsf/parser.py#L285

Added line #L285 was not covered by tests
elif item_type == "object" and "oneOf" in schema:
return self.__parse_oneOf(name, path, schema)
return self.__parse_oneOf(name, path, schema, root)
elif item_type == "array":
if (schema.get("contains") is not None) or isinstance(schema.get("items"), dict):
return self.__parse_array(name, path, schema)
return self.__parse_array(name, path, schema, root)
if isinstance(schema.get("items"), list) and all(
isinstance(x, dict) for x in schema.get("items", [])
):
return self.__parse_tuple(name, path, schema)
return self.__parse_tuple(name, path, schema, root)
else:
return self.__parse_primitive(name, path, schema)
elif "$ref" in schema:
Expand All @@ -283,20 +302,23 @@ def __parse_definition(self, name: str, path: str, schema: Dict[str, Any]) -> Al
else:
# parse referenced definition
ref_name = frag.split("/")[-1]
cls = self.__parse_named_definition(path, ref_name)
cls = self.__parse_named_definition(path, ref_name, root)
else:
with s_open(ext, "r") as f:
external_jsf = JSF(json.load(f))
cls = deepcopy(external_jsf.definitions.get(f"#{frag}"))
cls.name = name
cls.path = path
if path != "#" and cls == root:
cls.name = name
elif path != "#":
cls.name = name
cls.path = path
return cls
elif "anyOf" in schema:
return self.__parse_anyOf(name, path, schema)
return self.__parse_anyOf(name, path, schema, root)
elif "allOf" in schema:
return self.__parse_allOf(name, path, schema)
return self.__parse_allOf(name, path, schema, root)
elif "oneOf" in schema:
return self.__parse_oneOf(name, path, schema)
return self.__parse_oneOf(name, path, schema, root)
else:
raise ValueError(f"Cannot parse schema {repr(schema)}") # pragma: no cover

Expand Down
7 changes: 3 additions & 4 deletions jsf/tests/test_default_fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,6 @@ def test_fake_anyof_recursive(TestData):

fake_data = [p.generate() for _ in range(10)]
for d in fake_data:
for item in d:
assert isinstance(item, str) or isinstance(item, dict)
if isinstance(item, dict):
assert "value" in item
assert isinstance(d, str) or isinstance(d, dict)
if isinstance(d, dict):
assert "value" in d

0 comments on commit 021b739

Please sign in to comment.