Skip to content

Commit 587c3cf

Browse files
committed
KAFKA-17872: Update consumed offsets on records with invalid timestamp (apache#17710)
TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress. Reviewers: Bill Bejeck <[email protected]>
1 parent 014fdfe commit 587c3cf

File tree

3 files changed

+282
-119
lines changed

3 files changed

+282
-119
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ private void updateHead() {
228228
deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
229229
);
230230
droppedRecordsSensor.record();
231+
lastCorruptedRecord = raw;
231232
continue;
232233
}
233234
headRecord = new StampedRecord(deserialized, timestamp);

streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.kafka.common.serialization.Deserializer;
2828
import org.apache.kafka.common.serialization.IntegerDeserializer;
2929
import org.apache.kafka.common.serialization.IntegerSerializer;
30-
import org.apache.kafka.common.serialization.Serdes;
30+
import org.apache.kafka.common.serialization.LongSerializer;
3131
import org.apache.kafka.common.serialization.Serializer;
3232
import org.apache.kafka.common.utils.Bytes;
3333
import org.apache.kafka.common.utils.LogContext;
@@ -75,8 +75,7 @@ public class RecordQueueTest {
7575
private final StreamsMetricsImpl streamsMetrics =
7676
new StreamsMetricsImpl(metrics, "mock", StreamsConfig.METRICS_LATEST, new MockTime());
7777

78-
@SuppressWarnings("rawtypes")
79-
final InternalMockProcessorContext context = new InternalMockProcessorContext<>(
78+
final InternalMockProcessorContext<Integer, Integer> context = new InternalMockProcessorContext<>(
8079
StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
8180
new MockRecordCollector(),
8281
metrics
@@ -89,19 +88,28 @@ public class RecordQueueTest {
8988
timestampExtractor,
9089
new LogAndFailExceptionHandler(),
9190
context,
92-
new LogContext());
91+
new LogContext()
92+
);
9393
private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(
9494
new TopicPartition("topic", 1),
9595
mockSourceNodeWithMetrics,
9696
timestampExtractor,
9797
new LogAndContinueExceptionHandler(),
9898
context,
99-
new LogContext());
99+
new LogContext()
100+
);
101+
private final RecordQueue queueThatSkipsInvalidTimestamps = new RecordQueue(
102+
new TopicPartition("topic", 1),
103+
mockSourceNodeWithMetrics,
104+
new LogAndSkipOnInvalidTimestamp(),
105+
new LogAndFailExceptionHandler(),
106+
context,
107+
new LogContext()
108+
);
100109

101110
private final byte[] recordValue = intSerializer.serialize(null, 10);
102111
private final byte[] recordKey = intSerializer.serialize(null, 1);
103112

104-
@SuppressWarnings("unchecked")
105113
@BeforeEach
106114
public void before() {
107115
mockSourceNodeWithMetrics.init(context);
@@ -328,7 +336,7 @@ public void shouldSetTimestampAndRespectMaxTimestampPolicy() {
328336

329337
@Test
330338
public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
331-
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
339+
final byte[] key = new LongSerializer().serialize("foo", 1L);
332340
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
333341
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, key, recordValue,
334342
new RecordHeaders(), Optional.empty()));
@@ -342,7 +350,7 @@ public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
342350

343351
@Test
344352
public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
345-
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
353+
final byte[] value = new LongSerializer().serialize("foo", 1L);
346354
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
347355
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, recordKey, value,
348356
new RecordHeaders(), Optional.empty()));
@@ -356,7 +364,7 @@ public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
356364

357365
@Test
358366
public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
359-
final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
367+
final byte[] key = new LongSerializer().serialize("foo", 1L);
360368
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 1, 1, 0L,
361369
TimestampType.CREATE_TIME, 0, 0, key, recordValue,
362370
new RecordHeaders(), Optional.empty());
@@ -369,7 +377,7 @@ public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHan
369377

370378
@Test
371379
public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() {
372-
final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
380+
final byte[] value = new LongSerializer().serialize("foo", 1L);
373381
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("topic", 1, 1, 0L,
374382
TimestampType.CREATE_TIME, 0, 0, recordKey, value,
375383
new RecordHeaders(), Optional.empty());
@@ -392,7 +400,7 @@ public void shouldThrowOnNegativeTimestamp() {
392400
mockSourceNodeWithMetrics,
393401
new FailOnInvalidTimestamp(),
394402
new LogAndContinueExceptionHandler(),
395-
new InternalMockProcessorContext(),
403+
new InternalMockProcessorContext<>(),
396404
new LogContext());
397405

398406
final StreamsException exception = assertThrows(
@@ -409,20 +417,25 @@ public void shouldThrowOnNegativeTimestamp() {
409417

410418
@Test
411419
public void shouldDropOnNegativeTimestamp() {
412-
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
413-
new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0, 0, recordKey, recordValue,
414-
new RecordHeaders(), Optional.empty()));
420+
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
421+
"topic",
422+
1,
423+
1,
424+
-1L, // negative timestamp
425+
TimestampType.CREATE_TIME,
426+
0,
427+
0,
428+
recordKey,
429+
recordValue,
430+
new RecordHeaders(),
431+
Optional.empty()
432+
);
433+
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(record);
415434

416-
final RecordQueue queue = new RecordQueue(
417-
new TopicPartition("topic", 1),
418-
mockSourceNodeWithMetrics,
419-
new LogAndSkipOnInvalidTimestamp(),
420-
new LogAndContinueExceptionHandler(),
421-
new InternalMockProcessorContext(),
422-
new LogContext());
423-
queue.addRawRecords(records);
435+
queueThatSkipsInvalidTimestamps.addRawRecords(records);
424436

425-
assertEquals(0, queue.size());
437+
assertEquals(1, queueThatSkipsInvalidTimestamps.size());
438+
assertEquals(new CorruptedRecord(record), queueThatSkipsInvalidTimestamps.poll(0));
426439
}
427440

428441
@Test

0 commit comments

Comments
 (0)