Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions internal/venice-common/src/main/proto/VeniceReadService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ service VeniceReadService {
rpc get (VeniceClientRequest) returns (VeniceServerResponse) {}
rpc batchGet(VeniceClientRequest) returns (VeniceServerResponse) {}
rpc countByValue(CountByValueRequest) returns (CountByValueResponse) {}
rpc getMetadata(VeniceMetadataRequest) returns (VeniceMetadataResponse) {}
}

message VeniceClientRequest {
Expand Down Expand Up @@ -47,4 +48,17 @@ message CountByValueResponse {

message ValueCount {
map<string, int32> valueToCounts = 1;
}

message VeniceMetadataRequest {
string storeName = 1;
}

message VeniceMetadataResponse {
// Avro-serialized MetadataResponseRecord bytes
bytes metadata = 1;
// Schema version of the MetadataResponseRecord Avro protocol
int32 responseSchemaId = 2;
uint32 errorCode = 3;
string errorMessage = 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import com.linkedin.venice.meta.QueryAction;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.metadata.response.MetadataResponseRecord;
import com.linkedin.venice.protocols.VeniceMetadataRequest;
import com.linkedin.venice.protocols.VeniceMetadataResponse;
import com.linkedin.venice.protocols.VeniceReadServiceGrpc;
import com.linkedin.venice.response.VeniceReadResponseStatus;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
Expand All @@ -37,14 +41,19 @@
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -394,6 +403,163 @@ public void testVeniceServerWithD2(boolean https) throws Exception {
}
}

/**
* Verifies feature parity between the HTTP {@code /metadata/{storeName}} endpoint and the gRPC
* {@code getMetadata} RPC by sending both requests to each server and comparing the raw Avro response bytes.
*/
@Test
public void testGrpcMetadataFetchRequest() throws ExecutionException, InterruptedException, IOException {
Utils.thisIsLocalhost();
int servers = 3;
int replicationFactor = 2;

VeniceClusterCreateOptions options = new VeniceClusterCreateOptions.Builder().numberOfControllers(1)
.numberOfServers(servers)
.numberOfRouters(0)
.replicationFactor(replicationFactor)
.enableGrpc(true)
.build();
try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(options);
CloseableHttpAsyncClient httpClient =
HttpClientUtils.getMinimalHttpClient(1, 1, Optional.of(SslUtils.getVeniceLocalSslFactory()))) {

HelixAdmin admin = new ZKHelixAdmin(cluster.getZk().getAddress());

HelixConfigScope configScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(cluster.getClusterName())
.build();
Map<String, String> clusterProperties = new HashMap<String, String>() {
{
put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), "/zone/instance");
put(ClusterConfig.ClusterConfigProperty.TOPOLOGY_AWARE_ENABLED.name(), "TRUE");
put(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), "zone");
}
};

admin.setConfig(configScope, clusterProperties);

for (int i = 0; i < servers; i++) {
VeniceServerWrapper server = cluster.getVeniceServers().get(i);
String instanceName = server.getHost() + "_" + server.getPort();
String domain = "zone=zone_" + (char) (i % replicationFactor + 65) + ",instance=" + instanceName;

InstanceConfig instanceConfig = new InstanceConfig(instanceName);
instanceConfig.setDomain(domain);
instanceConfig.setHostName(server.getHost());
instanceConfig.setPort(String.valueOf(server.getPort()));

admin.setInstanceConfig(cluster.getClusterName(), instanceName, instanceConfig);
}

String storeName = cluster.createStore(1);
cluster.useControllerClient(
controllerClient -> Assert.assertFalse(
controllerClient.updateStore(storeName, new UpdateStoreQueryParams().setStorageNodeReadQuotaEnabled(true))
.isError()));

httpClient.start();

List<ManagedChannel> channels = new ArrayList<>();
try {
for (int i = 0; i < servers; i++) {
VeniceServerWrapper server = cluster.getVeniceServers().get(i);

// --- HTTP request ---
HttpGet httpRequest = new HttpGet(
"http://" + server.getAddress() + "/" + QueryAction.METADATA.toString().toLowerCase() + "/" + storeName);
HttpResponse httpResponse = httpClient.execute(httpRequest, null).get();
Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), HttpStatus.SC_OK);

