Skip to content

Commit 8973458

Browse files
hsheth2treff7es
andauthored
feat(ingest): add urn modification helper (#7440)
Co-authored-by: Tamas Nemeth <[email protected]>
1 parent 145cbd4 commit 8973458

File tree

11 files changed

+442
-88
lines changed

11 files changed

+442
-88
lines changed

metadata-ingestion/scripts/avro_codegen.py

+120-40
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import json
2-
import types
3-
import unittest.mock
2+
import re
43
from pathlib import Path
5-
from typing import Any, Dict, Iterable, List, Optional, Union
4+
from typing import Dict, Iterable, List, Optional, Tuple, Union
65

76
import avro.schema
87
import click
@@ -66,29 +65,89 @@ def load_schemas(schemas_path: str) -> Dict[str, dict]:
6665
return schemas
6766

6867

69-
def merge_schemas(schemas_obj: List[Any]) -> str:
70-
# Combine schemas.
71-
merged = ["null"] + schemas_obj
68+
def patch_schemas(schemas: Dict[str, dict], pdl_path: Path) -> Dict[str, dict]:
69+
# We can easily find normal urn types using the generated avro schema,
70+
# but for arrays of urns there's nothing in the avro schema and hence
71+
# we have to look in the PDL files instead.
72+
urn_arrays: Dict[
73+
str, List[Tuple[str, str]]
74+
] = {} # schema name -> list of (field name, type)
7275

73-
# Patch add_name method to NOT complain about duplicate names
74-
def add_name(self, name_attr, space_attr, new_schema):
75-
to_add = avro.schema.Name(name_attr, space_attr, self.default_namespace)
76+
# First, we need to load the PDL files and find all urn arrays.
77+
for pdl_file in Path(pdl_path).glob("**/*.pdl"):
78+
pdl_text = pdl_file.read_text()
7679

77-
self.names[to_add.fullname] = new_schema
78-
return to_add
80+
# TRICKY: We assume that all urn types end with "Urn".
81+
arrays = re.findall(
82+
r"^\s*(\w+)\s*:\s*(?:optional\s+)?array\[(\w*Urn)\]",
83+
pdl_text,
84+
re.MULTILINE,
85+
)
86+
if arrays:
87+
schema_name = pdl_file.stem
88+
urn_arrays[schema_name] = [(item[0], item[1]) for item in arrays]
7989

80-
with unittest.mock.patch("avro.schema.Names.add_name", add_name):
81-
cleaned_schema = avro.schema.make_avsc_object(merged)
90+
# Then, we can patch each schema.
91+
patched_schemas = {}
92+
for name, schema in schemas.items():
93+
patched_schemas[name] = patch_schema(schema, urn_arrays)
8294

83-
# Convert back to an Avro schema JSON representation.
84-
class MappingProxyEncoder(json.JSONEncoder):
85-
def default(self, obj):
86-
if isinstance(obj, types.MappingProxyType):
87-
return dict(obj)
88-
return json.JSONEncoder.default(self, obj)
95+
return patched_schemas
96+
97+
98+
def patch_schema(schema: dict, urn_arrays: Dict[str, List[Tuple[str, str]]]) -> dict:
99+
"""
100+
This method patches the schema to add an "Urn" property to all urn fields.
101+
Because the inner type in an array is not a named Avro schema, for urn arrays
102+
we annotate the array field and add an "urn_is_array" property.
103+
"""
104+
105+
# We're using Names() to generate a full list of embedded schemas.
106+
all_schemas = avro.schema.Names()
107+
patched = avro.schema.make_avsc_object(schema, names=all_schemas)
108+
109+
for nested in all_schemas.names.values():
110+
if isinstance(nested, (avro.schema.EnumSchema, avro.schema.FixedSchema)):
111+
continue
112+
assert isinstance(nested, avro.schema.RecordSchema)
113+
114+
# Patch normal urn types.
115+
field: avro.schema.Field
116+
for field in nested.fields:
117+
java_class: Optional[str] = field.props.get("java", {}).get("class")
118+
if java_class and java_class.startswith(
119+
"com.linkedin.pegasus2avro.common.urn."
120+
):
121+
field.set_prop("Urn", java_class.split(".")[-1])
122+
123+
# Patch array urn types.
124+
if nested.name in urn_arrays:
125+
mapping = urn_arrays[nested.name]
126+
127+
for field_name, type in mapping:
128+
field = nested.fields_dict[field_name]
129+
field.set_prop("Urn", type)
130+
field.set_prop("urn_is_array", True)
131+
132+
return patched.to_json()
133+
134+
135+
def merge_schemas(schemas_obj: List[dict]) -> str:
136+
# Combine schemas as a "union" of all of the types.
137+
merged = ["null"] + schemas_obj
138+
139+
# Patch add_name method to NOT complain about duplicate names.
140+
class NamesWithDups(avro.schema.Names):
141+
def add_name(self, name_attr, space_attr, new_schema):
142+
to_add = avro.schema.Name(name_attr, space_attr, self.default_namespace)
143+
self.names[to_add.fullname] = new_schema
144+
return to_add
145+
146+
cleaned_schema = avro.schema.make_avsc_object(merged, names=NamesWithDups())
89147

148+
# Convert back to an Avro schema JSON representation.
90149
out_schema = cleaned_schema.to_json()
91-
encoded = json.dumps(out_schema, cls=MappingProxyEncoder, indent=2)
150+
encoded = json.dumps(out_schema, indent=2)
92151
return encoded
93152

94153

@@ -149,11 +208,11 @@ def add_avro_python3_warning(filepath: Path) -> None:
149208
import functools
150209
import pathlib
151210
211+
@functools.lru_cache(maxsize=None)
152212
def _load_schema(schema_name: str) -> str:
153213
return (pathlib.Path(__file__).parent / f"{schema_name}.avsc").read_text()
154214
"""
155215
individual_schema_method = """
156-
@functools.lru_cache(maxsize=None)
157216
def get{schema_name}Schema() -> str:
158217
return _load_schema("{schema_name}")
159218
"""
@@ -165,6 +224,17 @@ def make_load_schema_methods(schemas: Iterable[str]) -> str:
165224
)
166225

167226

227+
def save_raw_schemas(schema_save_dir: Path, schemas: Dict[str, dict]) -> None:
228+
# Save raw avsc files.
229+
schema_save_dir.mkdir()
230+
for name, schema in schemas.items():
231+
(schema_save_dir / f"{name}.avsc").write_text(json.dumps(schema, indent=2))
232+
233+
# Add getXSchema methods.
234+
with open(schema_save_dir / "__init__.py", "w") as schema_dir_init:
235+
schema_dir_init.write(make_load_schema_methods(schemas.keys()))
236+
237+
168238
def annotate_aspects(aspects: List[dict], schema_class_file: Path) -> None:
169239
schema_classes_lines = schema_class_file.read_text().splitlines()
170240
line_lookup_table = {line: i for i, line in enumerate(schema_classes_lines)}
@@ -177,9 +247,9 @@ def annotate_aspects(aspects: List[dict], schema_class_file: Path) -> None:
177247
] += """
178248
179249
class _Aspect(DictWrapper):
180-
ASPECT_NAME: str = None # type: ignore
181-
ASPECT_TYPE: str = "default"
182-
ASPECT_INFO: dict = None # type: ignore
250+
ASPECT_NAME: ClassVar[str] = None # type: ignore
251+
ASPECT_TYPE: ClassVar[str] = "default"
252+
ASPECT_INFO: ClassVar[dict] = None # type: ignore
183253
184254
def __init__(self):
185255
if type(self) is _Aspect:
@@ -225,16 +295,18 @@ def get_aspect_info(cls) -> dict:
225295
schema_classes_lines[
226296
empty_line
227297
] += f"\n ASPECT_TYPE = '{aspect['Aspect']['type']}'"
228-
schema_classes_lines[empty_line] += f"\n ASPECT_INFO = {aspect['Aspect']}"
298+
299+
aspect_info = {
300+
k: v for k, v in aspect["Aspect"].items() if k not in {"name", "type"}
301+
}
302+
schema_classes_lines[empty_line] += f"\n ASPECT_INFO = {aspect_info}"
229303

230304
schema_classes_lines[empty_line + 1] += "\n"
231305

232306
# Finally, generate a big list of all available aspects.
233307
newline = "\n"
234308
schema_classes_lines.append(
235309
f"""
236-
from typing import Type
237-
238310
ASPECT_CLASSES: List[Type[_Aspect]] = [
239311
{f',{newline} '.join(f"{aspect['name']}Class" for aspect in aspects)}
240312
]
@@ -252,14 +324,22 @@ def get_aspect_info(cls) -> dict:
252324
@click.argument(
253325
"entity_registry", type=click.Path(exists=True, dir_okay=False), required=True
254326
)
327+
@click.argument(
328+
"pdl_path", type=click.Path(exists=True, file_okay=False), required=True
329+
)
255330
@click.argument(
256331
"schemas_path", type=click.Path(exists=True, file_okay=False), required=True
257332
)
258333
@click.argument("outdir", type=click.Path(), required=True)
259-
def generate(entity_registry: str, schemas_path: str, outdir: str) -> None:
334+
def generate(
335+
entity_registry: str, pdl_path: str, schemas_path: str, outdir: str
336+
) -> None:
260337
entities = load_entity_registry(Path(entity_registry))
261338
schemas = load_schemas(schemas_path)
262339

340+
# Patch the avsc files.
341+
schemas = patch_schemas(schemas, Path(pdl_path))
342+
263343
# Special handling for aspects.
264344
aspects = {
265345
schema["Aspect"]["name"]: schema
@@ -288,8 +368,8 @@ def generate(entity_registry: str, schemas_path: str, outdir: str) -> None:
288368

289369
# Check for unused aspects. We currently have quite a few.
290370
# unused_aspects = set(aspects.keys()) - set().union(
291-
# {entity.keyAspect for entity in entities},
292-
# *(set(entity.aspects) for entity in entities),
371+
# {entity.keyAspect for entity in entities},
372+
# *(set(entity.aspects) for entity in entities),
293373
# )
294374

295375
merged_schema = merge_schemas(list(schemas.values()))
@@ -303,17 +383,17 @@ def generate(entity_registry: str, schemas_path: str, outdir: str) -> None:
303383
Path(outdir) / "schema_classes.py",
304384
)
305385

306-
# Save raw schema files in codegen as well.
386+
# Keep a copy of a few raw avsc files.
387+
required_avsc_schemas = {"MetadataChangeEvent", "MetadataChangeProposal"}
307388
schema_save_dir = Path(outdir) / "schemas"
308-
schema_save_dir.mkdir()
309-
for schema_out_file, schema in schemas.items():
310-
(schema_save_dir / f"{schema_out_file}.avsc").write_text(
311-
json.dumps(schema, indent=2)
312-
)
313-
314-
# Add load_schema method.
315-
with open(schema_save_dir / "__init__.py", "a") as schema_dir_init:
316-
schema_dir_init.write(make_load_schema_methods(schemas.keys()))
389+
save_raw_schemas(
390+
schema_save_dir,
391+
{
392+
name: schema
393+
for name, schema in schemas.items()
394+
if name in required_avsc_schemas
395+
},
396+
)
317397

318398
# Add headers for all generated files
319399
generated_files = Path(outdir).glob("**/*.py")

metadata-ingestion/scripts/codegen.sh

+4-2
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ OUTDIR=./src/datahub/metadata
55

66
# Note: this assumes that datahub has already been built with `./gradlew build`.
77
DATAHUB_ROOT=..
8-
SCHEMAS_ROOT="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin"
8+
9+
SCHEMAS_PDL="$DATAHUB_ROOT/metadata-models/src/main/pegasus/com/linkedin"
10+
SCHEMAS_AVSC="$DATAHUB_ROOT/metadata-events/mxe-schemas/src/renamed/avro/com/linkedin"
911
ENTITY_REGISTRY="$DATAHUB_ROOT/metadata-models/src/main/resources/entity-registry.yml"
1012

1113
rm -r $OUTDIR 2>/dev/null || true
12-
python scripts/avro_codegen.py $ENTITY_REGISTRY $SCHEMAS_ROOT $OUTDIR
14+
python scripts/avro_codegen.py $ENTITY_REGISTRY $SCHEMAS_PDL $SCHEMAS_AVSC $OUTDIR

metadata-ingestion/setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def get_long_description():
3737
"entrypoints",
3838
"docker",
3939
"expandvars>=0.6.5",
40-
"avro-gen3==0.7.8",
40+
"avro-gen3==0.7.10",
4141
# "avro-gen3 @ git+https://github.com/acryldata/avro_gen@master#egg=avro-gen3",
4242
"avro>=1.10.2,<1.11",
4343
"python-dateutil>=2.8.0",

metadata-ingestion/src/datahub/ingestion/graph/client.py

+4-12
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,7 @@ def get_aspect(
144144
response_json = response.json()
145145

146146
# Figure out what field to look in.
147-
record_schema: RecordSchema = aspect_type.__getattribute__(
148-
aspect_type, "RECORD_SCHEMA"
149-
)
147+
record_schema: RecordSchema = aspect_type.RECORD_SCHEMA
150148
aspect_type_name = record_schema.fullname.replace(".pegasus2avro", "")
151149

152150
# Deserialize the aspect json into the aspect type.
@@ -335,15 +333,9 @@ def get_aspects_for_entity(
335333

336334
result: Dict[str, Optional[Aspect]] = {}
337335
for aspect_type in aspect_types:
338-
record_schema: RecordSchema = aspect_type.__getattribute__(
339-
aspect_type, "RECORD_SCHEMA"
340-
)
341-
if not record_schema:
342-
logger.warning(
343-
f"Failed to infer type name of the aspect from the aspect type class {aspect_type}. Continuing, but this will fail."
344-
)
345-
else:
346-
aspect_type_name = record_schema.props["Aspect"]["name"]
336+
record_schema = aspect_type.RECORD_SCHEMA
337+
aspect_type_name = record_schema.props["Aspect"]["name"]
338+
347339
aspect_json = response_json.get("aspects", {}).get(aspect_type_name)
348340
if aspect_json:
349341
# need to apply a transform to the response to match rest.li and avro serialization

metadata-ingestion/src/datahub/ingestion/sink/sink_registry.py

-5
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,3 @@ def _check_sink_classes(cls: Type[Sink]) -> None:
1313

1414
sink_registry = PluginRegistry[Sink](extra_cls_check=_check_sink_classes)
1515
sink_registry.register_from_entrypoint("datahub.ingestion.sink.plugins")
16-
17-
# These sinks are always enabled
18-
assert sink_registry.get("console")
19-
assert sink_registry.get("file")
20-
assert sink_registry.get("blackhole")

0 commit comments

Comments
 (0)