diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py b/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py index fa5032a5a2f5a9..2511ee8298309c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py @@ -23,7 +23,10 @@ EnvConfigMixin, PlatformInstanceConfigMixin, ) -from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance +from datahub.emitter.mce_builder import ( + make_dataset_urn_with_platform_instance, + make_schema_field_urn, +) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( @@ -41,6 +44,7 @@ from datahub.ingestion.source.sql.sql_common import ( SQLAlchemySource, SqlWorkUnit, + get_schema_metadata, register_custom_type, ) from datahub.ingestion.source.sql.sql_config import ( @@ -59,6 +63,14 @@ RecordTypeClass, SchemaField, ) +from datahub.metadata.schema_classes import ( + FineGrainedLineageClass, + FineGrainedLineageDownstreamTypeClass, + FineGrainedLineageUpstreamTypeClass, +) +from datahub.utilities.urns.field_paths import ( + get_simple_field_path_from_v2_field_path, +) register_custom_type(datatype.ROW, RecordTypeClass) register_custom_type(datatype.MAP, MapTypeClass) @@ -237,6 +249,12 @@ class TrinoConfig(BasicSQLAlchemyConfig): description="Whether lineage of datasets to connectors should be ingested", ) + include_column_lineage: bool = Field( + default=True, + description="When emitting upstreamLineage to connector sources (e.g. Iceberg, Hive), " + "include column-level (fine-grained) lineage. Requires schema to be available.", + ) + trino_as_primary: bool = Field( default=True, description="Experimental feature. Whether trino dataset should be primary entity of the set of siblings", @@ -259,15 +277,23 @@ def get_identifier(self: BasicSQLAlchemyConfig, schema: str, table: str) -> str: SourceCapabilityModifier.VIEW, ], ) +@capability( + SourceCapability.LINEAGE_FINE, + "Column-level lineage when emitting lineage to connector sources (e.g. Iceberg, Hive)", + subtype_modifier=[ + SourceCapabilityModifier.TABLE, + SourceCapabilityModifier.VIEW, + ], +) class TrinoSource(SQLAlchemySource): """ + Extracts metadata and two distinct relations to connector datasets (e.g. Iceberg, Hive): - This plugin extracts the following: - - - Metadata for databases, schemas, and tables - - Column types and schema associated with each table - - Table, row, and column statistics via optional SQL profiling + - **Siblings**: Same logical dataset in two platforms (Trino catalog table ↔ connector table). + - **UpstreamLineage**: This Trino dataset reads from the connector dataset (table or view). + Optionally includes column-level lineage (fineGrainedLineages) when schema is available. + Also extracts: databases, schemas, tables, column types, optional profiling. """ config: TrinoConfig @@ -348,16 +374,44 @@ def gen_lineage_workunit( self, dataset_urn: str, source_dataset_urn: str, + schema_metadata: Optional[Any] = None, ) -> Iterable[MetadataWorkUnit]: """ - Generate dataset to source connector lineage workunit + Emit upstreamLineage from this Trino dataset to the connector dataset. + When include_column_lineage is True and schema_metadata has fields, + also emits fineGrainedLineages (column-level lineage) with 1:1 field mapping. """ + fine_grained_lineages: Optional[List[FineGrainedLineageClass]] = None + if ( + self.config.include_column_lineage + and schema_metadata is not None + and getattr(schema_metadata, "fields", None) + ): + fine_grained_lineages = [] + for field in schema_metadata.fields: + try: + path = get_simple_field_path_from_v2_field_path(field.fieldPath) + fine_grained_lineages.append( + FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, + upstreams=[make_schema_field_urn(source_dataset_urn, path)], + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + downstreams=[make_schema_field_urn(dataset_urn, path)], + ) + ) + except Exception as e: + logging.debug( + "Skipping column lineage for field %s: %s", + getattr(field, "fieldPath", field), + e, + ) yield MetadataChangeProposalWrapper( entityUrn=dataset_urn, aspect=UpstreamLineage( upstreams=[ Upstream(dataset=source_dataset_urn, type=DatasetLineageType.VIEW) - ] + ], + fineGrainedLineages=fine_grained_lineages, ), ).as_workunit() @@ -385,7 +439,30 @@ def _process_table( ) if source_dataset_urn: yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn) - yield from self.gen_lineage_workunit(dataset_urn, source_dataset_urn) + schema_metadata: Optional[Any] = None + if self.config.include_column_lineage: + try: + columns = inspector.get_columns(table, schema) + pk_constraints = inspector.get_pk_constraint(table, schema) + schema_fields = self.get_schema_fields( + dataset_name, columns, inspector, pk_constraints + ) + schema_metadata = get_schema_metadata( + self.report, + dataset_name, + self.platform, + columns, + pk_constraints, + None, + schema_fields, + ) + except Exception as e: + logging.debug( + "Could not build schema for column lineage: %s", e + ) + yield from self.gen_lineage_workunit( + dataset_urn, source_dataset_urn, schema_metadata + ) def _process_view( self, @@ -410,6 +487,30 @@ def _process_view( ) if source_dataset_urn: yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn) + schema_metadata_view: Optional[Any] = None + if self.config.include_column_lineage: + try: + columns = inspector.get_columns(view, schema) + pk_constraints = inspector.get_pk_constraint(view, schema) + schema_fields = self.get_schema_fields( + dataset_name, columns, inspector, pk_constraints + ) + schema_metadata_view = get_schema_metadata( + self.report, + dataset_name, + self.platform, + columns, + pk_constraints, + None, + schema_fields, + ) + except Exception as e: + logging.debug( + "Could not build schema for view column lineage: %s", e + ) + yield from self.gen_lineage_workunit( + dataset_urn, source_dataset_urn, schema_metadata_view + ) @classmethod def create(cls, config_dict, ctx): diff --git a/metadata-ingestion/tests/integration/trino/trino_hive_instance_mces_golden.json b/metadata-ingestion/tests/integration/trino/trino_hive_instance_mces_golden.json index 7886ef04507ee5..d6d45fba123894 100644 --- a/metadata-ingestion/tests/integration/trino/trino_hive_instance_mces_golden.json +++ b/metadata-ingestion/tests/integration/trino/trino_hive_instance_mces_golden.json @@ -246,7 +246,7 @@ "numrows": "1", "rawdatasize": "32", "totalsize": "33", - "transient_lastddltime": "1759227091" + "transient_lastddltime": "1771824853" }, "name": "array_struct_test", "description": "This table has array of structs", @@ -461,6 +461,52 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.array_struct_test,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.array_struct_test,PROD),property_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test,PROD),property_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.array_struct_test,PROD),service)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test,PROD),service)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.array_struct_test,PROD),service.type)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test,PROD),service.type)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.array_struct_test,PROD),service.provider)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test,PROD),service.provider)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -504,7 +550,7 @@ "numrows": "3", "rawdatasize": "94", "totalsize": "97", - "transient_lastddltime": "1759227097" + "transient_lastddltime": "1771824858" }, "name": "classification_test", "tags": [] @@ -720,6 +766,63 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.classification_test,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.classification_test,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.classification_test,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.classification_test,PROD),name)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.classification_test,PROD),name)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.classification_test,PROD),email)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.classification_test,PROD),email)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.classification_test,PROD),gender)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.classification_test,PROD),gender)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.classification_test,PROD),age)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.classification_test,PROD),age)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -763,7 +866,7 @@ "numrows": "0", "rawdatasize": "0", "totalsize": "0", - "transient_lastddltime": "1759227094" + "transient_lastddltime": "1771824856" }, "name": "map_test", "tags": [] @@ -947,6 +1050,30 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.map_test,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.map_test,PROD),keyvalue)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.map_test,PROD),keyvalue)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.map_test,PROD),recordid)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.map_test,PROD),recordid)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -990,7 +1117,7 @@ "numrows": "0", "rawdatasize": "0", "totalsize": "0", - "transient_lastddltime": "1759227094" + "transient_lastddltime": "1771824856" }, "name": "nested_struct_test", "tags": [] @@ -1218,6 +1345,74 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.nested_struct_test,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.nested_struct_test,PROD),property_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.nested_struct_test,PROD),property_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.nested_struct_test,PROD),service)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.nested_struct_test,PROD),service)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.nested_struct_test,PROD),service.type)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.nested_struct_test,PROD),service.type)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.nested_struct_test,PROD),service.provider)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.nested_struct_test,PROD),service.provider)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.nested_struct_test,PROD),service.provider.name)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.nested_struct_test,PROD),service.provider.name)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.nested_struct_test,PROD),service.provider.id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.nested_struct_test,PROD),service.provider.id)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -1446,6 +1641,41 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.pokes,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.pokes,PROD),foo)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.pokes,PROD),foo)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.pokes,PROD),bar)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.pokes,PROD),bar)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.pokes,PROD),baz)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.pokes,PROD),baz)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -1489,7 +1719,7 @@ "numrows": "0", "rawdatasize": "0", "totalsize": "0", - "transient_lastddltime": "1759227088" + "transient_lastddltime": "1771824850" }, "name": "struct_test", "tags": [] @@ -1697,6 +1927,52 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.struct_test,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.struct_test,PROD),property_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.struct_test,PROD),property_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.struct_test,PROD),service)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.struct_test,PROD),service)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.struct_test,PROD),service.type)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.struct_test,PROD),service.type)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.struct_test,PROD),service.provider)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.struct_test,PROD),service.provider)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -1737,7 +2013,7 @@ "customProperties": { "numfiles": "0", "totalsize": "0", - "transient_lastddltime": "1759227093" + "transient_lastddltime": "1771824855" }, "name": "struct_test_view_materialized", "tags": [] @@ -1945,6 +2221,52 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.struct_test_view_materialized,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.struct_test_view_materialized,PROD),property_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.struct_test_view_materialized,PROD),property_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.struct_test_view_materialized,PROD),service)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.struct_test_view_materialized,PROD),service)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.struct_test_view_materialized,PROD),service.type)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.struct_test_view_materialized,PROD),service.type)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.struct_test_view_materialized,PROD),service.provider)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.struct_test_view_materialized,PROD),service.provider)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -1988,7 +2310,7 @@ "numrows": "0", "rawdatasize": "0", "totalsize": "0", - "transient_lastddltime": "1759227087" + "transient_lastddltime": "1771824850" }, "name": "_test_table_underscore", "tags": [] @@ -2168,6 +2490,30 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1._test_table_underscore,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1._test_table_underscore,PROD),foo)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1._test_table_underscore,PROD),foo)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1._test_table_underscore,PROD),bar)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1._test_table_underscore,PROD),bar)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -2211,7 +2557,7 @@ "numrows": "0", "rawdatasize": "0", "totalsize": "0", - "transient_lastddltime": "1759227094" + "transient_lastddltime": "1771824856" }, "name": "union_test", "tags": [] @@ -2467,6 +2813,96 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.union_test,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.union_test,PROD),foo)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.union_test,PROD),foo)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.union_test,PROD),foo.tag)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.union_test,PROD),foo.tag)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.union_test,PROD),foo.field0)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.union_test,PROD),foo.field0)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.union_test,PROD),foo.field1)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.union_test,PROD),foo.field1)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.union_test,PROD),foo.field2)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.union_test,PROD),foo.field2)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.union_test,PROD),foo.field3)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.union_test,PROD),foo.field3)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.union_test,PROD),foo.field3.a)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.union_test,PROD),foo.field3.a)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.union_test,PROD),foo.field3.b)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.union_test,PROD),foo.field3.b)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -2720,6 +3156,77 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test_view,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.array_struct_test_view,PROD)", + "type": "VIEW" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.array_struct_test_view,PROD),property_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test_view,PROD),property_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.array_struct_test_view,PROD),service)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test_view,PROD),service)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.array_struct_test_view,PROD),service.type)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test_view,PROD),service.type)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,local_server.db1.array_struct_test_view,PROD),service.provider)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test_view,PROD),service.provider)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "trino-hive-instance-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test_view,PROD)", @@ -2819,13 +3326,13 @@ "entity": "urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test,PROD)" }, { - "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test,PROD),property_id)" + "entity": "urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test_view,PROD)" }, { - "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test,PROD),service)" + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test,PROD),property_id)" }, { - "entity": "urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test_view,PROD)" + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test,PROD),service)" }, { "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,production_warehouse.hivedb.db1.array_struct_test_view,PROD),property_id)" diff --git a/metadata-ingestion/tests/integration/trino/trino_hive_mces_golden.json b/metadata-ingestion/tests/integration/trino/trino_hive_mces_golden.json index 97477331f6a9f1..ecf3160e261f8a 100644 --- a/metadata-ingestion/tests/integration/trino/trino_hive_mces_golden.json +++ b/metadata-ingestion/tests/integration/trino/trino_hive_mces_golden.json @@ -233,7 +233,7 @@ "numrows": "1", "rawdatasize": "32", "totalsize": "33", - "transient_lastddltime": "1759227091" + "transient_lastddltime": "1771824853" }, "name": "array_struct_test", "description": "This table has array of structs", @@ -427,6 +427,52 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,db1.array_struct_test,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.array_struct_test,PROD),property_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test,PROD),property_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.array_struct_test,PROD),service)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test,PROD),service)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.array_struct_test,PROD),service.type)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test,PROD),service.type)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.array_struct_test,PROD),service.provider)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test,PROD),service.provider)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -470,7 +516,7 @@ "numrows": "3", "rawdatasize": "94", "totalsize": "97", - "transient_lastddltime": "1759227097" + "transient_lastddltime": "1771824858" }, "name": "classification_test", "tags": [] @@ -709,6 +755,63 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,db1.classification_test,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.classification_test,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.classification_test,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.classification_test,PROD),name)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.classification_test,PROD),name)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.classification_test,PROD),email)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.classification_test,PROD),email)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.classification_test,PROD),gender)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.classification_test,PROD),gender)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.classification_test,PROD),age)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.classification_test,PROD),age)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -752,7 +855,7 @@ "numrows": "0", "rawdatasize": "0", "totalsize": "0", - "transient_lastddltime": "1759227094" + "transient_lastddltime": "1771824856" }, "name": "map_test", "tags": [] @@ -915,6 +1018,30 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,db1.map_test,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.map_test,PROD),keyvalue)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.map_test,PROD),keyvalue)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.map_test,PROD),recordid)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.map_test,PROD),recordid)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -958,7 +1085,7 @@ "numrows": "0", "rawdatasize": "0", "totalsize": "0", - "transient_lastddltime": "1759227094" + "transient_lastddltime": "1771824856" }, "name": "nested_struct_test", "tags": [] @@ -1165,6 +1292,74 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,db1.nested_struct_test,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.nested_struct_test,PROD),property_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.nested_struct_test,PROD),property_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.nested_struct_test,PROD),service)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.nested_struct_test,PROD),service)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.nested_struct_test,PROD),service.type)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.nested_struct_test,PROD),service.type)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.nested_struct_test,PROD),service.provider)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.nested_struct_test,PROD),service.provider)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.nested_struct_test,PROD),service.provider.name)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.nested_struct_test,PROD),service.provider.name)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.nested_struct_test,PROD),service.provider.id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.nested_struct_test,PROD),service.provider.id)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -1372,6 +1567,41 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,db1.pokes,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.pokes,PROD),foo)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.pokes,PROD),foo)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.pokes,PROD),bar)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.pokes,PROD),bar)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.pokes,PROD),baz)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.pokes,PROD),baz)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -1415,7 +1645,7 @@ "numrows": "0", "rawdatasize": "0", "totalsize": "0", - "transient_lastddltime": "1759227088" + "transient_lastddltime": "1771824850" }, "name": "struct_test", "tags": [] @@ -1602,6 +1832,52 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,db1.struct_test,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.struct_test,PROD),property_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.struct_test,PROD),property_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.struct_test,PROD),service)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.struct_test,PROD),service)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.struct_test,PROD),service.type)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.struct_test,PROD),service.type)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.struct_test,PROD),service.provider)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.struct_test,PROD),service.provider)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -1642,7 +1918,7 @@ "customProperties": { "numfiles": "0", "totalsize": "0", - "transient_lastddltime": "1759227093" + "transient_lastddltime": "1771824855" }, "name": "struct_test_view_materialized", "tags": [] @@ -1829,6 +2105,52 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,db1.struct_test_view_materialized,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.struct_test_view_materialized,PROD),property_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.struct_test_view_materialized,PROD),property_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.struct_test_view_materialized,PROD),service)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.struct_test_view_materialized,PROD),service)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.struct_test_view_materialized,PROD),service.type)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.struct_test_view_materialized,PROD),service.type)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.struct_test_view_materialized,PROD),service.provider)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.struct_test_view_materialized,PROD),service.provider)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -1872,7 +2194,7 @@ "numrows": "0", "rawdatasize": "0", "totalsize": "0", - "transient_lastddltime": "1759227087" + "transient_lastddltime": "1771824850" }, "name": "_test_table_underscore", "tags": [] @@ -2031,6 +2353,30 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,db1._test_table_underscore,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1._test_table_underscore,PROD),foo)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1._test_table_underscore,PROD),foo)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1._test_table_underscore,PROD),bar)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1._test_table_underscore,PROD),bar)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -2074,7 +2420,7 @@ "numrows": "0", "rawdatasize": "0", "totalsize": "0", - "transient_lastddltime": "1759227094" + "transient_lastddltime": "1771824856" }, "name": "union_test", "tags": [] @@ -2309,6 +2655,96 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,db1.union_test,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.union_test,PROD),foo)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.union_test,PROD),foo)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.union_test,PROD),foo.tag)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.union_test,PROD),foo.tag)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.union_test,PROD),foo.field0)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.union_test,PROD),foo.field0)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.union_test,PROD),foo.field1)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.union_test,PROD),foo.field1)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.union_test,PROD),foo.field2)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.union_test,PROD),foo.field2)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.union_test,PROD),foo.field3)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.union_test,PROD),foo.field3)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.union_test,PROD),foo.field3.a)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.union_test,PROD),foo.field3.a)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.union_test,PROD),foo.field3.b)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.union_test,PROD),foo.field3.b)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -2541,6 +2977,77 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test_view,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,db1.array_struct_test_view,PROD)", + "type": "VIEW" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.array_struct_test_view,PROD),property_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test_view,PROD),property_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.array_struct_test_view,PROD),service)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test_view,PROD),service)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.array_struct_test_view,PROD),service.type)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test_view,PROD),service.type)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,db1.array_struct_test_view,PROD),service.provider)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test_view,PROD),service.provider)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "trino-hive-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test_view,PROD)", @@ -2640,13 +3147,13 @@ "entity": "urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test,PROD)" }, { - "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test,PROD),property_id)" + "entity": "urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test_view,PROD)" }, { - "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test,PROD),service)" + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test,PROD),property_id)" }, { - "entity": "urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test_view,PROD)" + "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test,PROD),service)" }, { "entity": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,hivedb.db1.array_struct_test_view,PROD),property_id)" diff --git a/metadata-ingestion/tests/integration/trino/trino_mces_golden.json b/metadata-ingestion/tests/integration/trino/trino_mces_golden.json index 240e4ffbc821f9..2aeda362ec7072 100644 --- a/metadata-ingestion/tests/integration/trino/trino_mces_golden.json +++ b/metadata-ingestion/tests/integration/trino/trino_mces_golden.json @@ -435,6 +435,74 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.book,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book,PROD),name)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.book,PROD),name)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book,PROD),author)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.book,PROD),author)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book,PROD),publisher)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.book,PROD),publisher)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book,PROD),tags)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.book,PROD),tags)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book,PROD),genre_ids)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.book,PROD),genre_ids)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -654,6 +722,52 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.issue_history,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.issue_history,PROD),book_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.issue_history,PROD),book_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.issue_history,PROD),member_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.issue_history,PROD),member_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.issue_history,PROD),issue_date)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.issue_history,PROD),issue_date)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.issue_history,PROD),return_date)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.issue_history,PROD),return_date)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -849,6 +963,30 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.member,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.member,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.member,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.member,PROD),name)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.member,PROD),name)" + ], + "confidenceScore": 1.0 + } ] } }, @@ -1092,6 +1230,74 @@ "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book_in_circulation,PROD)", "type": "VIEW" } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book_in_circulation,PROD),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.book_in_circulation,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book_in_circulation,PROD),name)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.book_in_circulation,PROD),name)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book_in_circulation,PROD),author)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.book_in_circulation,PROD),author)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book_in_circulation,PROD),publisher)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.book_in_circulation,PROD),publisher)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book_in_circulation,PROD),member_id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.book_in_circulation,PROD),member_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,local_server.postgres.librarydb.book_in_circulation,PROD),issue_date)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:trino,postgresqldb.librarydb.book_in_circulation,PROD),issue_date)" + ], + "confidenceScore": 1.0 + } ] } }, diff --git a/metadata-ingestion/tests/unit/test_sql_common.py b/metadata-ingestion/tests/unit/test_sql_common.py index 9f3a1912b1e33c..11f1f0431be4fe 100644 --- a/metadata-ingestion/tests/unit/test_sql_common.py +++ b/metadata-ingestion/tests/unit/test_sql_common.py @@ -3,15 +3,20 @@ import pytest +from datahub.emitter.mce_builder import make_schema_field_urn from datahub.ingestion.source.sql.sql_common import PipelineContext, SQLAlchemySource from datahub.ingestion.source.sql.sql_config import SQLCommonConfig from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import ( get_platform_from_sqlalchemy_uri, ) +from datahub.ingestion.source.sql.trino import TrinoConfig, TrinoSource from datahub.metadata.schema_classes import ( SchemaFieldClass, SchemaFieldDataTypeClass, + SchemalessClass, + SchemaMetadataClass, StringTypeClass, + UpstreamLineageClass, ) @@ -200,3 +205,275 @@ def get_field_from_urn(urn): assert actual_downstream == expected_simplified_downstream assert actual_upstream == expected_simplified_upstream + + +def get_test_trino_source(include_column_lineage: bool = True) -> TrinoSource: + config = TrinoConfig( + host_port="localhost:8080", + database="iceberg_catalog", + username="test", + include_column_lineage=include_column_lineage, + ingest_lineage_to_connectors=True, + ) + return TrinoSource( + config=config, ctx=PipelineContext(run_id="test"), platform="trino" + ) + + +def get_test_trino_schema_metadata( + field_paths: list[str], +) -> SchemaMetadataClass: + return SchemaMetadataClass( + schemaName="iceberg_catalog.contextad.accountcontact", + platform="urn:li:dataPlatform:trino", + version=0, + hash="", + platformSchema=SchemalessClass(), + fields=[ + SchemaFieldClass( + fieldPath=path, + nativeDataType="varchar", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + ) + for path in field_paths + ], + ) + + +def test_trino_gen_lineage_workunit_includes_fine_grained_lineage_when_schema_provided(): + source = get_test_trino_source(include_column_lineage=True) + dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:trino,iceberg_catalog.contextad.accountcontact,PROD)" + source_dataset_urn = ( + "urn:li:dataset:(urn:li:dataPlatform:iceberg,contextad.accountcontact,PROD)" + ) + schema_metadata = get_test_trino_schema_metadata( + ["accountid", "accountmanagerid", "businessdevid", "accountservicetype"] + ) + + workunits = list( + source.gen_lineage_workunit(dataset_urn, source_dataset_urn, schema_metadata) + ) + assert len(workunits) == 1 + upstream_lineage = workunits[0].get_aspect_of_type(UpstreamLineageClass) + assert upstream_lineage is not None + assert hasattr(upstream_lineage, "fineGrainedLineages") + fgl = getattr(upstream_lineage, "fineGrainedLineages", None) + assert fgl is not None + assert len(fgl) == 4 + for fg in fgl: + assert len(fg.upstreams) == 1 + assert len(fg.downstreams) == 1 + assert "iceberg" in fg.upstreams[0] + assert "trino" in fg.downstreams[0] + assert make_schema_field_urn(source_dataset_urn, "accountid") in [ + fg.upstreams[0] for fg in fgl + ] + assert make_schema_field_urn(dataset_urn, "accountid") in [ + fg.downstreams[0] for fg in fgl + ] + + +def test_trino_gen_lineage_workunit_no_fine_grained_lineage_when_disabled(): + source = get_test_trino_source(include_column_lineage=False) + dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:trino,iceberg_catalog.contextad.accountcontact,PROD)" + source_dataset_urn = ( + "urn:li:dataset:(urn:li:dataPlatform:iceberg,contextad.accountcontact,PROD)" + ) + schema_metadata = get_test_trino_schema_metadata(["accountid"]) + + workunits = list( + source.gen_lineage_workunit(dataset_urn, source_dataset_urn, schema_metadata) + ) + assert len(workunits) == 1 + upstream_lineage = workunits[0].get_aspect_of_type(UpstreamLineageClass) + assert upstream_lineage is not None + fgl = getattr(upstream_lineage, "fineGrainedLineages", None) + assert fgl is None + + +def test_trino_gen_lineage_workunit_no_fine_grained_lineage_when_schema_none(): + source = get_test_trino_source(include_column_lineage=True) + dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:trino,iceberg_catalog.contextad.accountcontact,PROD)" + source_dataset_urn = ( + "urn:li:dataset:(urn:li:dataPlatform:iceberg,contextad.accountcontact,PROD)" + ) + + workunits = list(source.gen_lineage_workunit(dataset_urn, source_dataset_urn, None)) + assert len(workunits) == 1 + upstream_lineage = workunits[0].get_aspect_of_type(UpstreamLineageClass) + assert upstream_lineage is not None + fgl = getattr(upstream_lineage, "fineGrainedLineages", None) + assert fgl is None + + +def test_trino_gen_lineage_workunit_no_fine_grained_lineage_when_schema_empty_fields(): + source = get_test_trino_source(include_column_lineage=True) + dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:trino,iceberg_catalog.contextad.accountcontact,PROD)" + source_dataset_urn = ( + "urn:li:dataset:(urn:li:dataPlatform:iceberg,contextad.accountcontact,PROD)" + ) + schema_metadata = get_test_trino_schema_metadata([]) + + workunits = list( + source.gen_lineage_workunit(dataset_urn, source_dataset_urn, schema_metadata) + ) + assert len(workunits) == 1 + upstream_lineage = workunits[0].get_aspect_of_type(UpstreamLineageClass) + assert upstream_lineage is not None + fgl = getattr(upstream_lineage, "fineGrainedLineages", None) + assert fgl is None + + +def test_trino_gen_lineage_workunit_upstreams_present_with_or_without_cll(): + source = get_test_trino_source(include_column_lineage=True) + dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:trino,catalog.schema.table,PROD)" + source_dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:hive,schema.table,PROD)" + + workunits = list(source.gen_lineage_workunit(dataset_urn, source_dataset_urn, None)) + assert len(workunits) == 1 + upstream_lineage = workunits[0].get_aspect_of_type(UpstreamLineageClass) + assert upstream_lineage is not None + assert len(upstream_lineage.upstreams) == 1 + assert upstream_lineage.upstreams[0].dataset == source_dataset_urn + + +def test_trino_process_table_emits_connector_lineage_with_schema(): + """Covers _process_table path: schema build + gen_lineage_workunit when connector source exists.""" + source = get_test_trino_source(include_column_lineage=True) + dataset_name = "iceberg_catalog.ctx.t1" + schema, table = "ctx", "t1" + source_urn = "urn:li:dataset:(urn:li:dataPlatform:iceberg,ctx.t1,PROD)" + mock_inspector = mock.Mock() + mock_inspector.get_columns.return_value = [ + {"name": "col1", "type": None, "nullable": True, "full_type": None} + ] + mock_inspector.get_pk_constraint.return_value = {} + sql_config = mock.Mock(spec=["view_pattern", "table_pattern"]) + sql_config.view_pattern.allowed.return_value = True + sql_config.table_pattern.allowed.return_value = True + + schema_fields = [ + SchemaFieldClass( + fieldPath="col1", + nativeDataType="varchar", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + ) + ] + schema_metadata = get_test_trino_schema_metadata(["col1"]) + + with ( + mock.patch.object(SQLAlchemySource, "_process_table", return_value=iter([])), + mock.patch.object(source, "_get_source_dataset_urn", return_value=source_urn), + mock.patch.object(source, "get_schema_fields", return_value=schema_fields), + mock.patch( + "datahub.ingestion.source.sql.trino.get_schema_metadata", + return_value=schema_metadata, + ), + ): + workunits = list( + source._process_table( + dataset_name, + mock_inspector, + schema, + table, + sql_config, + data_reader=None, + ) + ) + + mock_inspector.get_columns.assert_called_once_with(table, schema) + lineage_wus = [w for w in workunits if w.get_aspect_of_type(UpstreamLineageClass)] + assert len(lineage_wus) == 1 + upstream_lineage = lineage_wus[0].get_aspect_of_type(UpstreamLineageClass) + assert upstream_lineage is not None + assert upstream_lineage.fineGrainedLineages is not None + assert len(upstream_lineage.fineGrainedLineages) == 1 + assert upstream_lineage.upstreams[0].dataset == source_urn + + +def test_trino_process_view_emits_connector_lineage_with_schema(): + """Covers _process_view path: schema build + gen_lineage_workunit when connector source exists.""" + source = get_test_trino_source(include_column_lineage=True) + dataset_name = "iceberg_catalog.ctx.v1" + schema, view = "ctx", "v1" + source_urn = "urn:li:dataset:(urn:li:dataPlatform:iceberg,ctx.v1,PROD)" + mock_inspector = mock.Mock() + mock_inspector.get_columns.return_value = [ + {"name": "col1", "type": None, "nullable": True, "full_type": None} + ] + mock_inspector.get_pk_constraint.return_value = {} + sql_config = mock.Mock(spec=["view_pattern", "table_pattern"]) + sql_config.view_pattern.allowed.return_value = True + sql_config.table_pattern.allowed.return_value = True + + schema_fields = [ + SchemaFieldClass( + fieldPath="col1", + nativeDataType="varchar", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + ) + ] + schema_metadata = get_test_trino_schema_metadata(["col1"]) + + with ( + mock.patch.object(SQLAlchemySource, "_process_view", return_value=iter([])), + mock.patch.object(source, "_get_source_dataset_urn", return_value=source_urn), + mock.patch.object(source, "get_schema_fields", return_value=schema_fields), + mock.patch( + "datahub.ingestion.source.sql.trino.get_schema_metadata", + return_value=schema_metadata, + ), + ): + workunits = list( + source._process_view( + dataset_name, + mock_inspector, + schema, + view, + sql_config, + ) + ) + + mock_inspector.get_columns.assert_called_once_with(view, schema) + lineage_wus = [w for w in workunits if w.get_aspect_of_type(UpstreamLineageClass)] + assert len(lineage_wus) == 1 + upstream_lineage = lineage_wus[0].get_aspect_of_type(UpstreamLineageClass) + assert upstream_lineage is not None + assert upstream_lineage.fineGrainedLineages is not None + assert len(upstream_lineage.fineGrainedLineages) == 1 + assert upstream_lineage.upstreams[0].dataset == source_urn + + +def test_trino_process_table_emits_lineage_without_cll_when_schema_build_fails(): + """Covers _process_table path when schema building raises (e.g. get_columns fails).""" + source = get_test_trino_source(include_column_lineage=True) + dataset_name = "iceberg_catalog.ctx.t1" + schema, table = "ctx", "t1" + source_urn = "urn:li:dataset:(urn:li:dataPlatform:iceberg,ctx.t1,PROD)" + mock_inspector = mock.Mock() + mock_inspector.get_columns.side_effect = Exception("connection failed") + sql_config = mock.Mock(spec=["view_pattern", "table_pattern"]) + sql_config.view_pattern.allowed.return_value = True + sql_config.table_pattern.allowed.return_value = True + + with ( + mock.patch.object(SQLAlchemySource, "_process_table", return_value=iter([])), + mock.patch.object(source, "_get_source_dataset_urn", return_value=source_urn), + ): + workunits = list( + source._process_table( + dataset_name, + mock_inspector, + schema, + table, + sql_config, + data_reader=None, + ) + ) + + lineage_wus = [w for w in workunits if w.get_aspect_of_type(UpstreamLineageClass)] + assert len(lineage_wus) == 1 + upstream_lineage = lineage_wus[0].get_aspect_of_type(UpstreamLineageClass) + assert upstream_lineage is not None + assert upstream_lineage.fineGrainedLineages is None + assert len(upstream_lineage.upstreams) == 1