Skip to content

Commit 9ab0bc3

Browse files
committed
KAFKA-20462: Preserve original source-record headers in ErrorHandlerContext
A user-supplied Deserializer is allowed to mutate the live Headers reference (e.g. a LargeMessageDeserializer that strips serde-internal headers so they don't leak downstream). The same Headers reference flows into ErrorHandlerContext used by both the DeserializationExceptionHandler and the ProcessingExceptionHandler, so when these handlers read context.headers() -- and when ExceptionHandlerUtils#buildDeadLetterQueueRecord copies those headers into a DLQ record -- they observe the post-mutation state, even though the javadoc on ErrorHandlerContext#headers() promises "the headers of the current source record". Snapshot rawRecord.headers() in RecordQueue#updateHead before invoking RecordDeserializer#deserialize, and propagate the snapshot through two paths: - As an explicit parameter to RecordDeserializer#deserialize so the DeserializationExceptionHandler error path uses the original source headers when the Deserializer threw. - On a new transient sourceRawHeaders field on StampedRecord and ProcessorRecordContext (mirroring the existing sourceRawKey/sourceRawValue lifecycle: not serialized, cleared by freeRawRecord), so that ProcessorNode -- when invoking the ProcessingExceptionHandler after a downstream processor failed on a successfully-deserialized record -- builds the DefaultErrorHandlerContext from the snapshot rather than the live (possibly mutated) Headers reference. GlobalStateUpdateTask and GlobalStateManagerImpl get the same treatment for the global-state restore path (deserialization-failure and processing-failure respectively). The downstream Record passed to processors still carries the live (possibly mutated) Headers reference, so processors that legitimately rely on a Deserializer mutating headers (the LargeMessageDeserializer pattern) continue to see the mutated view -- only the error-handler context is changed to honor its documented contract. Two regression tests: - RecordDeserializerTest covers the DeserializationExceptionHandler path: a Deserializer that mutates headers then throws. - ProcessorNodeTest covers the ProcessingExceptionHandler path: a ProcessorRecordContext where the live headers have already been mutated but sourceRawHeaders preserves the originals, and a processor throws.
1 parent 75052bb commit 9ab0bc3

10 files changed

Lines changed: 293 additions & 26 deletions

File tree

streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.kafka.common.PartitionInfo;
2424
import org.apache.kafka.common.TopicPartition;
2525
import org.apache.kafka.common.errors.TimeoutException;
26+
import org.apache.kafka.common.header.Headers;
27+
import org.apache.kafka.common.header.internals.RecordHeaders;
2628
import org.apache.kafka.common.metrics.Sensor;
2729
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
2830
import org.apache.kafka.common.utils.LogContext;
@@ -369,6 +371,12 @@ private void reprocessState(final StateStoreMetadata storeMetadata) {
369371
if (record.key() != null) {
370372
// Deserialization phase
371373
final Record<?, ?> deserializedRecord;
374+
// Snapshot headers before invoking the user-supplied
375+
// Deserializers so the ErrorHandlerContext seen by the
376+
// DeserializationExceptionHandler reflects the original
377+
// source-record headers, even if a Deserializer mutates
378+
// the live Headers reference.
379+
final Headers sourceRecordHeaders = new RecordHeaders(record.headers());
372380
try {
373381
deserializedRecord = new Record<>(
374382
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.headers(), record.key()),
@@ -384,6 +392,7 @@ private void reprocessState(final StateStoreMetadata storeMetadata) {
384392
globalProcessorContext,
385393
deserializationException,
386394
record,
395+
sourceRecordHeaders,
387396
log,
388397
droppedRecordsSensor,
389398
null
@@ -406,7 +415,7 @@ private void reprocessState(final StateStoreMetadata storeMetadata) {
406415
record.topic(),
407416
record.partition(),
408417
record.offset(),
409-
record.headers(),
418+
sourceRecordHeaders,
410419
reprocessFactory.processorName(),
411420
globalProcessorContext.taskId(),
412421
record.timestamp(),

streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import org.apache.kafka.clients.consumer.ConsumerRecord;
2020
import org.apache.kafka.common.TopicPartition;
21+
import org.apache.kafka.common.header.Headers;
22+
import org.apache.kafka.common.header.internals.RecordHeaders;
2123
import org.apache.kafka.common.utils.LogContext;
2224
import org.apache.kafka.common.utils.Time;
2325
import org.apache.kafka.common.utils.Utils;
@@ -109,7 +111,13 @@ public Map<TopicPartition, Long> initialize() {
109111
@Override
110112
public void update(final ConsumerRecord<byte[], byte[]> record) {
111113
final RecordDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
112-
final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserialize(processorContext, record);
114+
// Snapshot headers before invoking the user-supplied Deserializer so
115+
// ErrorHandlerContext#headers() observed by the
116+
// DeserializationExceptionHandler / ProcessingExceptionHandler exposes
117+
// the original source-record headers, even if a Deserializer mutates
118+
// the live Headers reference.
119+
final Headers sourceRawHeaders = new RecordHeaders(record.headers());
120+
final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserialize(processorContext, record, sourceRawHeaders);
113121

114122
if (deserialized != null) {
115123
final ProcessorRecordContext recordContext =
@@ -118,7 +126,10 @@ public void update(final ConsumerRecord<byte[], byte[]> record) {
118126
deserialized.offset(),
119127
deserialized.partition(),
120128
deserialized.topic(),
121-
deserialized.headers());
129+
deserialized.headers(),
130+
null,
131+
null,
132+
sourceRawHeaders);
122133
processorContext.setRecordContext(recordContext);
123134
processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
124135
final Record<Object, Object> toProcess = new Record<>(

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package org.apache.kafka.streams.processor.internals;
1818

1919
import org.apache.kafka.clients.producer.ProducerRecord;
20+
import org.apache.kafka.common.header.Headers;
2021
import org.apache.kafka.common.metrics.Sensor;
22+
import org.apache.kafka.streams.processor.RecordContext;
2123
import org.apache.kafka.streams.errors.ErrorHandlerContext;
2224
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
2325
import org.apache.kafka.streams.errors.StreamsException;
@@ -215,12 +217,21 @@ public void process(final Record<KIn, VIn> record) {
215217
throw processingException;
216218
}
217219

220+
// ErrorHandlerContext#headers() is documented to expose the source
221+
// record's headers. When the live recordContext is a
222+
// ProcessorRecordContext that captured a sourceRawHeaders snapshot
223+
// (i.e. the record came from a deserialized ConsumerRecord), use
224+
// that snapshot so the handler -- and any DLQ record built from it
225+
// -- sees the original headers even if a Deserializer mutated the
226+
// live Headers reference. Fall back to the live headers when no
227+
// snapshot is available (e.g. internal/synthetic record contexts).
228+
final Headers sourceRecordHeaders = sourceRawHeadersOrLive(internalProcessorContext.recordContext());
218229
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
219230
null, // only required to pass for DeserializationExceptionHandler
220231
internalProcessorContext.recordContext().topic(),
221232
internalProcessorContext.recordContext().partition(),
222233
internalProcessorContext.recordContext().offset(),
223-
internalProcessorContext.recordContext().headers(),
234+
sourceRecordHeaders,
224235
internalProcessorContext.currentNode().name(),
225236
internalProcessorContext.taskId(),
226237
internalProcessorContext.recordContext().timestamp(),
@@ -310,4 +321,14 @@ public String toString(final String indent) {
310321
}
311322
return sb.toString();
312323
}
324+
325+
private static Headers sourceRawHeadersOrLive(final RecordContext recordContext) {
326+
if (recordContext instanceof ProcessorRecordContext) {
327+
final Headers snapshot = ((ProcessorRecordContext) recordContext).sourceRawHeaders();
328+
if (snapshot != null) {
329+
return snapshot;
330+
}
331+
}
332+
return recordContext.headers();
333+
}
313334
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,21 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
4040
private final Headers headers;
4141
private byte[] sourceRawKey;
4242
private byte[] sourceRawValue;
43+
// Snapshot of the source record's headers taken before any user-supplied
44+
// Deserializer ran. Like {@link #sourceRawKey}/{@link #sourceRawValue},
45+
// this is transient (not serialized) and is cleared by
46+
// {@link #freeRawRecord()}. Used by the error-handler construction sites
47+
// in StreamTask and ProcessorNode so that ErrorHandlerContext#headers()
48+
// can return the original source-record headers even when a Deserializer
49+
// mutated the live Headers instance.
50+
private Headers sourceRawHeaders;
4351

4452
public ProcessorRecordContext(final long timestamp,
4553
final long offset,
4654
final int partition,
4755
final String topic,
4856
final Headers headers) {
49-
this.timestamp = timestamp;
50-
this.offset = offset;
51-
this.topic = topic;
52-
this.partition = partition;
53-
this.headers = Objects.requireNonNull(headers);
54-
this.sourceRawKey = null;
55-
this.sourceRawValue = null;
57+
this(timestamp, offset, partition, topic, headers, null, null, null);
5658
}
5759

5860
public ProcessorRecordContext(final long timestamp,
@@ -62,13 +64,25 @@ public ProcessorRecordContext(final long timestamp,
6264
final Headers headers,
6365
final byte[] sourceRawKey,
6466
final byte[] sourceRawValue) {
67+
this(timestamp, offset, partition, topic, headers, sourceRawKey, sourceRawValue, null);
68+
}
69+
70+
public ProcessorRecordContext(final long timestamp,
71+
final long offset,
72+
final int partition,
73+
final String topic,
74+
final Headers headers,
75+
final byte[] sourceRawKey,
76+
final byte[] sourceRawValue,
77+
final Headers sourceRawHeaders) {
6578
this.timestamp = timestamp;
6679
this.offset = offset;
6780
this.topic = topic;
6881
this.partition = partition;
6982
this.headers = Objects.requireNonNull(headers);
7083
this.sourceRawKey = sourceRawKey;
7184
this.sourceRawValue = sourceRawValue;
85+
this.sourceRawHeaders = sourceRawHeaders;
7286
}
7387

7488
@Override
@@ -106,6 +120,18 @@ public byte[] sourceRawValue() {
106120
return sourceRawValue;
107121
}
108122

123+
/**
124+
* Returns the snapshot of the source record's headers taken before any
125+
* user-supplied {@code Deserializer} ran, or {@code null} if no snapshot
126+
* was captured (e.g., for {@code ProcessorRecordContext} instances that
127+
* were not constructed from a live consumer record). This is consumed by
128+
* Streams error-handler construction sites and is not part of the public
129+
* {@link RecordContext} interface.
130+
*/
131+
public Headers sourceRawHeaders() {
132+
return sourceRawHeaders;
133+
}
134+
109135
public long residentMemorySizeEstimate() {
110136
long size = 0;
111137
size += Long.BYTES; // value.context.timestamp
@@ -210,6 +236,7 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
210236
public void freeRawRecord() {
211237
this.sourceRawKey = null;
212238
this.sourceRawValue = null;
239+
this.sourceRawHeaders = null;
213240
}
214241

215242
@Override
@@ -227,7 +254,8 @@ public boolean equals(final Object o) {
227254
Objects.equals(topic, that.topic) &&
228255
Objects.equals(headers, that.headers) &&
229256
Arrays.equals(sourceRawKey, that.sourceRawKey) &&
230-
Arrays.equals(sourceRawValue, that.sourceRawValue);
257+
Arrays.equals(sourceRawValue, that.sourceRawValue) &&
258+
Objects.equals(sourceRawHeaders, that.sourceRawHeaders);
231259
}
232260

233261
/**

streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.consumer.ConsumerRecord;
2020
import org.apache.kafka.clients.producer.ProducerRecord;
21+
import org.apache.kafka.common.header.Headers;
2122
import org.apache.kafka.common.metrics.Sensor;
2223
import org.apache.kafka.common.record.TimestampType;
2324
import org.apache.kafka.common.utils.LogContext;
@@ -55,8 +56,16 @@ public class RecordDeserializer {
5556
* or throws an exception itself
5657
*/
5758
ConsumerRecord<Object, Object> deserialize(final ProcessorContext<?, ?> processorContext,
58-
final ConsumerRecord<byte[], byte[]> rawRecord) {
59+
final ConsumerRecord<byte[], byte[]> rawRecord,
60+
final Headers sourceRecordHeaders) {
5961

62+
// sourceRecordHeaders is a snapshot of rawRecord.headers() captured by
63+
// the caller BEFORE this method runs, because user-supplied
64+
// Deserializers are allowed to mutate the live Headers reference (e.g.
65+
// a LargeMessageDeserializer that strips serde-internal headers) and
66+
// we still need ErrorHandlerContext#headers() to expose the original
67+
// source-record headers per its javadoc, so that DLQ records can be
68+
// reconstructed faithfully.
6069
try {
6170
return new ConsumerRecord<>(
6271
rawRecord.topic(),
@@ -75,7 +84,7 @@ ConsumerRecord<Object, Object> deserialize(final ProcessorContext<?, ?> processo
7584
// while Java distinguishes checked vs unchecked exceptions, other languages
7685
// like Scala or Kotlin do not, and thus we need to catch `Exception`
7786
// (instead of `RuntimeException`) to work well with those languages
78-
handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, log, droppedRecordsSensor, sourceNode().name());
87+
handleDeserializationFailure(deserializationExceptionHandler, processorContext, deserializationException, rawRecord, sourceRecordHeaders, log, droppedRecordsSensor, sourceNode().name());
7988
return null; // 'handleDeserializationFailure' would either throw or swallow -- if we swallow we need to skip the record by returning 'null'
8089
}
8190
}
@@ -84,6 +93,7 @@ public static void handleDeserializationFailure(final DeserializationExceptionHa
8493
final ProcessorContext<?, ?> processorContext,
8594
final Exception deserializationException,
8695
final ConsumerRecord<byte[], byte[]> rawRecord,
96+
final Headers sourceRecordHeaders,
8797
final Logger log,
8898
final Sensor droppedRecordsSensor,
8999
final String sourceNodeName) {
@@ -93,7 +103,7 @@ public static void handleDeserializationFailure(final DeserializationExceptionHa
93103
rawRecord.topic(),
94104
rawRecord.partition(),
95105
rawRecord.offset(),
96-
rawRecord.headers(),
106+
sourceRecordHeaders,
97107
sourceNodeName,
98108
processorContext.taskId(),
99109
rawRecord.timestamp(),

streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import org.apache.kafka.clients.consumer.ConsumerRecord;
2020
import org.apache.kafka.common.TopicPartition;
21+
import org.apache.kafka.common.header.Headers;
22+
import org.apache.kafka.common.header.internals.RecordHeaders;
2123
import org.apache.kafka.common.metrics.Sensor;
2224
import org.apache.kafka.common.utils.LogContext;
2325
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@@ -212,8 +214,16 @@ private void updateHead() {
212214

213215
while (headRecord == null && !fifoQueue.isEmpty()) {
214216
final ConsumerRecord<byte[], byte[]> raw = fifoQueue.pollFirst();
217+
// Snapshot raw.headers() before invoking the user-supplied
218+
// Deserializer in deserialize(...). The same snapshot is then
219+
// attached to the StampedRecord so it can be carried through to
220+
// the ProcessingExceptionHandler error path -- a Deserializer is
221+
// free to mutate the live Headers reference, but
222+
// ErrorHandlerContext#headers() must continue to expose the
223+
// original source-record headers per its javadoc.
224+
final Headers sourceRawHeaders = new RecordHeaders(raw.headers());
215225
final ConsumerRecord<Object, Object> deserialized =
216-
recordDeserializer.deserialize(processorContext, raw);
226+
recordDeserializer.deserialize(processorContext, raw, sourceRawHeaders);
217227

218228
if (deserialized == null) {
219229
// this only happens if the deserializer decides to skip. It has already logged the reason.
@@ -243,7 +253,7 @@ private void updateHead() {
243253
lastCorruptedRecord = raw;
244254
continue;
245255
}
246-
headRecord = new StampedRecord(deserialized, timestamp, raw.key(), raw.value());
256+
headRecord = new StampedRecord(deserialized, timestamp, raw.key(), raw.value(), sourceRawHeaders);
247257
headRecordSizeInBytes = consumerRecordSizeInBytes(raw);
248258
}
249259

streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,32 @@ public class StampedRecord extends Stamped<ConsumerRecord<?, ?>> {
2525

2626
private final byte[] rawKey;
2727
private final byte[] rawValue;
28+
// Snapshot of the source record's headers taken before any Deserializer
29+
// ran, propagated downstream so that ErrorHandlerContext#headers() can
30+
// expose the original source-record headers even after a Deserializer
31+
// mutated the live Headers instance.
32+
private final Headers rawHeaders;
2833

2934
public StampedRecord(final ConsumerRecord<?, ?> record, final long timestamp) {
30-
super(record, timestamp);
31-
this.rawKey = null;
32-
this.rawValue = null;
35+
this(record, timestamp, null, null, null);
3336
}
3437

3538
public StampedRecord(final ConsumerRecord<?, ?> record,
3639
final long timestamp,
3740
final byte[] rawKey,
3841
final byte[] rawValue) {
42+
this(record, timestamp, rawKey, rawValue, null);
43+
}
44+
45+
public StampedRecord(final ConsumerRecord<?, ?> record,
46+
final long timestamp,
47+
final byte[] rawKey,
48+
final byte[] rawValue,
49+
final Headers rawHeaders) {
3950
super(record, timestamp);
4051
this.rawKey = rawKey;
4152
this.rawValue = rawValue;
53+
this.rawHeaders = rawHeaders;
4254
}
4355

4456
public String topic() {
@@ -77,6 +89,10 @@ public byte[] rawValue() {
7789
return rawValue;
7890
}
7991

92+
public Headers rawHeaders() {
93+
return rawHeaders;
94+
}
95+
8096
@Override
8197
public String toString() {
8298
return value.toString() + ", timestamp = " + timestamp;

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -874,7 +874,8 @@ private void doProcess(final long wallClockTime) {
874874
record.topic(),
875875
record.headers(),
876876
record.rawKey(),
877-
record.rawValue()
877+
record.rawValue(),
878+
record.rawHeaders()
878879
);
879880
updateProcessorContext(currNode, wallClockTime, recordContext);
880881

0 commit comments

Comments
 (0)