byte[] httpBody;
try (InputStream bodyStream = httpResponse.getEntity().getContent()) {
httpBody = IOUtils.toByteArray(bodyStream);
}

// --- gRPC request ---
ManagedChannel channel =
Grpc.newChannelBuilder(server.getGrpcAddress(), InsecureChannelCredentials.create()).build();
channels.add(channel);

VeniceReadServiceGrpc.VeniceReadServiceBlockingStub blockingStub =
VeniceReadServiceGrpc.newBlockingStub(channel);
VeniceMetadataResponse grpcResponse =
blockingStub.getMetadata(VeniceMetadataRequest.newBuilder().setStoreName(storeName).build());

Assert.assertEquals(
grpcResponse.getErrorCode(),
VeniceReadResponseStatus.OK,
"gRPC metadata request failed for server " + i + ": " + grpcResponse.getErrorMessage());

byte[] grpcBody = grpcResponse.getMetadata().toByteArray();

// --- Verify HTTP and gRPC return identical Avro bytes ---
Assert.assertEquals(grpcBody, httpBody, "HTTP and gRPC metadata responses differ for server " + i);

// Deserialize and verify content
RecordDeserializer<MetadataResponseRecord> deserializer =
SerializerDeserializerFactory.getAvroGenericDeserializer(MetadataResponseRecord.SCHEMA$);
GenericRecord metadataResponse = deserializer.deserialize(grpcBody);

Comment on lines +495 to +502
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion compares raw Avro-serialized bytes between two separate metadata requests (HTTP vs gRPC). The response includes multiple map fields that are populated with HashMap in ServerReadMetadataRepository; Avro map entry ordering is iteration-order dependent and not guaranteed to be stable across calls. This can make the test flaky even when the decoded records are equivalent; consider deserializing both bodies and comparing the resulting MetadataResponseRecord/GenericRecord contents (or comparing a normalized form) instead of raw bytes.

Suggested change
// --- Verify HTTP and gRPC return identical Avro bytes ---
Assert.assertEquals(grpcBody, httpBody, "HTTP and gRPC metadata responses differ for server " + i);
// Deserialize and verify content
RecordDeserializer<MetadataResponseRecord> deserializer =
SerializerDeserializerFactory.getAvroGenericDeserializer(MetadataResponseRecord.SCHEMA$);
GenericRecord metadataResponse = deserializer.deserialize(grpcBody);
// --- Verify HTTP and gRPC return identical Avro content (ignoring map entry order) ---
RecordDeserializer<MetadataResponseRecord> deserializer =
SerializerDeserializerFactory.getAvroGenericDeserializer(MetadataResponseRecord.SCHEMA$);
GenericRecord grpcMetadataResponse = deserializer.deserialize(grpcBody);
GenericRecord httpMetadataResponse = deserializer.deserialize(httpBody);
Assert.assertEquals(
grpcMetadataResponse,
httpMetadataResponse,
"HTTP and gRPC metadata responses differ for server " + i);
// Deserialize and verify content using the gRPC response
GenericRecord metadataResponse = grpcMetadataResponse;

Copilot uses AI. Check for mistakes.
Assert.assertEquals(
((HashMap<Utf8, Utf8>) metadataResponse.get("keySchema")).get(new Utf8("1")),
new Utf8("\"int\""));

// Verify the property that no replicas of the same partition are in the same helix group
Map<Utf8, Integer> helixGroupInfo = (HashMap<Utf8, Integer>) metadataResponse.get("helixGroupInfo");
Map<Utf8, Collection<Utf8>> routingInfo =
(HashMap<Utf8, Collection<Utf8>>) metadataResponse.get("routingInfo");

