Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bf6b555
feat(ingest/sigma): add parallel SQL parsing and default db/schema co…
kyungsoo-datahub Feb 2, 2026
a49ec35
fix(ingest/sigma): handle 400 Bad Request for elements without lineag…
kyungsoo-datahub Feb 2, 2026
fd3e891
debug: add logging for Sigma SQL parsing and URN generation
kyungsoo-datahub Feb 4, 2026
f0c6a3c
feat(ingest/sigma): parallelize API calls and add column lineage batc…
kyungsoo-datahub Feb 4, 2026
af04da6
feat(ingest/sigma): add rate limiting for parallel API calls
kyungsoo-datahub Feb 4, 2026
09cc3e9
fix(ingest/sigma): lower rate limit threshold from 5 to 3 workers
kyungsoo-datahub Feb 5, 2026
f07ca7a
feat(ingest/sigma): pass graph to SQL parser for schema resolution
kyungsoo-datahub Feb 5, 2026
2ed25dd
debug(ingest/sigma): add logging to investigate column-level lineage
kyungsoo-datahub Feb 5, 2026
57dc94d
Revert "debug(ingest/sigma): add logging to investigate column-level …
kyungsoo-datahub Feb 5, 2026
8935f12
feat(ingest/sigma): add enable_column_lineage config for lighter inge…
kyungsoo-datahub Feb 5, 2026
250e16b
feat(ingest/sigma): add dataset source lineage extraction
kyungsoo-datahub Feb 5, 2026
55937be
fix(sql_parsing): remove excessive TABLE_EXTRACT_DEBUG logging
kyungsoo-datahub Feb 5, 2026
ec5dc0e
fix(ingest/sigma): improve rate limiting and use /sources endpoint fo…
kyungsoo-datahub Feb 5, 2026
ba45d3d
chore: trigger CI
kyungsoo-datahub Feb 5, 2026
bf66f74
Merge origin/master into dev/sigma-parallel-sql-parsing
kyungsoo-datahub Feb 5, 2026
decf159
fix(ingest/sigma): resolve table inodes to get actual connection/tabl…
kyungsoo-datahub Feb 5, 2026
8205658
fix(ingest/sigma): resolve connectionId from path for dataset lineage
kyungsoo-datahub Feb 6, 2026
b21e5d0
fix(ingest/sigma): traverse parent chain to find connectionId
kyungsoo-datahub Feb 6, 2026
a3d9611
revert: remove dataset lineage connectionId resolution attempts
kyungsoo-datahub Feb 6, 2026
bf5d5be
feat(ingest/sigma): add config options for rate limiting and retry 503
kyungsoo-datahub Feb 6, 2026
ea77b27
fix(ingest/sigma): handle list response from dataset sources API
kyungsoo-datahub Feb 6, 2026
adbca50
refactor(ingest/sigma): simplify config and remove unused features
kyungsoo-datahub Feb 6, 2026
70b30b0
revert(sql_parsing): remove column_lineage_batch_size from shared code
kyungsoo-datahub Feb 6, 2026
0c84dcf
refactor(ingest/sigma): remove dead code for dataset lineage
kyungsoo-datahub Feb 6, 2026
8ce533f
refactor(ingest/sigma): remove unused constants and debug logging
kyungsoo-datahub Feb 6, 2026
43f2f79
refactor(ingest/sigma): remove verbose SQL parsing progress logging
kyungsoo-datahub Feb 6, 2026
e36a09a
fix(ingest/sigma): address PR review feedback
kyungsoo-datahub Feb 7, 2026
9e790d4
fix(ingest/sigma): standardize logging and update docstring
kyungsoo-datahub Feb 7, 2026
274075d
fix(ingest/sigma): adjust sql_parsing_threads default and fix comment
kyungsoo-datahub Feb 7, 2026
842c30b
refactor(ingest/sigma): simplify SQL parsing cache lookup and use deb…
kyungsoo-datahub Feb 7, 2026
f75935f
docs(ingest/sigma): add clarifying comments for config and memory usage
kyungsoo-datahub Feb 7, 2026
e8a5c5d
refactor(ingest/sigma): remove threading and add generate_column_line…
kyungsoo-datahub Feb 7, 2026
f12b9b2
refactor(ingest/sigma): simplify SQL parsing to on-demand approach
kyungsoo-datahub Feb 7, 2026
3d3cc27
refactor(ingest/sigma): minimize changes, restore original code struc…
kyungsoo-datahub Feb 7, 2026
2390383
style(sql_parsing): use named args in sqlglot_lineage for clarity
kyungsoo-datahub Feb 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sigma/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ class PlatformDetail(PlatformInstanceConfigMixin, EnvConfigMixin):
data_source_platform: str = pydantic.Field(
description="A chart's data sources platform name.",
)
default_db: Optional[str] = pydantic.Field(
default=None,
description="Default database name to use when parsing SQL queries. "
"Used to generate fully qualified table URNs (e.g., 'prod' for 'prod.public.table').",
)
default_schema: Optional[str] = pydantic.Field(
default=None,
description="Default schema name to use when parsing SQL queries. "
"Used to generate fully qualified table URNs (e.g., 'public' for 'prod.public.table').",
)


