From ad3e34d33b4d265d9ad747aa5171536378802a11 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sat, 9 Nov 2024 17:46:05 -0800 Subject: [PATCH] HOTFIX: fix broken commit Fixes broken commit for KAFKA-17872 --- .../streams/processor/internals/StreamTaskTest.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 2650f70865277..875c7440b0f97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -167,7 +167,7 @@ public class StreamTaskTest { private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer); private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer); - private final MockSourceNode source3 = new MockSourceNode<>(intDeserializer, intDeserializer) { + private final MockSourceNode source3 = new MockSourceNode(intDeserializer, intDeserializer) { @Override public void process(final Record record) { throw new RuntimeException("KABOOM!"); @@ -178,7 +178,7 @@ public void close() { throw new RuntimeException("KABOOM!"); } }; - private final MockSourceNode timeoutSource = new MockSourceNode<>(intDeserializer, intDeserializer) { + private final MockSourceNode timeoutSource = new MockSourceNode(intDeserializer, intDeserializer) { @Override public void process(final Record record) { throw new TimeoutException("Kaboom!"); @@ -442,7 +442,7 @@ public void shouldAutoOffsetResetIfNoCommittedOffsetFound() { task.addPartitionsForOffsetReset(Collections.singleton(partition1)); final AtomicReference shouldNotSeek = new AtomicReference<>(); - try (final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST) { + try (final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override public void seek(final TopicPartition partition, final long offset) { final AssertionError error = shouldNotSeek.get(); @@ -2203,6 +2203,7 @@ public void shouldCheckpointOffsetsOnPostCommit() { when(stateManager.taskId()).thenReturn(taskId); when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); final long offset = 543L; + final long consumedOffset = 345L; when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, offset)); when(stateManager.changelogOffsets()) @@ -2600,7 +2601,6 @@ public void shouldUpdateOffsetIfAllRecordsHaveInvalidTimestamp() { task.resumePollingForPartitionsWithAvailableSpace(); consumer.poll(Duration.ZERO); task.addRecords(partition1, records); - task.updateNextOffsets(partition1, new OffsetAndMetadata(offset + 1, Optional.empty(), "")); task.updateLags(); assertTrue(task.process(offset)); @@ -2632,7 +2632,6 @@ public void shouldUpdateOffsetIfValidRecordFollowsInvalidTimestamp() { task.resumePollingForPartitionsWithAvailableSpace(); consumer.poll(Duration.ZERO); task.addRecords(partition1, records); - task.updateNextOffsets(partition1, new OffsetAndMetadata(offset + 1, Optional.empty(), "")); task.updateLags(); assertTrue(task.process(offset)); @@ -2661,8 +2660,6 @@ public void shouldUpdateOffsetIfInvalidTimestampeRecordFollowsValid() { task.resumePollingForPartitionsWithAvailableSpace(); consumer.poll(Duration.ZERO); task.addRecords(partition1, records); - task.updateNextOffsets(partition1, new OffsetAndMetadata(offset + 1, Optional.empty(), "")); - task.updateLags(); assertTrue(task.process(offset));