-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(ingest/dynamodb): add support for classification (#10138)
- Loading branch information
1 parent
85c54f5
commit 0361f24
Showing
28 changed files
with
330 additions
and
198 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
51 changes: 51 additions & 0 deletions
51
metadata-ingestion/src/datahub/ingestion/source/common/data_reader.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
from abc import abstractmethod | ||
from typing import Dict, List, Optional | ||
|
||
from datahub.ingestion.api.closeable import Closeable | ||
|
||
|
||
class DataReader(Closeable): | ||
def get_sample_data_for_column( | ||
self, table_id: List[str], column_name: str, sample_size: int | ||
) -> list: | ||
raise NotImplementedError() | ||
|
||
@abstractmethod | ||
def get_sample_data_for_table( | ||
self, | ||
table_id: List[str], | ||
sample_size: int, | ||
*, | ||
sample_size_percent: Optional[float] = None, | ||
filter: Optional[str] = None, | ||
) -> Dict[str, list]: | ||
""" | ||
Fetches table values , approx sample_size rows | ||
Args: | ||
table_id (List[str]): Table name identifier. One of | ||
- [<db_name>, <schema_name>, <table_name>] or | ||
- [<schema_name>, <table_name>] or | ||
- [<table_name>] | ||
sample_size (int): sample size | ||
Keyword Args: | ||
sample_size_percent(float, between 0 and 1): For bigquery-like data platforms that provide only | ||
percentage based sampling methods. If present, actual sample_size | ||
may be ignored. | ||
filter (string): For bigquery-like data platforms that need mandatory filter on partition | ||
column for some cases | ||
Returns: | ||
Dict[str, list]: dictionary of (column name -> list of column values) | ||
""" | ||
|
||
# Ideally we do not want null values in sample data for a column. | ||
# However that would require separate query per column and | ||
# that would be expensive, hence not done. To compensate for possibility | ||
# of some null values in collected sample, its usually recommended to | ||
# fetch extra (20% more) rows than configured sample_size. | ||
|
||
pass |
75 changes: 75 additions & 0 deletions
75
metadata-ingestion/src/datahub/ingestion/source/dynamodb/data_reader.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
from collections import defaultdict | ||
from typing import TYPE_CHECKING, Any, Dict, List | ||
|
||
from datahub.ingestion.source.common.data_reader import DataReader | ||
|
||
if TYPE_CHECKING: | ||
from mypy_boto3_dynamodb import DynamoDBClient | ||
|
||
PAGE_SIZE = 100 | ||
|
||
|
||
class DynamoDBTableItemsReader(DataReader): | ||
""" | ||
DynamoDB is a NoSQL database and may have different attributes (columns) present | ||
in different items (rows) of the table. | ||
""" | ||
|
||
@staticmethod | ||
def create(client: "DynamoDBClient") -> "DynamoDBTableItemsReader": | ||
return DynamoDBTableItemsReader(client) | ||
|
||
def __init__(self, client: "DynamoDBClient") -> None: | ||
# The lifecycle of this client is managed externally | ||
self.client = client | ||
|
||
def get_sample_data_for_table( | ||
self, table_id: List[str], sample_size: int, **kwargs: Any | ||
) -> Dict[str, list]: | ||
""" | ||
For dynamoDB, table_id should be in formation ( table-name ) or (region, table-name ) | ||
""" | ||
column_values: Dict[str, list] = defaultdict(list) | ||
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/scan.html | ||
paginator = self.client.get_paginator("scan") | ||
response_iterator = paginator.paginate( | ||
TableName=table_id[-1], | ||
PaginationConfig={ | ||
"MaxItems": sample_size, | ||
"PageSize": PAGE_SIZE, | ||
}, | ||
) | ||
# iterate through pagination result to retrieve items | ||
for page in response_iterator: | ||
items: List[Dict] = page["Items"] | ||
if len(items) > 0: | ||
for item in items: | ||
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/scan.html#scan | ||
# For an item (row), the key is the attribute name and the value is a dict with only one entry, | ||
# whose key is the data type and value is the data | ||
# for complex data types - L (list) or M (map) - we will recursively process the value into json-like form | ||
for attribute_name, attribute_value in item.items(): | ||
column_values[attribute_name].append( | ||
self._get_value(attribute_value) | ||
) | ||
|
||
# Note: Consider including items configured via `include_table_item` in sample data ? | ||
|
||
return column_values | ||
|
||
def _get_value(self, attribute_value: Dict[str, Any]) -> Any: | ||
# Each attribute value is described as a name-value pair. | ||
# The name is the data type, and the value is the data itself. | ||
for data_type, value in attribute_value.items(): | ||
if data_type == "L" and isinstance(value, list): | ||
return [self._get_value(e) for e in value] | ||
elif data_type == "M" and isinstance(value, dict): | ||
return { | ||
nested_attr: self._get_value(nested_value) | ||
for nested_attr, nested_value in value.items() | ||
} | ||
else: | ||
return value | ||
|
||
def close(self) -> None: | ||
pass |
Oops, something went wrong.