Skip to content

Commit 6b185ed

Browse files
igormqspring-builds
authored andcommitted
GH-3950: Propagate scope in async failures
Fixes: #3950 Fix trace context loss in async Kafka error handling Problem When async returns are enabled and a consumer failure occurs, the trace context from the original message is not propagated. This leads to each step of the retry/DLT flow starting a new trace instead of continuing the original one. Example (current behavior): • Producer → trace 1 • Consumer → trace 1, fails → message goes to retry topic • Retry listener → trace 2, fails → message goes to DLT topic • DLT listener → trace 3 This breaks end-to-end traceability, as each listener receives a new trace ID. Root cause The issue stems from the `handleAsyncFailure` method, which runs in a different thread but does not propagate the original Observation (trace) context associated with the failed record. Fix Ensure that the observation context is correctly propagated when handling async failures. This preserves the trace ID across retry and DLT flows. Signed-off-by: Igor Macedo Quintanilha <[email protected]> Co-authored-with: Artem Bilan <[email protected]> (cherry picked from commit 318cfb4)
1 parent 919ad3d commit 6b185ed

File tree

3 files changed

+155
-15
lines changed

3 files changed

+155
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1487,7 +1487,7 @@ protected void handleAsyncFailure() {
14871487
// We will give up on retrying with the remaining copied and failed Records.
14881488
for (FailedRecordTuple<K, V> copyFailedRecord : copyFailedRecords) {
14891489
try {
1490-
invokeErrorHandlerBySingleRecord(copyFailedRecord);
1490+
copyFailedRecord.observation.scoped(() -> invokeErrorHandlerBySingleRecord(copyFailedRecord));
14911491
}
14921492
catch (Exception e) {
14931493
this.logger.warn(() ->
@@ -3413,8 +3413,13 @@ private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(ConsumerRecords
34133413
.values();
34143414
}
34153415

3416+
private Observation getCurrentObservation() {
3417+
Observation currentObservation = this.observationRegistry.getCurrentObservation();
3418+
return currentObservation == null ? Observation.NOOP : currentObservation;
3419+
}
3420+
34163421
private void callbackForAsyncFailure(ConsumerRecord<K, V> cRecord, RuntimeException ex) {
3417-
this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex));
3422+
this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex, getCurrentObservation()));
34183423
}
34193424

34203425
@Override
@@ -4031,6 +4036,6 @@ private static class StopAfterFenceException extends KafkaException {
40314036

40324037
}
40334038

4034-
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { }
4039+
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex, Observation observation) { }
40354040

40364041
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ else if (!(result instanceof CompletableFuture<?>)) {
548548
}
549549

