Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18522: Slice records for share fetch #18804

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 79 additions & 7 deletions core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.OffsetNotAvailableException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
Expand All @@ -39,6 +42,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -113,13 +117,7 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR
.setAcquiredRecords(Collections.emptyList());
} else {
partitionData
// We set the records to the fetchPartitionData records. We do not alter the records
// fetched from the replica manager as they follow zero copy buffer. The acquired records
// might be a subset of the records fetched from the replica manager, depending
// on the max fetch records or available records in the share partition. The client
// sends the max bytes in request which should limit the bytes sent to the client
// in the response.
.setRecords(fetchPartitionData.records)
.setRecords(maybeSliceFetchRecords(fetchPartitionData.records, shareAcquiredRecords))
.setAcquiredRecords(shareAcquiredRecords.acquiredRecords());
acquiredRecordsCount += shareAcquiredRecords.count();
}
Expand Down Expand Up @@ -187,4 +185,78 @@ static Partition partition(ReplicaManager replicaManager, TopicPartition tp) {
}
return partition;
}

/**
* Slice the fetch records based on the acquired records. The slicing is done based on the first
* and last offset of the acquired records from the list. The slicing doesn't consider individual
* acquired batches rather the boundaries of the acquired list.
*
* @param records The records to be sliced.
* @param shareAcquiredRecords The share acquired records containing the non-empty acquired records.
* @return The sliced records, if the records are of type FileRecords and the acquired records are a subset
* of the fetched records. Otherwise, the original records are returned.
*/
static Records maybeSliceFetchRecords(Records records, ShareAcquiredRecords shareAcquiredRecords) {
if (!shareAcquiredRecords.subsetAcquired() || !(records instanceof FileRecords fileRecords)) {
return records;
}
// The acquired records should be non-empty, do not check as the method is called only when the
// acquired records are non-empty.
List<AcquiredRecords> acquiredRecords = shareAcquiredRecords.acquiredRecords();
try {
final long firstAcquiredOffset = acquiredRecords.get(0).firstOffset();
final long lastAcquiredOffset = acquiredRecords.get(acquiredRecords.size() - 1).lastOffset();
int startPosition = 0;
int size = 0;
// Track the previous batch to adjust the start position in case the first acquired offset
// is between the batch.
FileChannelRecordBatch previousBatch = null;
for (FileChannelRecordBatch batch : fileRecords.batches()) {
// If the batch base offset is less than the first acquired offset, then the start position
// should be updated to skip the batch.
if (batch.baseOffset() < firstAcquiredOffset) {
startPosition += batch.sizeInBytes();
previousBatch = batch;
continue;
}
// If the first acquired offset is between the batch, then adjust the start position
// to not skip the previous batch i.e. if batch is from 10-15 and the first acquired
// offset is 12, then the start position should be adjusted to include the batch containing
// the first acquired offset. Though generally, the first acquired offset should be the
// first offset of the batch, but there can be cases where the batch is split because of
// initial load from persister which has subset of acknowledged records from the batch.
// This adjustment should only be done once for the batch containing the first acquired offset,
// hence post the adjustment, the previous batch should be set to null.
if (previousBatch != null && batch.baseOffset() != firstAcquiredOffset) {
startPosition -= previousBatch.sizeInBytes();
size += previousBatch.sizeInBytes();
}
previousBatch = null;
// Consider the full batch size for slicing irrespective of the batch last offset i.e.
// if the batch last offset is greater than the last offset of the acquired records,
// we still consider the full batch size for slicing.
if (batch.baseOffset() <= lastAcquiredOffset) {
size += batch.sizeInBytes();
} else {
break;
}
}
// If the fetch resulted in single batch and the first acquired offset is not the base offset
// of the batch, then the position and size should be adjusted to include the batch. This
// can happen rarely when the batch is split because of initial load from persister. In such
// cases, check the last offset of the previous batch to include the batch. As the last offset
// call on batch is expensive hence the code is optimized to avoid the call. But should be
// considered for the edge case.
if (previousBatch != null && previousBatch.lastOffset() >= lastAcquiredOffset) {
startPosition -= previousBatch.sizeInBytes();
size += previousBatch.sizeInBytes();
}
return fileRecords.slice(startPosition, size);
} catch (Exception e) {
log.error("Error while checking batches for acquired records: {}, skipping slicing.", acquiredRecords, e);
// If there is an exception while slicing, return the original records so that the fetch
// can continue with the original records.
return records;
}
}
}
15 changes: 12 additions & 3 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -638,12 +638,17 @@ public ShareAcquiredRecords acquire(
List<AcquiredRecords> result = new ArrayList<>();
// The acquired count is used to track the number of records acquired for the request.
int acquiredCount = 0;
// Tracks if subset of the fetch batch is acquired.
boolean subsetAcquired = false;
// The fetched records are already part of the in-flight records. The records might
// be available for re-delivery hence try acquiring same. The request batches could
// be an exact match, subset or span over multiple already fetched batches.
for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
// If the acquired count is equal to the max fetch records then break the loop.
if (acquiredCount >= maxFetchRecords) {
// If the limit to acquire records is reached then it means there exists additional
// fetch batches which cannot be acquired.
subsetAcquired = true;
break;
}

Expand Down Expand Up @@ -714,8 +719,9 @@ public ShareAcquiredRecords acquire(
lastBatch.lastOffset(), batchSize, maxFetchRecords - acquiredCount);
result.addAll(shareAcquiredRecords.acquiredRecords());
acquiredCount += shareAcquiredRecords.count();
subsetAcquired = shareAcquiredRecords.subsetAcquired();
}
return new ShareAcquiredRecords(result, acquiredCount);
return new ShareAcquiredRecords(result, acquiredCount, subsetAcquired);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -1213,7 +1219,10 @@ private ShareAcquiredRecords acquireNewBatchRecords(
startOffset = firstAcquiredOffset;
}
endOffset = lastAcquiredOffset;
return new ShareAcquiredRecords(acquiredRecords, (int) (lastAcquiredOffset - firstAcquiredOffset + 1));
return new ShareAcquiredRecords(
acquiredRecords,
(int) (lastAcquiredOffset - firstAcquiredOffset + 1) /* acquired records count */,
lastOffset > lastAcquiredOffset /* subset acquired */);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -2138,7 +2147,7 @@ private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
}
}

