Skip to content

Commit

Permalink
(fixup) make commit whenever update even though the HW doesn't progress
Browse files Browse the repository at this point in the history
  • Loading branch information
kawamuray committed Dec 25, 2024
1 parent f5a5a89 commit 4a1a249
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 35 deletions.
1 change: 1 addition & 0 deletions processor/src/it/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
</root>
<logger name="com.linecorp.decaton" level="DEBUG" />
<logger name="com.linecorp.decaton.benchmark" level="INFO" />
<logger name="com.linecorp.decaton.processor.runtime.internal.OutOfOrderCommitControl" level="TRACE" />

<logger name="org.apache.kafka" level="ERROR" />
<logger name="kafka" level="ERROR" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
* it.
* - They are passed into threads pool consists of {@link #NUM_WORKER_THREADS}, and then completed immediately.
* - Consumer loop thread calls {@link OutOfOrderCommitControl#updateHighWatermark()} after feeding
* {@link #BATCH_SIZE} and calls {@link OutOfOrderCommitControl#commitReadyOffset()}.
* - Loop the above steps until {@link OutOfOrderCommitControl#commitReadyOffset()} returns the value equals to
* {@link #BATCH_SIZE} and calls {@link OutOfOrderCommitControl#commitReadyOffset(long)}.
* - Loop the above steps until {@link OutOfOrderCommitControl#commitReadyOffset(long)} returns the value equals to
* {@link #NUM_OFFSETS}.
* - Measure entire execution duration as a performance indicator.
*/
Expand Down Expand Up @@ -244,7 +244,7 @@ public void outOfOrderCommitControlV4(BmStateV4 state) throws InterruptedExcepti
}

control.updateHighWatermark();
while (control.commitReadyOffset() < NUM_OFFSETS) {
while (control.commitReadyOffset(0).offset() < NUM_OFFSETS + 1) {
Thread.yield();
control.updateHighWatermark();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import com.linecorp.decaton.protocol.Decaton.OffsetStorageComplexProto;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -35,7 +34,6 @@
*/
@Slf4j
@Accessors(fluent = true)
@AllArgsConstructor
public class OutOfOrderCommitControl implements AutoCloseable {
@Getter
private final TopicPartition topicPartition;
Expand All @@ -47,14 +45,21 @@ public class OutOfOrderCommitControl implements AutoCloseable {
* The current maximum offset which it and all it's previous offsets were committed.
*/
private volatile long highWatermark;
private volatile boolean anyOffsetsUpdated;

public OutOfOrderCommitControl(TopicPartition topicPartition, int capacity,
OffsetStateReaper offsetStateReaper) {
OffsetStateReaper offsetStateReaper,
OffsetStorageComplex complex, long highWatermark) {
this.topicPartition = topicPartition;
this.capacity = capacity;
this.offsetStateReaper = offsetStateReaper;
complex = new OffsetStorageComplex(capacity);
highWatermark = -1;
this.complex = complex;
this.highWatermark = highWatermark;
}

public OutOfOrderCommitControl(TopicPartition topicPartition, int capacity,
OffsetStateReaper offsetStateReaper) {
this(topicPartition, capacity, offsetStateReaper, new OffsetStorageComplex(capacity), -1);
}

public static OutOfOrderCommitControl fromOffsetMeta(TopicPartition tp,
Expand Down Expand Up @@ -103,6 +108,7 @@ void onComplete(long offset, int ringIndex) {
log.debug("Offset complete on {}: {}", topicPartition, offset);
}
complex.complete(ringIndex);
anyOffsetsUpdated = true;
}

public synchronized void updateHighWatermark() {
Expand Down Expand Up @@ -139,18 +145,21 @@ public synchronized int pendingOffsetsCount() {
return complex.size();
}

public OffsetAndMetadata commitReadyOffset() {
if (highWatermark < 0) {
public OffsetAndMetadata commitReadyOffset(long lastCommittedOffset) {
if (highWatermark < lastCommittedOffset && !anyOffsetsUpdated) {
return null;
}
OffsetStorageComplexProto complexProto = complex.toProto();
final OffsetAndMetadata offsetMeta;
try {
String meta = JsonFormat.printer().omittingInsignificantWhitespace().print(complexProto);
long commitOffset = highWatermark + 1;
return new OffsetAndMetadata(commitOffset, meta);
offsetMeta = new OffsetAndMetadata(commitOffset, meta);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("failed to serialize offset metadata into proto", e);
}
anyOffsetsUpdated = false;
return offsetMeta;
}

static OffsetStorageComplex complexFromMeta(String metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private static SubPartitions createSubPartitions(PartitionScope scope, Processor
metrics = metricsCtor.new PartitionStateMetrics(
commitControl::pendingOffsetsCount, () -> paused() ? 1 : 0,
() -> lastCommittedOffset, () -> latestConsumedOffset);
lastCommittedOffset = -1;
lastCommittedOffset = 0;
pausedTimeNanos = -1;
lastQueueStarvedTime = -1;
}
Expand All @@ -133,11 +133,7 @@ private static SubPartitions createSubPartitions(PartitionScope scope, Processor
* @return optional long value representing an offset waiting for commit.
*/
public Optional<OffsetAndMetadata> offsetWaitingCommit() {
OffsetAndMetadata offsetMeta = commitControl.commitReadyOffset();
if (offsetMeta != null && offsetMeta.offset() > lastCommittedOffset) {
return Optional.of(offsetMeta);
}
return Optional.empty();
return Optional.ofNullable(commitControl.commitReadyOffset(lastCommittedOffset));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ public void testInOrderOffsetCompletion() {
state1.completion().complete();
commitControl.updateHighWatermark();
assertEquals(2, commitControl.pendingOffsetsCount());
assertEquals(2, commitControl.commitReadyOffset().offset());
assertEquals(2, commitControl.commitReadyOffset(0).offset());

state2.completion().complete();
commitControl.updateHighWatermark();
assertEquals(1, commitControl.pendingOffsetsCount());
assertEquals(3, commitControl.commitReadyOffset().offset());
assertEquals(3, commitControl.commitReadyOffset(0).offset());

state3.completion().complete();
commitControl.updateHighWatermark();
assertEquals(0, commitControl.pendingOffsetsCount());
assertEquals(4, commitControl.commitReadyOffset().offset());
assertEquals(4, commitControl.commitReadyOffset(0).offset());
}

@Test
Expand All @@ -74,33 +74,33 @@ public void testOutOfOrderOffsetCompletion() {
state3.completion().complete();
commitControl.updateHighWatermark();
assertEquals(4, commitControl.pendingOffsetsCount());
assertNull(commitControl.commitReadyOffset());
assertEquals(0, commitControl.commitReadyOffset(0).offset());

state2.completion().complete();
commitControl.updateHighWatermark();
assertEquals(4, commitControl.pendingOffsetsCount());
assertNull(commitControl.commitReadyOffset());
assertEquals(0, commitControl.commitReadyOffset(0).offset());

state1.completion().complete();
commitControl.updateHighWatermark();
assertEquals(1, commitControl.pendingOffsetsCount());
assertEquals(4, commitControl.commitReadyOffset().offset());
assertEquals(4, commitControl.commitReadyOffset(0).offset());

state4.completion().complete();
commitControl.updateHighWatermark();
assertEquals(0, commitControl.pendingOffsetsCount());
assertEquals(5, commitControl.commitReadyOffset().offset());
assertEquals(5, commitControl.commitReadyOffset(0).offset());
}

@Test
public void testDoubleCompletingSameOffset() {
OffsetState state1 = commitControl.reportFetchedOffset(1);

state1.completion().complete();
assertNull(commitControl.commitReadyOffset());
assertEquals(0, commitControl.commitReadyOffset(0).offset());
state1.completion().complete(); // nothing happens
commitControl.updateHighWatermark();
assertEquals(2, commitControl.commitReadyOffset().offset());
assertEquals(2, commitControl.commitReadyOffset(0).offset());
state1.completion().complete(); // nothing happens
}

Expand All @@ -120,10 +120,10 @@ public void testDoubleCompletingSameOffsetCaseDuplicateInCommitted() {

state2.completion().complete(); // now committedOffsets contains 2
commitControl.updateHighWatermark();
assertNull(commitControl.commitReadyOffset());
assertEquals(0, commitControl.commitReadyOffset(0).offset());
state2.completion().complete(); // commit again
commitControl.updateHighWatermark();
assertNull(commitControl.commitReadyOffset());
assertNull(commitControl.commitReadyOffset(0));
}

@Test
Expand All @@ -137,7 +137,7 @@ public void testPendingRecordsCountWithGaps() {
state1.completion().complete();
state3.completion().complete();
commitControl.updateHighWatermark();
assertEquals(4, commitControl.commitReadyOffset().offset());
assertEquals(4, commitControl.commitReadyOffset(0).offset());
assertEquals(0, commitControl.pendingOffsetsCount());
}

Expand All @@ -153,7 +153,7 @@ public void testPendingRecordsCountWithLargeGap() {
state1.completion().complete();
stateLarge.completion().complete();
commitControl.updateHighWatermark();
assertEquals(largeGapOffset + 1, commitControl.commitReadyOffset().offset());
assertEquals(largeGapOffset + 1, commitControl.commitReadyOffset(0).offset());
assertEquals(0, commitControl.pendingOffsetsCount());
}

Expand All @@ -178,16 +178,16 @@ public void testTimeoutOffsetReaping() throws InterruptedException {
// 1 is blocking watermark to progress
state2.completion().complete();
ooocc.updateHighWatermark();
assertNull(ooocc.commitReadyOffset());
assertEquals(0, ooocc.commitReadyOffset(0).offset());

doReturn(20L).when(clock).millis();
// offset reaping performed but does not proceed watermark yet
ooocc.updateHighWatermark();
state1.completion().asFuture().toCompletableFuture().join();
assertNull(ooocc.commitReadyOffset());
assertEquals(0, ooocc.commitReadyOffset(0).offset());
// offset should progress as offset 1 has reaped in previous call
ooocc.updateHighWatermark();
assertEquals(3, ooocc.commitReadyOffset().offset());
assertEquals(3, ooocc.commitReadyOffset(0).offset());
}

@Test
Expand All @@ -200,7 +200,7 @@ void perOffsetCompleteTest() {
state3.completion().complete();

commitControl.updateHighWatermark();
OffsetAndMetadata om = commitControl.commitReadyOffset();
OffsetAndMetadata om = commitControl.commitReadyOffset(0);

assertEquals(om.offset(), 102L);
OffsetStorageComplex complex = OutOfOrderCommitControl.complexFromMeta(om.metadata());
Expand All @@ -210,7 +210,7 @@ void perOffsetCompleteTest() {

state2.completion().complete();
commitControl.updateHighWatermark();
om = commitControl.commitReadyOffset();
om = commitControl.commitReadyOffset(0);

assertEquals(om.offset(), 104L);
complex = OutOfOrderCommitControl.complexFromMeta(om.metadata());
Expand Down

0 comments on commit 4a1a249

Please sign in to comment.