From 6ea2ed3db050b89b1a491329a55f6a66a035d53c Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Sun, 10 Nov 2024 22:54:26 +0530 Subject: [PATCH] KAFKA-17801: RemoteLogManager may compute inaccurate upperBoundOffset for aborted txns (#17676) (#1 7733) Reviewers: Jun Rao --- .../kafka/log/remote/RemoteLogManager.java | 39 ++++++--- .../log/remote/RemoteLogManagerTest.java | 79 ++++++++++++++++++- 2 files changed, 103 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 6f4f5daa9c0f0..577f9b080de2f 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -1610,25 +1610,26 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); + EnrichedRecordBatch enrichedRecordBatch = new EnrichedRecordBatch(null, 0); InputStream remoteSegInputStream = null; try { int startPos = 0; - RecordBatch firstBatch = null; - // Iteration over multiple RemoteSegmentMetadata is required in case of log compaction. // It may be possible the offset is log compacted in the current RemoteLogSegmentMetadata // And we need to iterate over the next segment metadata to fetch messages higher than the given offset. - while (firstBatch == null && rlsMetadataOptional.isPresent()) { + while (enrichedRecordBatch.batch == null && rlsMetadataOptional.isPresent()) { remoteLogSegmentMetadata = rlsMetadataOptional.get(); // Search forward for the position of the last offset that is greater than or equal to the target offset startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); RemoteLogInputStream remoteLogInputStream = getRemoteLogInputStream(remoteSegInputStream); - firstBatch = findFirstBatch(remoteLogInputStream, offset); - if (firstBatch == null) { + enrichedRecordBatch = findFirstBatch(remoteLogInputStream, offset); + if (enrichedRecordBatch.batch == null) { + Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); rlsMetadataOptional = findNextSegmentMetadata(rlsMetadataOptional.get(), logOptional.get().leaderEpochCache()); } } + RecordBatch firstBatch = enrichedRecordBatch.batch; if (firstBatch == null) return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); @@ -1659,8 +1660,9 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws } buffer.flip(); + startPos = startPos + enrichedRecordBatch.skippedBytes; FetchDataInfo fetchDataInfo = new FetchDataInfo( - new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), + new LogOffsetMetadata(firstBatch.baseOffset(), remoteLogSegmentMetadata.startOffset(), startPos), MemoryRecords.readableRecords(buffer)); if (includeAbortedTxns) { fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata, fetchDataInfo, logOptional.get()); @@ -1668,7 +1670,9 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws return fetchDataInfo; } finally { - Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); + if (enrichedRecordBatch.batch != null) { + Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); + } } } // for testing @@ -1763,15 +1767,18 @@ Optional findNextSegmentMetadata(RemoteLogSegmentMetad } // Visible for testing - RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { - RecordBatch nextBatch; + EnrichedRecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + int skippedBytes = 0; + RecordBatch nextBatch = null; // Look for the batch which has the desired offset // We will always have a batch in that segment as it is a non-compacted topic. do { + if (nextBatch != null) { + skippedBytes += nextBatch.sizeInBytes(); + } nextBatch = remoteLogInputStream.nextBatch(); } while (nextBatch != null && nextBatch.lastOffset() < offset); - - return nextBatch; + return new EnrichedRecordBatch(nextBatch, skippedBytes); } OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException { @@ -2122,4 +2129,14 @@ public String toString() { '}'; } } + + static class EnrichedRecordBatch { + private final RecordBatch batch; + private final int skippedBytes; + + public EnrichedRecordBatch(RecordBatch batch, int skippedBytes) { + this.batch = batch; + this.skippedBytes = skippedBytes; + } + } } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 81d7e3f100fb0..24c89ce3970c1 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -135,6 +135,7 @@ import scala.collection.JavaConverters; import static kafka.log.remote.RemoteLogManager.isRemoteSegmentWithinLeaderEpochs; +import static org.apache.kafka.common.record.TimestampType.CREATE_TIME; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX; @@ -150,6 +151,7 @@ import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC; import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC; import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS; +import static org.apache.kafka.test.TestUtils.tempFile; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -2932,8 +2934,8 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } // This is the key scenario that we are testing here - RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) { - return null; + EnrichedRecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) { + return new EnrichedRecordBatch(null, 0); } }) { FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo); @@ -2999,10 +3001,10 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l return 1; } - RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) { + EnrichedRecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) { when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes); doNothing().when(firstBatch).writeTo(capture.capture()); - return firstBatch; + return new EnrichedRecordBatch(firstBatch, 0); } }) { FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo); @@ -3396,6 +3398,75 @@ public void testTierLagResetsToZeroOnBecomingFollower() { assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments()); } + @Test + public void testRemoteReadFetchDataInfo() throws RemoteStorageException, IOException { + checkpoint.write(totalEpochEntries); + LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.remoteLogSegmentMetadata(eq(leaderTopicIdPartition), anyInt(), anyLong())) + .thenAnswer(ans -> { + long offset = ans.getArgument(2); + RemoteLogSegmentId segmentId = new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()); + RemoteLogSegmentMetadata segmentMetadata = createRemoteLogSegmentMetadata(segmentId, + offset - 10, offset + 99, 1024, totalEpochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED); + return Optional.of(segmentMetadata); + }); + + File segmentFile = tempFile(); + appendRecordsToFile(segmentFile, 100, 3); + FileInputStream fileInputStream = new FileInputStream(segmentFile); + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) + .thenReturn(fileInputStream); + + RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> currentLogStartOffset.set(offset), + brokerTopicStats, metrics) { + public RemoteStorageManager createRemoteStorageManager() { + return remoteStorageManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return 0; + } + }; + remoteLogManager.startup(); + remoteLogManager.onLeadershipChange( + Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds); + + long fetchOffset = 10; + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( + Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty()); + RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo( + 1048576, true, leaderTopicIdPartition.topicPartition(), + partitionData, FetchIsolation.HIGH_WATERMARK, false); + FetchDataInfo fetchDataInfo = remoteLogManager.read(remoteStorageFetchInfo); + // firstBatch baseOffset may not be equal to the fetchOffset + assertEquals(9, fetchDataInfo.fetchOffsetMetadata.messageOffset); + assertEquals(273, fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment); + } + + private void appendRecordsToFile(File file, int nRecords, int nRecordsPerBatch) throws IOException { + byte magic = RecordBatch.CURRENT_MAGIC_VALUE; + Compression compression = Compression.NONE; + long offset = 0; + List records = new ArrayList<>(); + try (FileRecords fileRecords = FileRecords.open(file)) { + for (long counter = 1; counter < nRecords + 1; counter++) { + records.add(new SimpleRecord("foo".getBytes())); + if (counter % nRecordsPerBatch == 0) { + fileRecords.append(MemoryRecords.withRecords(magic, offset, compression, CREATE_TIME, + records.toArray(new SimpleRecord[0]))); + offset += records.size(); + records.clear(); + } + } + fileRecords.flush(); + } + } + private Partition mockPartition(TopicIdPartition topicIdPartition) { TopicPartition tp = topicIdPartition.topicPartition(); Partition partition = mock(Partition.class);