Skip to content

Commit 3392d62

Browse files
authored
[router] Reduced allocations in router read path (#1367)
A lot of multi-key specific state was moved out of VenicePath and into its subclasses, which makes single get queries require less heap space. This includes streaming (chunkedResponse), HAR (helixGroupId and requestId), longTailRetryThresholdMs and responseHeaders. In addition, all kinds of VenicePaths are also getting their heap size reduced by leveraging shared instances where possible. For example: - The resourceName, storeName and versionNumber properties are replaced by a single reference to a shared instance of StoreVersionName, which is a new class obtained from a NameRepository. - The smartLongTailRetryEnabled, smartLongTailRetryAbortThresholdMs and longTailRetryThresholdMs properties are replaced by a single pointer to a RouterRetryConfig object, which is a simple facade wrapping the VeniceRouterConfig. - Removed the time property from VenicePath since the only time it is used by tests to inject a MockTime, it is done from a subclass, and therefore can be achieved via extension, rather than composition. - The responseDecompressor is now coming from a map of shared instances in the VenicePathParser. New config: - name.repository.max.entry.count : controls the maximum number of entries (per type) to be cached in the NameRepository class. For now this config is used only in the router, but it would likely become used in the server and controller as well, later on. Miscellaneous: - Various refactorings enable the VenicePath constructors to have fewer params and to make more of the properties final. - Made use of StoreName rather than String in the VenicePathParser's RetryManager maps. - Added a default IntSet getVersionNumbers function to the Store interface. This is a convenience function since oftentimes we only want the version numbers, and not the complete Version objects. - Cleaned up generics in the VenicePathParser and ScatterGatherHelper builder. - Deleted VeniceMetricsProvider which can be trivially inlined.
1 parent d300c52 commit 3392d62

File tree

34 files changed

+797
-681
lines changed

34 files changed

+797
-681
lines changed

internal/alpini/router/alpini-router-api/src/main/java/com/linkedin/alpini/router/api/ScatterGatherHelper.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class ScatterGatherHelper<H, P extends ResourcePath<K>, K, R, BASIC_HTTP_
4343
private final @Nullable ResponseAggregatorFactory<BASIC_HTTP_REQUEST, HTTP_RESPONSE> _responseAggregatorFactory;
4444
private final @Nonnull Function<Headers, Long> _requestTimeout;
4545
private final @Nonnull LongTailRetrySupplier<P, K> _longTailRetrySupplier;
46-
private final @Nonnull Function<BasicRequest, Metrics> _metricsProvider;
46+
private final @Nonnull Function<BASIC_HTTP_REQUEST, Metrics> _metricsProvider;
4747
private final @Nonnull BiFunction<Headers, Metrics, Headers> _metricsDecorator;
4848
private final @Nonnull Function<Headers, Metrics> _responseMetrics;
4949
private final @Nonnull Function<P, ScatterGatherStats> _scatterGatherStatsProvider;
@@ -73,7 +73,7 @@ protected ScatterGatherHelper(
7373
@Nonnull Optional<ResponseAggregatorFactory<BASIC_HTTP_REQUEST, HTTP_RESPONSE>> responseAggregatorFactory,
7474
@Nonnull Function<Headers, Long> requestTimeout,
7575
@Nonnull LongTailRetrySupplier<P, K> longTailRetrySupplier,
76-
@Nonnull Function<BasicRequest, Metrics> metricsProvider,
76+
@Nonnull Function<BASIC_HTTP_REQUEST, Metrics> metricsProvider,
7777
@Nonnull BiFunction<Headers, Metrics, Headers> metricsDecorator,
7878
@Nonnull Function<Headers, Metrics> responseMetrics,
7979
@Nonnull Function<P, ScatterGatherStats> scatterGatherStatsProvider,
@@ -306,7 +306,7 @@ public void decorateResponse(@Nonnull Headers responseHeaders, @Nonnull Headers
306306
}
307307
}
308308

309-
public Metrics initializeMetrics(@Nonnull BasicRequest request) {
309+
public Metrics initializeMetrics(@Nonnull BASIC_HTTP_REQUEST request) {
310310
return _metricsProvider.apply(request);
311311
}
312312

@@ -318,7 +318,7 @@ public Metrics responseMetrics(@Nonnull Headers headers) {
318318
return _scatterGatherStatsProvider.apply(path);
319319
}
320320

321-
public static Builder<?, ?, ?, ?, ?, ?, ?> builder() {
321+
public static <H, P extends ResourcePath<K>, K, R, HTTP_REQUEST extends BasicRequest, HTTP_RESPONSE, HTTP_RESPONSE_STATUS> Builder<H, P, K, R, HTTP_REQUEST, HTTP_RESPONSE, HTTP_RESPONSE_STATUS> builder() {
322322
return new Builder<>();
323323
}
324324

@@ -354,7 +354,7 @@ public static class Builder<H, P extends ResourcePath<K>, K, R, HTTP_REQUEST ext
354354
Optional.empty();
355355
private Function<Headers, Long> _requestTimeout = headers -> null;
356356
private LongTailRetrySupplier<P, K> _longTailRetrySupplier = (resourceName, methodName) -> AsyncFuture.cancelled();
357-
private Function<BasicRequest, Metrics> _metricsProvider = http -> null;
357+
private Function<HTTP_REQUEST, Metrics> _metricsProvider = http -> null;
358358
private Function<Headers, Metrics> _responseMetrics = headers -> null;
359359
private BiFunction<Headers, Metrics, Headers> _metricsDecorator = (headers, metrics) -> Headers.EMPTY_HEADERS;
360360
private Function<P, ScatterGatherStats> _scatterGatherStatsProvider = path -> new ScatterGatherStats();
@@ -486,7 +486,7 @@ public Builder<H, P, K, R, HTTP_REQUEST, HTTP_RESPONSE, HTTP_RESPONSE_STATUS> lo
486486
}
487487

488488
public Builder<H, P, K, R, HTTP_REQUEST, HTTP_RESPONSE, HTTP_RESPONSE_STATUS> metricsProvider(
489-
@Nonnull Function<BasicRequest, Metrics> metricsProvider) {
489+
@Nonnull Function<HTTP_REQUEST, Metrics> metricsProvider) {
490490
_metricsProvider = Objects.requireNonNull(metricsProvider, "metricsProvider");
491491
return this;
492492
}

internal/venice-client-common/src/main/java/com/linkedin/venice/HttpConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public class HttpConstants {
4040
public static final String VENICE_ALLOW_REDIRECT = "X-VENICE-ALLOW-REDIRECT";
4141

4242
public static final String VENICE_CLIENT_COMPUTE = "X-VENICE-CLIENT-COMPUTE";
43+
public static final String VENICE_CLIENT_COMPUTE_TRUE = "1";
44+
public static final String VENICE_CLIENT_COMPUTE_FALSE = "0";
4345

4446
public static final int SC_MISDIRECTED_REQUEST = 421;
4547

internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2381,4 +2381,11 @@ private ConfigKeys() {
23812381
public static final String CONTROLLER_ENABLE_HYBRID_STORE_PARTITION_COUNT_UPDATE =
23822382
"controller.enable.hybrid.store.partition.count.update";
23832383
public static final String PUSH_JOB_VIEW_CONFIGS = "push.job.view.configs";
2384+
2385+
/**
2386+
* The maximum number of entries (per type) to be cached in the {@link com.linkedin.venice.meta.NameRepository}.
2387+
*
2388+
* Default: {@value com.linkedin.venice.meta.NameRepository#DEFAULT_MAXIMUM_ENTRY_COUNT}
2389+
*/
2390+
public static final String NAME_REPOSITORY_MAX_ENTRY_COUNT = "name.repository.max.entry.count";
23842391
}

internal/venice-common/src/main/java/com/linkedin/venice/meta/NameRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* A repository of {@link StoreName} and {@link StoreVersionName}, which are intended to be used as shared instances.
99
*/
1010
public class NameRepository {
11-
private static final int DEFAULT_MAXIMUM_ENTRY_COUNT = 2000;
11+
public static final int DEFAULT_MAXIMUM_ENTRY_COUNT = 2000;
1212
private final LoadingCache<String, StoreName> storeNameCache;
1313
private final LoadingCache<String, StoreVersionName> storeVersionNameCache;
1414

internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package com.linkedin.venice.meta;
22

3+
import com.fasterxml.jackson.annotation.JsonIgnore;
34
import com.linkedin.venice.common.VeniceSystemStoreType;
45
import com.linkedin.venice.compression.CompressionStrategy;
56
import com.linkedin.venice.exceptions.StoreVersionNotFoundException;
7+
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
8+
import it.unimi.dsi.fastutil.ints.IntSet;
69
import java.util.List;
710
import java.util.Map;
811
import java.util.concurrent.TimeUnit;
@@ -243,6 +246,16 @@ static boolean isSystemStore(String storeName) {
243246

244247
List<Version> getVersions();
245248

249+
@JsonIgnore
250+
default IntSet getVersionNumbers() {
251+
List<Version> versions = getVersions();
252+
IntSet versionNumbers = new IntOpenHashSet(versions.size());
253+
for (Version version: versions) {
254+
versionNumbers.add(version.getNumber());
255+
}
256+
return versionNumbers;
257+
}
258+
246259
void setVersions(List<Version> versions);
247260

248261
void addVersion(Version version);

internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.linkedin.venice.meta;
22

33
import static org.testng.Assert.assertEquals;
4+
import static org.testng.Assert.assertFalse;
45
import static org.testng.Assert.assertNotNull;
56
import static org.testng.Assert.assertNull;
67
import static org.testng.Assert.assertThrows;
8+
import static org.testng.Assert.assertTrue;
79

810
import com.linkedin.venice.exceptions.StoreDisabledException;
911
import com.linkedin.venice.exceptions.StoreVersionNotFoundException;
@@ -14,8 +16,6 @@
1416
import java.util.List;
1517
import java.util.Set;
1618
import java.util.stream.Collectors;
17-
import org.apache.logging.log4j.LogManager;
18-
import org.apache.logging.log4j.Logger;
1919
import org.testng.Assert;
2020
import org.testng.annotations.Test;
2121

@@ -24,8 +24,6 @@
2424
* Test cases for Venice Store.
2525
*/
2626
public class TestZKStore {
27-
private static final Logger LOGGER = LogManager.getLogger(TestZKStore.class);
28-
2927
@Test
3028
public void testVersionsAreAddedInOrdered() {
3129
Store s = TestUtils.createTestStore("s1", "owner", System.currentTimeMillis());
@@ -313,44 +311,50 @@ public void testUseTheDeletedVersionNumber() {
313311
public void testAddVersion() {
314312
String storeName = Utils.getUniqueString("store");
315313
Store store = TestUtils.createTestStore(storeName, "owner", System.currentTimeMillis());
316-
assertNull(store.getVersion(2));
317-
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(2));
318-
assertNull(store.getVersion(5));
319-
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(5));
320-
assertNull(store.getVersion(6));
321-
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(6));
322-
assertEquals(store.getVersions().size(), 0);
314+
assertMissingVersion(store, 2, 5, 6);
315+
assertVersionCount(store, 0);
316+
323317
store.addVersion(new VersionImpl(storeName, 5));
324-
assertNull(store.getVersion(2));
325-
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(2));
326-
assertNotNull(store.getVersion(5));
327-
assertEquals(store.getVersionOrThrow(5).getNumber(), 5);
328-
assertNull(store.getVersion(6));
329-
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(6));
330-
assertEquals(store.getVersions().size(), 1);
318+
assertMissingVersion(store, 2, 6);
319+
assertPresentVersion(store, 5);
320+
assertVersionCount(store, 1);
331321
// largest used version is 5
332322
assertEquals(store.peekNextVersion().getNumber(), 6);
323+
333324
store.addVersion(new VersionImpl(storeName, 2));
334-
assertNotNull(store.getVersion(2));
335-
assertEquals(store.getVersionOrThrow(2).getNumber(), 2);
336-
assertNotNull(store.getVersion(5));
337-
assertEquals(store.getVersionOrThrow(5).getNumber(), 5);
338-
assertNull(store.getVersion(6));
339-
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(6));
340-
assertEquals(store.getVersions().size(), 2);
325+
assertMissingVersion(store, 6);
326+
assertPresentVersion(store, 2, 5);
327+
assertVersionCount(store, 2);
341328
// largest used version is still 5
342329
Assert.assertEquals(store.peekNextVersion().getNumber(), 6);
330+
343331
Version version = new VersionImpl(store.getName(), store.getLargestUsedVersionNumber() + 1, "pushJobId");
344332
Assert.assertEquals(version.getNumber(), 6);
345333
store.addVersion(version);
346334
Assert.assertEquals(store.peekNextVersion().getNumber(), 7);
347-
assertNotNull(store.getVersion(2));
348-
assertEquals(store.getVersionOrThrow(2).getNumber(), 2);
349-
assertNotNull(store.getVersion(5));
350-
assertEquals(store.getVersionOrThrow(5).getNumber(), 5);
351-
assertNotNull(store.getVersion(6));
352-
assertEquals(store.getVersionOrThrow(6).getNumber(), 6);
353-
assertEquals(store.getVersions().size(), 3);
335+
assertPresentVersion(store, 2, 5, 6);
336+
assertVersionCount(store, 3);
337+
}
338+
339+
private void assertMissingVersion(Store store, int... versionNumbers) {
340+
for (int versionNumber: versionNumbers) {
341+
assertNull(store.getVersion(versionNumber));
342+
assertThrows(StoreVersionNotFoundException.class, () -> store.getVersionOrThrow(versionNumber));
343+
assertFalse(store.getVersionNumbers().contains(versionNumber));
344+
}
345+
}
346+
347+
private void assertPresentVersion(Store store, int... versionNumbers) {
348+
for (int versionNumber: versionNumbers) {
349+
assertNotNull(store.getVersion(versionNumber));
350+
assertEquals(store.getVersionOrThrow(versionNumber).getNumber(), versionNumber);
351+
assertTrue(store.getVersionNumbers().contains(versionNumber));
352+
}
353+
}
354+
355+
private void assertVersionCount(Store store, int versionCount) {
356+
assertEquals(store.getVersions().size(), versionCount);
357+
assertEquals(store.getVersionNumbers().size(), versionCount);
354358
}
355359

356360
@Test

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/api/TestVeniceDispatcher.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static com.linkedin.venice.HttpConstants.VENICE_COMPRESSION_STRATEGY;
44
import static com.linkedin.venice.HttpConstants.VENICE_REQUEST_RCU;
55
import static com.linkedin.venice.HttpConstants.VENICE_SUPPORTED_COMPRESSION_STRATEGY;
6+
import static com.linkedin.venice.utils.TestUtils.getVenicePathParser;
67
import static org.mockito.Mockito.any;
78
import static org.mockito.Mockito.anyString;
89
import static org.mockito.Mockito.doAnswer;
@@ -26,6 +27,7 @@
2627
import com.linkedin.venice.integration.utils.ServiceFactory;
2728
import com.linkedin.venice.meta.Instance;
2829
import com.linkedin.venice.meta.LiveInstanceMonitor;
30+
import com.linkedin.venice.meta.NameRepository;
2931
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
3032
import com.linkedin.venice.read.RequestType;
3133
import com.linkedin.venice.router.VeniceRouterConfig;
@@ -63,6 +65,8 @@
6365

6466
//TODO: refactor Dispatcher to take a HttpClient Factory, so we don't need to spin up an HTTP server for these tests
6567
public class TestVeniceDispatcher {
68+
private final NameRepository nameRepository = new NameRepository();
69+
6670
@Test
6771
public void testErrorRetry() {
6872
VeniceDispatcher dispatcher = getMockDispatcher(false, false);
@@ -420,9 +424,11 @@ private void triggerResponse(
420424
return modifyingCompressor;
421425
})).when(compressorFactory).getCompressor(any());
422426

423-
doReturn(new VeniceResponseDecompressor(true, routerStats, mockRequest, "test_store", 1, compressorFactory))
424-
.when(mockPath)
425-
.getResponseDecompressor();
427+
VenicePathParser pathParser = getVenicePathParser(compressorFactory, true);
428+
429+
VeniceResponseDecompressor decompressor =
430+
pathParser.getDecompressor(this.nameRepository.getStoreVersionName("test_store", 1), mockRequest);
431+
doReturn(decompressor).when(mockPath).getResponseDecompressor();
426432

427433
AsyncPromise mockHostSelected = mock(AsyncPromise.class);
428434
AsyncPromise mockTimeoutFuture = mock(AsyncPromise.class);

internal/venice-test-common/src/main/java/com/linkedin/venice/utils/TestUtils.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
import static com.linkedin.venice.ConfigKeys.SERVER_FORKED_PROCESS_JVM_ARGUMENT_LIST;
66
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_MODE;
77
import static com.linkedin.venice.utils.Utils.getUniqueString;
8+
import static org.mockito.ArgumentMatchers.any;
89
import static org.mockito.ArgumentMatchers.anyString;
910
import static org.mockito.Mockito.anyInt;
1011
import static org.mockito.Mockito.doReturn;
1112
import static org.mockito.Mockito.eq;
1213
import static org.mockito.Mockito.mock;
14+
import static org.mockito.Mockito.when;
1315
import static org.testng.Assert.assertEquals;
1416
import static org.testng.Assert.assertTrue;
1517

@@ -26,6 +28,7 @@
2628
import com.linkedin.davinci.store.AbstractStorageEngine;
2729
import com.linkedin.venice.ConfigKeys;
2830
import com.linkedin.venice.compression.CompressionStrategy;
31+
import com.linkedin.venice.compression.CompressorFactory;
2932
import com.linkedin.venice.compression.GzipCompressor;
3033
import com.linkedin.venice.compression.NoopCompressor;
3134
import com.linkedin.venice.compression.VeniceCompressor;
@@ -53,6 +56,7 @@
5356
import com.linkedin.venice.meta.HybridStoreConfigImpl;
5457
import com.linkedin.venice.meta.IngestionMode;
5558
import com.linkedin.venice.meta.Instance;
59+
import com.linkedin.venice.meta.NameRepository;
5660
import com.linkedin.venice.meta.OfflinePushStrategy;
5761
import com.linkedin.venice.meta.PartitionerConfig;
5862
import com.linkedin.venice.meta.PartitionerConfigImpl;
@@ -76,6 +80,12 @@
7680
import com.linkedin.venice.pubsub.api.PubSubTopicType;
7781
import com.linkedin.venice.pubsub.manager.TopicManagerRepository;
7882
import com.linkedin.venice.pushmonitor.ExecutionStatus;
83+
import com.linkedin.venice.router.VeniceRouterConfig;
84+
import com.linkedin.venice.router.api.VenicePartitionFinder;
85+
import com.linkedin.venice.router.api.VenicePathParser;
86+
import com.linkedin.venice.router.api.VeniceVersionFinder;
87+
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
88+
import com.linkedin.venice.router.stats.RouterStats;
7989
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
8090
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
8191
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
@@ -84,6 +94,7 @@
8494
import com.linkedin.venice.writer.VeniceWriter;
8595
import com.linkedin.venice.writer.VeniceWriterFactory;
8696
import com.linkedin.venice.writer.VeniceWriterOptions;
97+
import io.tehuti.metrics.MetricsRepository;
8798
import java.io.File;
8899
import java.io.IOException;
89100
import java.net.ServerSocket;
@@ -106,6 +117,7 @@
106117
import java.util.concurrent.ExecutionException;
107118
import java.util.concurrent.ExecutorService;
108119
import java.util.concurrent.Future;
120+
import java.util.concurrent.ScheduledExecutorService;
109121
import java.util.concurrent.ThreadLocalRandom;
110122
import java.util.concurrent.TimeUnit;
111123
import java.util.function.BooleanSupplier;
@@ -989,4 +1001,27 @@ public static String loadFileAsString(String fileName) {
9891001
return null;
9901002
}
9911003
}
1004+
1005+
public static VenicePathParser getVenicePathParser(CompressorFactory compressorFactory, boolean decompressOnClient) {
1006+
RouterStats stats = mock(RouterStats.class);
1007+
when(stats.getStatsByType(any())).thenReturn(mock(AggRouterHttpRequestStats.class));
1008+
ReadOnlyStoreRepository readOnlyStoreRepository = mock(ReadOnlyStoreRepository.class);
1009+
Store store = mock(Store.class);
1010+
when(store.getClientDecompressionEnabled()).thenReturn(decompressOnClient);
1011+
when(readOnlyStoreRepository.getStoreOrThrow(anyString())).thenReturn(store);
1012+
1013+
VeniceRouterConfig routerConfig = mock(VeniceRouterConfig.class);
1014+
when(routerConfig.isDecompressOnClient()).thenReturn(decompressOnClient);
1015+
1016+
return new VenicePathParser(
1017+
mock(VeniceVersionFinder.class),
1018+
mock(VenicePartitionFinder.class),
1019+
stats,
1020+
readOnlyStoreRepository,
1021+
routerConfig,
1022+
compressorFactory,
1023+
mock(MetricsRepository.class),
1024+
mock(ScheduledExecutorService.class),
1025+
new NameRepository());
1026+
}
9921027
}

services/venice-router/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ dependencies {
4646
exclude group: 'org.mockito', module: 'mockito-all' // this will introduce another different mockito-all version
4747
}
4848

49+
implementation libraries.fastUtil
50+
4951
implementation('org.apache.helix:helix-core:1.4.1:jdk8') {
5052
exclude group: 'org.apache.helix'
5153
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.linkedin.venice.router;
2+
3+
import java.util.TreeMap;
4+
5+
6+
/**
7+
* A facade for the {@link VeniceRouterConfig}, so that retry-related configs can be passed around without giving access
8+
* to the rest of the configs.
9+
*/
10+
public interface RouterRetryConfig {
11+
TreeMap<Integer, Integer> getLongTailRetryForBatchGetThresholdMs();
12+
13+
int getLongTailRetryForSingleGetThresholdMs();
14+
15+
int getLongTailRetryMaxRouteForMultiKeyReq();
16+
17+
int getSmartLongTailRetryAbortThresholdMs();
18+
19+
boolean isSmartLongTailRetryEnabled();
20+
}

0 commit comments

Comments
 (0)