Skip to content

Commit

Permalink
feat(ingestion/dbt): Add include_database_name parameter for dbt core (
Browse files Browse the repository at this point in the history
  • Loading branch information
svdimchenko authored Feb 11, 2025
1 parent 6098e97 commit 20409fd
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@ class DBTCommonConfig(
default=True,
description="When enabled, includes the compiled code in the emitted metadata.",
)
include_database_name: bool = Field(
default=True,
description="Whether to add database name to the table urn. "
"Set to False to skip it for engines like AWS Athena where it's not required.",
)

@validator("target_platform")
def validate_target_platform_value(cls, target_platform: str) -> str:
Expand Down
20 changes: 11 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def extract_dbt_entities(
use_identifiers: bool,
tag_prefix: str,
only_include_if_in_catalog: bool,
include_database_name: bool,
report: DBTSourceReport,
) -> List[DBTNode]:
sources_by_id = {x["unique_id"]: x for x in sources_results}
Expand Down Expand Up @@ -267,7 +268,7 @@ def extract_dbt_entities(
dbt_name=key,
dbt_adapter=manifest_adapter,
dbt_package_name=manifest_node.get("package_name"),
database=manifest_node["database"],
database=manifest_node["database"] if include_database_name else None,
schema=manifest_node["schema"],
name=name,
alias=manifest_node.get("alias"),
Expand Down Expand Up @@ -543,14 +544,15 @@ def loadManifestAndCatalog(
all_catalog_entities = {**catalog_nodes, **catalog_sources}

nodes = extract_dbt_entities(
all_manifest_entities,
all_catalog_entities,
sources_results,
manifest_adapter,
self.config.use_identifiers,
self.config.tag_prefix,
self.config.only_include_if_in_catalog,
self.report,
all_manifest_entities=all_manifest_entities,
all_catalog_entities=all_catalog_entities,
sources_results=sources_results,
manifest_adapter=manifest_adapter,
use_identifiers=self.config.use_identifiers,
tag_prefix=self.config.tag_prefix,
only_include_if_in_catalog=self.config.only_include_if_in_catalog,
include_database_name=self.config.include_database_name,
report=self.report,
)

return (
Expand Down
40 changes: 40 additions & 0 deletions metadata-ingestion/tests/unit/dbt/artifacts/catalog.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"dbt_version": "1.9.0",
"generated_at": "2025-01-22T10:56:18.229568Z",
"invocation_id": "385456c0-b0ca-43b6-aa5a-422539d1f142",
"env": {}
},
"nodes": {
"model.tdd.simple": {
"metadata": {
"type": "iceberg_table",
"schema": "sandbox",
"name": "simple",
"database": "awsdatacatalog",
"comment": "This model calculates the count of records in the stg_simple table.",
"owner": null
},
"columns": {
"action_type_id": {
"type": "int",
"index": 0,
"name": "action_type_id",
"comment": null
}
},
"stats": {
"has_stats": {
"id": "has_stats",
"label": "Has Stats?",
"value": false,
"include": false,
"description": "Indicates whether there are statistics for this table"
}
},
"unique_id": "model.tdd.simple"
}
},
"sources": {}
}
174 changes: 174 additions & 0 deletions metadata-ingestion/tests/unit/dbt/artifacts/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
{
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/manifest/v12.json",
"dbt_version": "1.9.0",
"generated_at": "2024-12-16T14:59:29.364802Z",
"invocation_id": "eb043e5d-3a2a-4fe3-b44b-3bc9f55ba7f0",
"env": {},
"project_name": "tdd",
"project_id": "66d26512fa77cc5ff934201903dd7482",
"user_id": null,
"send_anonymous_usage_stats": false,
"adapter_type": "athena"
},
"nodes": {
"model.tdd.simple": {
"database": "awsdatacatalog",
"schema": "sandbox",
"name": "simple",
"resource_type": "model",
"package_name": "tdd",
"path": "simple.sql",
"original_file_path": "models/simple.sql",
"unique_id": "model.tdd.simple",
"fqn": [
"tdd",
"simple"
],
"alias": "simple",
"checksum": {
"name": "sha256",
"checksum": "67b056428a09fa084c740bfeea17eafa2aa4a0e0ebd3ed100ef52e1bbc04718b"
},
"config": {
"enabled": true,
"alias": null,
"schema": "sandbox",
"database": null,
"tags": [
"dbt__tdd",
"dbt_project_tdd"
],
"meta": {},
"group": null,
"materialized": "table",
"incremental_strategy": null,
"batch_size": null,
"lookback": 1,
"begin": null,
"persist_docs": {
"relation": true,
"columns": true
},
"post-hook": [],
"pre-hook": [],
"quoting": {},
"column_types": {},
"full_refresh": null,
"unique_key": null,
"on_schema_change": "append_new_columns",
"on_configuration_change": "apply",
"grants": {},
"packages": [],
"docs": {
"show": true,
"node_color": null
},
"contract": {
"enforced": true,
"alias_types": true
},
"event_time": null,
"concurrent_batches": null,
"access": "protected",
"lf_tags_config": {
"enabled": true,
"tags": {
"confidentiality": "internal"
}
},
"table_type": "iceberg"
},
"tags": [
"dbt__tdd",
"dbt_project_tdd"
],
"description": "This model calculates the count of records in the stg_simple table.",
"columns": {
"action_type_id": {
"name": "action_type_id",
"description": "The count of records in the stg_simple table",
"meta": {},
"data_type": "integer",
"constraints": [],
"quote": null,
"tags": [],
"granularity": null
},
"mixed": {
"name": "mixed",
"description": "",
"meta": {},
"data_type": "varchar",
"constraints": [],
"quote": null,
"tags": [],
"granularity": null
}
},
"meta": {},
"group": null,
"docs": {
"show": true,
"node_color": null
},
"patch_path": "tdd://models/simple.yml",
"build_path": null,
"unrendered_config": {
"tags": "dbt_project_tdd",
"on_schema_change": "append_new_columns",
"lf_tags_config": {
"enabled": true,
"tags": {
"confidentiality": "internal"
}
},
"persist_docs": {
"relation": true,
"columns": true
},
"table_type": "iceberg",
"schema": "sandbox",
"materialized": "table",
"contract": {
"enforced": true
}
},
"created_at": 1734359717.2746701,
"relation_name": "\"awsdatacatalog\".\"sandbox\".\"simple\"",
"raw_code": "select 1 as action_type_id, 2 as mixed from test.test",
"language": "sql",
"refs": [
{
"name": "stg_simple",
"package": null,
"version": null
}
],
"sources": [],
"metrics": [],
"depends_on": {
"macros": [
"macro.dbt_unit_testing.ref"
],
"nodes": [
"model.tdd.stg_simple"
]
},
"compiled_path": null,
"contract": {
"enforced": true,
"alias_types": true,
"checksum": "a2ea5df63211a33bf954882f96b1f89f1f6f625547d2e119ec10a7e78da3d3ea"
},
"access": "protected",
"constraints": [],
"version": null,
"latest_version": null,
"deprecation_date": null,
"primary_key": [],
"time_spine": null
}
},
"sources": {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -493,3 +493,41 @@ def test_get_column_type_redshift():
messages[0].message
== "Got an unexpected column type. The column's parsed field type will not be populated."
)


def test_include_database_name_default():
config_dict = {
"manifest_path": "dummy_path",
"catalog_path": "dummy_path",
"target_platform": "dummy_platform",
}
config = DBTCoreConfig.parse_obj({**config_dict})
assert config.include_database_name is True


@pytest.mark.parametrize(
("include_database_name", "expected"), [("false", False), ("true", True)]
)
def test_include_database_name(include_database_name: str, expected: bool) -> None:
config_dict = {
"manifest_path": "dummy_path",
"catalog_path": "dummy_path",
"target_platform": "dummy_platform",
}
config_dict.update({"include_database_name": include_database_name})
config = DBTCoreConfig.parse_obj({**config_dict})
assert config.include_database_name is expected


def test_extract_dbt_entities():
ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source")
config = DBTCoreConfig(
manifest_path="tests/unit/dbt/artifacts/manifest.json",
catalog_path="tests/unit/dbt/artifacts/catalog.json",
target_platform="dummy",
)
source = DBTCoreSource(config, ctx, "dbt")
assert all(node.database is not None for node in source.loadManifestAndCatalog()[0])
config.include_database_name = False
source = DBTCoreSource(config, ctx, "dbt")
assert all(node.database is None for node in source.loadManifestAndCatalog()[0])

0 comments on commit 20409fd

Please sign in to comment.