From 20409fd702d2d4454de9170f77bffecbfba8b527 Mon Sep 17 00:00:00 2001 From: Serhii Dimchenko <39801237+svdimchenko@users.noreply.github.com> Date: Tue, 11 Feb 2025 10:42:14 +0100 Subject: [PATCH] feat(ingestion/dbt): Add include_database_name parameter for dbt core (#12411) --- .../ingestion/source/dbt/dbt_common.py | 5 + .../datahub/ingestion/source/dbt/dbt_core.py | 20 +- .../tests/unit/dbt/artifacts/catalog.json | 40 ++++ .../tests/unit/dbt/artifacts/manifest.json | 174 ++++++++++++++++++ .../tests/unit/{ => dbt}/test_dbt_source.py | 38 ++++ 5 files changed, 268 insertions(+), 9 deletions(-) create mode 100644 metadata-ingestion/tests/unit/dbt/artifacts/catalog.json create mode 100644 metadata-ingestion/tests/unit/dbt/artifacts/manifest.json rename metadata-ingestion/tests/unit/{ => dbt}/test_dbt_source.py (92%) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 99fa09b8b20b9b..f810500085295b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -357,6 +357,11 @@ class DBTCommonConfig( default=True, description="When enabled, includes the compiled code in the emitted metadata.", ) + include_database_name: bool = Field( + default=True, + description="Whether to add database name to the table urn. " + "Set to False to skip it for engines like AWS Athena where it's not required.", + ) @validator("target_platform") def validate_target_platform_value(cls, target_platform: str) -> str: diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index fbb54d211e1b21..ea9a24875d72b5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -167,6 +167,7 @@ def extract_dbt_entities( use_identifiers: bool, tag_prefix: str, only_include_if_in_catalog: bool, + include_database_name: bool, report: DBTSourceReport, ) -> List[DBTNode]: sources_by_id = {x["unique_id"]: x for x in sources_results} @@ -267,7 +268,7 @@ def extract_dbt_entities( dbt_name=key, dbt_adapter=manifest_adapter, dbt_package_name=manifest_node.get("package_name"), - database=manifest_node["database"], + database=manifest_node["database"] if include_database_name else None, schema=manifest_node["schema"], name=name, alias=manifest_node.get("alias"), @@ -543,14 +544,15 @@ def loadManifestAndCatalog( all_catalog_entities = {**catalog_nodes, **catalog_sources} nodes = extract_dbt_entities( - all_manifest_entities, - all_catalog_entities, - sources_results, - manifest_adapter, - self.config.use_identifiers, - self.config.tag_prefix, - self.config.only_include_if_in_catalog, - self.report, + all_manifest_entities=all_manifest_entities, + all_catalog_entities=all_catalog_entities, + sources_results=sources_results, + manifest_adapter=manifest_adapter, + use_identifiers=self.config.use_identifiers, + tag_prefix=self.config.tag_prefix, + only_include_if_in_catalog=self.config.only_include_if_in_catalog, + include_database_name=self.config.include_database_name, + report=self.report, ) return ( diff --git a/metadata-ingestion/tests/unit/dbt/artifacts/catalog.json b/metadata-ingestion/tests/unit/dbt/artifacts/catalog.json new file mode 100644 index 00000000000000..f3029efb13a288 --- /dev/null +++ b/metadata-ingestion/tests/unit/dbt/artifacts/catalog.json @@ -0,0 +1,40 @@ +{ + "metadata": { + "dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json", + "dbt_version": "1.9.0", + "generated_at": "2025-01-22T10:56:18.229568Z", + "invocation_id": "385456c0-b0ca-43b6-aa5a-422539d1f142", + "env": {} + }, + "nodes": { + "model.tdd.simple": { + "metadata": { + "type": "iceberg_table", + "schema": "sandbox", + "name": "simple", + "database": "awsdatacatalog", + "comment": "This model calculates the count of records in the stg_simple table.", + "owner": null + }, + "columns": { + "action_type_id": { + "type": "int", + "index": 0, + "name": "action_type_id", + "comment": null + } + }, + "stats": { + "has_stats": { + "id": "has_stats", + "label": "Has Stats?", + "value": false, + "include": false, + "description": "Indicates whether there are statistics for this table" + } + }, + "unique_id": "model.tdd.simple" + } + }, + "sources": {} +} diff --git a/metadata-ingestion/tests/unit/dbt/artifacts/manifest.json b/metadata-ingestion/tests/unit/dbt/artifacts/manifest.json new file mode 100644 index 00000000000000..94bcef1d85a713 --- /dev/null +++ b/metadata-ingestion/tests/unit/dbt/artifacts/manifest.json @@ -0,0 +1,174 @@ +{ + "metadata": { + "dbt_schema_version": "https://schemas.getdbt.com/dbt/manifest/v12.json", + "dbt_version": "1.9.0", + "generated_at": "2024-12-16T14:59:29.364802Z", + "invocation_id": "eb043e5d-3a2a-4fe3-b44b-3bc9f55ba7f0", + "env": {}, + "project_name": "tdd", + "project_id": "66d26512fa77cc5ff934201903dd7482", + "user_id": null, + "send_anonymous_usage_stats": false, + "adapter_type": "athena" + }, + "nodes": { + "model.tdd.simple": { + "database": "awsdatacatalog", + "schema": "sandbox", + "name": "simple", + "resource_type": "model", + "package_name": "tdd", + "path": "simple.sql", + "original_file_path": "models/simple.sql", + "unique_id": "model.tdd.simple", + "fqn": [ + "tdd", + "simple" + ], + "alias": "simple", + "checksum": { + "name": "sha256", + "checksum": "67b056428a09fa084c740bfeea17eafa2aa4a0e0ebd3ed100ef52e1bbc04718b" + }, + "config": { + "enabled": true, + "alias": null, + "schema": "sandbox", + "database": null, + "tags": [ + "dbt__tdd", + "dbt_project_tdd" + ], + "meta": {}, + "group": null, + "materialized": "table", + "incremental_strategy": null, + "batch_size": null, + "lookback": 1, + "begin": null, + "persist_docs": { + "relation": true, + "columns": true + }, + "post-hook": [], + "pre-hook": [], + "quoting": {}, + "column_types": {}, + "full_refresh": null, + "unique_key": null, + "on_schema_change": "append_new_columns", + "on_configuration_change": "apply", + "grants": {}, + "packages": [], + "docs": { + "show": true, + "node_color": null + }, + "contract": { + "enforced": true, + "alias_types": true + }, + "event_time": null, + "concurrent_batches": null, + "access": "protected", + "lf_tags_config": { + "enabled": true, + "tags": { + "confidentiality": "internal" + } + }, + "table_type": "iceberg" + }, + "tags": [ + "dbt__tdd", + "dbt_project_tdd" + ], + "description": "This model calculates the count of records in the stg_simple table.", + "columns": { + "action_type_id": { + "name": "action_type_id", + "description": "The count of records in the stg_simple table", + "meta": {}, + "data_type": "integer", + "constraints": [], + "quote": null, + "tags": [], + "granularity": null + }, + "mixed": { + "name": "mixed", + "description": "", + "meta": {}, + "data_type": "varchar", + "constraints": [], + "quote": null, + "tags": [], + "granularity": null + } + }, + "meta": {}, + "group": null, + "docs": { + "show": true, + "node_color": null + }, + "patch_path": "tdd://models/simple.yml", + "build_path": null, + "unrendered_config": { + "tags": "dbt_project_tdd", + "on_schema_change": "append_new_columns", + "lf_tags_config": { + "enabled": true, + "tags": { + "confidentiality": "internal" + } + }, + "persist_docs": { + "relation": true, + "columns": true + }, + "table_type": "iceberg", + "schema": "sandbox", + "materialized": "table", + "contract": { + "enforced": true + } + }, + "created_at": 1734359717.2746701, + "relation_name": "\"awsdatacatalog\".\"sandbox\".\"simple\"", + "raw_code": "select 1 as action_type_id, 2 as mixed from test.test", + "language": "sql", + "refs": [ + { + "name": "stg_simple", + "package": null, + "version": null + } + ], + "sources": [], + "metrics": [], + "depends_on": { + "macros": [ + "macro.dbt_unit_testing.ref" + ], + "nodes": [ + "model.tdd.stg_simple" + ] + }, + "compiled_path": null, + "contract": { + "enforced": true, + "alias_types": true, + "checksum": "a2ea5df63211a33bf954882f96b1f89f1f6f625547d2e119ec10a7e78da3d3ea" + }, + "access": "protected", + "constraints": [], + "version": null, + "latest_version": null, + "deprecation_date": null, + "primary_key": [], + "time_spine": null + } + }, + "sources": {} +} diff --git a/metadata-ingestion/tests/unit/test_dbt_source.py b/metadata-ingestion/tests/unit/dbt/test_dbt_source.py similarity index 92% rename from metadata-ingestion/tests/unit/test_dbt_source.py rename to metadata-ingestion/tests/unit/dbt/test_dbt_source.py index d7899af69f8405..4abec6cdc15836 100644 --- a/metadata-ingestion/tests/unit/test_dbt_source.py +++ b/metadata-ingestion/tests/unit/dbt/test_dbt_source.py @@ -493,3 +493,41 @@ def test_get_column_type_redshift(): messages[0].message == "Got an unexpected column type. The column's parsed field type will not be populated." ) + + +def test_include_database_name_default(): + config_dict = { + "manifest_path": "dummy_path", + "catalog_path": "dummy_path", + "target_platform": "dummy_platform", + } + config = DBTCoreConfig.parse_obj({**config_dict}) + assert config.include_database_name is True + + +@pytest.mark.parametrize( + ("include_database_name", "expected"), [("false", False), ("true", True)] +) +def test_include_database_name(include_database_name: str, expected: bool) -> None: + config_dict = { + "manifest_path": "dummy_path", + "catalog_path": "dummy_path", + "target_platform": "dummy_platform", + } + config_dict.update({"include_database_name": include_database_name}) + config = DBTCoreConfig.parse_obj({**config_dict}) + assert config.include_database_name is expected + + +def test_extract_dbt_entities(): + ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source") + config = DBTCoreConfig( + manifest_path="tests/unit/dbt/artifacts/manifest.json", + catalog_path="tests/unit/dbt/artifacts/catalog.json", + target_platform="dummy", + ) + source = DBTCoreSource(config, ctx, "dbt") + assert all(node.database is not None for node in source.loadManifestAndCatalog()[0]) + config.include_database_name = False + source = DBTCoreSource(config, ctx, "dbt") + assert all(node.database is None for node in source.loadManifestAndCatalog()[0])