Skip to content

Commit 47490ec

Browse files
authored
feat(ingestion/iceberg): Add capability to extract namespace properties to the iceberg ingestor (#13238)
1 parent 72aab9f commit 47490ec

File tree

5 files changed

+77
-8
lines changed

5 files changed

+77
-8
lines changed

metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
)
1717
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
1818
from pyiceberg.table import Table
19-
from pyiceberg.typedef import Identifier
19+
from pyiceberg.typedef import Identifier, Properties
2020
from pyiceberg.types import (
2121
BinaryType,
2222
BooleanType,
@@ -387,8 +387,13 @@ def _process_dataset(
387387
env=self.config.env,
388388
)
389389
)
390+
namespace_properties: Properties = catalog.load_namespace_properties(
391+
namespace
392+
)
390393
namespaces.append((namespace, namespace_urn))
391-
for aspect in self._create_iceberg_namespace_aspects(namespace):
394+
for aspect in self._create_iceberg_namespace_aspects(
395+
namespace, namespace_properties
396+
):
392397
yield stamping_processor.stamp_wu(
393398
MetadataChangeProposalWrapper(
394399
entityUrn=namespace_urn, aspect=aspect
@@ -608,12 +613,23 @@ def get_report(self) -> SourceReport:
608613
return self.report
609614

610615
def _create_iceberg_namespace_aspects(
611-
self, namespace: Identifier
616+
self, namespace: Identifier, properties: Properties
612617
) -> Iterable[_Aspect]:
613618
namespace_repr = ".".join(namespace)
619+
custom_properties: Dict[str, str] = {}
620+
for k, v in properties.items():
621+
try:
622+
custom_properties[str(k)] = str(v)
623+
except Exception as e:
624+
LOGGER.warning(
625+
f"Exception when trying to parse namespace properties for {namespace_repr}. Exception: {e}"
626+
)
614627
yield Status(removed=False)
615628
yield ContainerProperties(
616-
name=namespace_repr, qualifiedName=namespace_repr, env=self.config.env
629+
name=namespace_repr,
630+
qualifiedName=namespace_repr,
631+
env=self.config.env,
632+
customProperties=custom_properties,
617633
)
618634
yield SubTypes(typeNames=[DatasetContainerSubTypes.NAMESPACE])
619635
dpi = self._get_dataplatform_instance_aspect()

metadata-ingestion/tests/integration/iceberg/iceberg_deleted_table_mcps_golden.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
"aspectName": "containerProperties",
2424
"aspect": {
2525
"json": {
26-
"customProperties": {},
26+
"customProperties": {
27+
"location": "s3a://warehouse/wh/nyc"
28+
},
2729
"name": "nyc",
2830
"qualifiedName": "nyc",
2931
"env": "PROD"

metadata-ingestion/tests/integration/iceberg/iceberg_ingest_mcps_golden.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
"aspectName": "containerProperties",
2323
"aspect": {
2424
"json": {
25-
"customProperties": {},
25+
"customProperties": {
26+
"location": "s3a://warehouse/wh/nyc"
27+
},
2628
"name": "nyc",
2729
"qualifiedName": "nyc",
2830
"env": "PROD"

metadata-ingestion/tests/integration/iceberg/iceberg_profile_mcps_golden.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
"aspectName": "containerProperties",
2323
"aspect": {
2424
"json": {
25-
"customProperties": {},
25+
"customProperties": {
26+
"location": "s3a://warehouse/wh/nyc"
27+
},
2628
"name": "nyc",
2729
"qualifiedName": "nyc",
2830
"env": "PROD"

metadata-ingestion/tests/unit/test_iceberg.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import uuid
2+
from collections import defaultdict
23
from decimal import Decimal
34
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
45
from unittest import TestCase
@@ -540,13 +541,20 @@ def test_avro_decimal_bytes_nullable() -> None:
540541

541542

542543
class MockCatalog:
543-
def __init__(self, tables: Dict[str, Dict[str, Callable[[], Table]]]):
544+
def __init__(
545+
self,
546+
tables: Dict[str, Dict[str, Callable[[], Table]]],
547+
namespace_properties: Optional[Dict[str, Dict[str, str]]] = None,
548+
):
544549
"""
545550
546551
:param tables: Dictionary containing namespaces as keys and dictionaries containing names of tables (keys) and
547552
their metadata as values
548553
"""
549554
self.tables = tables
555+
self.namespace_properties = (
556+
namespace_properties if namespace_properties else defaultdict(dict)
557+
)
550558

551559
def list_namespaces(self) -> Iterable[Tuple[str]]:
552560
return [*[(key,) for key in self.tables]]
@@ -557,6 +565,9 @@ def list_tables(self, namespace: str) -> Iterable[Tuple[str, str]]:
557565
def load_table(self, dataset_path: Tuple[str, str]) -> Table:
558566
return self.tables[dataset_path[0]][dataset_path[1]]()
559567

568+
def load_namespace_properties(self, namespace: Tuple[str, ...]) -> Dict[str, str]:
569+
return self.namespace_properties[namespace[0]]
570+
560571

561572
class MockCatalogExceptionListingTables(MockCatalog):
562573
def list_tables(self, namespace: str) -> Iterable[Tuple[str, str]]:
@@ -1189,3 +1200,39 @@ def _raise_other_value_error_exception():
11891200
"('namespaceA', 'table5') <class 'Exception'>: ",
11901201
],
11911202
)
1203+
1204+
1205+
def test_ingesting_namespace_properties() -> None:
1206+
source = with_iceberg_source(processing_threads=2)
1207+
custom_properties = {"prop1": "foo", "prop2": "bar"}
1208+
mock_catalog = MockCatalog(
1209+
tables={
1210+
"namespaceA": {}, # mapped to urn:li:container:390e031441265aae5b7b7ae8d51b0c1f
1211+
"namespaceB": {}, # mapped to urn:li:container:74727446a56420d80ff3b1abf2a18087
1212+
},
1213+
namespace_properties={"namespaceA": {}, "namespaceB": custom_properties},
1214+
)
1215+
with patch(
1216+
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
1217+
) as get_catalog:
1218+
get_catalog.return_value = mock_catalog
1219+
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
1220+
aspects: Dict[str, Dict[str, Any]] = defaultdict(dict)
1221+
for unit in wu:
1222+
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
1223+
mcpw: MetadataChangeProposalWrapper = unit.metadata
1224+
assert mcpw.entityUrn
1225+
assert mcpw.aspectName
1226+
aspects[mcpw.entityUrn][mcpw.aspectName] = mcpw.aspect
1227+
assert (
1228+
aspects["urn:li:container:390e031441265aae5b7b7ae8d51b0c1f"][
1229+
"containerProperties"
1230+
].customProperties
1231+
== {}
1232+
)
1233+
assert (
1234+
aspects["urn:li:container:74727446a56420d80ff3b1abf2a18087"][
1235+
"containerProperties"
1236+
].customProperties
1237+
== custom_properties
1238+
)

0 commit comments

Comments
 (0)