Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 110 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/sql/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mce_builder import (
make_dataset_urn_with_platform_instance,
make_schema_field_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand All @@ -41,6 +44,7 @@
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemySource,
SqlWorkUnit,
get_schema_metadata,
register_custom_type,
)
from datahub.ingestion.source.sql.sql_config import (
Expand All @@ -59,6 +63,14 @@
RecordTypeClass,
SchemaField,
)
from datahub.metadata.schema_classes import (
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
)
from datahub.utilities.urns.field_paths import (
get_simple_field_path_from_v2_field_path,
)

register_custom_type(datatype.ROW, RecordTypeClass)
register_custom_type(datatype.MAP, MapTypeClass)
Expand Down Expand Up @@ -237,6 +249,12 @@ class TrinoConfig(BasicSQLAlchemyConfig):
description="Whether lineage of datasets to connectors should be ingested",
)

include_column_lineage: bool = Field(
default=True,
description="When emitting upstreamLineage to connector sources (e.g. Iceberg, Hive), "
"include column-level (fine-grained) lineage. Requires schema to be available.",
)

trino_as_primary: bool = Field(
default=True,
description="Experimental feature. Whether trino dataset should be primary entity of the set of siblings",
Expand All @@ -259,15 +277,23 @@ def get_identifier(self: BasicSQLAlchemyConfig, schema: str, table: str) -> str:
SourceCapabilityModifier.VIEW,
],
)
@capability(
SourceCapability.LINEAGE_FINE,
"Column-level lineage when emitting lineage to connector sources (e.g. Iceberg, Hive)",
subtype_modifier=[
SourceCapabilityModifier.TABLE,
SourceCapabilityModifier.VIEW,
],
)
class TrinoSource(SQLAlchemySource):
"""
Extracts metadata and two distinct relations to connector datasets (e.g. Iceberg, Hive):

This plugin extracts the following:

- Metadata for databases, schemas, and tables
- Column types and schema associated with each table
- Table, row, and column statistics via optional SQL profiling
- **Siblings**: Same logical dataset in two platforms (Trino catalog table ↔ connector table).
- **UpstreamLineage**: This Trino dataset reads from the connector dataset (table or view).
Optionally includes column-level lineage (fineGrainedLineages) when schema is available.

Also extracts: databases, schemas, tables, column types, optional profiling.
"""

config: TrinoConfig
Expand Down Expand Up @@ -348,16 +374,44 @@ def gen_lineage_workunit(
self,
dataset_urn: str,
source_dataset_urn: str,
schema_metadata: Optional[Any] = None,
) -> Iterable[MetadataWorkUnit]:
"""
Generate dataset to source connector lineage workunit
Emit upstreamLineage from this Trino dataset to the connector dataset.
When include_column_lineage is True and schema_metadata has fields,
also emits fineGrainedLineages (column-level lineage) with 1:1 field mapping.
"""
fine_grained_lineages: Optional[List[FineGrainedLineageClass]] = None
if (
self.config.include_column_lineage
and schema_metadata is not None
and getattr(schema_metadata, "fields", None)
):
fine_grained_lineages = []
for field in schema_metadata.fields:
try:
path = get_simple_field_path_from_v2_field_path(field.fieldPath)
fine_grained_lineages.append(
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
upstreams=[make_schema_field_urn(source_dataset_urn, path)],
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
downstreams=[make_schema_field_urn(dataset_urn, path)],
)
)
except Exception as e:
logging.debug(
"Skipping column lineage for field %s: %s",
getattr(field, "fieldPath", field),
e,
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=UpstreamLineage(
upstreams=[
Upstream(dataset=source_dataset_urn, type=DatasetLineageType.VIEW)
]
],
fineGrainedLineages=fine_grained_lineages,
),
).as_workunit()

Expand Down Expand Up @@ -385,7 +439,30 @@ def _process_table(
)
if source_dataset_urn:
yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn)
yield from self.gen_lineage_workunit(dataset_urn, source_dataset_urn)
schema_metadata: Optional[Any] = None
if self.config.include_column_lineage:
try:
columns = inspector.get_columns(table, schema)
pk_constraints = inspector.get_pk_constraint(table, schema)
schema_fields = self.get_schema_fields(
dataset_name, columns, inspector, pk_constraints
)
schema_metadata = get_schema_metadata(
self.report,
dataset_name,
self.platform,
columns,
pk_constraints,
None,
schema_fields,
)
except Exception as e:
logging.debug(
"Could not build schema for column lineage: %s", e
)
yield from self.gen_lineage_workunit(
dataset_urn, source_dataset_urn, schema_metadata
)

def _process_view(
self,
Expand All @@ -410,6 +487,30 @@ def _process_view(
)
if source_dataset_urn:
yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn)
schema_metadata_view: Optional[Any] = None
if self.config.include_column_lineage:
try:
columns = inspector.get_columns(view, schema)
pk_constraints = inspector.get_pk_constraint(view, schema)
schema_fields = self.get_schema_fields(
dataset_name, columns, inspector, pk_constraints
)
schema_metadata_view = get_schema_metadata(
self.report,
dataset_name,
self.platform,
columns,
pk_constraints,
None,
schema_fields,
)
except Exception as e:
logging.debug(
"Could not build schema for view column lineage: %s", e
)
yield from self.gen_lineage_workunit(
dataset_urn, source_dataset_urn, schema_metadata_view
)

@classmethod
def create(cls, config_dict, ctx):
Expand Down
Loading
Loading