diff --git a/metadata-ingestion/docs/sources/couchbase/couchbase_recipe.yml b/metadata-ingestion/docs/sources/couchbase/couchbase_recipe.yml index 3fa7c97d8d3b3..3816995bf342a 100644 --- a/metadata-ingestion/docs/sources/couchbase/couchbase_recipe.yml +++ b/metadata-ingestion/docs/sources/couchbase/couchbase_recipe.yml @@ -11,6 +11,7 @@ source: - type: datahub profiling: enabled: true + profile_nested_fields: true sink: # sink configs diff --git a/metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_common.py b/metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_common.py index 86a4ce367160b..a0cc17c92b7f3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_common.py @@ -13,11 +13,14 @@ ClassificationReportMixin, ClassificationSourceConfigMixin, ) -from datahub.ingestion.source.ge_profiling_config import GEProfilingBaseConfig +from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalSourceReport, StatefulIngestionConfigBase, ) +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulProfilingConfigMixin, +) from datahub.ingestion.source_config.operation_config import is_profiling_enabled from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.utilities.perf_timer import PerfTimer @@ -47,6 +50,7 @@ class CouchbaseDBConfig( EnvConfigMixin, StatefulIngestionConfigBase, ClassificationSourceConfigMixin, + StatefulProfilingConfigMixin, ): connect_string: str = Field( default="couchbases://127.0.0.1", description="Couchbase connect string." @@ -82,8 +86,8 @@ class CouchbaseDBConfig( description="Regex patterns for tables to profile", ) - profiling: GEProfilingBaseConfig = Field( - default=GEProfilingBaseConfig(), + profiling: GEProfilingConfig = Field( + default=GEProfilingConfig(), description="Configuration for profiling", ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py b/metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py index 35303e5780bae..fd35f525e5f55 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py +++ b/metadata-ingestion/src/datahub/ingestion/source/couchbase/couchbase_profiling.py @@ -67,6 +67,18 @@ def __init__( self.report = report self.client = client + if self.config.profiling.use_sampling: + self.sample_size = self.config.profiling.sample_size + else: + self.sample_size = 0 + + self.field_sample_count = self.config.profiling.field_sample_values_limit + + if self.config.profiling.max_number_of_fields_to_profile: + self.sample_fields = self.config.profiling.max_number_of_fields_to_profile + else: + self.sample_fields = 0 + try: self.loop = asyncio.get_running_loop() except RuntimeError: @@ -95,7 +107,10 @@ def generate_profile(self, keyspace: str) -> Iterable[MetadataWorkUnit]: platform_instance=self.config.cluster_name, ) - if not self.config.profile_pattern.allowed(keyspace): + if ( + not self.config.profile_pattern.allowed(keyspace) + and self.config.profiling.report_dropped_profiles + ): self.report.profiling_skipped_table_profile_pattern[keyspace] += 1 logger.info(f"Profiling not allowed for Keyspace {keyspace}") return @@ -193,8 +208,12 @@ async def _collect_column_data( self, keyspace: str, profile_data: ProfileData ) -> ProfileData: document_total_count: int = 0 + dropped_fields = set() + dropped_nested_fields = set() - aggregator = CouchbaseAggregate(self.client, keyspace) + aggregator = CouchbaseAggregate( + self.client, keyspace, max_sample_size=self.sample_size + ) async for chunk in aggregator.get_documents(): for document in chunk: @@ -204,7 +223,18 @@ async def _collect_column_data( for _field, data in flatten([], document): column_values[_field].append(data) - for field_name, values in column_values.items(): + for n, (field_name, values) in enumerate(column_values.items()): + if 0 < self.sample_fields <= n: + dropped_fields.add(field_name) + continue + + if ( + not self.config.profiling.profile_nested_fields + and len(field_name.split(".")) > 1 + ): + dropped_nested_fields.add(field_name) + continue + if field_name not in profile_data.column_metrics: profile_data.column_metrics[field_name] = ColumnMetric() if not profile_data.column_count: @@ -229,8 +259,23 @@ async def _collect_column_data( else: profile_data.column_metrics[field_name].values.append(value) + if len(dropped_fields) > 0: + if self.config.profiling.report_dropped_profiles: + self.report.report_dropped( + f"The max_number_of_fields_to_profile={self.sample_fields} reached. Dropped fields for {keyspace} ({', '.join(sorted(dropped_fields))})" + ) + + if len(dropped_nested_fields) > 0: + if self.config.profiling.report_dropped_profiles: + self.report.report_dropped( + f"Dropped nested fields for {keyspace} ({', '.join(sorted(dropped_nested_fields))})" + ) + profile_data.row_count = document_total_count + return self._add_field_statistics(profile_data) + + def _add_field_statistics(self, profile_data: ProfileData) -> ProfileData: for field_name, column_metrics in profile_data.column_metrics.items(): if column_metrics.values: try: @@ -277,7 +322,9 @@ def _compute_field_statistics(self, column_metrics: ColumnMetric) -> None: ] if values and self.config.profiling.include_field_sample_values: - column_metrics.sample_values = [str(v) for v in values[:5]] + column_metrics.sample_values = [ + str(v) for v in values[: self.field_sample_count] + ] @staticmethod def _is_numeric_type(data_type: Union[str, None]) -> bool: diff --git a/metadata-ingestion/tests/integration/couchbase/test_couchbase.py b/metadata-ingestion/tests/integration/couchbase/test_couchbase.py index 74d11794cb27d..05864a2227c76 100644 --- a/metadata-ingestion/tests/integration/couchbase/test_couchbase.py +++ b/metadata-ingestion/tests/integration/couchbase/test_couchbase.py @@ -110,7 +110,10 @@ def collection_wait(): "username": "Administrator", "password": "password", "cluster_name": "testdb", - "profiling": {"enabled": True}, + "profiling": { + "enabled": True, + "profile_nested_fields": True, + }, "classification": ClassificationConfig( enabled=True, classifiers=[