Skip to content

Commit

Permalink
[fc][test] clean up useStreamingBatchGetAsDefault and associated tests (
Browse files Browse the repository at this point in the history
#814)

cleanup useStreamingBatchGetAsDefault config which enables using streaming backend for batchGet API, its related code and tests to make it the only supported behavior.
  • Loading branch information
m-nagarajan authored Jan 5, 2024
1 parent 338ba65 commit d84a3b7
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 523 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,6 @@ public class ClientConfig<K, V, T extends SpecificRecord> {
private final StoreMetadataFetchMode storeMetadataFetchMode;
private final D2Client d2Client;
private final String clusterDiscoveryD2Service;
/**
* The choice of implementation for batch get: single get or streamingBatchget. The first version of batchGet in
* FC used single get in a loop to support a customer request for two-key batch-get. This config allows switching
* between the two implementations. The current default is single get based batchGet, but once the streamingBatchget
* is validated, the default should be changed to streamingBatchget based batchGet, or probably remove the single
* get based batchGet support.
*/
private final boolean useStreamingBatchGetAsDefault;
private final boolean useGrpc;
/**
* This is a temporary solution to support gRPC with Venice, we will replace this with retrieving information about
Expand Down Expand Up @@ -126,7 +118,6 @@ private ClientConfig(
StoreMetadataFetchMode storeMetadataFetchMode,
D2Client d2Client,
String clusterDiscoveryD2Service,
boolean useStreamingBatchGetAsDefault,
boolean useGrpc,
GrpcClientConfig grpcClientConfig,
boolean projectionFieldValidation) {
Expand Down Expand Up @@ -256,12 +247,6 @@ private ClientConfig(
&& this.storeMetadataFetchMode != StoreMetadataFetchMode.SERVER_BASED_METADATA) {
throw new VeniceClientException("Helix assisted routing is only available with server based metadata enabled");
}
this.useStreamingBatchGetAsDefault = useStreamingBatchGetAsDefault;
if (this.useStreamingBatchGetAsDefault) {
LOGGER.info("Batch get will use streaming batch get implementation");
} else {
LOGGER.warn("Deprecated: Batch get will use single get implementation");
}

this.useGrpc = useGrpc;
this.grpcClientConfig = grpcClientConfig;
Expand Down Expand Up @@ -394,10 +379,6 @@ public String getClusterDiscoveryD2Service() {
return this.clusterDiscoveryD2Service;
}

public boolean useStreamingBatchGetAsDefault() {
return this.useStreamingBatchGetAsDefault;
}

public boolean useGrpc() {
return useGrpc;
}
Expand Down Expand Up @@ -459,7 +440,6 @@ public static class ClientConfigBuilder<K, V, T extends SpecificRecord> {
private StoreMetadataFetchMode storeMetadataFetchMode = StoreMetadataFetchMode.DA_VINCI_CLIENT_BASED_METADATA;
private D2Client d2Client;
private String clusterDiscoveryD2Service;
private boolean useStreamingBatchGetAsDefault = false;
private boolean useGrpc = false;
private GrpcClientConfig grpcClientConfig = null;

Expand Down Expand Up @@ -632,11 +612,6 @@ public ClientConfigBuilder<K, V, T> setClusterDiscoveryD2Service(String clusterD
return this;
}

public ClientConfigBuilder<K, V, T> setUseStreamingBatchGetAsDefault(boolean useStreamingBatchGetAsDefault) {
this.useStreamingBatchGetAsDefault = useStreamingBatchGetAsDefault;
return this;
}

public ClientConfigBuilder<K, V, T> setUseGrpc(boolean useGrpc) {
this.useGrpc = useGrpc;
return this;
Expand Down Expand Up @@ -684,7 +659,6 @@ public ClientConfigBuilder<K, V, T> clone() {
.setStoreMetadataFetchMode(storeMetadataFetchMode)
.setD2Client(d2Client)
.setClusterDiscoveryD2Service(clusterDiscoveryD2Service)
.setUseStreamingBatchGetAsDefault(useStreamingBatchGetAsDefault)
.setUseGrpc(useGrpc)
.setGrpcClientConfig(grpcClientConfig)
.setProjectionFieldValidationEnabled(projectionFieldValidation);
Expand Down Expand Up @@ -723,7 +697,6 @@ public ClientConfig<K, V, T> build() {
storeMetadataFetchMode,
d2Client,
clusterDiscoveryD2Service,
useStreamingBatchGetAsDefault,
useGrpc,
grpcClientConfig,
projectionFieldValidation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
import com.linkedin.venice.client.store.streaming.VeniceResponseMap;
import com.linkedin.venice.client.store.streaming.VeniceResponseMapImpl;
import com.linkedin.venice.compute.ComputeRequestWrapper;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.router.exception.VeniceKeyCountLimitException;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
Expand Down Expand Up @@ -51,14 +47,6 @@ public final CompletableFuture<Map<K, V>> batchGet(Set<K> keys) throws VeniceCli

protected CompletableFuture<Map<K, V>> batchGet(BatchGetRequestContext<K, V> requestContext, Set<K> keys)
throws VeniceClientException {
return getClientConfig().useStreamingBatchGetAsDefault()
? batchGetUsingStreamingBatchGet(requestContext, keys)
: batchGetUsingSingleGet(keys);
}

private CompletableFuture<Map<K, V>> batchGetUsingStreamingBatchGet(
BatchGetRequestContext<K, V> requestContext,
Set<K> keys) throws VeniceClientException {
CompletableFuture<Map<K, V>> resultFuture = new CompletableFuture<>();
CompletableFuture<VeniceResponseMap<K, V>> streamingResultFuture = streamingBatchGet(requestContext, keys);

Expand All @@ -75,51 +63,6 @@ private CompletableFuture<Map<K, V>> batchGetUsingStreamingBatchGet(
return resultFuture;
}

/**
* Leverage single-get implementation here:
* 1. Looping through all keys and call get() for each of the keys
* 2. Collect the replies and pass it to the caller
*
* Transient change to support {@link ClientConfig#useStreamingBatchGetAsDefault()}
*/
private CompletableFuture<Map<K, V>> batchGetUsingSingleGet(Set<K> keys) throws VeniceClientException {
int maxAllowedKeyCntInBatchGetReq = getClientConfig().getMaxAllowedKeyCntInBatchGetReq();
if (keys.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyMap());
}
int keyCnt = keys.size();
if (keyCnt > maxAllowedKeyCntInBatchGetReq) {
throw new VeniceKeyCountLimitException(
getStoreName(),
RequestType.MULTI_GET,
keyCnt,
maxAllowedKeyCntInBatchGetReq);
}
CompletableFuture<Map<K, V>> resultFuture = new CompletableFuture<>();
Map<K, CompletableFuture<V>> valueFutures = new HashMap<>();
keys.forEach(k -> {
valueFutures.put(k, (get(new GetRequestContext(true), k)));
});
CompletableFuture.allOf(valueFutures.values().toArray(new CompletableFuture[0]))
.whenComplete(((aVoid, throwable) -> {
if (throwable != null) {
resultFuture.completeExceptionally(throwable);
}
Map<K, V> resultMap = new HashMap<>();
valueFutures.forEach((k, f) -> {
try {
resultMap.put(k, f.get());
} catch (Exception e) {
resultFuture.completeExceptionally(
new VeniceClientException("Failed to complete future for key: " + k.toString(), e));
}
});
resultFuture.complete(resultMap);
}));

return resultFuture;
}

@Override
public final void streamingBatchGet(Set<K> keys, StreamingCallback<K, V> callback) throws VeniceClientException {
streamingBatchGet(new BatchGetRequestContext<>(keys.size(), true), keys, callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,4 @@ public void testLongTailRetryWithDualRead() {
.setLongTailRetryThresholdForSingleGetInMicroSeconds(1000)
.build();
}

/** Setting useStreamingBatchGetAsDefault, no exceptions thrown */
@Test
public void testUseStreamingBatchGetAsDefault() {
ClientConfig.ClientConfigBuilder clientConfigBuilder = getClientConfigWithMinimumRequiredInputs();
ClientConfig clientConfig = clientConfigBuilder.build();

if (clientConfig.useStreamingBatchGetAsDefault()) {
// should be disabled by default
throw new VeniceClientException("Batch get using streaming batch get should be disabled by default");
}

clientConfigBuilder.setUseStreamingBatchGetAsDefault(true);
clientConfigBuilder.build();
}
}
Loading

0 comments on commit d84a3b7

Please sign in to comment.