Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
bd1e649
failed rows wip
tombaeyens Jun 6, 2025
f37e783
Failed rows wip
tombaeyens Jun 6, 2025
b097f27
failed rows wip
tombaeyens Jun 9, 2025
a286dbc
Fixed extension mechanism
tombaeyens Jun 11, 2025
8b47953
Failed rows wip
tombaeyens Jun 12, 2025
1f5ca20
Implement proper status handling for both local and remote contract v…
nielsn Jun 6, 2025
2351523
Add dotenv to deps (#2305)
m1n0 Jun 9, 2025
9b99876
Fix diagnostics, support date-like in freshness (#2307)
m1n0 Jun 10, 2025
6744f3d
Bump contract json schema (#2308)
m1n0 Jun 10, 2025
2ced0a8
Fix freshness if no data, handle corner cases better (#2309)
m1n0 Jun 10, 2025
b919b45
Use question mark char for not_evaluated (#2310)
m1n0 Jun 10, 2025
83d75ac
DTL-807: Floor the current freshness in seconds so it returns an `int…
nielsn Jun 12, 2025
587ab8c
V4 datatime2 (#2311)
tombaeyens Jun 16, 2025
100dd75
Default check names (#2317)
tombaeyens Jun 16, 2025
2e31e46
failed rows wip
tombaeyens Jun 16, 2025
f9e5fff
Failed rows extension wip
tombaeyens Jun 17, 2025
11b2eb8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 17, 2025
ad9134e
Failed rows wip
tombaeyens Jun 17, 2025
eeb99e6
Failed rows keys wip
tombaeyens Jun 18, 2025
c5e4877
Failed rows keys wip
tombaeyens Jun 18, 2025
460a483
Failed rows wip
tombaeyens Jun 19, 2025
d1d78f6
failed rows wip
tombaeyens Jun 19, 2025
920778a
Failed rows wip
tombaeyens Jun 20, 2025
8560ca6
Added diagnostics check results table
tombaeyens Jun 21, 2025
7971d23
Merge branch 'v4' into v4-failedrowsext
tombaeyens Jun 23, 2025
a23444b
Failed rows config WIP
m1n0 Jun 23, 2025
2e94aac
Failed rows config WIP
m1n0 Jun 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile dev-requirements.in
Expand All @@ -18,8 +18,6 @@ distlib==0.3.8
# via virtualenv
docopt==0.6.2
# via tbump
exceptiongroup==1.3.0
# via pytest
filelock==3.16.1
# via virtualenv
freezegun==1.5.1
Expand Down Expand Up @@ -62,15 +60,8 @@ tabulate==0.8.10
# via cli-ui
tbump==6.11.0
# via -r dev-requirements.in
tomli==2.2.1
# via
# build
# pip-tools
# pytest
tomlkit==0.11.8
# via tbump
typing-extensions==4.14.0
# via exceptiongroup
unidecode==1.3.8
# via cli-ui
virtualenv==20.26.6
Expand Down
13 changes: 10 additions & 3 deletions soda-core/src/soda_core/common/data_source_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from soda_core.common.data_source_connection import DataSourceConnection
from soda_core.common.data_source_results import QueryResult, UpdateResult
from soda_core.common.dataset_identifier import DatasetIdentifier
from soda_core.common.exceptions import DataSourceConnectionException
from soda_core.common.logging_constants import soda_logger
from soda_core.common.sql_dialect import SqlDialect
Expand Down Expand Up @@ -145,16 +146,16 @@ def get_max_aggregation_query_length(self) -> int:
# BigQuery: No documented limit on query size, but practical limits on complexity and performance.
return 63 * 1024 * 1024

def is_different_data_type(self, expected_column: ColumnMetadata, actual_column_metadata: ColumnMetadata) -> bool:
def is_different_data_type(self, expected_column: ColumnMetadata, actual_column: ColumnMetadata) -> bool:
canonical_expected_data_type: str = self.get_canonical_data_type(expected_column.data_type)
canonical_actual_data_type: str = self.get_canonical_data_type(actual_column_metadata.data_type)
canonical_actual_data_type: str = self.get_canonical_data_type(actual_column.data_type)

if canonical_expected_data_type != canonical_actual_data_type:
return True

if (
isinstance(expected_column.character_maximum_length, int)
and expected_column.character_maximum_length != actual_column_metadata.character_maximum_length
and expected_column.character_maximum_length != actual_column.character_maximum_length
):
return True

Expand Down Expand Up @@ -203,6 +204,12 @@ def test_connection_error_message(self) -> Optional[str]:
def build_data_source(self) -> DataSource:
return DataSource(name=self.name, type=self.type_name)

def qualify_dataset_name(self, dataset_identifier: DatasetIdentifier) -> str:
assert dataset_identifier.data_source_name == self.name
return self.sql_dialect.qualify_dataset_name(
dataset_prefix=dataset_identifier.prefixes, dataset_name=dataset_identifier.dataset_name
)

def quote_identifier(self, identifier: str) -> str:
c = self.sql_dialect._get_default_quote_char()
return f"{c}{identifier}{c}"
9 changes: 3 additions & 6 deletions soda-core/src/soda_core/common/extensions.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from importlib import import_module
from typing import Callable, Optional

from soda_core.common.exceptions import ExtensionException


class Extensions:
@classmethod
Expand All @@ -11,7 +9,6 @@ def find_class_method(cls, module_name: str, class_name: str, method_name: str)
module = import_module(module_name)
class_ = getattr(module, class_name)
return getattr(class_, method_name)
except AttributeError as e:
raise ExtensionException(
message=f"Feature '{class_name}.{method_name}' requires the Soda Extensions to be installed."
)
except (AttributeError, ModuleNotFoundError) as e:
# Extension not installed
return None
36 changes: 30 additions & 6 deletions soda-core/src/soda_core/common/soda_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ def upload_contract_file(self, contract: Contract) -> Optional[str]:
soda_cloud_file_path: str = f"{contract.soda_qualified_dataset_name.lower()}.yml"
return self._upload_scan_yaml_file(yaml_str=contract_yaml_source_str, soda_cloud_file_path=soda_cloud_file_path)

def send_contract_result(self, contract_verification_result: ContractVerificationResult) -> bool:
def send_contract_result(self, contract_verification_result: ContractVerificationResult) -> Optional[dict]:
"""
Returns True if a 200 OK was received, False otherwise
Returns A scanId string if a 200 OK was received, None otherwise
"""
contract_verification_result = _build_contract_result_json(
contract_verification_result=contract_verification_result
Expand All @@ -241,14 +241,13 @@ def send_contract_result(self, contract_verification_result: ContractVerificatio
)
if response.status_code == 200:
logger.info(f"{Emoticons.OK_HAND} Results sent to Soda Cloud")
response_json = response.json()
response_json: dict = response.json()
if isinstance(response_json, dict):
cloud_url: Optional[str] = response_json.get("cloudUrl")
if isinstance(cloud_url, str):
logger.info(f"To view the dataset on Soda Cloud, see {cloud_url}")
return True
else:
return False
return response_json
return None

def send_contract_skeleton(self, contract_yaml_str: str, soda_cloud_file_path: str) -> None:
file_id: Optional[str] = self._upload_scan_yaml_file(
Expand Down Expand Up @@ -879,6 +878,9 @@ def _get_token(self) -> str:
assert self.token, "No token in login response?!"
return self.token

def send_failed_rows_diagnostics(self, scan_id: str, failed_rows_diagnostics: list[FailedRowsDiagnostic]):
print(f"TODO sending failed rows diagnostics for scan {scan_id} to Soda Cloud: {failed_rows_diagnostics}")


def to_jsonnable(o) -> object:
if o is None or isinstance(o, str) or isinstance(o, int) or isinstance(o, float) or isinstance(o, bool):
Expand Down Expand Up @@ -1196,3 +1198,25 @@ def _append_exception_to_cloud_log_dicts(cloud_log_dicts: list[dict], exception:
exc_cloud_log_dict["index"] = len(cloud_log_dicts)
cloud_log_dicts.append(exc_cloud_log_dict)
return cloud_log_dicts


class FailedRowsDiagnostic:
def __init__(self, check_identity: str, name: str, query: str):
self.check_identity: str = check_identity
self.name: str = name
self.query: str = query


class QuerySourceFailedRowsDiagnostic(FailedRowsDiagnostic):
def __init__(self, check_identity: str, name: str, query: str):
super().__init__(check_identity, name, query)


class StoreKeysFailedRowsDiagnostic(FailedRowsDiagnostic):
def __init__(self, check_identity: str, name: str, query: str):
super().__init__(check_identity, name, query)


class StoreDataFailedRowsDiagnostic(FailedRowsDiagnostic):
def __init__(self, check_identity: str, name: str, query: str):
super().__init__(check_identity, name, query)
14 changes: 12 additions & 2 deletions soda-core/src/soda_core/common/sql_dialect.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from numbers import Number
from textwrap import dedent, indent

from soda_core.common.dataset_identifier import DatasetIdentifier
from soda_core.common.sql_ast import *


Expand All @@ -28,6 +29,11 @@ def quote_default(self, identifier: Optional[str]) -> Optional[str]:
else None
)

def build_fully_qualified_sql_name(self, dataset_identifier: DatasetIdentifier) -> str:
return self.qualify_dataset_name(
dataset_prefix=dataset_identifier.prefixes, dataset_name=dataset_identifier.dataset_name
)

def qualify_dataset_name(self, dataset_prefix: list[str], dataset_name: str) -> str:
"""
Creates a fully qualified table name, optionally quoting the table name
Expand Down Expand Up @@ -85,14 +91,18 @@ def escape_string(self, value: str):
def escape_regex(self, value: str):
return value

def build_select_sql(self, select_elements: list) -> str:
def create_schema_if_not_exists_sql(self, schema_name: str) -> str:
quoted_schema_name: str = self.quote_default(schema_name)
return f"CREATE SCHEMA IF NOT EXISTS {quoted_schema_name};"

def build_select_sql(self, select_elements: list, add_semicolon: bool = True) -> str:
statement_lines: list[str] = []
statement_lines.extend(self._build_cte_sql_lines(select_elements))
statement_lines.extend(self._build_select_sql_lines(select_elements))
statement_lines.extend(self._build_from_sql_lines(select_elements))
statement_lines.extend(self._build_where_sql_lines(select_elements))
statement_lines.extend(self._build_order_by_lines(select_elements))
return "\n".join(statement_lines) + ";"
return "\n".join(statement_lines) + (";" if add_semicolon else "")

def _build_select_sql_lines(self, select_elements: list) -> list[str]:
select_field_sqls: list[str] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ class ColumnMetadata:
data_type: str
character_maximum_length: Optional[int]

def get_data_type_ddl(self) -> str:
if self.character_maximum_length is None:
return self.data_type
else:
return f"{self.data_type}({self.character_maximum_length})"


class MetadataColumnsQuery:
def __init__(self, sql_dialect: SqlDialect, data_source_connection: DataSourceConnection):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def __init__(
)
else:
self.invalid_count_metric_impl = self._resolve_metric(
InvalidCountMetric(contract_impl=contract_impl, column_impl=column_impl, check_impl=self)
InvalidCountMetricImpl(contract_impl=contract_impl, column_impl=column_impl, check_impl=self)
)

self.row_count_metric = self._resolve_metric(RowCountMetricImpl(contract_impl=contract_impl, check_impl=self))
Expand Down Expand Up @@ -145,8 +145,11 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) ->
diagnostic_metric_values=diagnostic_metric_values,
)

def get_threshold_metric_impl(self) -> Optional[MetricImpl]:
return self.invalid_count_metric_impl

class InvalidCountMetric(AggregationMetricImpl):

class InvalidCountMetricImpl(AggregationMetricImpl):
def __init__(
self,
contract_impl: ContractImpl,
Expand All @@ -162,15 +165,17 @@ def __init__(
)

def sql_expression(self) -> SqlExpression:
return SUM(CASE_WHEN(self.sql_condition_expression(), LITERAL(1)))

def sql_condition_expression(self) -> SqlExpression:
column_name: str = self.column_impl.column_yaml.name
invalid_count_condition: SqlExpression = AND.optional(
return AND.optional(
[
SqlExpressionStr.optional(self.check_filter),
NOT.optional(self.missing_and_validity.is_missing_expr(column_name)),
self.missing_and_validity.is_invalid_expr(column_name),
]
)
return SUM(CASE_WHEN(invalid_count_condition, LITERAL(1)))

def convert_db_value(self, value) -> int:
# Note: expression SUM(CASE WHEN "id" IS NULL THEN 1 ELSE 0 END) gives NULL / None as a result if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(
)

self.metric_name = "missing_percent" if check_yaml.metric == "percent" else "missing_count"
self.missing_count_metric = self._resolve_metric(
self.missing_count_metric_impl = self._resolve_metric(
MissingCountMetricImpl(contract_impl=contract_impl, column_impl=column_impl, check_impl=self)
)

Expand All @@ -69,7 +69,7 @@ def __init__(
self.missing_percent_metric_impl: MetricImpl = self.contract_impl.metrics_resolver.resolve_metric(
DerivedPercentageMetricImpl(
metric_type="missing_percent",
fraction_metric_impl=self.missing_count_metric,
fraction_metric_impl=self.missing_count_metric_impl,
total_metric_impl=self.row_count_metric_impl,
)
)
Expand All @@ -79,7 +79,7 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) ->

diagnostic_metric_values: dict[str, float] = {}

missing_count: int = measurement_values.get_value(self.missing_count_metric)
missing_count: int = measurement_values.get_value(self.missing_count_metric_impl)
diagnostic_metric_values["missing_count"] = missing_count

row_count: int = measurement_values.get_value(self.row_count_metric_impl)
Expand All @@ -104,6 +104,9 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) ->
diagnostic_metric_values=diagnostic_metric_values,
)

def get_threshold_metric_impl(self) -> Optional[MetricImpl]:
return self.missing_count_metric_impl


class MissingCountMetricImpl(AggregationMetricImpl):
def __init__(
Expand All @@ -121,14 +124,16 @@ def __init__(
)

def sql_expression(self) -> SqlExpression:
return SUM(CASE_WHEN(self.sql_condition_expression(), LITERAL(1)))

def sql_condition_expression(self) -> SqlExpression:
column_name: str = self.column_impl.column_yaml.name
not_missing_and_invalid_expr = self.missing_and_validity.is_missing_expr(column_name)
missing_count_condition: SqlExpression = (
return (
not_missing_and_invalid_expr
if not self.check_filter
else AND([SqlExpressionStr(self.check_filter), not_missing_and_invalid_expr])
)
return SUM(CASE_WHEN(missing_count_condition, LITERAL(1), LITERAL(0)))

def convert_db_value(self, value) -> int:
# Note: expression SUM(CASE WHEN "id" IS NULL THEN 1 ELSE 0 END) gives NULL / None as a result if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def evaluate(self, measurement_values: MeasurementValues, contract: Contract) ->
actual_column_metadata
and expected_column.data_type
and self.contract_impl.data_source_impl.is_different_data_type(
expected_column=expected_column, actual_column_metadata=actual_column_metadata
expected_column=expected_column, actual_column=actual_column_metadata
)
):
column_data_type_mismatches.append(
Expand Down
Loading