for (Map.Entry<Utf8, Collection<Utf8>> entry: routingInfo.entrySet()) {
Set<Integer> zonesSeen = new HashSet<>();
for (Utf8 instance: entry.getValue()) {
Assert.assertFalse(
zonesSeen.contains(helixGroupInfo.get(instance)),
instance + " is in the same helix zone as another replica of the partition");
zonesSeen.add(helixGroupInfo.get(instance));
}
}
}
} finally {
for (ManagedChannel channel: channels) {
channel.shutdownNow();
}
}
}
}

/**
* Verifies that the gRPC {@code getMetadata} RPC returns the correct error when the store has read quota disabled.
*/
@Test
public void testGrpcMetadataFetchRequestWithQuotaDisabled() {
Utils.thisIsLocalhost();
VeniceClusterCreateOptions options = new VeniceClusterCreateOptions.Builder().numberOfControllers(1)
.numberOfServers(1)
.numberOfRouters(0)
.enableGrpc(true)
.build();
try (VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(options)) {
String storeName = cluster.createStore(1);
// StorageNodeReadQuotaEnabled is false by default, so the metadata endpoint should return an error

VeniceServerWrapper server = cluster.getVeniceServers().get(0);
ManagedChannel channel =
Grpc.newChannelBuilder(server.getGrpcAddress(), InsecureChannelCredentials.create()).build();
try {
VeniceReadServiceGrpc.VeniceReadServiceBlockingStub blockingStub =
VeniceReadServiceGrpc.newBlockingStub(channel);

VeniceMetadataRequest grpcRequest = VeniceMetadataRequest.newBuilder().setStoreName(storeName).build();
VeniceMetadataResponse grpcResponse = blockingStub.getMetadata(grpcRequest);

Assert.assertEquals(grpcResponse.getErrorCode(), VeniceReadResponseStatus.BAD_REQUEST);
Assert.assertFalse(grpcResponse.getErrorMessage().isEmpty(), "Error message should not be empty");
} finally {
channel.shutdownNow();
}
}
}

