Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3688] Fix some edge cases when parsing JSON Schemas #3937

Merged
merged 2 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 44 additions & 25 deletions src/openforms/registrations/contrib/objects_api/json_schema.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import itertools
from dataclasses import dataclass, field, replace
from typing import Iterator, Literal, overload

Expand Down Expand Up @@ -76,31 +77,37 @@ def iter_json_schema_paths(
def _iter_json_schema(
json_schema: ObjectSchema, parent_json_path: JsonSchemaPath
) -> Iterator[tuple[JsonSchemaPath, ObjectSchema | InvalidReference]]:
assert json_schema.get("type") == "object"

yield parent_json_path, json_schema

required = json_schema.get("required", [])

k: str
for k, v in json_schema["properties"].items():
json_path = parent_json_path / k
json_path.required = k in required

match v:
case {"type": "object"}:
yield from _iter_json_schema(v, json_path)
case {"$ref": str(uri)}:
try:
resolved = resolver.lookup(uri)
except Unresolvable as exc:
if fail_fast:
raise
yield json_path, InvalidReference(uri, exc)
else:
yield from _iter_json_schema(resolved.contents, json_path)
case {}:
yield json_path, v
if "properties" in json_schema:
required = json_schema.get("required", [])

k: str
for k, v in json_schema["properties"].items():
json_path = parent_json_path / k
json_path.required = k in required

match v:
case {"properties": dict()}:
# `type` is not required. But if provided, we want to make sure
# it is 'object' (or a list of allowed types where 'object' is allowed).
type_ = v.get("type", "object")
assert isinstance(type_, (str, list))
assert type_ == "object" or "object" in type_

yield from _iter_json_schema(v, json_path)
case {"$ref": str(uri)}:
try:
resolved = resolver.lookup(uri)
except Unresolvable as exc:
if fail_fast:
raise
yield json_path, InvalidReference(uri, exc)
else:
yield from _iter_json_schema(resolved.contents, json_path)
case {}:
yield json_path, v

yield from _iter_json_schema(json_schema, parent_json_path)

Expand Down Expand Up @@ -131,9 +138,11 @@ def get_missing_required_paths(
"""
missing_paths: list[list[str]] = []

for r_path, _ in iter_json_schema_paths(json_schema):
if not r_path.required:
continue
required_paths = [
r_path for r_path, _ in iter_json_schema_paths(json_schema) if r_path.required
]

for r_path in required_paths:

# If a child key is provided (e.g. "a.b"), any required parent key is dismissed (e.g. "a").
if any(JsonSchemaPath(path).startswith(r_path) for path in paths):
Expand All @@ -146,6 +155,16 @@ def get_missing_required_paths(
if any(r_path.startswith(path) for path in paths):
continue

# If the required path is "a.b.c", the two previous checks tell us "a.b.c.x" and "a"/"a.b" wasn't provided.
# However, we need to check if *all* the sub segments (i.e. "a.b" and "a") are required, as we can't
# flag "a.b.c" as missing if for instance "a" is not required *and* not provided.
if not all(
JsonSchemaPath(subsegments, required=True) in required_paths
# fancy way of iterating over [["a"], ["a", "b"]]:
for subsegments in itertools.accumulate(map(lambda s: [s], r_path.segments))
):
continue

missing_paths.append(r_path.segments)

return missing_paths
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,24 @@
"properties": {
"first.name": {"type": "string"},
"last.name": {"type": "string"},
"two_types": {
"type": ["string", "number"],
},
},
},
},
}

JSON_SCHEMA_NO_TYPE = {
"$id": "noise-complaint.schema",
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": False,
"properties": {
"complaintDescription": {"type": "string"},
},
}


JSON_SCHEMA_REFS = {
"$id": "noise-complaint.schema",
"$schema": "http://json-schema.org/draft-07/schema#",
Expand All @@ -30,6 +43,7 @@
"properties": {
"complainant": {"$ref": "#/definitions/person"},
"noisyAddress": {"$ref": "#/definitions/address"},
"phoneNumber": {"$ref": "#/definitions/phoneNumber"},
},
"definitions": {
"person": {
Expand All @@ -45,6 +59,9 @@
"street": {"type": "string"},
},
},
"phoneNumber": {
"type": "number",
},
},
}

Expand Down Expand Up @@ -118,6 +135,22 @@
"required": ["a", "b"],
}

# "a" not required, but "a.b" is:
JSON_SCHEMA_DEEP_REQUIRED = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"a": {
"type": "object",
"required": ["b"],
"properties": {
"b": {"type": "string"},
"c": {"type": "string"},
},
}
},
}


class IterJsonSchemaTests(SimpleTestCase):
"""Test cases to assert the JSON Schemas are correctly iterated over.
Expand All @@ -143,12 +176,27 @@ def test_iter_json_schema_no_refs(self):
"properties": {
"first.name": {"type": "string"},
"last.name": {"type": "string"},
"two_types": {"type": ["string", "number"]},
},
"type": "object",
},
),
(["complainant", "first.name"], {"type": "string"}),
(["complainant", "last.name"], {"type": "string"}),
(["complainant", "two_types"], {"type": ["string", "number"]}),
],
)

def test_iter_json_schema_no_type(self):
paths_list = [
(path.segments, schema)
for path, schema in iter_json_schema_paths(JSON_SCHEMA_NO_TYPE)
]

self.assertEqual(
paths_list[1:],
[
(["complaintDescription"], {"type": "string"}),
],
)

Expand Down Expand Up @@ -183,6 +231,7 @@ def test_iter_json_schema_refs(self):
},
),
(["noisyAddress", "street"], {"type": "string"}),
(["phoneNumber"], {"type": "number"}),
],
)

Expand Down Expand Up @@ -309,3 +358,9 @@ def test_missing_required_paths(self):
)

self.assertEqual(missing_paths, [["b", "d", "e"]])

def test_required_path_deep(self):
"""Test that "a.b" is not marked as required if "a" is not provided."""

missing_paths = get_missing_required_paths(JSON_SCHEMA_DEEP_REQUIRED, [])
self.assertEqual(missing_paths, [])
Loading