Skip to content

Commit

Permalink
feat(ingest): Couchbase source profiling updates
Browse files Browse the repository at this point in the history
  • Loading branch information
mminichino committed Jan 22, 2025
1 parent 8591db8 commit bb14433
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ source:
- type: datahub
profiling:
enabled: true
profile_nested_fields: true

sink:
# sink configs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
ClassificationReportMixin,
ClassificationSourceConfigMixin,
)
from datahub.ingestion.source.ge_profiling_config import GEProfilingBaseConfig
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulIngestionConfigBase,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulProfilingConfigMixin,
)
from datahub.ingestion.source_config.operation_config import is_profiling_enabled
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.utilities.perf_timer import PerfTimer
Expand Down Expand Up @@ -47,6 +50,7 @@ class CouchbaseDBConfig(
EnvConfigMixin,
StatefulIngestionConfigBase,
ClassificationSourceConfigMixin,
StatefulProfilingConfigMixin,
):
connect_string: str = Field(
default="couchbases://127.0.0.1", description="Couchbase connect string."
Expand Down Expand Up @@ -82,8 +86,8 @@ class CouchbaseDBConfig(
description="Regex patterns for tables to profile",
)

profiling: GEProfilingBaseConfig = Field(
default=GEProfilingBaseConfig(),
profiling: GEProfilingConfig = Field(
default=GEProfilingConfig(),
description="Configuration for profiling",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ def __init__(
self.report = report
self.client = client

if self.config.profiling.use_sampling:
self.sample_size = self.config.profiling.sample_size
else:
self.sample_size = 0

Check warning on line 73 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py#L73

Added line #L73 was not covered by tests

self.field_sample_count = self.config.profiling.field_sample_values_limit

if self.config.profiling.max_number_of_fields_to_profile:
self.sample_fields = self.config.profiling.max_number_of_fields_to_profile

Check warning on line 78 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py#L78

Added line #L78 was not covered by tests
else:
self.sample_fields = 0

try:
self.loop = asyncio.get_running_loop()
except RuntimeError:
Expand Down Expand Up @@ -95,7 +107,10 @@ def generate_profile(self, keyspace: str) -> Iterable[MetadataWorkUnit]:
platform_instance=self.config.cluster_name,
)

if not self.config.profile_pattern.allowed(keyspace):
if (
not self.config.profile_pattern.allowed(keyspace)
and self.config.profiling.report_dropped_profiles
):
self.report.profiling_skipped_table_profile_pattern[keyspace] += 1
logger.info(f"Profiling not allowed for Keyspace {keyspace}")
return

Check warning on line 116 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py#L114-L116

Added lines #L114 - L116 were not covered by tests
Expand Down Expand Up @@ -193,8 +208,12 @@ async def _collect_column_data(
self, keyspace: str, profile_data: ProfileData
) -> ProfileData:
document_total_count: int = 0
dropped_fields = set()
dropped_nested_fields = set()

aggregator = CouchbaseAggregate(self.client, keyspace)
aggregator = CouchbaseAggregate(
self.client, keyspace, max_sample_size=self.sample_size
)

async for chunk in aggregator.get_documents():
for document in chunk:
Expand All @@ -204,7 +223,18 @@ async def _collect_column_data(
for _field, data in flatten([], document):
column_values[_field].append(data)

for field_name, values in column_values.items():
for n, (field_name, values) in enumerate(column_values.items()):
if 0 < self.sample_fields <= n:
dropped_fields.add(field_name)
continue

Check warning on line 229 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py#L228-L229

Added lines #L228 - L229 were not covered by tests

if (
not self.config.profiling.profile_nested_fields
and len(field_name.split(".")) > 1
):
dropped_nested_fields.add(field_name)
continue

Check warning on line 236 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py#L235-L236

Added lines #L235 - L236 were not covered by tests

if field_name not in profile_data.column_metrics:
profile_data.column_metrics[field_name] = ColumnMetric()
if not profile_data.column_count:
Expand All @@ -229,8 +259,23 @@ async def _collect_column_data(
else:
profile_data.column_metrics[field_name].values.append(value)

if len(dropped_fields) > 0:
if self.config.profiling.report_dropped_profiles:
self.report.report_dropped(

Check warning on line 264 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py#L263-L264

Added lines #L263 - L264 were not covered by tests
f"The max_number_of_fields_to_profile={self.sample_fields} reached. Dropped fields for {keyspace} ({', '.join(sorted(dropped_fields))})"
)

if len(dropped_nested_fields) > 0:
if self.config.profiling.report_dropped_profiles:
self.report.report_dropped(

Check warning on line 270 in metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py#L269-L270

Added lines #L269 - L270 were not covered by tests
f"Dropped nested fields for {keyspace} ({', '.join(sorted(dropped_nested_fields))})"
)

profile_data.row_count = document_total_count

return self._add_field_statistics(profile_data)

def _add_field_statistics(self, profile_data: ProfileData) -> ProfileData:
for field_name, column_metrics in profile_data.column_metrics.items():
if column_metrics.values:
try:
Expand Down Expand Up @@ -277,7 +322,9 @@ def _compute_field_statistics(self, column_metrics: ColumnMetric) -> None:
]

if values and self.config.profiling.include_field_sample_values:
column_metrics.sample_values = [str(v) for v in values[:5]]
column_metrics.sample_values = [
str(v) for v in values[: self.field_sample_count]
]

@staticmethod
def _is_numeric_type(data_type: Union[str, None]) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ def collection_wait():
"username": "Administrator",
"password": "password",
"cluster_name": "testdb",
"profiling": {"enabled": True},
"profiling": {
"enabled": True,
"profile_nested_fields": True,
},
"classification": ClassificationConfig(
enabled=True,
classifiers=[
Expand Down

0 comments on commit bb14433

Please sign in to comment.