Skip to content

Commit bb85f98

Browse files
fcfangccemdnetoxrmx
authored
Fix aiokafka multiple values headers error. (#3332)
* Fix aiokafka multiple values headers * modify testcase * add changelog --------- Co-authored-by: Emídio Neto <[email protected]> Co-authored-by: Riccardo Magliocchetti <[email protected]>
1 parent 04f9e8d commit bb85f98

File tree

3 files changed

+52
-0
lines changed

3 files changed

+52
-0
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2929
([#3113](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3113))
3030
- `opentelemetry-instrumentation-grpc` Fix error when using gprc versions <= 1.50.0 with unix sockets.
3131
([[#3393](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3393)])
32+
- `opentelemetry-instrumentation-aiokafka` Fix send_and_wait method no headers kwargs error.
33+
([[#3332](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3332)])
3234

3335
## Version 1.31.0/0.52b0 (2025-03-12)
3436

instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/utils.py

+10
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,15 @@ def _extract_send_headers(args: Tuple[Any], kwargs: Dict[str, Any]):
6868
return _extract_argument("headers", 5, None, args, kwargs)
6969

7070

71+
def _move_headers_to_kwargs(
72+
args: Tuple[Any], kwargs: Dict[str, Any]
73+
) -> Tuple[Tuple[Any], Dict[str, Any]]:
74+
"""Move headers from args to kwargs"""
75+
if len(args) > 5:
76+
kwargs["headers"] = args[5]
77+
return args[:5], kwargs
78+
79+
7180
async def _extract_send_partition(
7281
instance: aiokafka.AIOKafkaProducer,
7382
args: Tuple[Any],
@@ -260,6 +269,7 @@ async def _traced_send(
260269
args: Tuple[Any],
261270
kwargs: Dict[str, Any],
262271
) -> None:
272+
args, kwargs = _move_headers_to_kwargs(args, kwargs)
263273
headers = _extract_send_headers(args, kwargs)
264274
if headers is None:
265275
headers = []

instrumentation/opentelemetry-instrumentation-aiokafka/tests/test_instrumentation.py

+40
Original file line numberDiff line numberDiff line change
@@ -318,3 +318,43 @@ def _compare_spans(
318318
self.assertEqual(
319319
expected_span["attributes"], dict(span.attributes)
320320
)
321+
322+
async def test_send_and_wait(self) -> None:
323+
AIOKafkaInstrumentor().uninstrument()
324+
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)
325+
326+
producer = await self.producer_factory()
327+
add_message_mock: mock.AsyncMock = (
328+
producer._message_accumulator.add_message
329+
)
330+
add_message_mock.side_effect = [mock.AsyncMock()(), mock.AsyncMock()()]
331+
332+
tracer = self.tracer_provider.get_tracer(__name__)
333+
with tracer.start_as_current_span("test_span") as span:
334+
await producer.send_and_wait("topic_1", b"value_1")
335+
336+
add_message_mock.assert_awaited_with(
337+
TopicPartition(topic="topic_1", partition=1),
338+
None,
339+
b"value_1",
340+
40.0,
341+
timestamp_ms=None,
342+
headers=[("traceparent", mock.ANY)],
343+
)
344+
assert (
345+
add_message_mock.call_args_list[0]
346+
.kwargs["headers"][0][1]
347+
.startswith(
348+
f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode()
349+
)
350+
)
351+
352+
await producer.send_and_wait("topic_2", b"value_2")
353+
add_message_mock.assert_awaited_with(
354+
TopicPartition(topic="topic_2", partition=1),
355+
None,
356+
b"value_2",
357+
40.0,
358+
timestamp_ms=None,
359+
headers=[("traceparent", mock.ANY)],
360+
)

0 commit comments

Comments
 (0)