Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] ConsumptionTask Refactor #1318

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d53fe23
Refactored `produceToStoreBufferServiceOrKafka()` from `StoreIngestio…
KaiSernLim Oct 18, 2024
4bb2c5a
Refactored `produceToStoreBufferServiceOrKafkaInBatch()` from `StoreI…
KaiSernLim Oct 18, 2024
776d65f
Refactored `waitReadyToProcessRecord()` and the call chain down to ri…
KaiSernLim Oct 18, 2024
f1be5cf
Refactored `handleSingleMessage()` from `StoreIngestionTask` into `St…
KaiSernLim Oct 19, 2024
8eda4a0
Refactored `validateAndFilterOutDuplicateMessagesFromLeaderTopic()` f…
KaiSernLim Oct 19, 2024
6eea45c
Refactored delegateConsumerRecord() (which was an absolute behemoth) …
KaiSernLim Feb 5, 2025
d764c81
Refactored `validateRecordBeforeProducingToLocalKafka()` from `Leader…
KaiSernLim Oct 19, 2024
865bda3
Refactored `recordRegionHybridConsumptionStats()` from `LeaderFollowe…
KaiSernLim Oct 19, 2024
0e5d0e1
Refactored propagateHeartbeatFromUpstreamTopicToLocalVersionTopic() f…
KaiSernLim Feb 5, 2025
c9f8a78
Refactored `processMessageAndMaybeProduceToKafka()` from `LeaderFollo…
KaiSernLim Feb 5, 2025
983792d
Refactored `producePutOrDeleteToKafka()` from `ActiveActiveStoreInges…
KaiSernLim Oct 19, 2024
9435ddc
Refactored `getProduceToTopicFunction()` from `ActiveActiveStoreInges…
KaiSernLim Oct 19, 2024
6f6c6eb
Refactored `produceToLocalKafka()` from `LeaderFollowerStoreIngestion…
KaiSernLim Oct 21, 2024
dc08f49
Refactored `createProducerCallback()` from `LeaderFollowerStoreIngest…
KaiSernLim Oct 19, 2024
98c0023
Moved `IngestionBatchProcessor` from `LeaderFollowerStoreIngestionTas…
KaiSernLim Oct 19, 2024
46fb7e8
Refactored `processMessage()` from `LeaderFollowerStoreIngestionTask`…
KaiSernLim Oct 19, 2024
d311658
Refactored `processActiveActiveMessage()` from `ActiveActiveStoreInge…
KaiSernLim Oct 19, 2024
1270f39
Refactored `validatePostOperationResultsAndRecord()` from `ActiveActi…
KaiSernLim Oct 19, 2024
1f9f4c2
Refactored `getValueBytesForKey()` and `getCurrentValueFromTransientR…
KaiSernLim Oct 19, 2024
23544b6
Refactored `maybeCompressData()` from `LeaderFollowerStoreIngestionTa…
KaiSernLim Oct 19, 2024
b713011
Refactored `readStoredValueRecord()` from `LeaderFollowerStoreIngesti…
KaiSernLim Oct 19, 2024
6b7493d
Refactored `getAndUpdateLeaderCompletedState()` from `LeaderFollowerS…
KaiSernLim Oct 21, 2024
5293304
Refactored `shouldProduceInBatch()` from `StoreIngestionTask` into `S…
KaiSernLim Oct 19, 2024
cee3705
Refactored `shouldProcessRecord()` from `StoreIngestionTask` / `Leade…
KaiSernLim Oct 19, 2024
a0b92c9
Refactored `getReplicationMetadataAndSchemaId()` and `getRmdWithValue…
KaiSernLim Oct 20, 2024
bb6b5e2
Refactored fields `mergeConflictResolver`, `rmdSerDe`, `keyLevelLocks…
KaiSernLim Oct 20, 2024
7ff77b6
Undo refactoring fields `mergeConflictResolver` and `rmdSerDe` from `…
KaiSernLim Jan 3, 2025
85ff1af
Removed `aggVersionedIngestionStats` from `ActiveActiveStoreIngestion…
KaiSernLim Oct 20, 2024
bb61d5a
Removed all the dangling commented out code blocks that were moved in…
KaiSernLim Oct 20, 2024
273d634
Changed the StoreIngestionTask proxy methods to be package-private. 🧹
KaiSernLim Jan 6, 2025
c0f6853
Minor tweaks to the proxy methods to help with abstraction. 🧹
KaiSernLim Jan 6, 2025
283b670
Reverted `IngestionBatchProcessor` and `KeyLevelLocksManager` back to…
KaiSernLim Feb 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,20 @@ PubSubMessageProcessedResult apply(
private final KeyLevelLocksManager lockManager;
private final boolean isWriteComputationEnabled;
private final boolean isActiveActiveReplicationEnabled;
private final ProcessingFunction processingFunction;
private final AggVersionedIngestionStats aggVersionedIngestionStats;
private final HostLevelIngestionStats hostLevelIngestionStats;

public IngestionBatchProcessor(
String storeVersionName,
ExecutorService batchProcessingThreadPool,
KeyLevelLocksManager lockManager,
ProcessingFunction processingFunction,
boolean isWriteComputationEnabled,
boolean isActiveActiveReplicationEnabled,
AggVersionedIngestionStats aggVersionedIngestionStats,
HostLevelIngestionStats hostLevelIngestionStats) {
this.storeVersionName = storeVersionName;
this.batchProcessingThreadPool = batchProcessingThreadPool;
this.lockManager = lockManager;
this.processingFunction = processingFunction;
this.isWriteComputationEnabled = isWriteComputationEnabled;
this.isActiveActiveReplicationEnabled = isActiveActiveReplicationEnabled;
this.aggVersionedIngestionStats = aggVersionedIngestionStats;
Expand Down Expand Up @@ -130,7 +127,8 @@ public List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope,
String kafkaUrl,
int kafkaClusterId,
long beforeProcessingRecordTimestampNs,
long beforeProcessingBatchRecordsTimestampMs) {
long beforeProcessingBatchRecordsTimestampMs,
ProcessingFunction processingFunction) {
long currentTimestampInNs = System.nanoTime();
if (records.isEmpty()) {
return Collections.emptyList();
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ public void removePartition(int partition) {
/**
* Enforce partition level quota for the map.
* This function could be invoked by multiple threads when shared consumer is being used.
* Check {@link StoreIngestionTask#produceToStoreBufferServiceOrKafka} and {@link StoreIngestionTask#checkIngestionProgress}
* to find more details.
* Check {@link StorePartitionDataReceiver#produceToStoreBufferServiceOrKafka} and
* {@link StoreIngestionTask#checkIngestionProgress} to find more details.
*/
public void checkAllPartitionsQuota() {
try (AutoCloseableLock ignored = AutoCloseableLock.of(hybridStoreDiskQuotaLock)) {
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -130,30 +130,35 @@ public static Object[] compressionStrategyProvider() {
@Test
public void testHandleDeleteBeforeEOP() {
ActiveActiveStoreIngestionTask ingestionTask = mock(ActiveActiveStoreIngestionTask.class);
doCallRealMethod().when(ingestionTask)
.processMessageAndMaybeProduceToKafka(any(), any(), anyInt(), anyString(), anyInt(), anyLong(), anyLong());
PubSubTopicPartition topicPartition = mock(PubSubTopicPartition.class);
VeniceServerConfig serverConfig = mock(VeniceServerConfig.class);
when(serverConfig.isComputeFastAvroEnabled()).thenReturn(false);
when(ingestionTask.getServerConfig()).thenReturn(serverConfig);
when(ingestionTask.getHostLevelIngestionStats()).thenReturn(mock(HostLevelIngestionStats.class));
StorePartitionDataReceiver storePartitionDataReceiver =
spy(new StorePartitionDataReceiver(ingestionTask, topicPartition, "dummyUrl", 0));
PartitionConsumptionState pcs = mock(PartitionConsumptionState.class);
when(pcs.isEndOfPushReceived()).thenReturn(false);
when(pcs.getVeniceWriterLazyRef()).thenReturn(Lazy.of(() -> mock(VeniceWriter.class)));
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord = mock(PubSubMessage.class);
KafkaKey kafkaKey = mock(KafkaKey.class);
KafkaKey kafkaKey = new KafkaKey(MessageType.DELETE, new byte[] { 1 });
when(consumerRecord.getKey()).thenReturn(kafkaKey);
KafkaMessageEnvelope kafkaValue = new KafkaMessageEnvelope();
when(consumerRecord.getValue()).thenReturn(kafkaValue);
when(consumerRecord.getOffset()).thenReturn(1L);
when(consumerRecord.getTopicPartition()).thenReturn(topicPartition);
kafkaValue.messageType = MessageType.DELETE.getValue();
Delete deletePayload = new Delete();
kafkaValue.payloadUnion = deletePayload;
PubSubMessageProcessedResult result =
new PubSubMessageProcessedResult(new WriteComputeResultWrapper(null, null, true));
PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> resultWrapper =
new PubSubMessageProcessedResultWrapper<>(consumerRecord);
resultWrapper.setProcessedResult(result);
ArgumentCaptor<LeaderProducedRecordContext> leaderProducedRecordContextArgumentCaptor =
ArgumentCaptor.forClass(LeaderProducedRecordContext.class);
ingestionTask.processMessageAndMaybeProduceToKafka(
new PubSubMessageProcessedResultWrapper<>(consumerRecord),
pcs,
0,
"dummyUrl",
0,
0L,
0L);
verify(ingestionTask, times(1)).produceToLocalKafka(
storePartitionDataReceiver.processMessageAndMaybeProduceToKafka(resultWrapper, pcs, 0, "dummyUrl", 0, 0L, 0L);
verify(storePartitionDataReceiver, times(1)).produceToLocalKafka(
any(),
any(),
leaderProducedRecordContextArgumentCaptor.capture(),
Expand All @@ -172,7 +177,9 @@ public void testGetValueBytesFromTransientRecords(CompressionStrategy strategy)
VeniceCompressor compressor = getCompressor(strategy);
when(ingestionTask.getCompressor()).thenReturn(Lazy.of(() -> compressor));
when(ingestionTask.getCompressionStrategy()).thenReturn(strategy);
when(ingestionTask.getCurrentValueFromTransientRecord(any())).thenCallRealMethod();
PubSubTopicPartition topicPartition = mock(PubSubTopicPartition.class);
StorePartitionDataReceiver storePartitionDataReceiver =
new StorePartitionDataReceiver(ingestionTask, topicPartition, "dummyUrl", 0);

byte[] dataBytes = "Hello World".getBytes();
byte[] transientRecordValueBytes = dataBytes;
Expand All @@ -187,7 +194,7 @@ public void testGetValueBytesFromTransientRecords(CompressionStrategy strategy)
when(transientRecord.getValue()).thenReturn(transientRecordValueBytes);
when(transientRecord.getValueOffset()).thenReturn(startPosition);
when(transientRecord.getValueLen()).thenReturn(dataLength);
ByteBuffer result = ingestionTask.getCurrentValueFromTransientRecord(transientRecord);
ByteBuffer result = storePartitionDataReceiver.getCurrentValueFromTransientRecord(transientRecord);
Assert.assertEquals(result.remaining(), dataBytes.length);
byte[] resultByteArray = new byte[result.remaining()];
result.get(resultByteArray);
Expand Down Expand Up @@ -321,17 +328,18 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio

HostLevelIngestionStats mockHostLevelIngestionStats = mock(HostLevelIngestionStats.class);
ActiveActiveStoreIngestionTask ingestionTask = mock(ActiveActiveStoreIngestionTask.class);
when(ingestionTask.isActiveActiveReplicationEnabled()).thenReturn(true);
when(ingestionTask.getHostLevelIngestionStats()).thenReturn(mockHostLevelIngestionStats);
when(ingestionTask.getVersionIngestionStats()).thenReturn(mock(AggVersionedIngestionStats.class));
when(ingestionTask.getVersionedDIVStats()).thenReturn(mock(AggVersionedDIVStats.class));
when(ingestionTask.getKafkaVersionTopic()).thenReturn(testTopic);
when(ingestionTask.createProducerCallback(any(), any(), any(), anyInt(), anyString(), anyLong()))
.thenCallRealMethod();
when(ingestionTask.getProduceToTopicFunction(any(), any(), any(), any(), any(), any(), anyInt(), anyBoolean()))
.thenCallRealMethod();
when(ingestionTask.getRmdProtocolVersionId()).thenReturn(rmdProtocolVersionID);
doCallRealMethod().when(ingestionTask)
.produceToLocalKafka(any(), any(), any(), any(), anyInt(), anyString(), anyInt(), anyLong());
PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
PubSubTopicPartition topicPartition =
new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(testTopic), partition);
StorePartitionDataReceiver storePartitionDataReceiver =
new StorePartitionDataReceiver(ingestionTask, topicPartition, kafkaUrl, kafkaClusterId);

byte[] key = "foo".getBytes();
byte[] updatedKeyBytes = ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(key);

Expand Down Expand Up @@ -400,11 +408,11 @@ public void testLeaderCanSendValueChunksIntoDrainer() throws InterruptedExceptio
KafkaKey kafkaKey = mock(KafkaKey.class);
when(consumerRecord.getKey()).thenReturn(kafkaKey);
when(kafkaKey.getKey()).thenReturn(new byte[] { 0xa });
ingestionTask.produceToLocalKafka(
storePartitionDataReceiver.produceToLocalKafka(
consumerRecord,
partitionConsumptionState,
leaderProducedRecordContext,
ingestionTask.getProduceToTopicFunction(
storePartitionDataReceiver.getProduceToTopicFunction(
partitionConsumptionState,
updatedKeyBytes,
updatedValueBytes,
Expand Down Expand Up @@ -520,14 +528,16 @@ public void testReadingChunkedRmdFromStorage() {
when(ingestionTask.getStorageEngine()).thenReturn(storageEngine);
when(ingestionTask.getSchemaRepo()).thenReturn(schemaRepository);
when(ingestionTask.getServerConfig()).thenReturn(serverConfig);
when(ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(anyInt(), any(), any(), anyLong()))
.thenCallRealMethod();
when(ingestionTask.isChunked()).thenReturn(true);
when(ingestionTask.getHostLevelIngestionStats()).thenReturn(mock(HostLevelIngestionStats.class));
ChunkedValueManifestContainer container = new ChunkedValueManifestContainer();
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(topLevelKey1)))
.thenReturn(expectedNonChunkedValue);
byte[] result = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(partition, key1, container, 0L);
PubSubTopicPartition topicPartition = mock(PubSubTopicPartition.class);
StorePartitionDataReceiver storePartitionDataReceiver =
new StorePartitionDataReceiver(ingestionTask, topicPartition, "dummyUrl", 0);
byte[] result =
storePartitionDataReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key1, container, 0L);
Assert.assertNotNull(result);
Assert.assertNull(container.getManifest());
Assert.assertEquals(result, expectedNonChunkedValue);
Expand Down Expand Up @@ -557,7 +567,8 @@ public void testReadingChunkedRmdFromStorage() {
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(topLevelKey2)))
.thenReturn(chunkedManifestBytes.array());
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(chunkedKey1InKey2))).thenReturn(chunkedValue1);
byte[] result2 = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(partition, key2, container, 0L);
byte[] result2 =
storePartitionDataReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key2, container, 0L);
Assert.assertNotNull(result2);
Assert.assertNotNull(container.getManifest());
Assert.assertEquals(container.getManifest().getKeysWithChunkIdSuffix().size(), 1);
Expand Down Expand Up @@ -593,7 +604,8 @@ public void testReadingChunkedRmdFromStorage() {
.thenReturn(chunkedManifestBytes.array());
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(chunkedKey1InKey3))).thenReturn(chunkedValue1);
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(chunkedKey2InKey3))).thenReturn(chunkedValue2);
byte[] result3 = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(partition, key3, container, 0L);
byte[] result3 =
storePartitionDataReceiver.getRmdWithValueSchemaByteBufferFromStorage(partition, key3, container, 0L);
Assert.assertNotNull(result3);
Assert.assertNotNull(container.getManifest());
Assert.assertEquals(container.getManifest().getKeysWithChunkIdSuffix().size(), 2);
Expand All @@ -602,11 +614,11 @@ public void testReadingChunkedRmdFromStorage() {

@Test
public void testUnwrapByteBufferFromOldValueProvider() {
Lazy<ByteBuffer> lazyBB = ActiveActiveStoreIngestionTask.unwrapByteBufferFromOldValueProvider(Lazy.of(() -> null));
Lazy<ByteBuffer> lazyBB = StorePartitionDataReceiver.unwrapByteBufferFromOldValueProvider(Lazy.of(() -> null));
assertNotNull(lazyBB);
assertNull(lazyBB.get());

lazyBB = ActiveActiveStoreIngestionTask.unwrapByteBufferFromOldValueProvider(
lazyBB = StorePartitionDataReceiver.unwrapByteBufferFromOldValueProvider(
Lazy.of(() -> new ByteBufferValueRecord<>(ByteBuffer.wrap(new byte[1]), 1)));
assertNotNull(lazyBB);
assertNotNull(lazyBB.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ public void lockKeysTest() {
"store_v1",
mock(ExecutorService.class),
mockKeyLevelLocksManager,
(ignored1, ignored2, ignored3, ignored4, ignored5, ignored6, ignored7) -> null,
true,
true,
mock(AggVersionedIngestionStats.class),
Expand Down Expand Up @@ -181,6 +180,19 @@ public void processTest() {
"store_v1",
Executors.newFixedThreadPool(1, new DaemonThreadFactory("test")),
mockKeyLevelLocksManager,
true,
true,
mockAggVersionedIngestionStats,
mockHostLevelIngestionStats);

List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long>> result = batchProcessor.process(
Arrays.asList(rtMessage1, rtMessage2),
mock(PartitionConsumptionState.class),
1,
"test_kafka",
1,
1,
1,
(consumerRecord, ignored2, ignored3, ignored4, ignored5, ignored6, ignored7) -> {
if (Arrays.equals(consumerRecord.getKey().getKey(), "key1".getBytes())) {
Put put = new Put();
Expand All @@ -194,20 +206,7 @@ public void processTest() {
return new PubSubMessageProcessedResult(writeComputeResultWrapper);
}
return null;
},
true,
true,
mockAggVersionedIngestionStats,
mockHostLevelIngestionStats);

List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long>> result = batchProcessor.process(
Arrays.asList(rtMessage1, rtMessage2),
mock(PartitionConsumptionState.class),
1,
"test_kafka",
1,
1,
1);
});

assertEquals(result.size(), 2);
PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> resultForKey1 = result.get(0);
Expand All @@ -228,17 +227,6 @@ public void processTest() {
"store_v1",
Executors.newFixedThreadPool(1, new DaemonThreadFactory("test")),
mockKeyLevelLocksManager,
(consumerRecord, ignored2, ignored3, ignored4, ignored5, ignored6, ignored7) -> {
if (Arrays.equals(consumerRecord.getKey().getKey(), "key1".getBytes())) {
Put put = new Put();
put.setPutValue(ByteBuffer.wrap("value1".getBytes()));
WriteComputeResultWrapper writeComputeResultWrapper = new WriteComputeResultWrapper(put, null, true);
return new PubSubMessageProcessedResult(writeComputeResultWrapper);
} else if (Arrays.equals(consumerRecord.getKey().getKey(), "key2".getBytes())) {
throw new VeniceException("Fake");
}
return null;
},
true,
true,
mockAggVersionedIngestionStats,
Expand All @@ -253,7 +241,18 @@ public void processTest() {
"test_kafka",
1,
1,
1));
1,
(consumerRecord, ignored2, ignored3, ignored4, ignored5, ignored6, ignored7) -> {
if (Arrays.equals(consumerRecord.getKey().getKey(), "key1".getBytes())) {
Put put = new Put();
put.setPutValue(ByteBuffer.wrap("value1".getBytes()));
WriteComputeResultWrapper writeComputeResultWrapper = new WriteComputeResultWrapper(put, null, true);
return new PubSubMessageProcessedResult(writeComputeResultWrapper);
} else if (Arrays.equals(consumerRecord.getKey().getKey(), "key2".getBytes())) {
throw new VeniceException("Fake");
}
return null;
}));
assertTrue(exception.getMessage().contains("Failed to execute the batch processing"));
verify(mockAggVersionedIngestionStats).recordBatchProcessingRequestError("store", 1);
verify(mockHostLevelIngestionStats).recordBatchProcessingRequestError();
Expand Down
Loading
Loading