Skip to content

Commit bb092b9

Browse files
authored
[server] Allocated a dedicated thread pool for ingestion DB lookup (#1525)
Based on the prod experiment, the SSD being used is not performing well when enabling parallel processing for AA/WC workload as the database lookup concurrency will be roughly same as the concurrency of parallel processing and we saw a higher lookup latency with high cpu wait. This PR introduces a separated thread pool for ingestion database lookup (value and RMD) and the concurrency can be controlled separately from the parallel processing concurrency, and based on a canary test, a lower concurrency of DB lookup can reduce cpu wait while delivering a slightly better overall throughput. New Config: server.aa.wc.ingestion.storage.lookup.thread.pool.size: default 4
1 parent b68eb16 commit bb092b9

File tree

9 files changed

+99
-31
lines changed

9 files changed

+99
-31
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import static com.linkedin.venice.ConfigKeys.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_CONSUMER_POOL_SIZE;
5555
import static com.linkedin.venice.ConfigKeys.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_THREAD_POOL_SIZE;
5656
import static com.linkedin.venice.ConfigKeys.ROUTER_PRINCIPAL_NAME;
57+
import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_INGESTION_STORAGE_LOOKUP_THREAD_POOL_SIZE;
5758
import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_LEADER_QUOTA_RECORDS_PER_SECOND;
5859
import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_ENABLED;
5960
import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_THREAD_POOL_SIZE;
@@ -587,6 +588,8 @@ public class VeniceServerConfig extends VeniceClusterConfig {
587588
private final boolean deleteUnassignedPartitionsOnStartup;
588589
private final int aclInMemoryCacheTTLMs;
589590

591+
private final int aaWCIngestionStorageLookupThreadPoolSize;
592+
590593
public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
591594
this(serverProperties, Collections.emptyMap());
592595
}
@@ -991,7 +994,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
991994
deleteUnassignedPartitionsOnStartup =
992995
serverProperties.getBoolean(SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP, false);
993996
aclInMemoryCacheTTLMs = serverProperties.getInt(ACL_IN_MEMORY_CACHE_TTL_MS, -1); // acl caching is disabled by
994-
// default
997+
aaWCIngestionStorageLookupThreadPoolSize =
998+
serverProperties.getInt(SERVER_AA_WC_INGESTION_STORAGE_LOOKUP_THREAD_POOL_SIZE, 4);
995999
}
9961000

