From 0361f2463db5f64dead3ea9b24af853498bbf28f Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Wed, 27 Mar 2024 23:58:58 +0530 Subject: [PATCH] feat(ingest/dynamodb): add support for classification (#10138) --- metadata-ingestion/scripts/docgen.py | 1 + metadata-ingestion/setup.py | 4 +- .../glossary/classification_mixin.py | 22 ++- .../ingestion/source/bigquery_v2/bigquery.py | 3 +- .../bigquery_v2/bigquery_data_reader.py | 2 +- .../ingestion/source/common/data_reader.py | 51 +++++ .../ingestion/source/dynamodb/data_reader.py | 75 ++++++++ .../ingestion/source/dynamodb/dynamodb.py | 175 +++++++++++------- .../ingestion/source/redshift/redshift.py | 2 +- .../source/redshift/redshift_data_reader.py | 2 +- .../source/schema_inference/object.py | 2 +- .../source/snowflake/snowflake_data_reader.py | 2 +- .../datahub/ingestion/source/sql/athena.py | 5 - .../ingestion/source/sql/clickhouse.py | 5 - .../src/datahub/ingestion/source/sql/druid.py | 5 - .../src/datahub/ingestion/source/sql/hana.py | 5 - .../src/datahub/ingestion/source/sql/hive.py | 5 - .../src/datahub/ingestion/source/sql/mysql.py | 5 - .../datahub/ingestion/source/sql/oracle.py | 5 - .../datahub/ingestion/source/sql/postgres.py | 5 - .../ingestion/source/sql/presto_on_hive.py | 8 +- .../ingestion/source/sql/sql_common.py | 23 ++- ...ta_reader.py => sqlalchemy_data_reader.py} | 58 +----- .../datahub/ingestion/source/sql/teradata.py | 5 - .../src/datahub/ingestion/source/sql/trino.py | 7 +- .../datahub/ingestion/source/sql/vertica.py | 7 +- ...ynamodb_platform_instance_mces_golden.json | 11 ++ .../integration/dynamodb/test_dynamodb.py | 28 +++ 28 files changed, 330 insertions(+), 198 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/common/data_reader.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source/dynamodb/data_reader.py rename metadata-ingestion/src/datahub/ingestion/source/sql/{data_reader.py => sqlalchemy_data_reader.py} (57%) diff --git a/metadata-ingestion/scripts/docgen.py b/metadata-ingestion/scripts/docgen.py index 10679ad770c45..c9818b936f929 100644 --- a/metadata-ingestion/scripts/docgen.py +++ b/metadata-ingestion/scripts/docgen.py @@ -413,6 +413,7 @@ def get_capability_text(src_capability: SourceCapability) -> str: SourceCapability.DOMAINS: "../../../domains.md", SourceCapability.PLATFORM_INSTANCE: "../../../platform-instances.md", SourceCapability.DATA_PROFILING: "../../../../metadata-ingestion/docs/dev_guides/sql_profiles.md", + SourceCapability.CLASSIFICATION: "../../../../metadata-ingestion/docs/dev_guides/classification.md", } capability_doc = capability_docs_mapping.get(src_capability) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 65fa00c2acd0e..111321eed303a 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -103,7 +103,7 @@ } classification_lib = { - "acryl-datahub-classify==0.0.9", + "acryl-datahub-classify==0.0.10", } sql_common = ( @@ -445,7 +445,7 @@ "types-click==0.1.12", # The boto3-stubs package seems to have regularly breaking minor releases, # we pin to a specific version to avoid this. - "boto3-stubs[s3,glue,sagemaker,sts]==1.28.15", + "boto3-stubs[s3,glue,sagemaker,sts,dynamodb]==1.28.15", "mypy-boto3-sagemaker==1.28.15", # For some reason, above pin only restricts `mypy-boto3-sagemaker<1.29.0,>=1.28.0` "types-tabulate", # avrogen package requires this diff --git a/metadata-ingestion/src/datahub/ingestion/glossary/classification_mixin.py b/metadata-ingestion/src/datahub/ingestion/glossary/classification_mixin.py index c0de827b21131..346357cd0a863 100644 --- a/metadata-ingestion/src/datahub/ingestion/glossary/classification_mixin.py +++ b/metadata-ingestion/src/datahub/ingestion/glossary/classification_mixin.py @@ -14,12 +14,14 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.glossary.classifier import ClassificationConfig, Classifier from datahub.ingestion.glossary.classifier_registry import classifier_registry -from datahub.ingestion.source.sql.data_reader import DataReader +from datahub.ingestion.source.common.data_reader import DataReader +from datahub.ingestion.source.sql.sqlalchemy_data_reader import SAMPLE_SIZE_MULTIPLIER from datahub.metadata.com.linkedin.pegasus2avro.common import ( AuditStamp, GlossaryTermAssociation, GlossaryTerms, ) +from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.perf_timer import PerfTimer @@ -289,12 +291,22 @@ def classification_workunit_processor( table_id: List[str], data_reader_kwargs: dict = {}, ) -> Iterable[MetadataWorkUnit]: + """ + Classification handling for a particular table. + Currently works only for workunits having MCP or MCPW + """ table_name = ".".join(table_id) if not classification_handler.is_classification_enabled_for_table(table_name): yield from table_wu_generator for wu in table_wu_generator: maybe_schema_metadata = wu.get_aspect_of_type(SchemaMetadata) - if maybe_schema_metadata: + if ( + isinstance(wu.metadata, MetadataChangeEvent) + and len(wu.metadata.proposedSnapshot.aspects) > 1 + ) or not maybe_schema_metadata: + yield wu + continue + else: # This is MCP or MCPW workunit with SchemaMetadata aspect try: classification_handler.classify_schema_fields( table_name, @@ -304,7 +316,7 @@ def classification_workunit_processor( data_reader.get_sample_data_for_table, table_id, classification_handler.config.classification.sample_size - * 1.2, + * SAMPLE_SIZE_MULTIPLIER, **data_reader_kwargs, ) if data_reader @@ -317,10 +329,8 @@ def classification_workunit_processor( is_primary_source=wu.is_primary_source, ) except Exception as e: - logger.debug( + logger.warning( f"Failed to classify table columns for {table_name} due to error -> {e}", exc_info=e, ) yield wu - else: - yield wu diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index aeb6a524eb77c..674446e60d6d9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -77,6 +77,7 @@ gen_schema_container, get_domain_wu, ) +from datahub.ingestion.source.sql.sqlalchemy_data_reader import SAMPLE_SIZE_MULTIPLIER from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler from datahub.ingestion.source.state.redundant_run_skip_handler import ( RedundantLineageRunSkipHandler, @@ -764,7 +765,7 @@ def _process_schema( data_reader_kwargs=dict( sample_size_percent=( self.config.classification.sample_size - * 1.2 + * SAMPLE_SIZE_MULTIPLIER / table.rows_count if table.rows_count else None diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_data_reader.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_data_reader.py index 37dfd14ce125e..cc9f6d4765658 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_data_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_data_reader.py @@ -4,7 +4,7 @@ from google.cloud import bigquery -from datahub.ingestion.source.sql.data_reader import DataReader +from datahub.ingestion.source.common.data_reader import DataReader from datahub.utilities.perf_timer import PerfTimer logger = logging.Logger(__name__) diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/data_reader.py b/metadata-ingestion/src/datahub/ingestion/source/common/data_reader.py new file mode 100644 index 0000000000000..8a0a492ca5d33 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/common/data_reader.py @@ -0,0 +1,51 @@ +from abc import abstractmethod +from typing import Dict, List, Optional + +from datahub.ingestion.api.closeable import Closeable + + +class DataReader(Closeable): + def get_sample_data_for_column( + self, table_id: List[str], column_name: str, sample_size: int + ) -> list: + raise NotImplementedError() + + @abstractmethod + def get_sample_data_for_table( + self, + table_id: List[str], + sample_size: int, + *, + sample_size_percent: Optional[float] = None, + filter: Optional[str] = None, + ) -> Dict[str, list]: + """ + Fetches table values , approx sample_size rows + + Args: + table_id (List[str]): Table name identifier. One of + - [, , ] or + - [, ] or + - [] + sample_size (int): sample size + + Keyword Args: + sample_size_percent(float, between 0 and 1): For bigquery-like data platforms that provide only + percentage based sampling methods. If present, actual sample_size + may be ignored. + + filter (string): For bigquery-like data platforms that need mandatory filter on partition + column for some cases + + + Returns: + Dict[str, list]: dictionary of (column name -> list of column values) + """ + + # Ideally we do not want null values in sample data for a column. + # However that would require separate query per column and + # that would be expensive, hence not done. To compensate for possibility + # of some null values in collected sample, its usually recommended to + # fetch extra (20% more) rows than configured sample_size. + + pass diff --git a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/data_reader.py b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/data_reader.py new file mode 100644 index 0000000000000..13804d703ac93 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/data_reader.py @@ -0,0 +1,75 @@ +from collections import defaultdict +from typing import TYPE_CHECKING, Any, Dict, List + +from datahub.ingestion.source.common.data_reader import DataReader + +if TYPE_CHECKING: + from mypy_boto3_dynamodb import DynamoDBClient + +PAGE_SIZE = 100 + + +class DynamoDBTableItemsReader(DataReader): + """ + DynamoDB is a NoSQL database and may have different attributes (columns) present + in different items (rows) of the table. + """ + + @staticmethod + def create(client: "DynamoDBClient") -> "DynamoDBTableItemsReader": + return DynamoDBTableItemsReader(client) + + def __init__(self, client: "DynamoDBClient") -> None: + # The lifecycle of this client is managed externally + self.client = client + + def get_sample_data_for_table( + self, table_id: List[str], sample_size: int, **kwargs: Any + ) -> Dict[str, list]: + """ + For dynamoDB, table_id should be in formation ( table-name ) or (region, table-name ) + """ + column_values: Dict[str, list] = defaultdict(list) + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/scan.html + paginator = self.client.get_paginator("scan") + response_iterator = paginator.paginate( + TableName=table_id[-1], + PaginationConfig={ + "MaxItems": sample_size, + "PageSize": PAGE_SIZE, + }, + ) + # iterate through pagination result to retrieve items + for page in response_iterator: + items: List[Dict] = page["Items"] + if len(items) > 0: + for item in items: + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/scan.html#scan + # For an item (row), the key is the attribute name and the value is a dict with only one entry, + # whose key is the data type and value is the data + # for complex data types - L (list) or M (map) - we will recursively process the value into json-like form + for attribute_name, attribute_value in item.items(): + column_values[attribute_name].append( + self._get_value(attribute_value) + ) + + # Note: Consider including items configured via `include_table_item` in sample data ? + + return column_values + + def _get_value(self, attribute_value: Dict[str, Any]) -> Any: + # Each attribute value is described as a name-value pair. + # The name is the data type, and the value is the data itself. + for data_type, value in attribute_value.items(): + if data_type == "L" and isinstance(value, list): + return [self._get_value(e) for e in value] + elif data_type == "M" and isinstance(value, dict): + return { + nested_attr: self._get_value(nested_value) + for nested_attr, nested_value in value.items() + } + else: + return value + + def close(self) -> None: + pass diff --git a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py index f917c38f947f4..3d85238d9422f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py @@ -1,10 +1,19 @@ import logging from dataclasses import dataclass, field -from typing import Any, Counter, Dict, Iterable, List, Optional, Tuple, Type, Union +from typing import ( + TYPE_CHECKING, + Counter, + Dict, + Iterable, + List, + Optional, + Tuple, + Type, + Union, +) import boto3 import pydantic -from botocore.client import BaseClient from pydantic.fields import Field from datahub.configuration.common import AllowDenyPattern @@ -27,6 +36,13 @@ ) from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceCapability from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.glossary.classification_mixin import ( + ClassificationHandler, + ClassificationReportMixin, + ClassificationSourceConfigMixin, + classification_workunit_processor, +) +from datahub.ingestion.source.dynamodb.data_reader import DynamoDBTableItemsReader from datahub.ingestion.source.schema_inference.object import SchemaDescription from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, @@ -65,8 +81,19 @@ logger: logging.Logger = logging.getLogger(__name__) +if TYPE_CHECKING: + from mypy_boto3_dynamodb import DynamoDBClient + from mypy_boto3_dynamodb.type_defs import ( + AttributeValueTypeDef, + TableDescriptionTypeDef, + ) + -class DynamoDBConfig(DatasetSourceConfigMixin, StatefulIngestionConfigBase): +class DynamoDBConfig( + DatasetSourceConfigMixin, + StatefulIngestionConfigBase, + ClassificationSourceConfigMixin, +): # TODO: refactor the config to use AwsConnectionConfig and create a method get_dynamodb_client # in the class to provide optional region name input aws_access_key_id: str = Field(description="AWS Access Key ID.") @@ -95,7 +122,7 @@ class DynamoDBConfig(DatasetSourceConfigMixin, StatefulIngestionConfigBase): @dataclass -class DynamoDBSourceReport(StaleEntityRemovalSourceReport): +class DynamoDBSourceReport(StaleEntityRemovalSourceReport, ClassificationReportMixin): filtered: List[str] = field(default_factory=list) def report_dropped(self, name: str) -> None: @@ -163,6 +190,7 @@ def __init__(self, ctx: PipelineContext, config: DynamoDBConfig, platform: str): self.config = config self.report = DynamoDBSourceReport() self.platform = platform + self.classification_handler = ClassificationHandler(self.config, self.report) if self.config.domain: self.domain_registry = DomainRegistry( @@ -201,6 +229,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: aws_access_key_id=self.config.aws_access_key_id, aws_secret_access_key=self.config.aws_secret_access_key.get_secret_value(), ) + data_reader = DynamoDBTableItemsReader.create(dynamodb_client) for table_name in self._list_tables(dynamodb_client): dataset_name = f"{region}.{table_name}" @@ -209,66 +238,82 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.report.report_dropped(dataset_name) continue - logger.debug(f"Processing table: {dataset_name}") - table_info = dynamodb_client.describe_table(TableName=table_name)[ - "Table" - ] - account_id = table_info["TableArn"].split(":")[4] - platform_instance = self.config.platform_instance or account_id - dataset_urn = make_dataset_urn_with_platform_instance( - platform=self.platform, - platform_instance=platform_instance, - name=dataset_name, - ) - dataset_properties = DatasetPropertiesClass( - tags=[], - customProperties={ - "table.arn": table_info["TableArn"], - "table.totalItems": str(table_info["ItemCount"]), - }, + table_wu_generator = self._process_table( + region, dynamodb_client, table_name, dataset_name ) - primary_key_dict = self.extract_primary_key_from_key_schema(table_info) - table_schema = self.construct_schema_from_dynamodb( - dynamodb_client, region, table_name - ) - schema_metadata = self.construct_schema_metadata( - table_name, - dataset_urn, - dataset_properties, - table_schema, - primary_key_dict, + yield from classification_workunit_processor( + table_wu_generator, + self.classification_handler, + data_reader, + [region, table_name], ) - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, - aspect=schema_metadata, - ).as_workunit() + def _process_table( + self, + region: str, + dynamodb_client: "DynamoDBClient", + table_name: str, + dataset_name: str, + ) -> Iterable[MetadataWorkUnit]: + + logger.debug(f"Processing table: {dataset_name}") + table_info = dynamodb_client.describe_table(TableName=table_name)["Table"] + account_id = table_info["TableArn"].split(":")[4] + platform_instance = self.config.platform_instance or account_id + dataset_urn = make_dataset_urn_with_platform_instance( + platform=self.platform, + platform_instance=platform_instance, + name=dataset_name, + ) + dataset_properties = DatasetPropertiesClass( + tags=[], + customProperties={ + "table.arn": table_info["TableArn"], + "table.totalItems": str(table_info["ItemCount"]), + }, + ) - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, - aspect=dataset_properties, - ).as_workunit() + primary_key_dict = self.extract_primary_key_from_key_schema(table_info) + table_schema = self.construct_schema_from_dynamodb( + dynamodb_client, region, table_name + ) - yield from self._get_domain_wu( - dataset_name=table_name, - entity_urn=dataset_urn, - ) + schema_metadata = self.construct_schema_metadata( + table_name, + dataset_urn, + dataset_properties, + table_schema, + primary_key_dict, + ) - platform_instance_aspect = DataPlatformInstanceClass( - platform=make_data_platform_urn(self.platform), - instance=make_dataplatform_instance_urn( - self.platform, platform_instance - ), - ) + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=schema_metadata, + ).as_workunit() + + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=dataset_properties, + ).as_workunit() + + yield from self._get_domain_wu( + dataset_name=table_name, + entity_urn=dataset_urn, + ) + + platform_instance_aspect = DataPlatformInstanceClass( + platform=make_data_platform_urn(self.platform), + instance=make_dataplatform_instance_urn(self.platform, platform_instance), + ) - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, - aspect=platform_instance_aspect, - ).as_workunit() + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=platform_instance_aspect, + ).as_workunit() def _list_tables( self, - dynamodb_client: BaseClient, + dynamodb_client: "DynamoDBClient", ) -> Iterable[str]: # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/paginator/ListTables.html try: @@ -283,7 +328,7 @@ def _list_tables( def construct_schema_from_dynamodb( self, - dynamodb_client: BaseClient, + dynamodb_client: "DynamoDBClient", region: str, table_name: str, ) -> Dict[Tuple[str, ...], SchemaDescription]: @@ -321,7 +366,7 @@ def construct_schema_from_dynamodb( def include_table_item_to_schema( self, - dynamodb_client: Any, + dynamodb_client: "DynamoDBClient", region: str, table_name: str, schema: Dict[Tuple[str, ...], SchemaDescription], @@ -342,7 +387,7 @@ def include_table_item_to_schema( f"the provided primary keys list size exceeded the max size for table {dataset_name}, we'll only process the first {MAX_PRIMARY_KEYS_SIZE} items" ) primary_key_list = primary_key_list[0:MAX_PRIMARY_KEYS_SIZE] - items = [] + items: List[Dict[str, "AttributeValueTypeDef"]] = [] response = dynamodb_client.batch_get_item( RequestItems={table_name: {"Keys": primary_key_list}} ).get("Responses") @@ -354,13 +399,13 @@ def include_table_item_to_schema( logger.debug( f"successfully retrieved {len(primary_key_list)} items based on supplied primary key list" ) - items = response.get(table_name) + items = response.get(table_name) or [] self.construct_schema_from_items(items, schema) def construct_schema_from_items( self, - items: List[Dict[str, Dict]], + items: List[Dict[str, "AttributeValueTypeDef"]], schema: Dict[Tuple[str, ...], SchemaDescription], ) -> None: """ @@ -375,7 +420,7 @@ def construct_schema_from_items( def append_schema( self, schema: Dict[Tuple[str, ...], SchemaDescription], - document: Dict[str, Dict], + document: Dict[str, "AttributeValueTypeDef"], parent_field_path: Tuple[str, ...] = (), ) -> None: # the key is the attribute name and the value is a dict with only one entry, @@ -384,7 +429,7 @@ def append_schema( for key, value in document.items(): if value is not None: data_type = list(value.keys())[0] - attribute_value = value[data_type] + attribute_value = value[data_type] # type:ignore current_field_path = parent_field_path + (key,) # Handle nested maps by recursive calls if data_type == "M": @@ -487,14 +532,14 @@ def construct_schema_metadata( return schema_metadata def extract_primary_key_from_key_schema( - self, table_info: Dict[str, Any] + self, table_info: "TableDescriptionTypeDef" ) -> Dict[str, str]: key_schema = table_info.get("KeySchema") - primary_key_dict = {} + primary_key_dict: Dict[str, str] = {} assert isinstance(key_schema, List) for key in key_schema: - attribute_name = key.get("AttributeName") - key_type = key.get("KeyType") + attribute_name = key["AttributeName"] + key_type = key["KeyType"] primary_key_dict[attribute_name] = key_type return primary_key_dict diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index ad19386d41fad..6be7eedf976bd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -39,6 +39,7 @@ ClassificationHandler, classification_workunit_processor, ) +from datahub.ingestion.source.common.data_reader import DataReader from datahub.ingestion.source.common.subtypes import ( DatasetContainerSubTypes, DatasetSubTypes, @@ -57,7 +58,6 @@ ) from datahub.ingestion.source.redshift.report import RedshiftReport from datahub.ingestion.source.redshift.usage import RedshiftUsageExtractor -from datahub.ingestion.source.sql.data_reader import DataReader from datahub.ingestion.source.sql.sql_common import SqlWorkUnit from datahub.ingestion.source.sql.sql_types import resolve_postgres_modified_type from datahub.ingestion.source.sql.sql_utils import ( diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_data_reader.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_data_reader.py index 5b92cf5c45688..8af436ae49979 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_data_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_data_reader.py @@ -3,7 +3,7 @@ import redshift_connector -from datahub.ingestion.source.sql.data_reader import DataReader +from datahub.ingestion.source.common.data_reader import DataReader from datahub.utilities.perf_timer import PerfTimer logger = logging.Logger(__name__) diff --git a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/object.py b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/object.py index 5a11d020547e8..bbcb114ee40c3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/object.py +++ b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/object.py @@ -5,7 +5,7 @@ class BasicSchemaDescription(TypedDict): - types: CounterType[type] # field types and times seen + types: CounterType[Union[type, str]] # field types and times seen count: int # times the field was seen diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_data_reader.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_data_reader.py index afb8cca707160..9fa81cb1bd20c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_data_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_data_reader.py @@ -4,7 +4,7 @@ import pandas as pd from snowflake.connector import SnowflakeConnection -from datahub.ingestion.source.sql.data_reader import DataReader +from datahub.ingestion.source.common.data_reader import DataReader from datahub.utilities.perf_timer import PerfTimer logger = logging.Logger(__name__) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index eed5b1cb6c9eb..c3759875b2769 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -291,11 +291,6 @@ def get_sql_alchemy_url(self): ) @capability(SourceCapability.LINEAGE_COARSE, "Supported for S3 tables") @capability(SourceCapability.DESCRIPTIONS, "Enabled by default") -@capability( - SourceCapability.CLASSIFICATION, - "Optionally enabled via `classification.enabled`", - supported=True, -) class AthenaSource(SQLAlchemySource): """ This plugin supports extracting the following metadata from Athena diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py b/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py index 7d32b5a20df11..84c1d3844a7b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py @@ -380,11 +380,6 @@ def get_columns(self, connection, table_name, schema=None, **kw): @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") @capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") -@capability( - SourceCapability.CLASSIFICATION, - "Optionally enabled via `classification.enabled`", - supported=True, -) class ClickHouseSource(TwoTierSQLAlchemySource): """ This plugin extracts the following: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/druid.py b/metadata-ingestion/src/datahub/ingestion/source/sql/druid.py index fdec869baa583..3f20e0a0f18b6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/druid.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/druid.py @@ -61,11 +61,6 @@ def get_identifier(self, schema: str, table: str) -> str: @config_class(DruidConfig) @support_status(SupportStatus.INCUBATING) @capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") -@capability( - SourceCapability.CLASSIFICATION, - "Optionally enabled via `classification.enabled`", - supported=True, -) class DruidSource(SQLAlchemySource): """ This plugin extracts the following: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py index 40875809120de..5c9c8f063a1a9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hana.py @@ -28,11 +28,6 @@ class HanaConfig(BasicSQLAlchemyConfig): @capability(SourceCapability.DOMAINS, "Supported via the `domain` config field") @capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") -@capability( - SourceCapability.CLASSIFICATION, - "Optionally enabled via `classification.enabled`", - supported=True, -) class HanaSource(SQLAlchemySource): def __init__(self, config: HanaConfig, ctx: PipelineContext): super().__init__(config, ctx, "hana") diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py index 2975bfe820d1b..003732236ba80 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py @@ -134,11 +134,6 @@ def clean_host_port(cls, v): @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") @capability(SourceCapability.DOMAINS, "Supported via the `domain` config field") -@capability( - SourceCapability.CLASSIFICATION, - "Optionally enabled via `classification.enabled`", - supported=True, -) class HiveSource(TwoTierSQLAlchemySource): """ This plugin extracts the following: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mysql.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mysql.py index af036a5fa0d8e..12ce271e9f5ef 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mysql.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mysql.py @@ -66,11 +66,6 @@ def get_identifier(self, *, schema: str, table: str) -> str: @capability(SourceCapability.DOMAINS, "Supported via the `domain` config field") @capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") -@capability( - SourceCapability.CLASSIFICATION, - "Optionally enabled via `classification.enabled`", - supported=True, -) class MySQLSource(TwoTierSQLAlchemySource): """ This plugin extracts the following: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py b/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py index cf7bdc982ee80..bcf0f26008ae3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py @@ -560,11 +560,6 @@ def __getattr__(self, item: str) -> Any: @config_class(OracleConfig) @support_status(SupportStatus.INCUBATING) @capability(SourceCapability.DOMAINS, "Enabled by default") -@capability( - SourceCapability.CLASSIFICATION, - "Optionally enabled via `classification.enabled`", - supported=True, -) class OracleSource(SQLAlchemySource): """ This plugin extracts the following: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py b/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py index 20976c91f7878..5d1e37fbb68a3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py @@ -132,11 +132,6 @@ class PostgresConfig(BasePostgresConfig): @capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") @capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") @capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration") -@capability( - SourceCapability.CLASSIFICATION, - "Optionally enabled via `classification.enabled`", - supported=True, -) class PostgresSource(SQLAlchemySource): """ This plugin extracts the following: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py index 98e2f2ecfbd5a..74cdabc84ad3c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py @@ -159,12 +159,8 @@ def get_sql_alchemy_url( @config_class(PrestoOnHiveConfig) @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") -@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") -@capability( - SourceCapability.CLASSIFICATION, - "Optionally enabled via `classification.enabled`", - supported=True, -) +@capability(SourceCapability.DATA_PROFILING, "Not Supported", False) +@capability(SourceCapability.CLASSIFICATION, "Not Supported", False) class PrestoOnHiveSource(SQLAlchemySource): """ This plugin extracts the following: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 70826c14e13c7..8e4b6ea83b459 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -37,10 +37,12 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.sql_parsing_builder import SqlParsingBuilder from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import capability from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage from datahub.ingestion.api.source import ( CapabilityReport, MetadataWorkUnitProcessor, + SourceCapability, TestableSource, TestConnectionReport, ) @@ -49,14 +51,11 @@ ClassificationHandler, ClassificationReportMixin, ) +from datahub.ingestion.source.common.data_reader import DataReader from datahub.ingestion.source.common.subtypes import ( DatasetContainerSubTypes, DatasetSubTypes, ) -from datahub.ingestion.source.sql.data_reader import ( - DataReader, - SqlAlchemyTableDataReader, -) from datahub.ingestion.source.sql.sql_config import SQLCommonConfig from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, @@ -68,6 +67,10 @@ get_domain_wu, schema_requires_v2, ) +from datahub.ingestion.source.sql.sqlalchemy_data_reader import ( + SAMPLE_SIZE_MULTIPLIER, + SqlAlchemyTableDataReader, +) from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, StaleEntityRemovalSourceReport, @@ -314,6 +317,11 @@ class ProfileMetadata: dataset_name_to_storage_bytes: Dict[str, int] = field(default_factory=dict) +@capability( + SourceCapability.CLASSIFICATION, + "Optionally enabled via `classification.enabled`", + supported=True, +) class SQLAlchemySource(StatefulIngestionSourceBase, TestableSource): """A Base class for all SQL Sources that use SQLAlchemy to extend""" @@ -674,7 +682,7 @@ def loop_tables( # noqa: C901 ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: tables_seen: Set[str] = set() data_reader = self.make_data_reader(inspector) - with (data_reader or contextlib.nullcontext()): + with data_reader or contextlib.nullcontext(): try: for table in inspector.get_table_names(schema): dataset_name = self.get_identifier( @@ -827,7 +835,10 @@ def _classify( partial( data_reader.get_sample_data_for_table, [schema, table], - int(self.config.classification.sample_size * 1.2), + int( + self.config.classification.sample_size + * SAMPLE_SIZE_MULTIPLIER + ), ), ) except Exception as e: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/data_reader.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sqlalchemy_data_reader.py similarity index 57% rename from metadata-ingestion/src/datahub/ingestion/source/sql/data_reader.py rename to metadata-ingestion/src/datahub/ingestion/source/sql/sqlalchemy_data_reader.py index 75d3236a0a5ad..d4eacd0b8fc76 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/data_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sqlalchemy_data_reader.py @@ -1,65 +1,17 @@ import logging -from abc import abstractmethod from collections import defaultdict -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Union import sqlalchemy as sa from sqlalchemy.engine import Connection, Engine from sqlalchemy.engine.reflection import Inspector -from datahub.ingestion.api.closeable import Closeable +from datahub.ingestion.source.common.data_reader import DataReader from datahub.utilities.perf_timer import PerfTimer logger: logging.Logger = logging.getLogger(__name__) -class DataReader(Closeable): - def get_sample_data_for_column( - self, table_id: List[str], column_name: str, sample_size: int - ) -> list: - raise NotImplementedError() - - @abstractmethod - def get_sample_data_for_table( - self, - table_id: List[str], - sample_size: int, - *, - sample_size_percent: Optional[float] = None, - filter: Optional[str] = None, - ) -> Dict[str, list]: - """ - Fetches table values , approx sample_size rows - - Args: - table_id (List[str]): Table name identifier. One of - - [, , ] or - - [, ] or - - [] - sample_size (int): sample size - - Keyword Args: - sample_size_percent(float, between 0 and 1): For bigquery-like data platforms that provide only - percentage based sampling methods. If present, actual sample_size - may be ignored. - - filter (string): For bigquery-like data platforms that need mandatory filter on partition - column for some cases - - - Returns: - Dict[str, list]: dictionary of (column name -> list of column values) - """ - - # Ideally we do not want null values in sample data for a column. - # However that would require separate query per column and - # that would be expensive, hence not done. To compensate for possibility - # of some null values in collected sample, its usually recommended to - # fetch extra (20% more) rows than configured sample_size. - - pass - - class SqlAlchemyTableDataReader(DataReader): @staticmethod def create(inspector: Inspector) -> "SqlAlchemyTableDataReader": @@ -81,6 +33,9 @@ def _table(self, table_id: List[str]) -> sa.Table: def get_sample_data_for_table( self, table_id: List[str], sample_size: int, **kwargs: Any ) -> Dict[str, list]: + """ + For sqlalchemy, table_id should be in form (schema_name, table_name) + """ logger.debug(f"Collecting sample values for table {'.'.join(table_id)}") @@ -117,3 +72,6 @@ def get_sample_data_for_table( def close(self) -> None: self.connection.close() + + +SAMPLE_SIZE_MULTIPLIER = 1.2 diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index 3d0bacba74a69..53b1ddfcde595 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -447,11 +447,6 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig): @capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration") @capability(SourceCapability.LINEAGE_FINE, "Optionally enabled via configuration") @capability(SourceCapability.USAGE_STATS, "Optionally enabled via configuration") -@capability( - SourceCapability.CLASSIFICATION, - "Optionally enabled via `classification.enabled`", - supported=True, -) class TeradataSource(TwoTierSQLAlchemySource): """ This plugin extracts the following: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py b/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py index 1828c5101d4f3..e1c47acbc4b87 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py @@ -35,7 +35,7 @@ ) from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.extractor import schema_util -from datahub.ingestion.source.sql.data_reader import DataReader +from datahub.ingestion.source.common.data_reader import DataReader from datahub.ingestion.source.sql.sql_common import ( SQLAlchemySource, SqlWorkUnit, @@ -226,11 +226,6 @@ def get_identifier(self: BasicSQLAlchemyConfig, schema: str, table: str) -> str: @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.DOMAINS, "Supported via the `domain` config field") @capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration") -@capability( - SourceCapability.CLASSIFICATION, - "Optionally enabled via `classification.enabled`", - supported=True, -) class TrinoSource(SQLAlchemySource): """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py b/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py index 9800660a9ad54..738cc7e321764 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py @@ -24,7 +24,7 @@ support_status, ) from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.sql.data_reader import DataReader +from datahub.ingestion.source.common.data_reader import DataReader from datahub.ingestion.source.sql.sql_common import ( SQLAlchemySource, SQLSourceReport, @@ -120,11 +120,6 @@ def clean_host_port(cls, v): "Optionally enabled via `stateful_ingestion.remove_stale_metadata`", supported=True, ) -@capability( - SourceCapability.CLASSIFICATION, - "Optionally enabled via `classification.enabled`", - supported=True, -) class VerticaSource(SQLAlchemySource): def __init__(self, config: VerticaConfig, ctx: PipelineContext): # self.platform = platform diff --git a/metadata-ingestion/tests/integration/dynamodb/dynamodb_platform_instance_mces_golden.json b/metadata-ingestion/tests/integration/dynamodb/dynamodb_platform_instance_mces_golden.json index 2cfc559ed7744..6509844c87d29 100644 --- a/metadata-ingestion/tests/integration/dynamodb/dynamodb_platform_instance_mces_golden.json +++ b/metadata-ingestion/tests/integration/dynamodb/dynamodb_platform_instance_mces_golden.json @@ -56,6 +56,17 @@ }, "nativeDataType": "List", "recursive": false, + "glossaryTerms": { + "terms": [ + { + "urn": "urn:li:glossaryTerm:Phone_Number" + } + ], + "auditStamp": { + "time": 1693396800000, + "actor": "urn:li:corpuser:datahub" + } + }, "isPartOfKey": false }, { diff --git a/metadata-ingestion/tests/integration/dynamodb/test_dynamodb.py b/metadata-ingestion/tests/integration/dynamodb/test_dynamodb.py index 0bca1e8ac66de..33ecd0dcd7e07 100644 --- a/metadata-ingestion/tests/integration/dynamodb/test_dynamodb.py +++ b/metadata-ingestion/tests/integration/dynamodb/test_dynamodb.py @@ -5,6 +5,13 @@ from freezegun import freeze_time from moto import mock_dynamodb +from datahub.ingestion.glossary.classification_mixin import ClassificationConfig +from datahub.ingestion.glossary.classifier import DynamicTypedClassifierConfig +from datahub.ingestion.glossary.datahub_classifier import ( + DataHubClassifierConfig, + InfoTypeConfig, + PredictionFactorsAndWeights, +) from datahub.ingestion.run.pipeline import Pipeline from tests.test_helpers import mce_helpers @@ -90,6 +97,27 @@ def test_dynamodb(pytestconfig, tmp_path): "platform_instance": "dynamodb_test", "aws_access_key_id": "test", "aws_secret_access_key": "test", + "classification": ClassificationConfig( + enabled=True, + classifiers=[ + DynamicTypedClassifierConfig( + type="datahub", + config=DataHubClassifierConfig( + minimum_values_threshold=1, + info_types_config={ + "Phone_Number": InfoTypeConfig( + prediction_factors_and_weights=PredictionFactorsAndWeights( + name=0.7, + description=0, + datatype=0, + values=0.3, + ) + ) + }, + ), + ) + ], + ), }, }, "sink": {