KAFKA-20462: Preserve original source-record headers in ErrorHandlerContext#22155
Open
1fanwang wants to merge 1 commit intoapache:trunkfrom
Open
KAFKA-20462: Preserve original source-record headers in ErrorHandlerContext#221551fanwang wants to merge 1 commit intoapache:trunkfrom
1fanwang wants to merge 1 commit intoapache:trunkfrom
Conversation
…ontext
A user-supplied Deserializer is allowed to mutate the live Headers
reference (e.g. a LargeMessageDeserializer that strips serde-internal
headers so they don't leak downstream). The same Headers reference
flows into ErrorHandlerContext used by both the
DeserializationExceptionHandler and the ProcessingExceptionHandler,
so when these handlers read context.headers() -- and when
ExceptionHandlerUtils#buildDeadLetterQueueRecord copies those headers
into a DLQ record -- they observe the post-mutation state, even
though the javadoc on ErrorHandlerContext#headers() promises "the
headers of the current source record".
Snapshot rawRecord.headers() in RecordQueue#updateHead before
invoking RecordDeserializer#deserialize, and propagate the snapshot
through two paths:
- As an explicit parameter to RecordDeserializer#deserialize so the
DeserializationExceptionHandler error path uses the original
source headers when the Deserializer threw.
- On a new transient sourceRawHeaders field on StampedRecord and
ProcessorRecordContext (mirroring the existing
sourceRawKey/sourceRawValue lifecycle: not serialized, cleared
by freeRawRecord), so that ProcessorNode -- when invoking the
ProcessingExceptionHandler after a downstream processor failed
on a successfully-deserialized record -- builds the
DefaultErrorHandlerContext from the snapshot rather than the
live (possibly mutated) Headers reference.
GlobalStateUpdateTask and GlobalStateManagerImpl get the same
treatment for the global-state restore path (deserialization-failure
and processing-failure respectively).
The downstream Record passed to processors still carries the live
(possibly mutated) Headers reference, so processors that legitimately
rely on a Deserializer mutating headers (the LargeMessageDeserializer
pattern) continue to see the mutated view -- only the error-handler
context is changed to honor its documented contract.
Two regression tests:
- RecordDeserializerTest covers the
DeserializationExceptionHandler path: a Deserializer that mutates
headers then throws.
- ProcessorNodeTest covers the ProcessingExceptionHandler path: a
ProcessorRecordContext where the live headers have already been
mutated but sourceRawHeaders preserves the originals, and a
processor throws.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
ErrorHandlerContext#headers()is documented to expose "the headers of the current source record". Both error paths in Streams violate that contract today: a user-suppliedDeserializeris allowed to mutate the liveHeadersreference (e.g. aLargeMessageDeserializerthat removes its serde-internal marker header so it doesn't leak downstream), and the same mutable reference flows into theDefaultErrorHandlerContextthat theDeserializationExceptionHandlerandProcessingExceptionHandlersee.ExceptionHandlerUtils#buildDeadLetterQueueRecordthen copies the post-mutation headers into the DLQ record, so a DLQ record produced from a mutating-Deserializer pipeline can no longer be deserialized by the sameDeserializeron a replay (its marker header is gone).This complements #21370 (which made
buildDeadLetterQueueRecordcopycontext.headers()into the DLQ record): the DLQ now copies something, but per KAFKA-20462 that something must be the original source-record headers, which is what this PR provides.Evidence
Deserializerthat callsheaders.remove("source-only")and then throws causesErrorHandlerContext#headers()in theDeserializationExceptionHandlerto return headers without"source-only". New testRecordDeserializerTest#shouldExposeOriginalSourceRecordHeadersToErrorHandlerWhenDeserializerMutatesHeadersis verified to fail before this fix and pass after.ProcessorRecordContextwhere the live headers were mutated to empty butsourceRawHeaderscarries the original"source-only"header. With the live headers fed through to the error context,ErrorHandlerContext#headers()returns the empty (mutated) view; withsourceRawHeadersfed through, it returns the original. New testProcessorNodeTest#shouldExposeSourceRawHeadersToProcessingExceptionHandlerWhenDeserializerMutatedHeaderscovers this.Fix
Single snapshot per record, taken in
RecordQueue#updateHead(and the analogous spots inGlobalStateUpdateTask/GlobalStateManagerImpl) before the user-suppliedDeserializerruns. The snapshot is propagated along two paths:RecordDeserializer#deserializeso that, when deserialization throws,RecordDeserializer#handleDeserializationFailurebuildsDefaultErrorHandlerContextwith the snapshot rather than the (possibly mutated) live reference. (This is the path my earlier commit on this branch addressed.)sourceRawHeadersfield onStampedRecordandProcessorRecordContext, mirroring the existingsourceRawKey/sourceRawValuelifecycle: not serialized, cleared byfreeRawRecord().ProcessorNodereads the snapshot from theProcessorRecordContext(cast through a small helper, so the publicRecordContextinterface is unchanged — no KIP) when constructing theDefaultErrorHandlerContextfor theProcessingExceptionHandler.The downstream
Recordpassed to processors still carries the live (possibly mutated)Headersreference, so theLargeMessageDeserializerpattern — Deserializers that legitimately mutate headers so processors don't see serde-internal data — continues to work unchanged. OnlyErrorHandlerContext#headers()is changed to honor its documented contract.Cost: one
RecordHeaderscopy (smallArrayList<Header>clone, typically 0–3 entries) per record on the deserialization path. Negligible relative to the existing per-record allocations inRecordQueue/RecordDeserializer.Compatibility
RecordContextorErrorHandlerContext.sourceRawHeaderslives only on the internalProcessorRecordContextand is read via a type-narrowing cast insideProcessorNode. No KIP needed.sourceRawHeadersis transient (likesourceRawKey/sourceRawValue), not serialized intoProcessorRecordContext#serialize()and not surfaced in changelog topics or state-store buffers.ErrorHandlerContext#headers()now returns the original source-record headers (as documented) rather than the post-mutation reference. Users whose handlers depended on the buggy post-mutation behavior will see the unmutated headers instead.Tests
RecordDeserializerTest#shouldExposeOriginalSourceRecordHeadersToErrorHandlerWhenDeserializerMutatesHeaders— DeserializationExceptionHandler path.ProcessorNodeTest#shouldExposeSourceRawHeadersToProcessingExceptionHandlerWhenDeserializerMutatedHeaders— ProcessingExceptionHandler path.RecordDeserializerTest,ProcessorNodeTest,StreamTaskTest,RecordQueueTest,GlobalStateTaskTest,GlobalStateManagerImplTest,ProcessingExceptionHandlerIntegrationTest,ExceptionHandlerUtilsTestall pass.:streams:checkstyleMain :streams:checkstyleTest :streams:spotbugsMainall green.JIRA: https://issues.apache.org/jira/browse/KAFKA-20462