9971001
long extractIngestionMemoryLimit(
@@ -1809,4 +1813,8 @@ public boolean isDeleteUnassignedPartitionsOnStartupEnabled() {
18091813
public int getAclInMemoryCacheTTLMs() {
18101814
return aclInMemoryCacheTTLMs;
18111815
}
1816+
1817+
public int getAaWCIngestionStorageLookupThreadPoolSize() {
1818+
return aaWCIngestionStorageLookupThreadPoolSize;
1819+
}
18121820
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -389,8 +389,8 @@ byte[] getRmdWithValueSchemaByteBufferFromStorage(
389389
ChunkedValueManifestContainer rmdManifestContainer,
390390
long currentTimeForMetricsMs) {
391391
final long lookupStartTimeInNS = System.nanoTime();
392-
ValueRecord result = SingleGetChunkingAdapter
393-
.getReplicationMetadata(getStorageEngine(), partition, key, isChunked(), rmdManifestContainer);
392+
ValueRecord result = databaseLookupWithConcurrencyLimit(
393+
() -> getRmdWithValueSchemaByteBufferFromStorageInternal(partition, key, rmdManifestContainer));
394394
getHostLevelIngestionStats().recordIngestionReplicationMetadataLookUpLatency(
395395
LatencyUtils.getElapsedTimeFromNSToMS(lookupStartTimeInNS),
396396
currentTimeForMetricsMs);
@@ -400,6 +400,15 @@ byte[] getRmdWithValueSchemaByteBufferFromStorage(
400400
return result.serialize();
401401
}
402402

403+
// For testing purpose
404+
ValueRecord getRmdWithValueSchemaByteBufferFromStorageInternal(
405+
int partition,
406+
byte[] key,
407+
ChunkedValueManifestContainer rmdManifestContainer) {
408+
return SingleGetChunkingAdapter
409+
.getReplicationMetadata(getStorageEngine(), partition, key, isChunked(), rmdManifestContainer);
410+
}
411+
403412
@Override
404413
protected IngestionBatchProcessor getIngestionBatchProcessor() {
405414
return ingestionBatchProcessorLazy.get();
@@ -755,16 +764,18 @@ private ByteBufferValueRecord<ByteBuffer> getValueBytesForKey(
755764
ReusableObjects reusableObjects = threadLocalReusableObjects.get();
756765
ByteBuffer reusedRawValue = reusableObjects.reusedByteBuffer;
757766
BinaryDecoder binaryDecoder = reusableObjects.binaryDecoder;
758-
originalValue = RawBytesChunkingAdapter.INSTANCE.getWithSchemaId(
759-
storageEngine,
760-
topicPartition.getPartitionNumber(),
761-
ByteBuffer.wrap(key),
762-
isChunked,
763-
reusedRawValue,
764-
binaryDecoder,
765-
RawBytesStoreDeserializerCache.getInstance(),
766-
compressor.get(),
767-
valueManifestContainer);
767+
768+
originalValue = databaseLookupWithConcurrencyLimit(
769+
() -> RawBytesChunkingAdapter.INSTANCE.getWithSchemaId(
770+
storageEngine,
771+
topicPartition.getPartitionNumber(),
772+
ByteBuffer.wrap(key),
773+
isChunked,
774+
reusedRawValue,
775+
binaryDecoder,
776+
RawBytesStoreDeserializerCache.getInstance(),
777+
compressor.get(),
778+
valueManifestContainer));
768779
hostLevelIngestionStats.recordIngestionValueBytesLookUpLatency(
769780
LatencyUtils.getElapsedTimeFromNSToMS(lookupStartTimeInNS),
770781
currentTimeForMetricsMs);

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements
197197

198198
private Lazy<ZKHelixAdmin> zkHelixAdmin;
199199

200+
private final ExecutorService aaWCIngestionStorageLookupThreadPool;
201+
200202
public KafkaStoreIngestionService(
201203
StorageService storageService,
202204
VeniceConfigLoader veniceConfigLoader,
@@ -456,6 +458,13 @@ public void handleStoreDeleted(Store store) {
456458
this.aaWCWorkLoadProcessingThreadPool = null;
457459
}
458460

461+
this.aaWCIngestionStorageLookupThreadPool = Executors.newFixedThreadPool(
462+
serverConfig.getAaWCIngestionStorageLookupThreadPoolSize(),
463+
new DaemonThreadFactory("AA_WC_INGESTION_STORAGE_LOOKUP"));
464+
LOGGER.info(
465+
"Enabled a thread pool for AA/WC ingestion lookup with {} threads.",
466+
serverConfig.getAaWCIngestionStorageLookupThreadPoolSize());
467+
459468
ingestionTaskFactory = StoreIngestionTaskFactory.builder()
460469
.setVeniceWriterFactory(veniceWriterFactory)
461470
.setStorageEngineRepository(storageService.getStorageEngineRepository())
@@ -482,6 +491,7 @@ public void handleStoreDeleted(Store store) {
482491
serverConfig.getIngestionMemoryLimit() > 0 ? () -> killConsumptionTaskForNonCurrentVersions() : null)
483492
.setHeartbeatMonitoringService(heartbeatMonitoringService)
484493
.setAAWCWorkLoadProcessingThreadPool(aaWCWorkLoadProcessingThreadPool)
494+
.setAAWCIngestionStorageLookupThreadPool(aaWCIngestionStorageLookupThreadPool)
485495
.build();
486496
}
487497

@@ -605,6 +615,7 @@ public void stopInner() {
605615
Utils.closeQuietlyWithErrorLogged(metaStoreWriter);
606616

607617
shutdownExecutorService(aaWCWorkLoadProcessingThreadPool, "aaWCWorkLoadProcessingThreadPool", true);
618+
shutdownExecutorService(aaWCIngestionStorageLookupThreadPool, "aaWCIngestionStorageLookupThreadPool", true);
608619

609620
kafkaMessageEnvelopeSchemaReader.ifPresent(Utils::closeQuietlyWithErrorLogged);
610621

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
import java.util.concurrent.CompletableFuture;
103103
import java.util.concurrent.CompletionException;
104104
import java.util.concurrent.ExecutionException;
105+
import java.util.concurrent.ExecutorService;
105106
import java.util.concurrent.Future;
106107
import java.util.concurrent.TimeUnit;
107108
import java.util.concurrent.TimeoutException;
@@ -208,6 +209,8 @@ public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask {
208209
private final Lazy<IngestionBatchProcessor> ingestionBatchProcessingLazy;
209210
private final Version version;
210211

212+
protected final ExecutorService aaWCIngestionStorageLookupThreadPool;
213+
211214
public LeaderFollowerStoreIngestionTask(
212215
StorageService storageService,
213216
StoreIngestionTaskFactory.Builder builder,
@@ -363,6 +366,7 @@ public LeaderFollowerStoreIngestionTask(
363366
builder.getVersionedStorageIngestionStats(),
364367
getHostLevelIngestionStats());
365368
});
369+
this.aaWCIngestionStorageLookupThreadPool = builder.getAaWCIngestionStorageLookupThreadPool();
366370
}
367371

368372
public static VeniceWriter<byte[], byte[], byte[]> constructVeniceWriter(
@@ -3536,18 +3540,19 @@ private GenericRecord readStoredValueRecord(
35363540
if (transientRecord == null) {
35373541
try {
35383542
long lookupStartTimeInNS = System.nanoTime();
3539-
currValue = GenericRecordChunkingAdapter.INSTANCE.get(
3540-
storageEngine,
3541-
topicPartition.getPartitionNumber(),
3542-
ByteBuffer.wrap(keyBytes),
3543-
isChunked,
3544-
null,
3545-
null,
3546-
NoOpReadResponseStats.SINGLETON,
3547-
readerValueSchemaID,
3548-
storeDeserializerCache,
3549-
compressor.get(),
3550-
manifestContainer);
3543+
currValue = databaseLookupWithConcurrencyLimit(
3544+
() -> GenericRecordChunkingAdapter.INSTANCE.get(
3545+
storageEngine,
3546+
topicPartition.getPartitionNumber(),
3547+
ByteBuffer.wrap(keyBytes),
3548+
isChunked,
3549+
null,
3550+
null,
3551+
NoOpReadResponseStats.SINGLETON,
3552+
readerValueSchemaID,
3553+
storeDeserializerCache,
3554+
compressor.get(),
3555+
manifestContainer));
35513556
hostLevelIngestionStats
35523557
.recordWriteComputeLookUpLatency(LatencyUtils.getElapsedTimeFromNSToMS(lookupStartTimeInNS));
35533558
} catch (Exception e) {
@@ -4026,4 +4031,16 @@ private void maybeQueueCMWritesToVersionTopic(
40264031
produceCall.run();
40274032
}
40284033
}
4034+
4035+
<T> T databaseLookupWithConcurrencyLimit(Supplier<T> supplier) {
4036+
if (serverConfig.isAAWCWorkloadParallelProcessingEnabled()) {
4037+
try {
4038+
return aaWCIngestionStorageLookupThreadPool.submit(() -> supplier.get()).get();
4039+
} catch (InterruptedException | ExecutionException e) {
4040+
throw new VeniceException(e);
4041+
}
4042+
} else {
4043+
return supplier.get();
4044+
}
4045+
}
40294046
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public static class Builder {
126126
private PubSubTopicRepository pubSubTopicRepository;
127127
private Runnable runnableForKillIngestionTasksForNonCurrentVersions;
128128
private ExecutorService aaWCWorkLoadProcessingThreadPool;
129+
private ExecutorService aaWCIngestionStorageLookupThreadPool;
129130

130131
private interface Setter {
131132
void apply();
@@ -333,6 +334,14 @@ public Builder setAAWCWorkLoadProcessingThreadPool(ExecutorService executorServi
333334
return set(() -> this.aaWCWorkLoadProcessingThreadPool = executorService);
334335
}
335336

337+
public Builder setAAWCIngestionStorageLookupThreadPool(ExecutorService executorService) {
338+
return set(() -> this.aaWCIngestionStorageLookupThreadPool = executorService);
339+
}
340+
341+
public ExecutorService getAaWCIngestionStorageLookupThreadPool() {
342+
return aaWCIngestionStorageLookupThreadPool;
343+
}
344+
336345
public ExecutorService getAAWCWorkLoadProcessingThreadPool() {
337346
return this.aaWCWorkLoadProcessingThreadPool;
338347
}

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -520,14 +520,14 @@ public void testReadingChunkedRmdFromStorage() {
520520
when(ingestionTask.getStorageEngine()).thenReturn(storageEngine);
521521
when(ingestionTask.getSchemaRepo()).thenReturn(schemaRepository);
522522
when(ingestionTask.getServerConfig()).thenReturn(serverConfig);
523-
when(ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(anyInt(), any(), any(), anyLong()))
524-
.thenCallRealMethod();
523+
when(ingestionTask.getRmdWithValueSchemaByteBufferFromStorageInternal(anyInt(), any(), any())).thenCallRealMethod();
525524
when(ingestionTask.isChunked()).thenReturn(true);
526525
when(ingestionTask.getHostLevelIngestionStats()).thenReturn(mock(HostLevelIngestionStats.class));
527526
ChunkedValueManifestContainer container = new ChunkedValueManifestContainer();
528527
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(topLevelKey1)))
529528
.thenReturn(expectedNonChunkedValue);
530-
byte[] result = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(partition, key1, container, 0L);
529+
byte[] result =
530+
ingestionTask.getRmdWithValueSchemaByteBufferFromStorageInternal(partition, key1, container).serialize();
531531
Assert.assertNotNull(result);
532532
Assert.assertNull(container.getManifest());
533533
Assert.assertEquals(result, expectedNonChunkedValue);
@@ -557,7 +557,8 @@ public void testReadingChunkedRmdFromStorage() {
557557
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(topLevelKey2)))
558558
.thenReturn(chunkedManifestBytes.array());
559559
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(chunkedKey1InKey2))).thenReturn(chunkedValue1);
560-
byte[] result2 = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(partition, key2, container, 0L);
560+
byte[] result2 =
561+
ingestionTask.getRmdWithValueSchemaByteBufferFromStorageInternal(partition, key2, container).serialize();
561562
Assert.assertNotNull(result2);
562563
Assert.assertNotNull(container.getManifest());
563564
Assert.assertEquals(container.getManifest().getKeysWithChunkIdSuffix().size(), 1);
@@ -593,7 +594,8 @@ public void testReadingChunkedRmdFromStorage() {
593594
.thenReturn(chunkedManifestBytes.array());
594595
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(chunkedKey1InKey3))).thenReturn(chunkedValue1);
595596
when(storageEngine.getReplicationMetadata(partition, ByteBuffer.wrap(chunkedKey2InKey3))).thenReturn(chunkedValue2);
596-
byte[] result3 = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(partition, key3, container, 0L);
597+
byte[] result3 =
598+
ingestionTask.getRmdWithValueSchemaByteBufferFromStorageInternal(partition, key3, container).serialize();
597599
Assert.assertNotNull(result3);
598600
Assert.assertNotNull(container.getManifest());
599601
Assert.assertEquals(container.getManifest().getKeysWithChunkIdSuffix().size(), 2);

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ private void setupMockConfig() {
131131
doReturn(Object2IntMaps.emptyMap()).when(mockVeniceServerConfig).getKafkaClusterUrlToIdMap();
132132
doReturn(KafkaConsumerServiceDelegator.ConsumerPoolStrategyType.DEFAULT).when(mockVeniceServerConfig)
133133
.getConsumerPoolStrategyType();
134+
doReturn(2).when(mockVeniceServerConfig).getAaWCIngestionStorageLookupThreadPoolSize();
134135

135136
// Consumer related configs for preparing kafka consumer service.
136137
doReturn(dummyKafkaUrl).when(mockVeniceServerConfig).getKafkaBootstrapServers();

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1127,7 +1127,9 @@ private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder(
11271127
.setPartitionStateSerializer(partitionStateSerializer)
11281128
.setRunnableForKillIngestionTasksForNonCurrentVersions(runnableForKillNonCurrentVersion)
11291129
.setAAWCWorkLoadProcessingThreadPool(
1130-
Executors.newFixedThreadPool(2, new DaemonThreadFactory("AA_WC_PARALLEL_PROCESSING")));
1130+
Executors.newFixedThreadPool(2, new DaemonThreadFactory("AA_WC_PARALLEL_PROCESSING")))
1131+
.setAAWCIngestionStorageLookupThreadPool(
1132+
Executors.newFixedThreadPool(1, new DaemonThreadFactory("AA_WC_INGESTION_STORAGE_LOOKUP")));
11311133
}
11321134

11331135
abstract KafkaConsumerService.ConsumerAssignmentStrategy getConsumerAssignmentStrategy();

internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2379,6 +2379,13 @@ private ConfigKeys() {
23792379
"server.aa.wc.workload.parallel.processing.thread.pool.size";
23802380
public static final String SERVER_GLOBAL_RT_DIV_ENABLED = "server.global.rt.div.enabled";
23812381

2382+
/**
2383+
* This config is used to control the RocksDB lookup concurrency when handling AA/WC workload with parallel processing enabled.
2384+
* Check {@link #SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_ENABLED} for more details.
2385+
*/
2386+
public static final String SERVER_AA_WC_INGESTION_STORAGE_LOOKUP_THREAD_POOL_SIZE =
2387+
"server.aa.wc.ingestion.storage.lookup.thread.pool.size";
2388+
23822389
/**
23832390
* Whether to enable producer throughput optimization for realtime workload or not.
23842391
* Two strategies:

0 commit comments

Comments
 (0)