Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ protected void streamingBatchGet(
BatchGetRequestContext<K, V> requestContext,
Set<K> keys,
StreamingCallback<K, V> callback) {
if (keys.size() == 1) {
streamingBatchGetForSingleKey(keys.iterator().next(), callback);
return;
}

multiKeyStreamingRequest(
requestContext,
RequestType.MULTI_GET_STREAMING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import com.linkedin.venice.client.store.streaming.VeniceResponseMap;
import com.linkedin.venice.client.store.streaming.VeniceResponseMapImpl;
import com.linkedin.venice.compute.ComputeRequestWrapper;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -39,6 +41,37 @@ public CompletableFuture<V> get(K key) throws VeniceClientException {

protected abstract CompletableFuture<V> get(GetRequestContext<K> requestContext, K key) throws VeniceClientException;

/**
* Serves a single-key batchGet/streamingBatchGet by issuing a single-GET request instead of a multi-get
* streaming request.
*
* NOTE: this changes the request type seen by the server from a (streaming) multi-get to a single-get. That
* shifts server-side routing, read-quota accounting (single-get and batch-get quotas are tracked/throttled
* separately) and per-request-type server metrics for these 1-key requests. It also means the request follows
* the single-get long-tail retry configuration rather than the batch-get one (see
* {@link RetriableAvroGenericStoreClient#get}); a store that enabled only batch-get long-tail retry will get no
* long-tail retry on 1-key batch gets.
*/
protected final void streamingBatchGetForSingleKey(K key, StreamingCallback<K, V> callback) {
getClientConfig().getStats(RequestType.MULTI_GET_STREAMING).recordBatchGetRoutedToSingleGetRequest();
get(key).whenComplete((value, throwable) -> {
if (throwable != null) {
callback.onCompletion(Optional.of(toException(throwable)));
return;
}
callback.onRecordReceived(key, value);
callback.onCompletion(Optional.empty());
});
}

private static Exception toException(Throwable throwable) {
Throwable unwrappedThrowable =
throwable instanceof CompletionException && throwable.getCause() != null ? throwable.getCause() : throwable;
return unwrappedThrowable instanceof Exception
? (Exception) unwrappedThrowable
: new VeniceClientException(unwrappedThrowable);
}

@Override
public final CompletableFuture<Map<K, V>> batchGet(Set<K> keys) throws VeniceClientException {
// Since user has invoked batchGet directly, then we do not want to allow partial success
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ protected void streamingBatchGet(
BatchGetRequestContext<K, V> requestContext,
Set<K> keys,
StreamingCallback<K, V> callback) throws VeniceClientException {
if (keys.size() == 1) {
// Single-key batchGet is served via single-GET, so it follows the single-get long-tail retry config (this
// class's #get) rather than the batch-get retry threshold below. A store with only batch-get long-tail retry
// enabled gets no long-tail retry on 1-key batch gets. See #streamingBatchGetForSingleKey for details.
streamingBatchGetForSingleKey(keys.iterator().next(), callback);
return;
}

int longTailRetryThresholdForBatchGetInMicroSeconds =
getLongTailRetryThresholdForBatchGetInMicroSeconds(keys.size());
retryStreamingMultiKeyRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ public enum FastClientMetricEntity implements ModuleMetricEntityInterface {
setOf(VENICE_STORE_NAME, VENICE_CLUSTER_NAME, VENICE_REQUEST_METHOD)
),

/**
* Count of batch-get/streaming-batch-get requests that contained a single key and were therefore
* served via a single-GET lookup.
*/
BATCH_GET_SINGLE_KEY_REROUTE_COUNT(
"request.batch_get_routed_to_single_get_count", MetricType.COUNTER, MetricUnit.NUMBER,
"Count of single-key batch-get requests served via a single-GET lookup",
setOf(VENICE_STORE_NAME, VENICE_CLUSTER_NAME, VENICE_REQUEST_METHOD)
),

/**
* Metadata staleness watermark reported asynchronously in milliseconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class FastClientStats extends ClientStats {

private final Sensor leakedRequestCountSensor;
private volatile MetricEntityStateOneEnum<RejectionReason> rejectionRatio;
/** Counts batch-get requests that contained a single key and were therefore served via a single-GET lookup. */
private volatile MetricEntityStateBase batchGetRoutedToSingleGetRequestCount;

// OTel metrics
private volatile MetricEntityStateOneEnum<RequestRetryType> longTailRetry;
Expand Down Expand Up @@ -160,6 +162,15 @@ private void buildFastClientOtelStats() {
baseDimensionsMap,
getBaseAttributes());

this.batchGetRoutedToSingleGetRequestCount = MetricEntityStateBase.create(
FastClientMetricEntity.BATCH_GET_SINGLE_KEY_REROUTE_COUNT.getMetricEntity(),
otelRepository,
this::registerSensor,
FastClientTehutiMetricName.BATCH_GET_ROUTED_TO_SINGLE_GET_REQUEST_COUNT,
Collections.singletonList(new OccurrenceRate()),
baseDimensionsMap,
getBaseAttributes());

// OTel: fanout_size (MIN_MAX_COUNT_SUM_AGGREGATIONS) with dimensions: venice.store.name, venice.request.method,
// venice.request.fanout_type
this.retryFanoutSize = MetricEntityStateOneEnum.create(
Expand Down Expand Up @@ -270,6 +281,14 @@ public void recordRejectionRatio(double rejectionRatio) {
this.rejectionRatio.record(rejectionRatio, RejectionReason.THROTTLED_BY_LOAD_CONTROLLER);
}

/**
* Records a batch-get/streaming-batch-get request that contained a single key and was therefore short-circuited
* to a single-GET lookup (see {@code InternalAvroStoreClient#streamingBatchGetForSingleKey}).
*/
public void recordBatchGetRoutedToSingleGetRequest() {
batchGetRoutedToSingleGetRequestCount.record(1);
}

/**
* This method is a utility method to build concise summaries useful in tests
* and for logging. It generates a single string for all metrics for a sensor
Expand Down Expand Up @@ -308,6 +327,7 @@ public List<Double> getMetricValues(String sensorName, String... stats) {
*/
public enum FastClientTehutiMetricName implements TehutiMetricNameEnum {
LONG_TAIL_RETRY_REQUEST, ERROR_RETRY_REQUEST, RETRY_REQUEST_WIN, METADATA_STALENESS_HIGH_WATERMARK_MS, FANOUT_SIZE,
RETRY_FANOUT_SIZE, NO_AVAILABLE_REPLICA_REQUEST_COUNT, REJECTED_REQUEST_COUNT_BY_LOAD_CONTROLLER, REJECTION_RATIO
RETRY_FANOUT_SIZE, NO_AVAILABLE_REPLICA_REQUEST_COUNT, REJECTED_REQUEST_COUNT_BY_LOAD_CONTROLLER, REJECTION_RATIO,
BATCH_GET_ROUTED_TO_SINGLE_GET_REQUEST_COUNT
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,35 @@ public void testSimpleStreamingBatchGet() throws InterruptedException, Execution
validateMetrics(client, 1000, 1000, 0, 0);
}

@Test(timeOut = TEST_TIMEOUT)
public void testSingleKeyStreamingBatchGetUsesSingleGet()
throws InterruptedException, ExecutionException, TimeoutException {
TestClientSimulator client = new TestClientSimulator();
client.generateKeyValues(0, 1)
.setLongTailRangeBasedRetryThresholdForBatchGetInMilliSeconds("1-:10000")
.partitionKeys(1)
.assignRouteToPartitions("https://host1.linkedin.com", 0)
.expectSingleGetRequestWithKeyForPartitionOnRoute(1, 1, "https://host1.linkedin.com", 0)
.respondToRequestWithKeyValues(5, 1)
.simulate();

callStreamingBatchGetAndVerifyResults(
client.getFastClient(),
client.getRequestedKeyValues(),
client.getSimulatorCompleteFuture());

validateMetrics(client, 1, 1, 0, 0);

// The single-key batch-get should be counted as routed to a single-GET lookup.
Map<String, ? extends Metric> metrics = getStats(client.getClientConfig());
assertTrue(
metrics
.get(
"." + client.UNIT_TEST_STORE_NAME
+ "--multiget_streaming_batch_get_routed_to_single_get_request_count.OccurrenceRate")
.value() > 0);
}

/**
* Error case: Timeout due to response taking a longer time than the simulator's allowed timeout.
*/
Expand All @@ -79,8 +108,10 @@ public void testSimpleStreamingBatchGettingTimeout()
throws InterruptedException, ExecutionException, TimeoutException {

TestClientSimulator client = new TestClientSimulator();
client.generateKeyValues(0, 2)
.partitionKeys(2)
// Use >=5 keys on a single partition so this stays a multi-get request (a 1-key batch-get is served via
// single-GET) while still exercising the batch-get timeout path on a single route.
client.generateKeyValues(0, 5)
.partitionKeys(1)
.assignRouteToPartitions("https://host1.linkedin.com", 0)
.expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host1.linkedin.com", 0)
.respondToRequestWithKeyValues((int) CLIENT_TIME_OUT_IN_SECONDS * 2 * 1000, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.linkedin.venice.fastclient.transport.TransportClientResponseForRoute;
import com.linkedin.venice.fastclient.utils.ClientTestUtils;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.read.protocol.response.MultiGetResponseRecordV1;
import com.linkedin.venice.router.exception.VeniceKeyCountLimitException;
Expand Down Expand Up @@ -90,6 +91,14 @@ public class DispatchingAvroGenericStoreClientTest {
private static final Set<String> BATCH_GET_KEYS = new HashSet<>();
private static final Set<String> BATCH_GET_PARTIAL_KEYS_1 = new HashSet<>();
private static final Set<String> BATCH_GET_PARTIAL_KEYS_2 = new HashSet<>();
/**
* Multi-key (>=5 keys) batch-get requests whose keys all route to the same single replica as
* {@code test_key_1} / {@code test_key_2} respectively. Used by the route-blocking tests so that a single-route
* batch-get stays on the multi-get path (a 1-key batch-get is now served via single-GET) while still blocking
* exactly one replica.
*/
private static final Set<String> BATCH_GET_BLOCK_KEYS_REPLICA1 = new HashSet<>();
private static final Set<String> BATCH_GET_BLOCK_KEYS_REPLICA2 = new HashSet<>();
private static final Map<String, GenericRecord> BATCH_GET_VALUE_RESPONSE = new HashMap<>();
private static final RecordSerializer VALUE_SERIALIZER =
FastSerializerDeserializerFactory.getFastAvroGenericSerializer(STORE_VALUE_SCHEMA);
Expand Down Expand Up @@ -119,6 +128,8 @@ public void setUp() {
BATCH_GET_KEYS.add("test_key_2");
BATCH_GET_PARTIAL_KEYS_1.add("test_key_1");
BATCH_GET_PARTIAL_KEYS_2.add("test_key_2");
BATCH_GET_BLOCK_KEYS_REPLICA1.addAll(generateKeysForSamePartition("test_key_1", 5));
BATCH_GET_BLOCK_KEYS_REPLICA2.addAll(generateKeysForSamePartition("test_key_2", 5));
GenericRecord value1 = (GenericRecord) rrg.randomGeneric(STORE_VALUE_SCHEMA);
GenericRecord value2 = (GenericRecord) rrg.randomGeneric(STORE_VALUE_SCHEMA);
BATCH_GET_VALUE_RESPONSE.put("test_key_1", value1);
Expand All @@ -139,6 +150,28 @@ public void setUp() {
COMPUTE_REQUEST_VALUE_RESPONSE.put("test_key_2", projectionResultForKey2);
}

/**
* Generates {@code count} keys that all route to the same partition (hence the same single replica, since the
* test metadata has one replica per partition) as {@code anchorKey}. This lets the route-blocking tests issue a
* multi-key batch-get that fans out to exactly one replica, since a 1-key batch-get is now served via single-GET.
* Mirrors the client's partitioning: FastAvro key serialization + {@link DefaultVenicePartitioner} over 2 partitions.
*/
private static Set<String> generateKeysForSamePartition(String anchorKey, int count) {
RecordSerializer<Object> keySerializer =
FastSerializerDeserializerFactory.getFastAvroGenericSerializer(AvroCompatibilityHelper.parse(KEY_SCHEMA));
DefaultVenicePartitioner partitioner = new DefaultVenicePartitioner();
int numPartitions = 2;
int targetPartition = partitioner.getPartitionId(keySerializer.serialize(anchorKey), numPartitions);
Set<String> keys = new HashSet<>();
for (int candidate = 0; keys.size() < count; candidate++) {
String key = anchorKey + "_block_" + candidate;
if (partitioner.getPartitionId(keySerializer.serialize(key), numPartitions) == targetPartition) {
keys.add(key);
}
}
return keys;
}

private void setUpClient() throws InterruptedException {
setUpClient(false, false, false);
}
Expand Down Expand Up @@ -382,7 +415,8 @@ private void validateMultiGetMetrics(
RequestType requestType,
boolean noAvailableReplicas,
int numKeys,
double numBlockedReplicas) {
double numBlockedReplicas,
double expectedRequestKeyCountMax) {
validateMetrics(
null,
batchGetRequestContext,
Expand All @@ -391,7 +425,8 @@ private void validateMultiGetMetrics(
requestType,
noAvailableReplicas,
numKeys,
numBlockedReplicas);
numBlockedReplicas,
expectedRequestKeyCountMax);
}

private void validateComputeRequestMetrics(
Expand Down Expand Up @@ -453,13 +488,41 @@ private void validateMetrics(
boolean noAvailableReplicas,
int numKeys,
double numBlockedReplicas) {
// By default the largest request seen by this request type is the current request's key count.
validateMetrics(
getRequestContext,
batchGetRequestContext,
healthyRequest,
partialHealthyRequest,
requestType,
noAvailableReplicas,
numKeys,
numBlockedReplicas,
numKeys);
}

/**
* @param expectedRequestKeyCountMax the expected {@code request_key_count.Max} value. This is a cumulative max
* across all requests of this type in the test, so it can differ from {@code numKeys} when an earlier
* request (e.g. a route-blocking batch-get) had more keys than the request currently being validated.
*/
private void validateMetrics(
GetRequestContext getRequestContext,
BatchGetRequestContext batchGetRequestContext,
boolean healthyRequest,
boolean partialHealthyRequest,
RequestType requestType,
boolean noAvailableReplicas,
int numKeys,
double numBlockedReplicas,
double expectedRequestKeyCountMax) {
String metricPrefix = ClientTestUtils.getMetricPrefix(STORE_NAME, requestType);
boolean batchGet = requestType == RequestType.MULTI_GET || requestType == RequestType.MULTI_GET_STREAMING;
boolean computeRequest = requestType == RequestType.COMPUTE || requestType == RequestType.COMPUTE_STREAMING;

String routeMetricsPrefix = "." + STORE_NAME;
double successKeyCount;
double requestKeyCount = numKeys;
double requestKeyCount = expectedRequestKeyCountMax;
if (partialHealthyRequest) {
// batchGet and partialHealthyRequest: 1 request is unsuccessful
successKeyCount = numKeys - 1;
Expand Down Expand Up @@ -966,8 +1029,8 @@ public void testBatchGetWithTimeoutV4() throws IOException {
try {
setUpClient(false, false, true, true, routingLeakedRequestCleanupThresholdMS);
BatchGetRequestContext batchGetRequestContext =
new BatchGetRequestContext<>(BATCH_GET_PARTIAL_KEYS_2.size(), false);
statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_PARTIAL_KEYS_2).get();
new BatchGetRequestContext<>(BATCH_GET_BLOCK_KEYS_REPLICA2.size(), false);
statsAvroGenericStoreClient.batchGet(batchGetRequestContext, BATCH_GET_BLOCK_KEYS_REPLICA2).get();
fail();
} catch (Exception e) {
assertTrue(e.getMessage().endsWith("At least one route did not complete"), e.getMessage());
Expand All @@ -988,7 +1051,8 @@ public void testBatchGetWithTimeoutV4() throws IOException {
() -> {
assertTrue(metrics.get("." + STORE_NAME + "--multiget_streaming_request.OccurrenceRate").value() > 0);
});
validateMultiGetMetrics(batchGetRequestContext, false, true, RequestType.MULTI_GET, true, 2, 1);
// The blocking (first) batch-get used 5 keys, so request_key_count.Max is 5 even though this request has 2.
validateMultiGetMetrics(batchGetRequestContext, false, true, RequestType.MULTI_GET, true, 2, 1, 5);
}
} finally {
tearDown();
Expand Down Expand Up @@ -1194,16 +1258,16 @@ public void testStreamingBatchGetToUnreachableClientV1()
try {
setUpClient(false, false, false, false, TimeUnit.SECONDS.toMillis(1));
BatchGetRequestContext batchGetRequestContext =
new BatchGetRequestContext<>(BATCH_GET_PARTIAL_KEYS_1.size(), true);
new BatchGetRequestContext<>(BATCH_GET_BLOCK_KEYS_REPLICA1.size(), true);
VeniceResponseMap<String, GenericRecord> response =
(VeniceResponseMap<String, GenericRecord>) statsAvroGenericStoreClient
.streamingBatchGet(batchGetRequestContext, BATCH_GET_PARTIAL_KEYS_1)
.streamingBatchGet(batchGetRequestContext, BATCH_GET_BLOCK_KEYS_REPLICA1)
.get();
assertFalse(response.isFullResponse());
assertEquals(response.getTotalEntryCount(), 0);
// First batchGet fails with unreachable host after timeout and this adds the hosts
// as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1)
validateMultiGetMetrics(batchGetRequestContext, false, false, RequestType.MULTI_GET_STREAMING, false, 1);
validateMultiGetMetrics(batchGetRequestContext, false, false, RequestType.MULTI_GET_STREAMING, false, 5);

BatchGetRequestContext batchGetRequestContext2 = new BatchGetRequestContext<>(BATCH_GET_KEYS.size(), true);
VeniceResponseMap<String, GenericRecord> response2 =
Expand All @@ -1212,7 +1276,8 @@ public void testStreamingBatchGetToUnreachableClientV1()
.get();
assertFalse(response2.isFullResponse());
assertEquals(response2.getTotalEntryCount(), 0);
validateMultiGetMetrics(batchGetRequestContext2, false, false, RequestType.MULTI_GET_STREAMING, true, 2);
// The blocking (first) batch-get used 5 keys, so request_key_count.Max is 5 even though this request has 2.
validateMultiGetMetrics(batchGetRequestContext2, false, false, RequestType.MULTI_GET_STREAMING, true, 2, 2, 5);
} finally {
tearDown();
}
Expand All @@ -1230,16 +1295,16 @@ public void testStreamingBatchGetToUnreachableClientV2()
try {
setUpClient(false, false, true, true, TimeUnit.SECONDS.toMillis(1));
BatchGetRequestContext batchGetRequestContext =
new BatchGetRequestContext<>(BATCH_GET_PARTIAL_KEYS_2.size(), true);
new BatchGetRequestContext<>(BATCH_GET_BLOCK_KEYS_REPLICA2.size(), true);
VeniceResponseMap<String, GenericRecord> response =
(VeniceResponseMap<String, GenericRecord>) statsAvroGenericStoreClient
.streamingBatchGet(batchGetRequestContext, BATCH_GET_PARTIAL_KEYS_2)
.streamingBatchGet(batchGetRequestContext, BATCH_GET_BLOCK_KEYS_REPLICA2)
.get();
assertFalse(response.isFullResponse());
assertEquals(response.getTotalEntryCount(), 0);
// First batchGet fails with unreachable host after timeout and this adds the hosts
// as blocked due to setRoutingPendingRequestCounterInstanceBlockThreshold(1)
validateMultiGetMetrics(batchGetRequestContext, false, false, RequestType.MULTI_GET_STREAMING, false, 1);
validateMultiGetMetrics(batchGetRequestContext, false, false, RequestType.MULTI_GET_STREAMING, false, 5);
BatchGetRequestContext batchGetRequestContext2 = new BatchGetRequestContext<>(BATCH_GET_KEYS.size(), true);
CompletableFuture<VeniceResponseMap<String, GenericRecord>> future =
statsAvroGenericStoreClient.streamingBatchGet(batchGetRequestContext2, BATCH_GET_KEYS);
Expand Down
Loading