Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18522: Slice records for share fetch #18804

Merged
merged 18 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
b5fdf29
KAFKA-18522: Slice records for share fetch
apoorvmittal10 Feb 4, 2025
cfbfb65
Added co-pilot suggestion
apoorvmittal10 Feb 4, 2025
9c25379
Merge remote-tracking branch 'upstream/trunk' into KAFKA-18522
apoorvmittal10 Feb 5, 2025
55881c5
Adding changes as per upstream change
apoorvmittal10 Feb 5, 2025
847d886
Correcting code comments
apoorvmittal10 Feb 6, 2025
fcf2c47
Removing subset acquired changes
apoorvmittal10 Feb 7, 2025
be63e8d
Correcting variable name
apoorvmittal10 Feb 7, 2025
87936be
Simplifying solution
apoorvmittal10 Feb 10, 2025
a8988a1
Merge remote-tracking branch 'upstream/trunk' into KAFKA-18522
apoorvmittal10 Feb 10, 2025
1cc3e16
Merge remote-tracking branch 'upstream/trunk' into KAFKA-18522
apoorvmittal10 Feb 11, 2025
f8f117f
Merge remote-tracking branch 'upstream/trunk' into KAFKA-18522
apoorvmittal10 Feb 13, 2025
0a1e850
Merge remote-tracking branch 'upstream/trunk' into KAFKA-18522
apoorvmittal10 Feb 14, 2025
e417521
Merge remote-tracking branch 'upstream/trunk' into KAFKA-18522
apoorvmittal10 Feb 14, 2025
0cfb9bf
Merge remote-tracking branch 'upstream/trunk' into KAFKA-18522
apoorvmittal10 Feb 20, 2025
b6c2150
Addressing review comments
apoorvmittal10 Feb 21, 2025
71fb04c
Merge remote-tracking branch 'upstream/trunk' into KAFKA-18522
apoorvmittal10 Feb 21, 2025
fe7a47a
Merge remote-tracking branch 'upstream/trunk' into KAFKA-18522
apoorvmittal10 Feb 22, 2025
d2267c2
Merge remote-tracking branch 'upstream/trunk' into KAFKA-18522
apoorvmittal10 Feb 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 69 additions & 7 deletions core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.OffsetNotAvailableException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
Expand All @@ -39,6 +42,7 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -122,13 +126,7 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR
.setAcquiredRecords(Collections.emptyList());
} else {
partitionData
// We set the records to the fetchPartitionData records. We do not alter the records
// fetched from the replica manager as they follow zero copy buffer. The acquired records
// might be a subset of the records fetched from the replica manager, depending
// on the max fetch records or available records in the share partition. The client
// sends the max bytes in request which should limit the bytes sent to the client
// in the response.
.setRecords(fetchPartitionData.records)
.setRecords(maybeSliceFetchRecords(fetchPartitionData.records, shareAcquiredRecords))
.setAcquiredRecords(shareAcquiredRecords.acquiredRecords());
acquiredRecordsCount += shareAcquiredRecords.count();
}
Expand Down Expand Up @@ -196,4 +194,68 @@ static Partition partition(ReplicaManager replicaManager, TopicPartition tp) {
}
return partition;
}

