Skip to content

Commit 0eae750

Browse files
committed
KAFKA-17635: Ensure only committed offsets are returned for purging (apache#17686)
Kafka Streams actively purges records from repartition topics. Prior to this PR, Kafka Streams would retrieve the offset from the consumedOffsets map, but here are a couple of edge cases where the consumedOffsets can get ahead of the commitedOffsets map. In these cases, this means Kafka Streams will potentially purge a repartition record before it's committed. Updated the current StreamTask test to cover this case Reviewers: Matthias Sax <[email protected]>
1 parent c21ad10 commit 0eae750

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,10 +1074,13 @@ private void initializeTaskTimeAndProcessorMetadata(final Map<TopicPartition, Of
10741074
@Override
10751075
public Map<TopicPartition, Long> purgeableOffsets() {
10761076
final Map<TopicPartition, Long> purgeableConsumedOffsets = new HashMap<>();
1077-
for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
1077+
for (final Map.Entry<TopicPartition, Long> entry : committedOffsets.entrySet()) {
10781078
final TopicPartition tp = entry.getKey();
10791079
if (topology.isRepartitionTopic(tp.topic())) {
1080-
purgeableConsumedOffsets.put(tp, entry.getValue() + 1);
1080+
// committedOffsets map is initialized at -1 so no purging until there's a committed offset
1081+
if (entry.getValue() > -1) {
1082+
purgeableConsumedOffsets.put(tp, entry.getValue());
1083+
}
10811084
}
10821085
}
10831086

streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@
7878
import org.junit.jupiter.api.BeforeEach;
7979
import org.junit.jupiter.api.Test;
8080
import org.junit.jupiter.api.extension.ExtendWith;
81+
import org.junit.jupiter.params.ParameterizedTest;
82+
import org.junit.jupiter.params.provider.ValueSource;
8183
import org.mockito.InOrder;
8284
import org.mockito.Mock;
8385
import org.mockito.junit.jupiter.MockitoExtension;
@@ -1863,8 +1865,9 @@ public void shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() {
18631865
verify(stateManager).close();
18641866
}
18651867

1866-
@Test
1867-
public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
1868+
@ParameterizedTest
1869+
@ValueSource(booleans = {true, false})
1870+
public void shouldMaybeReturnOffsetsForRepartitionTopicsForPurging(final boolean doCommit) {
18681871
when(stateManager.taskId()).thenReturn(taskId);
18691872
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
18701873
final TopicPartition repartition = new TopicPartition("repartition", 1);
@@ -1916,10 +1919,17 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
19161919
assertTrue(task.process(0L));
19171920

19181921
task.prepareCommit();
1922+
if (doCommit) {
1923+
task.updateCommittedOffsets(repartition, 10L);
1924+
}
19191925

19201926
final Map<TopicPartition, Long> map = task.purgeableOffsets();
19211927

1922-
assertThat(map, equalTo(singletonMap(repartition, 11L)));
1928+
if (doCommit) {
1929+
assertThat(map, equalTo(singletonMap(repartition, 10L)));
1930+
} else {
1931+
assertThat(map, equalTo(Collections.emptyMap()));
1932+
}
19231933
}
19241934

19251935
@Test

0 commit comments

Comments
 (0)