Skip to content

Commit

Permalink
deny topics absent in config
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Feb 11, 2025
1 parent ffc46fd commit a057762
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,6 @@ def _get_connector_tasks(self, connector_name: str) -> dict:
def _get_connector_topics(
self, connector_name: str, config: Dict[str, str], connector_type: str
) -> List[str]:
if connector_type == SINK and config.get("topics"):
# Sink connectors may configure `topics` as List of topics to consume, separated by commas
# https://kafka.apache.org/documentation/#sinkconnectorconfigs_topics
# https://docs.confluent.io/platform/current/installation/configuration/connect/sink-connect-configs.html#topics
return [topic.strip() for topic in config["topics"].split(",")]

try:
response = self.session.get(
f"{self.config.connect_uri}/connectors/{connector_name}/topics",
Expand All @@ -225,7 +219,24 @@ def _get_connector_topics(
)
return []

return response.json()[connector_name]["topics"]
processed_topics = response.json()[connector_name]["topics"]

if connector_type == SINK:
try:
## Sink connectors may configure `topics` or `topics.regex`
# https://kafka.apache.org/documentation/#sinkconnectorconfigs_topics
# https://docs.confluent.io/platform/current/installation/configuration/connect/sink-connect-configs.html#topics
return SinkTopicFilter().filter_stale_topics(processed_topics, config)
except Exception as e:
self.report.warning(
title="Error parsing sink conector topics configuration",
message="Some stale lineage tasks might show up for connector",
context=connector_name,
exc=e,
)
return processed_topics
else:
return processed_topics

def construct_flow_workunit(self, connector: ConnectorManifest) -> MetadataWorkUnit:
connector_name = connector.name
Expand Down Expand Up @@ -369,3 +380,63 @@ def make_lineage_dataset_urn(
return builder.make_dataset_urn_with_platform_instance(
platform, name, platform_instance, self.config.env
)


class SinkTopicFilter:
"""Helper class to filter Kafka Connect topics based on configuration."""

def filter_stale_topics(
self,
processed_topics: List[str],
sink_config: Dict[str, str],
) -> List[str]:
"""Filter out stale topics based on sink connector configuration.
Args:
connector_name: Name of the connector
processed_topics: List of topics currently being processed
sink_config: Configuration dictionary for the sink connector
Returns:
List of filtered topics that match the configuration
"""
# Validate required config exists
if not self._has_topic_config(sink_config):
return processed_topics

# Handle explicit topic list
if sink_config.get("topics"):
return self._filter_by_topic_list(processed_topics, sink_config["topics"])
else:
# Handle regex pattern
return self._filter_by_topic_regex(
processed_topics, sink_config["topics.regex"]
)

def _has_topic_config(self, sink_config: Dict[str, str]) -> bool:
"""Check if sink config has either topics or topics.regex."""
return bool(sink_config.get("topics") or sink_config.get("topics.regex"))

def _filter_by_topic_list(
self, processed_topics: List[str], topics_config: str
) -> List[str]:
"""Filter topics based on explicit topic list from config."""
config_topics = [
topic.strip() for topic in topics_config.split(",") if topic.strip()
]
return [topic for topic in processed_topics if topic in config_topics]

def _filter_by_topic_regex(
self, processed_topics: List[str], regex_pattern: str
) -> List[str]:
"""Filter topics based on regex pattern from config."""
from java.util.regex import Pattern

regex_matcher = Pattern.compile(regex_pattern)

return [
topic
for topic in processed_topics
if regex_matcher.matcher(topic).matches()
]
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
from typing import Any, Dict, List, Optional, cast
from unittest import mock

import jpype
import jpype.imports
import pytest
import requests
from freezegun import freeze_time

from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.kafka_connect.kafka_connect import SinkTopicFilter
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from tests.test_helpers import mce_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
Expand Down Expand Up @@ -684,3 +687,43 @@ def test_kafka_connect_bigquery_sink_ingest(
golden_path=test_resources_dir / "kafka_connect_bigquery_sink_mces_golden.json",
ignore_paths=[],
)


def test_filter_stale_topics_topics_list():
"""
Test case for filter_stale_topics method when sink_config has 'topics' key.
"""
# Create an instance of SinkTopicFilter
sink_filter = SinkTopicFilter()

# Set up test data
processed_topics = ["topic1", "topic2", "topic3", "topic4"]
sink_config = {"topics": "topic1,topic3,topic5"}

# Call the method under test
result = sink_filter.filter_stale_topics(processed_topics, sink_config)

# Assert the expected result
expected_result = ["topic1", "topic3"]
assert result == expected_result, f"Expected {expected_result}, but got {result}"


def test_filter_stale_topics_regex_filtering():
"""
Test filter_stale_topics when using topics.regex for filtering.
"""
if not jpype.isJVMStarted():
jpype.startJVM()

# Create an instance of SinkTopicFilter
sink_filter = SinkTopicFilter()

# Set up test data
processed_topics = ["topic1", "topic2", "other_topic", "test_topic"]
sink_config = {"topics.regex": "topic.*"}

# Call the method under test
result = sink_filter.filter_stale_topics(processed_topics, sink_config)

# Assert the result matches the expected filtered topics
assert result == ["topic1", "topic2"]

0 comments on commit a057762

Please sign in to comment.