From a057762d88974d1b4495856e6698bc0a4569503f Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Tue, 11 Feb 2025 19:40:58 +0530 Subject: [PATCH] deny topics absent in config --- .../source/kafka_connect/kafka_connect.py | 85 +++++++++++++++++-- .../kafka-connect/test_kafka_connect.py | 43 ++++++++++ 2 files changed, 121 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py index c10e87970db4b..29d35251a9c4e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py @@ -208,12 +208,6 @@ def _get_connector_tasks(self, connector_name: str) -> dict: def _get_connector_topics( self, connector_name: str, config: Dict[str, str], connector_type: str ) -> List[str]: - if connector_type == SINK and config.get("topics"): - # Sink connectors may configure `topics` as List of topics to consume, separated by commas - # https://kafka.apache.org/documentation/#sinkconnectorconfigs_topics - # https://docs.confluent.io/platform/current/installation/configuration/connect/sink-connect-configs.html#topics - return [topic.strip() for topic in config["topics"].split(",")] - try: response = self.session.get( f"{self.config.connect_uri}/connectors/{connector_name}/topics", @@ -225,7 +219,24 @@ def _get_connector_topics( ) return [] - return response.json()[connector_name]["topics"] + processed_topics = response.json()[connector_name]["topics"] + + if connector_type == SINK: + try: + ## Sink connectors may configure `topics` or `topics.regex` + # https://kafka.apache.org/documentation/#sinkconnectorconfigs_topics + # https://docs.confluent.io/platform/current/installation/configuration/connect/sink-connect-configs.html#topics + return SinkTopicFilter().filter_stale_topics(processed_topics, config) + except Exception as e: + self.report.warning( + title="Error parsing sink conector topics configuration", + message="Some stale lineage tasks might show up for connector", + context=connector_name, + exc=e, + ) + return processed_topics + else: + return processed_topics def construct_flow_workunit(self, connector: ConnectorManifest) -> MetadataWorkUnit: connector_name = connector.name @@ -369,3 +380,63 @@ def make_lineage_dataset_urn( return builder.make_dataset_urn_with_platform_instance( platform, name, platform_instance, self.config.env ) + + +class SinkTopicFilter: + """Helper class to filter Kafka Connect topics based on configuration.""" + + def filter_stale_topics( + self, + processed_topics: List[str], + sink_config: Dict[str, str], + ) -> List[str]: + """Filter out stale topics based on sink connector configuration. + + Args: + connector_name: Name of the connector + processed_topics: List of topics currently being processed + sink_config: Configuration dictionary for the sink connector + + Returns: + List of filtered topics that match the configuration + + """ + # Validate required config exists + if not self._has_topic_config(sink_config): + return processed_topics + + # Handle explicit topic list + if sink_config.get("topics"): + return self._filter_by_topic_list(processed_topics, sink_config["topics"]) + else: + # Handle regex pattern + return self._filter_by_topic_regex( + processed_topics, sink_config["topics.regex"] + ) + + def _has_topic_config(self, sink_config: Dict[str, str]) -> bool: + """Check if sink config has either topics or topics.regex.""" + return bool(sink_config.get("topics") or sink_config.get("topics.regex")) + + def _filter_by_topic_list( + self, processed_topics: List[str], topics_config: str + ) -> List[str]: + """Filter topics based on explicit topic list from config.""" + config_topics = [ + topic.strip() for topic in topics_config.split(",") if topic.strip() + ] + return [topic for topic in processed_topics if topic in config_topics] + + def _filter_by_topic_regex( + self, processed_topics: List[str], regex_pattern: str + ) -> List[str]: + """Filter topics based on regex pattern from config.""" + from java.util.regex import Pattern + + regex_matcher = Pattern.compile(regex_pattern) + + return [ + topic + for topic in processed_topics + if regex_matcher.matcher(topic).matches() + ] diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index 404baa8935392..606a2c81a9ad8 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -2,11 +2,14 @@ from typing import Any, Dict, List, Optional, cast from unittest import mock +import jpype +import jpype.imports import pytest import requests from freezegun import freeze_time from datahub.ingestion.run.pipeline import Pipeline +from datahub.ingestion.source.kafka_connect.kafka_connect import SinkTopicFilter from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState from tests.test_helpers import mce_helpers from tests.test_helpers.click_helpers import run_datahub_cmd @@ -684,3 +687,43 @@ def test_kafka_connect_bigquery_sink_ingest( golden_path=test_resources_dir / "kafka_connect_bigquery_sink_mces_golden.json", ignore_paths=[], ) + + +def test_filter_stale_topics_topics_list(): + """ + Test case for filter_stale_topics method when sink_config has 'topics' key. + """ + # Create an instance of SinkTopicFilter + sink_filter = SinkTopicFilter() + + # Set up test data + processed_topics = ["topic1", "topic2", "topic3", "topic4"] + sink_config = {"topics": "topic1,topic3,topic5"} + + # Call the method under test + result = sink_filter.filter_stale_topics(processed_topics, sink_config) + + # Assert the expected result + expected_result = ["topic1", "topic3"] + assert result == expected_result, f"Expected {expected_result}, but got {result}" + + +def test_filter_stale_topics_regex_filtering(): + """ + Test filter_stale_topics when using topics.regex for filtering. + """ + if not jpype.isJVMStarted(): + jpype.startJVM() + + # Create an instance of SinkTopicFilter + sink_filter = SinkTopicFilter() + + # Set up test data + processed_topics = ["topic1", "topic2", "other_topic", "test_topic"] + sink_config = {"topics.regex": "topic.*"} + + # Call the method under test + result = sink_filter.filter_stale_topics(processed_topics, sink_config) + + # Assert the result matches the expected filtered topics + assert result == ["topic1", "topic2"]