11# (c) Copyright IBM Corp. 2025
22
3+ import os
34from typing import Generator
45
56import pytest
1112from confluent_kafka .admin import AdminClient , NewTopic
1213from opentelemetry .trace import SpanKind
1314
15+ from instana .options import StandardOptions
1416from instana .singletons import agent , tracer
17+ from instana .util .config import parse_ignored_endpoints_from_yaml
1518from tests .helpers import get_first_span_by_filter , testenv
1619
1720
@@ -29,7 +32,7 @@ def _resource(self) -> Generator[None, None, None]:
2932 self .kafka_client = AdminClient (self .kafka_config )
3033
3134 try :
32- topics = self .kafka_client .create_topics ( # noqa: F841
35+ _ = self .kafka_client .create_topics ( # noqa: F841
3336 [
3437 NewTopic (
3538 testenv ["kafka_topic" ],
@@ -187,6 +190,130 @@ def test_trace_confluent_kafka_error(self) -> None:
187190 == "num_messages must be between 0 and 1000000 (1M)"
188191 )
189192
193+ def test_ignore_confluent_kafka (self ) -> None :
194+ os .environ ["INSTANA_IGNORE_ENDPOINTS" ] = "kafka"
195+ agent .options = StandardOptions ()
196+
197+ with tracer .start_as_current_span ("test" ):
198+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes" )
199+ self .producer .flush (timeout = 10 )
200+
201+ spans = self .recorder .queued_spans ()
202+ assert len (spans ) == 2
203+
204+ filtered_spans = agent .filter_spans (spans )
205+ assert len (filtered_spans ) == 1
206+
207+ def test_ignore_confluent_kafka_producer (self ) -> None :
208+ os .environ ["INSTANA_IGNORE_ENDPOINTS" ] = "kafka:produce"
209+ agent .options = StandardOptions ()
210+
211+ with tracer .start_as_current_span ("test-span" ):
212+ # Produce some events
213+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes1" )
214+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes2" )
215+ self .producer .flush ()
216+
217+ # Consume the events
218+ consumer_config = self .kafka_config .copy ()
219+ consumer_config ["group.id" ] = "my-group"
220+ consumer_config ["auto.offset.reset" ] = "earliest"
221+
222+ consumer = Consumer (consumer_config )
223+ consumer .subscribe ([testenv ["kafka_topic" ]])
224+ consumer .consume (num_messages = 2 , timeout = 60 )
225+
226+ consumer .close ()
227+
228+ spans = self .recorder .queued_spans ()
229+ assert len (spans ) == 5
230+
231+ filtered_spans = agent .filter_spans (spans )
232+ assert len (filtered_spans ) == 3
233+
234+ def test_ignore_confluent_kafka_consumer (self ) -> None :
235+ os .environ ["INSTANA_IGNORE_ENDPOINTS" ] = "kafka:consume"
236+ agent .options = StandardOptions ()
237+
238+ with tracer .start_as_current_span ("test-span" ):
239+ # Produce some events
240+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes1" )
241+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes2" )
242+ self .producer .flush ()
243+
244+ # Consume the events
245+ consumer_config = self .kafka_config .copy ()
246+ consumer_config ["group.id" ] = "my-group"
247+ consumer_config ["auto.offset.reset" ] = "earliest"
248+
249+ consumer = Consumer (consumer_config )
250+ consumer .subscribe ([testenv ["kafka_topic" ]])
251+ consumer .consume (num_messages = 2 , timeout = 60 )
252+
253+ consumer .close ()
254+
255+ spans = self .recorder .queued_spans ()
256+ assert len (spans ) == 5
257+
258+ filtered_spans = agent .filter_spans (spans )
259+ assert len (filtered_spans ) == 3
260+
261+ def test_ignore_confluent_specific_topic (self ) -> None :
262+ os .environ ["INSTANA_IGNORE_ENDPOINTS" ] = "kafka:consume"
263+ os .environ ["INSTANA_IGNORE_ENDPOINTS_PATH" ] = (
264+ "tests/util/test_configuration-1.yaml"
265+ )
266+
267+ agent .options = StandardOptions ()
268+
269+ with tracer .start_as_current_span ("test-span" ):
270+ # Produce some events
271+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes1" )
272+ self .producer .flush ()
273+
274+ # Consume the events
275+ consumer_config = self .kafka_config .copy ()
276+ consumer_config ["group.id" ] = "my-group"
277+ consumer_config ["auto.offset.reset" ] = "earliest"
278+
279+ consumer = Consumer (consumer_config )
280+ consumer .subscribe ([testenv ["kafka_topic" ]])
281+ consumer .consume (num_messages = 1 , timeout = 60 )
282+
283+ consumer .close ()
284+
285+ spans = self .recorder .queued_spans ()
286+ assert len (spans ) == 3
287+
288+ filtered_spans = agent .filter_spans (spans )
289+ assert len (filtered_spans ) == 1
290+
291+ def test_ignore_confluent_specific_topic_with_config_file (self ) -> None :
292+ agent .options .ignore_endpoints = parse_ignored_endpoints_from_yaml (
293+ "tests/util/test_configuration-1.yaml"
294+ )
295+
296+ with tracer .start_as_current_span ("test-span" ):
297+ # Produce some events
298+ self .producer .produce (testenv ["kafka_topic" ], b"raw_bytes1" )
299+ self .producer .flush ()
300+
301+ # Consume the events
302+ consumer_config = self .kafka_config .copy ()
303+ consumer_config ["group.id" ] = "my-group"
304+ consumer_config ["auto.offset.reset" ] = "earliest"
305+
306+ consumer = Consumer (consumer_config )
307+ consumer .subscribe ([testenv ["kafka_topic" ]])
308+ consumer .consume (num_messages = 1 , timeout = 60 )
309+ consumer .close ()
310+
311+ spans = self .recorder .queued_spans ()
312+ assert len (spans ) == 3
313+
314+ filtered_spans = agent .filter_spans (spans )
315+ assert len (filtered_spans ) == 1
316+
190317 def test_confluent_kafka_consumer_root_exit (self ) -> None :
191318 agent .options .allow_exit_as_root = True
192319
0 commit comments