Skip to content

Commit

Permalink
Reverted IngestionBatchProcessor and KeyLevelLocksManager back to…
Browse files Browse the repository at this point in the history
… `StoreIngestionTask`. 🧹
  • Loading branch information
KaiSernLim committed Feb 8, 2025
1 parent c0f6853 commit da2ed5e
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public class ActiveActiveStoreIngestionTask extends LeaderFollowerStoreIngestion
private final int rmdProtocolVersionId;
private final MergeConflictResolver mergeConflictResolver;
private final RmdSerDe rmdSerDe;
private final Lazy<KeyLevelLocksManager> keyLevelLocksManager;
private final RemoteIngestionRepairService remoteIngestionRepairService;
private final Lazy<IngestionBatchProcessor> ingestionBatchProcessorLazy;

public ActiveActiveStoreIngestionTask(
StorageService storageService,
Expand Down Expand Up @@ -90,6 +92,14 @@ public ActiveActiveStoreIngestionTask(

this.rmdProtocolVersionId = version.getRmdVersionId();

int knownKafkaClusterNumber = serverConfig.getKafkaClusterIdToUrlMap().size();

int initialPoolSize = knownKafkaClusterNumber + 1;
this.keyLevelLocksManager = Lazy.of(
() -> new KeyLevelLocksManager(
getVersionTopic().getName(),
initialPoolSize,
getKeyLevelLockMaxPoolSizeBasedOnServerConfig(serverConfig, storeVersionPartitionCount)));
StringAnnotatedStoreSchemaCache annotatedReadOnlySchemaRepository =
new StringAnnotatedStoreSchemaCache(storeName, schemaRepository);

Expand All @@ -105,6 +115,21 @@ public ActiveActiveStoreIngestionTask(
isWriteComputationEnabled,
getServerConfig().isComputeFastAvroEnabled());
this.remoteIngestionRepairService = builder.getRemoteIngestionRepairService();
this.ingestionBatchProcessorLazy = Lazy.of(() -> {
if (!serverConfig.isAAWCWorkloadParallelProcessingEnabled()) {
LOGGER.info("AA/WC workload parallel processing is disabled for store version: {}", getKafkaVersionTopic());
return null;
}
LOGGER.info("AA/WC workload parallel processing is enabled for store version: {}", getKafkaVersionTopic());
return new IngestionBatchProcessor(
kafkaVersionTopic,
parallelProcessingThreadPool,
keyLevelLocksManager.get(),
isWriteComputationEnabled,
isActiveActiveReplicationEnabled(),
versionedIngestionStats,
getHostLevelIngestionStats());
});
}

public static int getKeyLevelLockMaxPoolSizeBasedOnServerConfig(VeniceServerConfig serverConfig, int partitionCount) {
Expand All @@ -131,7 +156,6 @@ public static int getKeyLevelLockMaxPoolSizeBasedOnServerConfig(VeniceServerConf
@Override
protected void putInStorageEngine(int partition, byte[] keyBytes, Put put) {
try {

// TODO: Honor BatchConflictResolutionPolicy and maybe persist RMD for batch push records.
StorageOperationType storageOperationType =
getStorageOperationType(partition, put.putValue, put.replicationMetadataPayload);
Expand Down Expand Up @@ -224,6 +248,16 @@ private byte[] prependReplicationMetadataBytesWithValueSchemaId(ByteBuffer metad
return replicationMetadataBytesWithValueSchemaId;
}

@Override
KeyLevelLocksManager getKeyLevelLocksManager() {
return keyLevelLocksManager.get();
}

@Override
IngestionBatchProcessor getIngestionBatchProcessor() {
return ingestionBatchProcessorLazy.get();
}

@Override
protected Map<String, Long> calculateLeaderUpstreamOffsetWithTopicSwitch(
PartitionConsumptionState partitionConsumptionState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,20 @@ PubSubMessageProcessedResult apply(
private final KeyLevelLocksManager lockManager;
private final boolean isWriteComputationEnabled;
private final boolean isActiveActiveReplicationEnabled;
private final ProcessingFunction processingFunction;
private final AggVersionedIngestionStats aggVersionedIngestionStats;
private final HostLevelIngestionStats hostLevelIngestionStats;

public IngestionBatchProcessor(
String storeVersionName,
ExecutorService batchProcessingThreadPool,
KeyLevelLocksManager lockManager,
ProcessingFunction processingFunction,
boolean isWriteComputationEnabled,
boolean isActiveActiveReplicationEnabled,
AggVersionedIngestionStats aggVersionedIngestionStats,
HostLevelIngestionStats hostLevelIngestionStats) {
this.storeVersionName = storeVersionName;
this.batchProcessingThreadPool = batchProcessingThreadPool;
this.lockManager = lockManager;
this.processingFunction = processingFunction;
this.isWriteComputationEnabled = isWriteComputationEnabled;
this.isActiveActiveReplicationEnabled = isActiveActiveReplicationEnabled;
this.aggVersionedIngestionStats = aggVersionedIngestionStats;
Expand Down Expand Up @@ -130,7 +127,8 @@ public List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope,
String kafkaUrl,
int kafkaClusterId,
long beforeProcessingRecordTimestampNs,
long beforeProcessingBatchRecordsTimestampMs) {
long beforeProcessingBatchRecordsTimestampMs,
ProcessingFunction processingFunction) {
long currentTimestampInNs = System.nanoTime();
if (records.isEmpty()) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {

private final AtomicLong lastSendIngestionHeartbeatTimestamp = new AtomicLong(0);

private final Lazy<IngestionBatchProcessor> ingestionBatchProcessingLazy;
private final Version version;

public LeaderFollowerStoreIngestionTask(
Expand Down Expand Up @@ -315,6 +316,21 @@ public LeaderFollowerStoreIngestionTask(
builder.getSchemaRepo(),
getStoreName(),
serverConfig.isComputeFastAvroEnabled());
this.ingestionBatchProcessingLazy = Lazy.of(() -> {
if (!serverConfig.isAAWCWorkloadParallelProcessingEnabled()) {
LOGGER.info("AA/WC workload parallel processing is disabled for store version: {}", getKafkaVersionTopic());
return null;
}
LOGGER.info("AA/WC workload parallel processing is enabled for store version: {}", getKafkaVersionTopic());
return new IngestionBatchProcessor(
kafkaVersionTopic,
parallelProcessingThreadPool,
null,
isWriteComputationEnabled,
isActiveActiveReplicationEnabled(),
builder.getVersionedStorageIngestionStats(),
getHostLevelIngestionStats());
});
}

public static VeniceWriter<byte[], byte[], byte[]> constructVeniceWriter(
Expand Down Expand Up @@ -347,6 +363,11 @@ public void closeVeniceWriters(boolean doFlush) {
}
}

@Override
IngestionBatchProcessor getIngestionBatchProcessor() {
return ingestionBatchProcessingLazy.get();
}

@Override
public Map<String, VeniceViewWriter> getViewWriters() {
return viewWriters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4310,12 +4310,14 @@ RmdSerDe getRmdSerDe() {

protected abstract boolean hasViewWriters();

void setWriteComputeFailureCode(int code) {
this.writeComputeFailureCode = code;
KeyLevelLocksManager getKeyLevelLocksManager() {
throw new VeniceException("getKeyLevelLocksManager() should only be called in active active mode");
}

ExecutorService getParallelProcessingThreadPool() {
return parallelProcessingThreadPool;
abstract IngestionBatchProcessor getIngestionBatchProcessor();

void setWriteComputeFailureCode(int code) {
this.writeComputeFailureCode = code;
}

boolean isDataRecovery() {
Expand All @@ -4330,10 +4332,6 @@ int getLocalKafkaClusterId() {
return localKafkaClusterId;
}

int getStoreVersionPartitionCount() {
return storeVersionPartitionCount;
}

// For unit test purpose.
void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) {
this.versionRole = versionRole;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ public class StorePartitionDataReceiver
private final String kafkaUrl;
private final String kafkaUrlForLogger;
private final int kafkaClusterId;
private final Lazy<KeyLevelLocksManager> keyLevelLocksManager;
private final Lazy<IngestionBatchProcessor> ingestionBatchProcessorLazy;

private long receivedRecordsCount;

Expand All @@ -120,39 +118,6 @@ public StorePartitionDataReceiver(
this.kafkaClusterId = kafkaClusterId;
this.LOGGER = LogManager.getLogger(this.getClass().getSimpleName() + " [" + kafkaUrlForLogger + "]");
this.receivedRecordsCount = 0L;
this.keyLevelLocksManager = Lazy.of(() -> {
final int knownKafkaClusterNumber = storeIngestionTask.getServerConfig().getKafkaClusterIdToUrlMap().size();
final int initialPoolSize = knownKafkaClusterNumber + 1;
return new KeyLevelLocksManager(
storeIngestionTask.getVersionTopic().getName(),
initialPoolSize,
ActiveActiveStoreIngestionTask.getKeyLevelLockMaxPoolSizeBasedOnServerConfig(
storeIngestionTask.getServerConfig(),
storeIngestionTask.getStoreVersionPartitionCount()));
});
this.ingestionBatchProcessorLazy = Lazy.of(() -> {
final String kafkaVersionTopic = storeIngestionTask.getKafkaVersionTopic();
if (!storeIngestionTask.getServerConfig().isAAWCWorkloadParallelProcessingEnabled()) {
LOGGER.info("AA/WC workload parallel processing is disabled for store version: {}", kafkaVersionTopic);
return null;
}
IngestionBatchProcessor.ProcessingFunction processingFunction =
(storeIngestionTask.isActiveActiveReplicationEnabled())
? this::processActiveActiveMessage
: this::processMessage;
KeyLevelLocksManager lockManager =
(storeIngestionTask.isActiveActiveReplicationEnabled()) ? keyLevelLocksManager.get() : null;
LOGGER.info("AA/WC workload parallel processing is enabled for store version: {}", kafkaVersionTopic);
return new IngestionBatchProcessor(
storeIngestionTask.getKafkaVersionTopic(),
storeIngestionTask.getParallelProcessingThreadPool(),
lockManager,
processingFunction,
storeIngestionTask.isTransientRecordBufferUsed(),
storeIngestionTask.isActiveActiveReplicationEnabled(),
storeIngestionTask.getVersionIngestionStats(),
storeIngestionTask.getHostLevelIngestionStats());
});
}

@Override
Expand Down Expand Up @@ -308,7 +273,7 @@ public void produceToStoreBufferServiceOrKafkaInBatch(
if (batches.isEmpty()) {
return;
}
IngestionBatchProcessor ingestionBatchProcessor = ingestionBatchProcessorLazy.get();
IngestionBatchProcessor ingestionBatchProcessor = storeIngestionTask.getIngestionBatchProcessor();
if (ingestionBatchProcessor == null) {
throw new VeniceException(
"IngestionBatchProcessor object should present for store version: "
Expand All @@ -329,7 +294,10 @@ public void produceToStoreBufferServiceOrKafkaInBatch(
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs,
beforeProcessingBatchRecordsTimestampMs);
beforeProcessingBatchRecordsTimestampMs,
(storeIngestionTask.isActiveActiveReplicationEnabled())
? this::processMessage
: this::processActiveActiveMessage);

for (PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> processedRecord: processedResults) {
totalBytesRead += handleSingleMessage(
Expand Down Expand Up @@ -796,7 +764,7 @@ private DelegateConsumerRecordResult delegateConsumerRecordMaybeWithLock(
* -> [fabric A thread]produce to VT
*/
final ByteArrayKey byteArrayKey = ByteArrayKey.wrap(consumerRecordWrapper.getMessage().getKey().getKey());
ReentrantLock keyLevelLock = keyLevelLocksManager.get().acquireLockByKey(byteArrayKey);
ReentrantLock keyLevelLock = storeIngestionTask.getKeyLevelLocksManager().acquireLockByKey(byteArrayKey);
keyLevelLock.lock();
try {
return delegateConsumerRecord(
Expand All @@ -808,7 +776,7 @@ private DelegateConsumerRecordResult delegateConsumerRecordMaybeWithLock(
beforeProcessingBatchRecordsTimestampMs);
} finally {
keyLevelLock.unlock();
keyLevelLocksManager.get().releaseLock(byteArrayKey);
storeIngestionTask.getKeyLevelLocksManager().releaseLock(byteArrayKey);
}
}
}
Expand Down Expand Up @@ -2550,8 +2518,4 @@ public String toString() {
int getKafkaClusterId() {
return this.kafkaClusterId;
}

IngestionBatchProcessor getIngestionBatchProcessor() {
return ingestionBatchProcessorLazy.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ public void lockKeysTest() {
"store_v1",
mock(ExecutorService.class),
mockKeyLevelLocksManager,
(ignored1, ignored2, ignored3, ignored4, ignored5, ignored6, ignored7) -> null,
true,
true,
mock(AggVersionedIngestionStats.class),
Expand Down Expand Up @@ -181,6 +180,19 @@ public void processTest() {
"store_v1",
Executors.newFixedThreadPool(1, new DaemonThreadFactory("test")),
mockKeyLevelLocksManager,
true,
true,
mockAggVersionedIngestionStats,
mockHostLevelIngestionStats);

List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long>> result = batchProcessor.process(
Arrays.asList(rtMessage1, rtMessage2),
mock(PartitionConsumptionState.class),
1,
"test_kafka",
1,
1,
1,
(consumerRecord, ignored2, ignored3, ignored4, ignored5, ignored6, ignored7) -> {
if (Arrays.equals(consumerRecord.getKey().getKey(), "key1".getBytes())) {
Put put = new Put();
Expand All @@ -194,20 +206,7 @@ public void processTest() {
return new PubSubMessageProcessedResult(writeComputeResultWrapper);
}
return null;
},
true,
true,
mockAggVersionedIngestionStats,
mockHostLevelIngestionStats);

List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long>> result = batchProcessor.process(
Arrays.asList(rtMessage1, rtMessage2),
mock(PartitionConsumptionState.class),
1,
"test_kafka",
1,
1,
1);
});

assertEquals(result.size(), 2);
PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> resultForKey1 = result.get(0);
Expand All @@ -228,17 +227,6 @@ public void processTest() {
"store_v1",
Executors.newFixedThreadPool(1, new DaemonThreadFactory("test")),
mockKeyLevelLocksManager,
(consumerRecord, ignored2, ignored3, ignored4, ignored5, ignored6, ignored7) -> {
if (Arrays.equals(consumerRecord.getKey().getKey(), "key1".getBytes())) {
Put put = new Put();
put.setPutValue(ByteBuffer.wrap("value1".getBytes()));
WriteComputeResultWrapper writeComputeResultWrapper = new WriteComputeResultWrapper(put, null, true);
return new PubSubMessageProcessedResult(writeComputeResultWrapper);
} else if (Arrays.equals(consumerRecord.getKey().getKey(), "key2".getBytes())) {
throw new VeniceException("Fake");
}
return null;
},
true,
true,
mockAggVersionedIngestionStats,
Expand All @@ -253,7 +241,18 @@ public void processTest() {
"test_kafka",
1,
1,
1));
1,
(consumerRecord, ignored2, ignored3, ignored4, ignored5, ignored6, ignored7) -> {
if (Arrays.equals(consumerRecord.getKey().getKey(), "key1".getBytes())) {
Put put = new Put();
put.setPutValue(ByteBuffer.wrap("value1".getBytes()));
WriteComputeResultWrapper writeComputeResultWrapper = new WriteComputeResultWrapper(put, null, true);
return new PubSubMessageProcessedResult(writeComputeResultWrapper);
} else if (Arrays.equals(consumerRecord.getKey().getKey(), "key2".getBytes())) {
throw new VeniceException("Fake");
}
return null;
}));
assertTrue(exception.getMessage().contains("Failed to execute the batch processing"));
verify(mockAggVersionedIngestionStats).recordBatchProcessingRequestError("store", 1);
verify(mockHostLevelIngestionStats).recordBatchProcessingRequestError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3393,13 +3393,11 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT
null);

if (hybridConfig.equals(HYBRID) && nodeType.equals(LEADER) && isAaWCParallelProcessingEnabled()) {
localConsumedDataReceiver = new StorePartitionDataReceiver(
storeIngestionTaskUnderTest,
fooTopicPartition,
inMemoryLocalKafkaBroker.getKafkaBootstrapServer(),
1);
assertNotNull(localConsumedDataReceiver.getIngestionBatchProcessor());
assertNotNull(localConsumedDataReceiver.getIngestionBatchProcessor().getLockManager());
assertTrue(storeIngestionTaskUnderTest instanceof ActiveActiveStoreIngestionTask);
ActiveActiveStoreIngestionTask activeActiveStoreIngestionTask =
(ActiveActiveStoreIngestionTask) storeIngestionTaskUnderTest;
assertNotNull(activeActiveStoreIngestionTask.getIngestionBatchProcessor());
assertNotNull(activeActiveStoreIngestionTask.getIngestionBatchProcessor().getLockManager());
}

String rtTopicName = Utils.getRealTimeTopicName(mockStore);
Expand Down

0 comments on commit da2ed5e

Please sign in to comment.