@Test
public void testStartServerWithSystemSchemaInitialization() {
Properties controllerProperties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public ListenerService(
grpcExecutor = createThreadPool(serverConfig.getGrpcWorkerThreadCount(), "GrpcWorkerThread", nettyBacklogSize);

VeniceGrpcServerConfig.Builder grpcServerBuilder = new VeniceGrpcServerConfig.Builder().setPort(grpcPort)
.addService(new VeniceReadServiceImpl(requestProcessor))
.addService(new VeniceReadServiceImpl(requestProcessor, readMetadataRetriever))
.setExecutor(grpcExecutor)
.setInterceptors(interceptors);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
package com.linkedin.venice.listener.grpc;

import com.google.protobuf.ByteString;
import com.linkedin.davinci.listener.response.MetadataResponse;
import com.linkedin.davinci.storage.ReadMetadataRetriever;
import com.linkedin.venice.listener.grpc.handlers.VeniceServerGrpcRequestProcessor;
import com.linkedin.venice.protocols.VeniceClientRequest;
import com.linkedin.venice.protocols.VeniceMetadataRequest;
import com.linkedin.venice.protocols.VeniceMetadataResponse;
import com.linkedin.venice.protocols.VeniceReadServiceGrpc;
import com.linkedin.venice.protocols.VeniceServerResponse;
import com.linkedin.venice.response.VeniceReadResponseStatus;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ByteBuf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class VeniceReadServiceImpl extends VeniceReadServiceGrpc.VeniceReadServiceImplBase {
private static final Logger LOGGER = LogManager.getLogger(VeniceReadServiceImpl.class);
private static final String GRPC_METADATA_ERROR_PREFIX = "GRPC_METADATA_ERROR:";
private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER =
RedundantExceptionFilter.getRedundantExceptionFilter();

private final VeniceServerGrpcRequestProcessor requestProcessor;
private final ReadMetadataRetriever readMetadataRetriever;

public VeniceReadServiceImpl(VeniceServerGrpcRequestProcessor requestProcessor) {
public VeniceReadServiceImpl(
VeniceServerGrpcRequestProcessor requestProcessor,
ReadMetadataRetriever readMetadataRetriever) {
this.requestProcessor = requestProcessor;
this.readMetadataRetriever = readMetadataRetriever;
}

@Override
Expand All @@ -29,6 +43,44 @@ public void batchGet(VeniceClientRequest request, StreamObserver<VeniceServerRes
handleRequest(request, responseObserver);
}

@Override
public void getMetadata(VeniceMetadataRequest request, StreamObserver<VeniceMetadataResponse> responseObserver) {
VeniceMetadataResponse.Builder responseBuilder = VeniceMetadataResponse.newBuilder();
try {
String storeName = request.getStoreName();
MetadataResponse metadataResponse = readMetadataRetriever.getMetadata(storeName);

Comment on lines +46 to +52
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gRPC server interceptors currently assume every request message is a VeniceClientRequest (e.g., ServerAclHandler and ServerStoreAclHandler cast ReqT to VeniceClientRequest in onMessage). Since this RPC uses VeniceMetadataRequest, enabling SSL/ACL interceptors will likely trigger a ClassCastException before this method executes. Please update the interceptors to handle/skip VeniceMetadataRequest (e.g., instanceof guard + allowlist), or register method-specific interceptors so non-storage RPCs aren't intercepted by VeniceClientRequest-only logic.

Copilot uses AI. Check for mistakes.
if (metadataResponse.isError()) {
String errorMessage = metadataResponse.getMessage() != null ? metadataResponse.getMessage() : "Unknown error";
responseBuilder.setErrorCode(VeniceReadResponseStatus.INTERNAL_ERROR).setErrorMessage(errorMessage);
} else {
ByteBuf body = metadataResponse.getResponseBody();
// nioBuffer() is zero-copy for heap-backed ByteBufs (which MetadataResponse produces via
// Unpooled.wrappedBuffer). copyFrom() performs a single copy into ByteString's internal array.
// If this becomes a bottleneck, UnsafeByteOperations.unsafeWrap(body.nioBuffer()) can eliminate
// that copy entirely, but it bypasses ByteString's immutability guarantee — the caller must
// ensure the underlying buffer is not modified or released while the ByteString is in use.
responseBuilder.setMetadata(ByteString.copyFrom(body.nioBuffer()))
.setResponseSchemaId(metadataResponse.getResponseSchemaIdHeader())
.setErrorCode(VeniceReadResponseStatus.OK);
}
} catch (UnsupportedOperationException e) {
// This happens when storageNodeReadQuotaEnabled is false for the store. The metadata endpoint is
// designed for the fast client, which requires read quota enforcement on storage nodes.
responseBuilder.setErrorCode(VeniceReadResponseStatus.BAD_REQUEST).setErrorMessage(e.getMessage());
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The quota-disabled path maps UnsupportedOperationException to VeniceReadResponseStatus.BAD_REQUEST (400), but the HTTP /metadata/{storeName} endpoint currently returns 403 FORBIDDEN for the same condition (see StorageReadRequestHandler catching UnsupportedOperationException and writing a FORBIDDEN response). If the goal is parity with HTTP semantics, consider returning 403 here as well (and ideally adding a shared constant), or update the parity claim/tests to reflect the intentional difference.

Suggested change
responseBuilder.setErrorCode(VeniceReadResponseStatus.BAD_REQUEST).setErrorMessage(e.getMessage());
responseBuilder.setErrorCode(VeniceReadResponseStatus.FORBIDDEN).setErrorMessage(e.getMessage());

Copilot uses AI. Check for mistakes.
} catch (Exception e) {
String filterKey = GRPC_METADATA_ERROR_PREFIX + request.getStoreName();
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(filterKey)) {
LOGGER.error("Error handling gRPC metadata request for store: {}", request.getStoreName(), e);
}
responseBuilder.setErrorCode(VeniceReadResponseStatus.INTERNAL_ERROR)
.setErrorMessage("Internal error: " + e.getMessage());
}

responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}

private void handleRequest(VeniceClientRequest request, StreamObserver<VeniceServerResponse> responseObserver) {
VeniceServerResponse.Builder responseBuilder =
VeniceServerResponse.newBuilder().setErrorCode(VeniceReadResponseStatus.OK);
Expand Down
Loading
Loading