/**
* Slice the fetch records based on the acquired records. The slicing is done based on the first
* and last offset of the acquired records from the list. The slicing doesn't consider individual
* acquired batches rather the boundaries of the acquired list. The method expects the acquired
* records list to be within the fetch records bounds.
*
* @param records The records to be sliced.
* @param shareAcquiredRecords The share acquired records containing the non-empty acquired records.
* @return The sliced records, if the records are of type FileRecords and the acquired records are a subset
* of the fetched records. Otherwise, the original records are returned.
*/
static Records maybeSliceFetchRecords(Records records, ShareAcquiredRecords shareAcquiredRecords) {
if (!(records instanceof FileRecords fileRecords)) {
return records;
}
// The acquired records should be non-empty, do not check as the method is called only when the
// acquired records are non-empty.
List<AcquiredRecords> acquiredRecords = shareAcquiredRecords.acquiredRecords();
try {
final Iterator<FileChannelRecordBatch> iterator = fileRecords.batchIterator();
// Track the first overlapping batch with the first acquired offset.
FileChannelRecordBatch firstOverlapBatch = iterator.next();
// If there exists single fetch batch, then return the original records.
if (!iterator.hasNext()) {
return records;
}
// Find the first and last acquired offset to slice the records.
final long firstAcquiredOffset = acquiredRecords.get(0).firstOffset();
final long lastAcquiredOffset = acquiredRecords.get(acquiredRecords.size() - 1).lastOffset();
int startPosition = 0;
int size = 0;
// Start iterating from the second batch.
while (iterator.hasNext()) {
FileChannelRecordBatch batch = iterator.next();
// Iterate until finds the first overlap batch with the first acquired offset. All the
// batches before this first overlap batch should be sliced hence increment the start
// position.
if (batch.baseOffset() <= firstAcquiredOffset) {
startPosition += firstOverlapBatch.sizeInBytes();
firstOverlapBatch = batch;
continue;
}
// Break if traversed all the batches till the last acquired offset.
if (batch.baseOffset() > lastAcquiredOffset) {
break;
}
size += batch.sizeInBytes();
}
// Include the first overlap batch as it's the last batch traversed which overlapped the first
// acquired offset.
size += firstOverlapBatch.sizeInBytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing this here, it's probably more intuitive to set the initial size when firstOverlapBatch is initialized and is reset later on.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that earlier and it made code messy as then we need to check as well that decrementing size should not make it negative and reset back to 0. So I thought it's much better to just add the first overlap batch size later on.

// Check if we do not need slicing i.e. neither start position nor size changed.
if (startPosition == 0 && size == fileRecords.sizeInBytes()) {
return records;
}
return fileRecords.slice(startPosition, size);
} catch (Exception e) {
log.error("Error while checking batches for acquired records: {}, skipping slicing.", acquiredRecords, e);
// If there is an exception while slicing, return the original records so that the fetch
// can continue with the original records.
return records;
}
}
}
28 changes: 14 additions & 14 deletions core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
Expand Down Expand Up @@ -74,6 +73,7 @@
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -186,7 +186,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// We are testing the case when the share partition is getting fetched for the first time, so for the first time
// the fetchOffsetMetadata will return empty. Post the readFromLog call, the fetchOffsetMetadata will be
Expand Down Expand Up @@ -259,7 +259,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// We are testing the case when the share partition has been fetched before, hence we are mocking positionDiff
// functionality to give the file position difference as 1 byte, so it doesn't satisfy the minBytes(2).
Expand Down Expand Up @@ -312,7 +312,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
Expand Down Expand Up @@ -427,7 +427,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
Expand Down Expand Up @@ -591,7 +591,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch
// requests, it should add a "check and complete" action for request key tp1 on the purgatory.
Expand Down Expand Up @@ -689,7 +689,7 @@ public void testExceptionInMinBytesCalculation() {

when(sp0.canAcquireRecords()).thenReturn(true);
when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any())).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

// Mocking partition object to throw an exception during min bytes calculation while calling fetchOffsetSnapshot
Expand Down Expand Up @@ -897,15 +897,15 @@ public void testPartitionMaxBytesFromUniformStrategyWhenAllPartitionsAreAcquirab
BROKER_TOPIC_STATS);

when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// All 5 partitions are acquirable.
doAnswer(invocation -> buildLogReadResult(sharePartitions.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
Expand Down Expand Up @@ -995,9 +995,9 @@ public void testPartitionMaxBytesFromUniformStrategyWhenFewPartitionsAreAcquirab
BROKER_TOPIC_STATS);

when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// Only 2 out of 5 partitions are acquirable.
Set<TopicIdPartition> acquirableTopicPartitions = new LinkedHashSet<>();
Expand Down
Loading