Skip to content
15 changes: 15 additions & 0 deletions metadata-ingestion/docs/sources/snowflake/snowflake_post.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,21 @@ stage_pattern:
- "MY_DB.MY_SCHEMA.*"
```

###### External Stage Lineage via Graph (`resolve_external_stage_lineage_via_graph`)

By default, DataHub generates a dataset URN for each external stage directly from its raw S3/GCS/Azure URL. If your data lake source was ingested with a `platform_instance`, the auto-generated URN may not match the existing dataset in DataHub, causing broken lineage.

Set `resolve_external_stage_lineage_via_graph: true` to look up existing dataset URNs in the DataHub graph whose paths share the stage URL as a prefix, instead of generating a URN from the raw path. This requires the corresponding data lake source (S3, GCS, or Azure) to have been ingested first; otherwise stage lineage will be skipped on the current run and resolved on a subsequent run.

If the underlying data lake source was ingested with a `platform_instance`, set `external_stage_platform_instance` to that value so the graph lookup uses the correct URN prefix.

```yaml
include_stages: true
resolve_external_stage_lineage_via_graph: true
# Only needed if the data lake source was ingested with a platform_instance:
external_stage_platform_instance: my-s3-instance
```

##### Tasks (`include_tasks: true`)

Tasks are ingested as DataJob entities grouped under a per-schema DataFlow. Predecessor dependencies between tasks are captured as `inputDatajobs` on the DataJobInputOutput aspect, preserving the DAG structure.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import logging
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple

if TYPE_CHECKING:
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.graph.filters import RawSearchFilterRule

logger: logging.Logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class DataLakeUrnLookup:
matched_urns: Tuple[str, ...] = ()
transient_error: Optional[Exception] = None

def __post_init__(self) -> None:
if self.transient_error is not None and self.matched_urns:
raise ValueError(
"DataLakeUrnLookup is either a transient-failure result or a "
"(possibly empty) match — never both."
)


@dataclass
class DataLakePathResolver:
"""Resolve a data-lake storage path to existing dataset URNs rooted at it.

Bulk-fetches every dataset URN under a bucket once per
``(platform, platform_instance, bucket)`` and matches client-side, so N paths
under a bucket cost a single graph call rather than one wildcard query each.
Transient failures are surfaced via ``DataLakeUrnLookup.transient_error`` so
callers can log and count without this module owning a report type.
"""

graph: "DataHubGraph"
env: str
_bucket_index: Dict[Tuple[str, Optional[str], str], Tuple[str, ...]] = field(
default_factory=dict
)

def resolve_datasets_under_path(
self,
*,
platform: str,
bucket: str,
path: str,
platform_instance: Optional[str] = None,
) -> DataLakeUrnLookup:
"""Return existing dataset URNs whose path equals or sits under ``path``."""
key = (platform, platform_instance, bucket)
bucket_urns = self._bucket_index.get(key)
if bucket_urns is None:
try:
bucket_urns = self._bulk_fetch_bucket(
platform=platform,
platform_instance=platform_instance,
bucket=bucket,
)
except Exception as e:
logger.warning(
f"Transient failure fetching {platform} dataset URNs for bucket "
f"{bucket!r}; lookup will not be cached.",
exc_info=True,
)
return DataLakeUrnLookup(transient_error=e)
self._bucket_index[key] = bucket_urns

path_prefix = self._urn_prefix(platform, platform_instance, path)
matches = tuple(
u for u in bucket_urns if dataset_path_is_rooted_at(u, path_prefix)
)
return DataLakeUrnLookup(matched_urns=matches)

def _bulk_fetch_bucket(
self, *, platform: str, platform_instance: Optional[str], bucket: str
) -> Tuple[str, ...]:
bucket_prefix = self._urn_prefix(platform, platform_instance, bucket)
extra_filters: List["RawSearchFilterRule"] = [
{"field": "urn", "condition": "START_WITH", "values": [bucket_prefix]}
]
candidate_urns = self.graph.get_urns_by_filter(
entity_types=["dataset"],
platform=platform,
platform_instance=platform_instance,
env=self.env,
extraFilters=extra_filters,
)
# The START_WITH wildcard is case-insensitive and prefix-only, so it can
# over-match sibling buckets (e.g. `bucket` vs `bucket-other`). Re-check the
# bucket boundary case-sensitively before caching.
return tuple(
urn
for urn in candidate_urns
if dataset_path_is_rooted_at(urn, bucket_prefix)
)

@staticmethod
def _urn_prefix(platform: str, platform_instance: Optional[str], path: str) -> str:
name = f"{platform_instance}.{path}" if platform_instance else path
return f"urn:li:dataset:(urn:li:dataPlatform:{platform},{name}"


def dataset_path_is_rooted_at(dataset_urn: str, urn_prefix: str) -> bool:
"""Whether ``dataset_urn``'s path equals or sits strictly under ``urn_prefix``.

Rejects false positives where a plain prefix match would treat a sibling path as
a child (e.g. ``foo`` matching ``foobar``): the character immediately after the
prefix must be ``/`` (a child path) or ``,`` (the URN's env separator, i.e. an
exact match).
"""
if not dataset_urn.startswith(urn_prefix):
return False
return dataset_urn[len(urn_prefix) : len(urn_prefix) + 1] in ("/", ",")
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,29 @@ class SnowflakeV2Config(
description="If enabled, Snowflake Snowpipe objects will be ingested as DataJobs with COPY INTO lineage.",
)

Comment thread
alokr-dhub marked this conversation as resolved.
resolve_external_stage_lineage_via_graph: bool = Field(
default=False,
description=(
"If enabled, resolve external stage lineage by looking up existing dataset "
"URNs in the DataHub graph that share the stage path as a prefix, instead "
"of generating a URN from the raw stage path. Requires the corresponding "
"data lake source (S3, GCS, or Azure) to have been ingested first; "
"otherwise stage lineage will be skipped on this run and resolved on a "
"subsequent run."
),
)

external_stage_platform_instance: Optional[str] = Field(
default=None,
description=(
"Platform instance to use when looking up dataset URNs for external "
"stage lineage via the graph. Only relevant when "
"`resolve_external_stage_lineage_via_graph` is enabled and the "
"underlying data lake source (S3, GCS, or Azure) was ingested with "
"a platform_instance set."
),
)

marketplace: SnowflakeMarketplaceConfig = Field(
default_factory=SnowflakeMarketplaceConfig,
description="Configuration for Snowflake Internal Marketplace (private data exchange) ingestion.",
Expand Down Expand Up @@ -728,6 +751,35 @@ def validate_include_column_lineage(cls, v, info):
)
return v

@field_validator("external_stage_platform_instance", mode="after")
@classmethod
def normalize_external_stage_platform_instance(
cls, v: Optional[str]
) -> Optional[str]:
# Reject blank/whitespace strings: they would silently produce a malformed
# URN prefix like `urn:li:dataset:(urn:li:dataPlatform:s3,.<path>` and yield
# zero matches at lookup time. Treat them as if the field were unset.
if v is None:
return None
stripped = v.strip()
return stripped or None

@model_validator(mode="after")
def validate_external_stage_platform_instance_requires_resolve_flag(
self,
) -> "SnowflakeV2Config":
if (
self.external_stage_platform_instance is not None
and not self.resolve_external_stage_lineage_via_graph
):
add_global_warning(
"`external_stage_platform_instance` is set but "
"`resolve_external_stage_lineage_via_graph` is False. The platform "
"instance is only consulted when graph-based stage resolution is "
"enabled, so this setting will have no effect."
)
return self

@field_validator("include_externally_managed_dmfs", mode="after")
@classmethod
def validate_include_externally_managed_dmfs(cls, v, info):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,15 +352,20 @@ def _gen_data_job(
output_datasets.append(target_urn)

stages_without_urn: List[str] = []
# dict.fromkeys preserves insertion order and dedups in O(n); the previous
# ``if urn not in input_datasets`` was O(n*m) per pipe.
deduped: Dict[str, None] = dict.fromkeys(input_datasets)
for entry in stage_entries:
if entry.dataset_urn is None:
if not entry.dataset_urns:
stages_without_urn.append(
f"{entry.stage.database_name}."
f"{entry.stage.schema_name}."
f"{entry.stage.name}"
)
elif entry.dataset_urn not in input_datasets:
input_datasets.append(entry.dataset_urn)
continue
for urn in entry.dataset_urns:
deduped.setdefault(urn)
input_datasets = list(deduped)

if stages_without_urn:
# Stage was found, but its underlying dataset URN couldn't be resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class SnowflakeV2Report(
tasks_scanned: int = 0
pipes_scanned: int = 0

external_stage_lineage_resolved: int = 0
external_stage_lineage_unresolved: int = 0

include_usage_stats: bool = False
include_operational_stats: bool = False
include_technical_schema: bool = False
Expand Down
Loading
Loading