private long startOffsetDuringInitialization(long partitionDataStartOffset) throws Exception {
private long startOffsetDuringInitialization(long partitionDataStartOffset) {
// Set the state epoch and end offset from the persisted state.
if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) {
return partitionDataStartOffset;
Expand Down
28 changes: 14 additions & 14 deletions core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
Expand Down Expand Up @@ -71,6 +70,7 @@
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -174,7 +174,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), anyInt(), anyInt(), any())).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// We are testing the case when the share partition is getting fetched for the first time, so for the first time
// the fetchOffsetMetadata will return empty. Post the readFromLog call, the fetchOffsetMetadata will be
Expand Down Expand Up @@ -236,7 +236,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), anyInt(), anyInt(), any())).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// We are testing the case when the share partition has been fetched before, hence we are mocking positionDiff
// functionality to give the file position difference as 1 byte, so it doesn't satisfy the minBytes(2).
Expand Down Expand Up @@ -289,7 +289,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
Expand Down Expand Up @@ -382,7 +382,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
Expand Down Expand Up @@ -530,7 +530,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
when(sp1.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch
// requests, it should add a "check and complete" action for request key tp1 on the purgatory.
Expand Down Expand Up @@ -628,7 +628,7 @@ public void testExceptionInMinBytesCalculation() {

when(sp0.canAcquireRecords()).thenReturn(true);
when(sp0.acquire(any(), anyInt(), anyInt(), any())).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

// Mocking partition object to throw an exception during min bytes calculation while calling fetchOffsetSnapshot
Expand Down Expand Up @@ -824,15 +824,15 @@ public void testPartitionMaxBytesFromUniformStrategyWhenAllPartitionsAreAcquirab
BROKER_TOPIC_STATS);

when(sp0.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp1.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp2.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp3.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp4.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// All 5 partitions are acquirable.
doAnswer(invocation -> buildLogReadResult(sharePartitions.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
Expand Down Expand Up @@ -922,9 +922,9 @@ public void testPartitionMaxBytesFromUniformStrategyWhenFewPartitionsAreAcquirab
BROKER_TOPIC_STATS);

when(sp0.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp1.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// Only 2 out of 5 partitions are acquirable.
Set<TopicIdPartition> acquirableTopicPartitions = new LinkedHashSet<>();
Expand Down
Loading
Loading