Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion metadata-ingestion/docs/sources/trino/trino_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ source:
# connector_database: db_name
# connector_platform: connector_platform_name
# platform_instance: cloud_instance
# env: DEV
# env: DEV

# Optional -- Whether to ingest lineage to upstream connectors (default: true)
# ingest_lineage_to_connectors: true

# Optional -- Whether to ingest column-level lineage to upstream connectors (default: true)
# Requires ingest_lineage_to_connectors to be enabled.
# This creates 1:1 column mappings between Trino tables/views and their upstream source tables.
# include_column_lineage: true

sink:
# sink configs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"generated_at": "2025-11-14T14:26:00.526772+00:00",
"generated_at": "2025-11-27T16:51:15.797044+00:00",
"generated_by": "metadata-ingestion/scripts/capability_summary.py",
"plugin_details": {
"abs": {
Expand Down Expand Up @@ -2441,8 +2441,9 @@
},
{
"capability": "LINEAGE_FINE",
"description": "Enabled by default to get lineage for views via `include_view_column_lineage`",
"description": "Enabled by default for views via `include_view_column_lineage`, and to upstream connectors via `include_column_lineage`",
"subtype_modifier": [
"Table",
"View"
],
"supported": true
Expand Down Expand Up @@ -2479,7 +2480,7 @@
},
{
"capability": "LINEAGE_COARSE",
"description": "Extract table-level lineage",
"description": "Enabled by default for views via `include_view_lineage`, and to upstream connectors via `ingest_lineage_to_connectors`",
"subtype_modifier": [
"Table",
"View"
Expand Down Expand Up @@ -3444,8 +3445,9 @@
},
{
"capability": "LINEAGE_FINE",
"description": "Enabled by default to get lineage for views via `include_view_column_lineage`",
"description": "Enabled by default for views via `include_view_column_lineage`, and to upstream connectors via `include_column_lineage`",
"subtype_modifier": [
"Table",
"View"
],
"supported": true
Expand Down Expand Up @@ -3482,7 +3484,7 @@
},
{
"capability": "LINEAGE_COARSE",
"description": "Extract table-level lineage",
"description": "Enabled by default for views via `include_view_lineage`, and to upstream connectors via `ingest_lineage_to_connectors`",
"subtype_modifier": [
"Table",
"View"
Expand Down
147 changes: 139 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/sql/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mce_builder import (
make_dataset_urn_with_platform_instance,
make_schema_field_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand All @@ -41,6 +44,7 @@
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemySource,
SqlWorkUnit,
get_schema_metadata,
register_custom_type,
)
from datahub.ingestion.source.sql.sql_config import (
Expand All @@ -50,6 +54,9 @@
from datahub.metadata.com.linkedin.pegasus2avro.common import Siblings
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage,
)
Expand All @@ -58,6 +65,7 @@
NumberTypeClass,
RecordTypeClass,
SchemaField,
SchemaMetadata,
)

register_custom_type(datatype.ROW, RecordTypeClass)
Expand All @@ -66,20 +74,50 @@


KNOWN_CONNECTOR_PLATFORM_MAPPING = {
"bigquery": "bigquery",
"cassandra": "cassandra",
"clickhouse": "clickhouse",
"hive": "hive",
"databricks": "databricks",
"db2": "db2",
"delta_lake": "delta-lake",
"delta-lake": "delta-lake",
"druid": "druid",
"elasticsearch": "elasticsearch",
"glue": "glue",
"hive": "hive",
"hudi": "hudi",
"iceberg": "iceberg",
"mariadb": "mariadb",
"mongodb": "mongodb",
"mysql": "mysql",
"oracle": "oracle",
"pinot": "pinot",
"postgresql": "postgres",
"redshift": "redshift",
"bigquery": "bigquery",
"snowflake_distributed": "snowflake",
"snowflake_parallel": "snowflake",
"snowflake_jdbc": "snowflake",
"sqlserver": "mssql",
"teradata": "teradata",
"vertica": "vertica",
}

TWO_TIER_CONNECTORS = ["clickhouse", "hive", "glue", "mysql", "iceberg"]
TWO_TIER_CONNECTORS = [
"cassandra",
"clickhouse",
"delta-lake",
"delta_lake",
"druid",
"elasticsearch",
"glue",
"hive",
"hudi",
"iceberg",
"mariadb",
"mongodb",
"mysql",
"pinot",
]

PROPERTIES_TABLE_SUPPORTED_CONNECTORS = ["hive", "iceberg"]

Expand Down Expand Up @@ -237,6 +275,12 @@ class TrinoConfig(BasicSQLAlchemyConfig):
description="Whether lineage of datasets to connectors should be ingested",
)

include_column_lineage: bool = Field(
default=True,
description="Whether column-level lineage to upstream connectors should be ingested. "
"Requires ingest_lineage_to_connectors to be enabled.",
)

trino_as_primary: bool = Field(
default=True,
description="Experimental feature. Whether trino dataset should be primary entity of the set of siblings",
Expand All @@ -253,7 +297,15 @@ def get_identifier(self: BasicSQLAlchemyConfig, schema: str, table: str) -> str:
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(
SourceCapability.LINEAGE_COARSE,
"Extract table-level lineage",
"Enabled by default for views via `include_view_lineage`, and to upstream connectors via `ingest_lineage_to_connectors`",
subtype_modifier=[
SourceCapabilityModifier.TABLE,
SourceCapabilityModifier.VIEW,
],
)
@capability(
SourceCapability.LINEAGE_FINE,
"Enabled by default for views via `include_view_column_lineage`, and to upstream connectors via `include_column_lineage`",
subtype_modifier=[
SourceCapabilityModifier.TABLE,
SourceCapabilityModifier.VIEW,
Expand Down Expand Up @@ -348,16 +400,43 @@ def gen_lineage_workunit(
self,
dataset_urn: str,
source_dataset_urn: str,
schema_metadata: Optional[SchemaMetadata] = None,
) -> Iterable[MetadataWorkUnit]:
"""
Generate dataset to source connector lineage workunit
Generate dataset to source connector lineage workunit with optional column-level lineage
"""
fine_grained_lineages: Optional[List[FineGrainedLineage]] = None

# Generate column-level lineage if enabled and schema metadata is available
if (
self.config.include_column_lineage
and schema_metadata is not None
and schema_metadata.fields
):
fine_grained_lineages = []
for field in schema_metadata.fields:
# Create 1:1 column mapping between Trino and upstream source
# Assumes column names match between Trino view/table and upstream source
fine_grained_lineages.append(
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[
make_schema_field_urn(source_dataset_urn, field.fieldPath)
],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[
make_schema_field_urn(dataset_urn, field.fieldPath)
],
)
)

yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=UpstreamLineage(
upstreams=[
Upstream(dataset=source_dataset_urn, type=DatasetLineageType.VIEW)
]
],
fineGrainedLineages=fine_grained_lineages or None,
),
).as_workunit()

Expand All @@ -384,8 +463,34 @@ def _process_table(
dataset_name, inspector, schema, table
)
if source_dataset_urn:
# Get schema metadata for column-level lineage
schema_metadata = None
if self.config.include_column_lineage:
columns = self._get_columns(dataset_name, inspector, schema, table)
pk_constraints = inspector.get_pk_constraint(table, schema)
partitions = self.get_partitions(inspector, schema, table)
extra_tags = self.get_extra_tags(inspector, schema, table)
schema_fields = self.get_schema_fields(
dataset_name,
columns,
inspector,
pk_constraints,
tags=extra_tags,
partition_keys=partitions,
)
schema_metadata = get_schema_metadata(
self.report,
dataset_name,
self.platform,
columns,
pk_constraints,
canonical_schema=schema_fields,
)

yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn)
yield from self.gen_lineage_workunit(dataset_urn, source_dataset_urn)
yield from self.gen_lineage_workunit(
dataset_urn, source_dataset_urn, schema_metadata
)

def _process_view(
self,
Expand All @@ -409,7 +514,33 @@ def _process_view(
dataset_name, inspector, schema, view
)
if source_dataset_urn:
# Get schema metadata for column-level lineage
schema_metadata = None
if self.config.include_column_lineage:
columns = self._get_columns(dataset_name, inspector, schema, view)
pk_constraints = inspector.get_pk_constraint(view, schema)
extra_tags = self.get_extra_tags(inspector, schema, view)
schema_fields = self.get_schema_fields(
dataset_name,
columns,
inspector,
pk_constraints,
tags=extra_tags,
)
schema_metadata = get_schema_metadata(
self.report,
dataset_name,
self.platform,
columns,
pk_constraints,
canonical_schema=schema_fields,
)

yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn)
# For views, also generate lineage workunit (not just siblings)
yield from self.gen_lineage_workunit(
dataset_urn, source_dataset_urn, schema_metadata
)

@classmethod
def create(cls, config_dict, ctx):
Expand Down
Loading
Loading