550550
completableFutureResult.whenComplete((r, t) -> {
551-
try {
551+
try (var scope = observation.openScope()) {
552552
if (t == null) {
553553
asyncSuccess(r, replyTopic, source, messageReturnType);
554554
if (isAsyncReplies()) {
@@ -734,13 +734,15 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
734734
"Async Fail", source.getPayload()), cause));
735735
}
736736
catch (Throwable ex) {
737-
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
738737
acknowledge(acknowledgment);
739738
if (canAsyncRetry(request, ex) && this.asyncRetryCallback != null) {
740739
@SuppressWarnings("unchecked")
741740
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
742741
this.asyncRetryCallback.accept(record, (RuntimeException) ex);
743742
}
743+
else {
744+
this.logger.error(ex, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
745+
}
744746
}
745747
}
746748

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 143 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,16 @@
3838
import io.micrometer.observation.Observation;
3939
import io.micrometer.observation.ObservationHandler;
4040
import io.micrometer.observation.ObservationRegistry;
41-
import io.micrometer.observation.tck.TestObservationRegistry;
4241
import io.micrometer.tracing.Span;
4342
import io.micrometer.tracing.TraceContext;
4443
import io.micrometer.tracing.Tracer;
4544
import io.micrometer.tracing.handler.DefaultTracingObservationHandler;
4645
import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler;
4746
import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler;
47+
import io.micrometer.tracing.handler.TracingAwareMeterObservationHandler;
4848
import io.micrometer.tracing.propagation.Propagator;
4949
import io.micrometer.tracing.test.simple.SimpleSpan;
50+
import io.micrometer.tracing.test.simple.SimpleTraceContext;
5051
import io.micrometer.tracing.test.simple.SimpleTracer;
5152
import org.apache.kafka.clients.admin.AdminClientConfig;
5253
import org.apache.kafka.clients.consumer.Consumer;
@@ -69,8 +70,10 @@
6970
import org.springframework.context.annotation.Configuration;
7071
import org.springframework.context.annotation.Primary;
7172
import org.springframework.kafka.KafkaException;
73+
import org.springframework.kafka.annotation.DltHandler;
7274
import org.springframework.kafka.annotation.EnableKafka;
7375
import org.springframework.kafka.annotation.KafkaListener;
76+
import org.springframework.kafka.annotation.RetryableTopic;
7477
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
7578
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
7679
import org.springframework.kafka.core.ConsumerFactory;
@@ -79,6 +82,7 @@
7982
import org.springframework.kafka.core.KafkaAdmin;
8083
import org.springframework.kafka.core.KafkaTemplate;
8184
import org.springframework.kafka.core.ProducerFactory;
85+
import org.springframework.kafka.listener.ContainerProperties;
8286
import org.springframework.kafka.listener.MessageListenerContainer;
8387
import org.springframework.kafka.listener.RecordInterceptor;
8488
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
@@ -89,6 +93,9 @@
8993
import org.springframework.kafka.test.context.EmbeddedKafka;
9094
import org.springframework.kafka.test.utils.KafkaTestUtils;
9195
import org.springframework.messaging.handler.annotation.SendTo;
96+
import org.springframework.retry.annotation.Backoff;
97+
import org.springframework.scheduling.TaskScheduler;
98+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
9299
import org.springframework.test.annotation.DirtiesContext;
93100
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
94101
import org.springframework.util.StringUtils;
@@ -112,7 +119,8 @@
112119
@EmbeddedKafka(topics = {ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
113120
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_TEST_4, ObservationTests.OBSERVATION_REPLY,
114121
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR,
115-
ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE}, partitions = 1)
122+
ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE, ObservationTests.OBSERVATION_ASYNC_FAILURE_TEST,
123+
ObservationTests.OBSERVATION_ASYNC_FAILURE_WITH_RETRY_TEST}, partitions = 1)
116124
@DirtiesContext
117125
public class ObservationTests {
118126

@@ -136,6 +144,55 @@ public class ObservationTests {
136144

137145
public final static String OBSERVATION_TRACEPARENT_DUPLICATE = "observation.traceparent.duplicate";
138146

147+
public final static String OBSERVATION_ASYNC_FAILURE_TEST = "observation.async.failure.test";
148+
149+
public final static String OBSERVATION_ASYNC_FAILURE_WITH_RETRY_TEST = "observation.async.failure.retry.test";
150+
151+
@Test
152+
void asyncRetryScopePropagation(@Autowired AsyncFailureListener asyncFailureListener,
153+
@Autowired KafkaTemplate<Integer, String> template,
154+
@Autowired SimpleTracer tracer,
155+
@Autowired ObservationRegistry observationRegistry) throws InterruptedException {
156+
157+
// Clear any previous spans
158+
tracer.getSpans().clear();
159+
160+
// Create an observation scope to ensure we have a proper trace context
161+
var testObservation = Observation.createNotStarted("test.message.send", observationRegistry);
162+
163+
// Send a message within the observation scope to ensure trace context is propagated
164+
testObservation.observe(() -> {
165+
try {
166+
template.send(OBSERVATION_ASYNC_FAILURE_TEST, "trigger-async-failure").get(5, TimeUnit.SECONDS);
167+
}
168+
catch (Exception e) {
169+
throw new RuntimeException("Failed to send message", e);
170+
}
171+
});
172+
173+
// Wait for the listener to process the message (initial + retry + DLT = 3 invocations)
174+
assertThat(asyncFailureListener.asyncFailureLatch.await(100000, TimeUnit.SECONDS)).isTrue();
175+
176+
// Verify that the captured spans from the listener contexts are all part of the same trace
177+
// This demonstrates that the tracing context propagates correctly through the retry mechanism
178+
Deque<SimpleSpan> spans = tracer.getSpans();
179+
assertThat(spans).hasSizeGreaterThanOrEqualTo(4); // template + listener + retry + DLT spans
180+
181+
// Verify that spans were captured for each phase and belong to the same trace
182+
assertThat(asyncFailureListener.capturedSpanInListener).isNotNull();
183+
assertThat(asyncFailureListener.capturedSpanInRetry).isNotNull();
184+
assertThat(asyncFailureListener.capturedSpanInDlt).isNotNull();
185+
186+
// All spans should have the same trace ID, demonstrating trace continuity
187+
var originalTraceId = asyncFailureListener.capturedSpanInListener.getTraceId();
188+
assertThat(originalTraceId).isNotBlank();
189+
assertThat(asyncFailureListener.capturedSpanInRetry.getTraceId()).isEqualTo(originalTraceId);
190+
assertThat(asyncFailureListener.capturedSpanInDlt.getTraceId()).isEqualTo(originalTraceId);
191+
192+
// Clear any previous spans
193+
tracer.getSpans().clear();
194+
}
195+
139196
@Test
140197
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
141198
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
@@ -628,6 +685,11 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
628685
if (container.getListenerId().equals("obs3")) {
629686
container.setKafkaAdmin(this.mockAdmin);
630687
}
688+
if (container.getListenerId().contains("asyncFailure")) {
689+
// Enable async acks to trigger async failure handling
690+
container.getContainerProperties().setAsyncAcks(true);
691+
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
692+
}
631693
if (container.getListenerId().equals("obs4")) {
632694
container.setRecordInterceptor(new RecordInterceptor<>() {
633695

@@ -662,17 +724,17 @@ MeterRegistry meterRegistry() {
662724

663725
@Bean
664726
ObservationRegistry observationRegistry(Tracer tracer, Propagator propagator, MeterRegistry meterRegistry) {
665-
TestObservationRegistry observationRegistry = TestObservationRegistry.create();
727+
var observationRegistry = ObservationRegistry.create();
666728
observationRegistry.observationConfig().observationHandler(
667729
// Composite will pick the first matching handler
668730
new ObservationHandler.FirstMatchingCompositeObservationHandler(
669-
// This is responsible for creating a child span on the sender side
670-
new PropagatingSenderTracingObservationHandler<>(tracer, propagator),
671731
// This is responsible for creating a span on the receiver side
672732
new PropagatingReceiverTracingObservationHandler<>(tracer, propagator),
733+
// This is responsible for creating a child span on the sender side
734+
new PropagatingSenderTracingObservationHandler<>(tracer, propagator),
673735
// This is responsible for creating a default span
674736
new DefaultTracingObservationHandler(tracer)))
675-
.observationHandler(new DefaultMeterObservationHandler(meterRegistry));
737+
.observationHandler(new TracingAwareMeterObservationHandler<>(new DefaultMeterObservationHandler(meterRegistry), tracer));
676738
return observationRegistry;
677739
}
678740

@@ -683,29 +745,41 @@ Propagator propagator(Tracer tracer) {
683745
// List of headers required for tracing propagation
684746
@Override
685747
public List<String> fields() {
686-
return Arrays.asList("foo", "bar");
748+
return Arrays.asList("traceId", "spanId", "foo", "bar");
687749
}
688750

689751
// This is called on the producer side when the message is being sent
690-
// Normally we would pass information from tracing context - for tests we don't need to
691752
@Override
692753
public <C> void inject(TraceContext context, C carrier, Setter<C> setter) {
693754
setter.set(carrier, "foo", "some foo value");
694755
setter.set(carrier, "bar", "some bar value");
695756

757+
setter.set(carrier, "traceId", context.traceId());
758+
setter.set(carrier, "spanId", context.spanId());
759+
696760
// Add a traceparent header to simulate W3C trace context
697761
setter.set(carrier, "traceparent", "traceparent-from-propagator");
698762
}
699763

700764
// This is called on the consumer side when the message is consumed
701-
// Normally we would use tools like Extractor from tracing but for tests we are just manually creating a span
702765
@Override
703766
public <C> Span.Builder extract(C carrier, Getter<C> getter) {
704767
String foo = getter.get(carrier, "foo");
705768
String bar = getter.get(carrier, "bar");
706-
return tracer.spanBuilder()
769+
770+
var traceId = getter.get(carrier, "traceId");
771+
var spanId = getter.get(carrier, "spanId");
772+
773+
Span.Builder spanBuilder = tracer.spanBuilder()
707774
.tag("foo", foo)
708775
.tag("bar", bar);
776+
777+
var traceContext = new SimpleTraceContext();
778+
traceContext.setTraceId(traceId);
779+
traceContext.setSpanId(spanId);
780+
spanBuilder = spanBuilder.setParent(traceContext);
781+
782+
return spanBuilder;
709783
}
710784
};
711785
}
@@ -720,6 +794,15 @@ ExceptionListener exceptionListener() {
720794
return new ExceptionListener();
721795
}
722796

797+
@Bean
798+
AsyncFailureListener asyncFailureListener(SimpleTracer tracer) {
799+
return new AsyncFailureListener(tracer);
800+
}
801+
802+
@Bean
803+
public TaskScheduler taskExecutor() {
804+
return new ThreadPoolTaskScheduler();
805+
}
723806
}
724807

725808
public static class Listener {
@@ -801,4 +884,54 @@ Mono<Void> receive1(ConsumerRecord<Object, Object> record) {
801884

802885
}
803886

887+
public static class AsyncFailureListener {
888+
889+
final CountDownLatch asyncFailureLatch = new CountDownLatch(3);
890+
891+
volatile @Nullable SimpleSpan capturedSpanInListener;
892+
893+
volatile @Nullable SimpleSpan capturedSpanInRetry;
894+
895+
volatile @Nullable SimpleSpan capturedSpanInDlt;
896+
897+
private final SimpleTracer tracer;
898+
899+
public AsyncFailureListener(SimpleTracer tracer) {
900+
this.tracer = tracer;
901+
}
902+
903+
@RetryableTopic(
904+
attempts = "2",
905+
backoff = @Backoff(delay = 1000)
906+
)
907+
@KafkaListener(id = "asyncFailure", topics = OBSERVATION_ASYNC_FAILURE_TEST)
908+
CompletableFuture<Void> handleAsync(ConsumerRecord<Integer, String> record) {
909+
910+
// Use topic name to distinguish between original and retry calls
911+
String topicName = record.topic();
912+
913+
if (topicName.equals(OBSERVATION_ASYNC_FAILURE_TEST)) {
914+
// This is the original call
915+
this.capturedSpanInListener = this.tracer.currentSpan();
916+
}
917+
else {
918+
// This is a retry call (topic name will be different for retry topics)
919+
this.capturedSpanInRetry = this.tracer.currentSpan();
920+
}
921+
922+
this.asyncFailureLatch.countDown();
923+
924+
// Return a failed CompletableFuture to trigger async failure handling
925+
return CompletableFuture.supplyAsync(() -> {
926+
throw new RuntimeException("Async failure for observation test");
927+
});
928+
}
929+
930+
@DltHandler
931+
void handleDlt(ConsumerRecord<Integer, String> record, Exception exception) {
932+
this.capturedSpanInDlt = this.tracer.currentSpan();
933+
this.asyncFailureLatch.countDown();
934+
}
935+
}
936+
804937
}

0 commit comments

Comments
 (0)