class SigmaSourceConfig(
Expand Down Expand Up @@ -173,6 +183,12 @@ class SigmaSourceConfig(
default={},
description="A mapping of the sigma workspace/workbook/chart folder path to all chart's data sources platform details present inside that folder path.",
)
generate_column_lineage: bool = pydantic.Field(
default=False,
description="Whether to generate column-level lineage. "
"Disabled by default as Sigma only uses table-level lineage and "
"column lineage computation can be expensive for complex queries.",
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="Sigma Stateful Ingestion Config."
)
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,15 @@ def _get_element_input_details(
try:
sql_parser_in_tables = create_lineage_sql_parsed_result(
query=element.query.strip(),
default_db=None,
default_db=data_source_platform_details.default_db,
default_schema=data_source_platform_details.default_schema,
platform=data_source_platform_details.data_source_platform,
env=data_source_platform_details.env,
platform_instance=data_source_platform_details.platform_instance,
generate_column_lineage=self.config.generate_column_lineage,
).in_tables
except Exception:
logging.debug(f"Unable to parse query of element {element.name}")
logger.debug(f"Unable to parse query of element {element.name}")

# Add sigma dataset as input of element if present
# and its matched sql parser in_table as its upsteam dataset
Expand Down Expand Up @@ -671,6 +673,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

for dataset in self.sigma_api.get_sigma_datasets():
yield from self._gen_dataset_workunit(dataset)

for workbook in self.sigma_api.get_sigma_workbooks():
yield from self._gen_workbook_workunit(workbook)

Expand Down
43 changes: 39 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/sigma/sigma_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import logging
import sys
import time
from typing import Any, Dict, List, Optional

import requests
Expand All @@ -22,6 +23,10 @@
# Logger instance
logger = logging.getLogger(__name__)

# Retry constants for 429/503 errors
RETRY_MAX_ATTEMPTS = 3
RETRY_BASE_DELAY_SECONDS = 2.0 # Exponential backoff: 2s, 4s, 8s


class SigmaAPI:
def __init__(self, config: SigmaSourceConfig, report: SigmaSourceReport) -> None:
Expand All @@ -31,6 +36,7 @@ def __init__(self, config: SigmaSourceConfig, report: SigmaSourceReport) -> None
self.users: Dict[str, str] = {}
self.session = requests.Session()
self.refresh_token: Optional[str] = None

# Test connection by generating access token
logger.info(f"Trying to connect to {self.config.api_url}")
self._generate_token()
Expand Down Expand Up @@ -87,11 +93,35 @@ def _refresh_access_token(self):
)

def _get_api_call(self, url: str) -> requests.Response:
get_response = self.session.get(url)
if get_response.status_code == 401 and self.refresh_token:
logger.debug("Access token might expired. Refreshing access token.")
self._refresh_access_token()
"""Make an API call with retry on 429/503 errors."""
get_response: requests.Response
for attempt in range(RETRY_MAX_ATTEMPTS):
get_response = self.session.get(url)

# Handle token refresh on 401
if get_response.status_code == 401 and self.refresh_token:
logger.debug("Access token might expired. Refreshing access token.")
self._refresh_access_token()
get_response = self.session.get(url)

# Success or non-retryable error
if get_response.status_code not in (429, 503):
break

# Retry with exponential backoff, or give up on last attempt
if attempt < RETRY_MAX_ATTEMPTS - 1:
delay = RETRY_BASE_DELAY_SECONDS * (2**attempt)
logger.debug(
f"Retryable error ({get_response.status_code}) on {url}, "
f"retrying in {delay}s (attempt {attempt + 1}/{RETRY_MAX_ATTEMPTS})"
)
time.sleep(delay)
else:
logger.warning(
f"Retryable error ({get_response.status_code}) on {url}, "
f"max retries exceeded"
)

return get_response

def get_workspace(self, workspace_id: str) -> Optional[Workspace]:
Expand Down Expand Up @@ -299,6 +329,11 @@ def _get_element_upstream_sources(
f"Lineage metadata not accessible for element {element.name} of workbook '{workbook.name}'"
)
return upstream_sources
if response.status_code == 400:
logger.debug(
f"Lineage not supported for element {element.name} of workbook '{workbook.name}' (400 Bad Request)"
)
return upstream_sources

response.raise_for_status()
response_dict = response.json()
Expand Down
71 changes: 44 additions & 27 deletions metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,7 @@ def _sqlglot_lineage_inner(
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
override_dialect: Optional[DialectOrStr] = None,
generate_column_lineage: bool = True,
) -> SqlParsingResult:
if override_dialect:
dialect = get_dialect(override_dialect)
Expand Down Expand Up @@ -1957,31 +1958,32 @@ def _sqlglot_lineage_inner(

column_lineage: Optional[List[_ColumnLineageInfo]] = None
joins = None
try:
with cooperative_timeout(
timeout=(
SQL_LINEAGE_TIMEOUT_SECONDS if SQL_LINEAGE_TIMEOUT_ENABLED else None
)
):
column_lineage_debug_info = _column_level_lineage(
statement,
dialect=dialect,
downstream_table=downstream_table,
table_name_schema_mapping=table_name_schema_mapping,
default_db=default_db,
default_schema=default_schema,
)
column_lineage = column_lineage_debug_info.column_lineage
joins = column_lineage_debug_info.joins
except CooperativeTimeoutError as e:
logger.debug(f"Timed out while generating column-level lineage: {e}")
debug_info.column_error = e
except UnsupportedStatementTypeError as e:
# For this known exception type, we assume the error is logged at the point of failure.
debug_info.column_error = e
except Exception as e:
logger.debug(f"Failed to generate column-level lineage: {e}", exc_info=True)
debug_info.column_error = e
if generate_column_lineage:
try:
with cooperative_timeout(
timeout=(
SQL_LINEAGE_TIMEOUT_SECONDS if SQL_LINEAGE_TIMEOUT_ENABLED else None
)
):
column_lineage_debug_info = _column_level_lineage(
statement,
dialect=dialect,
downstream_table=downstream_table,
table_name_schema_mapping=table_name_schema_mapping,
default_db=default_db,
default_schema=default_schema,
)
column_lineage = column_lineage_debug_info.column_lineage
joins = column_lineage_debug_info.joins
except CooperativeTimeoutError as e:
logger.debug(f"Timed out while generating column-level lineage: {e}")
debug_info.column_error = e
except UnsupportedStatementTypeError as e:
# For this known exception type, we assume the error is logged at the point of failure.
debug_info.column_error = e
except Exception as e:
logger.debug(f"Failed to generate column-level lineage: {e}", exc_info=True)
debug_info.column_error = e

# TODO: Can we generate a common JOIN tables / keys section?
# TODO: Can we generate a common WHERE clauses section?
Expand Down Expand Up @@ -2043,6 +2045,7 @@ def _sqlglot_lineage_nocache(
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
override_dialect: Optional[DialectOrStr] = None,
generate_column_lineage: bool = True,
) -> SqlParsingResult:
"""Parse a SQL statement and generate lineage information.

Expand Down Expand Up @@ -2102,6 +2105,7 @@ def _sqlglot_lineage_nocache(
default_db=default_db,
default_schema=default_schema,
override_dialect=override_dialect,
generate_column_lineage=generate_column_lineage,
)
except Exception as e:
return SqlParsingResult.make_from_error(e)
Expand Down Expand Up @@ -2140,14 +2144,25 @@ def sqlglot_lineage(
default_db: Optional[str] = None,
default_schema: Optional[str] = None,
override_dialect: Optional[DialectOrStr] = None,
generate_column_lineage: bool = True,
) -> SqlParsingResult:
if schema_resolver.includes_temp_tables():
return _sqlglot_lineage_nocache(
sql, schema_resolver, default_db, default_schema, override_dialect
sql=sql,
schema_resolver=schema_resolver,
default_db=default_db,
default_schema=default_schema,
override_dialect=override_dialect,
generate_column_lineage=generate_column_lineage,
)
else:
return _sqlglot_lineage_cached(
sql, schema_resolver, default_db, default_schema, override_dialect
sql=sql,
schema_resolver=schema_resolver,
default_db=default_db,
default_schema=default_schema,
override_dialect=override_dialect,
generate_column_lineage=generate_column_lineage,
)


Expand Down Expand Up @@ -2200,6 +2215,7 @@ def create_lineage_sql_parsed_result(
graph: Optional[DataHubGraph] = None,
schema_aware: bool = True,
override_dialect: Optional[DialectOrStr] = None,
generate_column_lineage: bool = True,
) -> SqlParsingResult:
schema_resolver = create_schema_resolver(
platform=platform,
Expand All @@ -2220,6 +2236,7 @@ def create_lineage_sql_parsed_result(
default_db=default_db,
default_schema=default_schema,
override_dialect=override_dialect,
generate_column_lineage=generate_column_lineage,
)
except Exception as e:
return SqlParsingResult.make_from_error(e)
Expand Down
Loading