Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void testRejectTooManyRequest() throws IOException {

if (outbound instanceof FullHttpResponse) {
FullHttpResponse httpResponse = (FullHttpResponse) outbound;
if (httpResponse.status().code() == TOO_MANY_REQUESTS) {
if (httpResponse.status().code() == TOO_MANY_REQUESTS.getCode()) {
foundTooManyRequestsResponse = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public VeniceGrpcStreamObserver(CompletableFuture<TransportClientResponse> respo

@Override
public void onNext(VeniceServerResponse value) {
if (value.getErrorCode() != VeniceReadResponseStatus.OK) {
if (value.getErrorCode() != VeniceReadResponseStatus.OK.getCode()) {
handleResponseError(value);
return;
}
Expand Down Expand Up @@ -288,21 +288,16 @@ void handleResponseError(VeniceServerResponse response) {
String errorMessage = response.getErrorMessage();
Exception exception;

switch (statusCode) {
case VeniceReadResponseStatus.BAD_REQUEST:
exception = new VeniceClientHttpException(errorMessage, statusCode);
break;
case VeniceReadResponseStatus.TOO_MANY_REQUESTS:
exception = new VeniceClientRateExceededException(errorMessage);
break;
case VeniceReadResponseStatus.KEY_NOT_FOUND:
exception = null;
break;
default:
exception = new VeniceClientException(
String
.format("An unexpected error occurred with status code: %d, message: %s", statusCode, errorMessage));
break;
VeniceReadResponseStatus responseStatus = VeniceReadResponseStatus.fromCode(statusCode);
if (responseStatus == VeniceReadResponseStatus.BAD_REQUEST) {
exception = new VeniceClientHttpException(errorMessage, statusCode);
} else if (responseStatus == VeniceReadResponseStatus.TOO_MANY_REQUESTS) {
exception = new VeniceClientRateExceededException(errorMessage);
} else if (responseStatus == VeniceReadResponseStatus.KEY_NOT_FOUND) {
exception = null;
} else {
exception = new VeniceClientException(
String.format("An unexpected error occurred with status code: %d, message: %s", statusCode, errorMessage));
}

if (exception != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.venice.grpc;

import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.linkedin.venice.acl.handler.AccessResult;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.exceptions.VeniceException;
Expand All @@ -12,6 +14,7 @@
import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.TlsChannelCredentials;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
Expand Down Expand Up @@ -102,6 +105,24 @@ private static KeyStore loadStore(String path, char[] password, String type)
return keyStore;
}

/** Wraps a byte array into a {@link ByteString} without copying. */
public static ByteString toByteString(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return ByteString.EMPTY;
}
return UnsafeByteOperations.unsafeWrap(bytes);
}
Comment on lines +108 to +114
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toByteString(byte[]) uses UnsafeByteOperations.unsafeWrap(bytes), which aliases the caller-provided array. If the caller reuses/mutates that array (e.g., pooled buffers), the serialized gRPC payload can be corrupted and violate ByteString immutability expectations. Consider either copying (e.g., ByteString.copyFrom) or clearly documenting/enforcing a contract that the input byte[] must never be modified after passing it here (and potentially renaming the method to make the unsafe semantics explicit).

Copilot uses AI. Check for mistakes.

/** Reads readable bytes from a {@link ByteBuf} and wraps them into a {@link ByteString}. */
public static ByteString toByteString(ByteBuf buf) {
if (buf == null || buf.readableBytes() == 0) {
return ByteString.EMPTY;
}
byte[] bytes = new byte[buf.readableBytes()];
buf.getBytes(buf.readerIndex(), bytes);
return UnsafeByteOperations.unsafeWrap(bytes);
Comment on lines +116 to +123
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description mentions adding a “zero-copy” toByteString(ByteBuf) helper, but the current implementation always allocates a new byte[] and copies from the ByteBuf. Either update the PR/docs to reflect that this is a copying conversion, or implement an actual zero-copy path (only when safe) with clearly defined buffer lifetime / reference-counting expectations.

Copilot uses AI. Check for mistakes.
}

public static ChannelCredentials buildChannelCredentials(SSLFactory sslFactory) {
// TODO: Evaluate if this needs to fail instead since it depends on plain text support on server
if (sslFactory == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
package com.linkedin.venice.listener.response;

import com.linkedin.venice.response.VeniceReadResponseStatus;
import io.netty.handler.codec.http.HttpResponseStatus;


/**
* Created by mwise on 3/11/16.
*/
/** A response object carrying an HTTP status and optional Venice read response status. */
public class HttpShortcutResponse {
private final String message;
private final HttpResponseStatus status;
private final VeniceReadResponseStatus veniceReadResponseStatus;

private boolean misroutedStoreVersion = false;

public HttpShortcutResponse(String message, HttpResponseStatus status) {
this.message = message;
this.status = status;
this(message, status, null);
}

public HttpShortcutResponse(HttpResponseStatus status) {
this("", status);
this("", status, null);
}

public HttpShortcutResponse(String message, HttpResponseStatus status, VeniceReadResponseStatus readResponseStatus) {
this.message = message;
this.status = status;
this.veniceReadResponseStatus = readResponseStatus;
}

public String getMessage() {
Expand All @@ -29,6 +34,10 @@ public HttpResponseStatus getStatus() {
return status;
}

public VeniceReadResponseStatus getVeniceReadResponseStatus() {
return veniceReadResponseStatus;
}

public boolean isMisroutedStoreVersion() {
return misroutedStoreVersion;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,41 @@
package com.linkedin.venice.response;

import java.util.HashMap;
import java.util.Map;


/**
* Enumeration of response status codes for Venice read requests.
* <p>
* **Positive values** correspond to standard HTTP status codes and can be used directly in HTTP responses.
* **Negative values** represent custom Venice-specific error codes.
* <p>
* For example, a status code of `200` indicates a successful read, while a status code of `-100` might indicate a specific Venice-related error.
* Positive values correspond to standard HTTP status codes and can be used directly in HTTP responses.
* Negative values represent custom Venice-specific error codes.
*/
public class VeniceReadResponseStatus {
public static final int KEY_NOT_FOUND = -420;

public static final int OK = 200;
public static final int BAD_REQUEST = 400;
public static final int INTERNAL_ERROR = 500;
public static final int TOO_MANY_REQUESTS = 429;
public static final int SERVICE_UNAVAILABLE = 503;
public enum VeniceReadResponseStatus {
KEY_NOT_FOUND(-420), OK(200), BAD_REQUEST(400), METHOD_NOT_ALLOWED(405), REQUEST_TIMEOUT(408),
MISROUTED_STORE_VERSION(410), TOO_MANY_REQUESTS(429), INTERNAL_ERROR(500), SERVICE_UNAVAILABLE(503);

private final int code;

private static final Map<Integer, VeniceReadResponseStatus> CODE_MAP = new HashMap<>();

static {
for (VeniceReadResponseStatus status: values()) {
CODE_MAP.put(status.code, status);
}
Comment on lines +19 to +24
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE_MAP is a mutable HashMap. Even though it’s private, accidental future mutation would break fromCode() determinism. Consider initializing into a local map and wrapping with Collections.unmodifiableMap(...) (or using Map.ofEntries(...)) to make the reverse-lookup table immutable.

Copilot uses AI. Check for mistakes.
}

VeniceReadResponseStatus(int code) {
this.code = code;
}

public int getCode() {
return code;
}

/**
* Returns the {@link VeniceReadResponseStatus} for the given integer code, or {@code null} if no match is found.
*/
public static VeniceReadResponseStatus fromCode(int code) {
return CODE_MAP.get(code);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;

import com.google.protobuf.ByteString;
import com.linkedin.venice.acl.handler.AccessResult;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.exceptions.VeniceException;
Expand All @@ -19,6 +21,8 @@
import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.TlsChannelCredentials;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import javax.net.ssl.KeyManager;
Expand Down Expand Up @@ -205,4 +209,43 @@ public void testExtractGrpcClientCertWithEmptyPeerCertificates() throws SSLPeerU
// Verify the exception is thrown
assertNotNull(thrownException);
}

@Test
public void testToByteStringFromByteArray() {
byte[] data = new byte[] { 1, 2, 3, 4, 5 };
ByteString result = GrpcUtils.toByteString(data);
assertEquals(result.size(), 5);
assertEquals(result.toByteArray(), data);
}

@Test
public void testToByteStringFromNullByteArray() {
assertSame(GrpcUtils.toByteString((byte[]) null), ByteString.EMPTY);
}

@Test
public void testToByteStringFromEmptyByteArray() {
assertSame(GrpcUtils.toByteString(new byte[0]), ByteString.EMPTY);
}

@Test
public void testToByteStringFromByteBuf() {
byte[] data = new byte[] { 10, 20, 30 };
ByteBuf buf = Unpooled.wrappedBuffer(data);
ByteString result = GrpcUtils.toByteString(buf);
assertEquals(result.size(), 3);
assertEquals(result.toByteArray(), data);
buf.release();
}

@Test
public void testToByteStringFromNullByteBuf() {
assertSame(GrpcUtils.toByteString((ByteBuf) null), ByteString.EMPTY);
}

@Test
public void testToByteStringFromEmptyByteBuf() {
ByteBuf buf = Unpooled.EMPTY_BUFFER;
assertSame(GrpcUtils.toByteString(buf), ByteString.EMPTY);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.linkedin.venice.response;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import java.util.HashSet;
import java.util.Set;
import org.testng.annotations.Test;


public class VeniceReadResponseStatusTest {
@Test
public void testGetCode() {
assertEquals(VeniceReadResponseStatus.KEY_NOT_FOUND.getCode(), -420);
assertEquals(VeniceReadResponseStatus.OK.getCode(), 200);
assertEquals(VeniceReadResponseStatus.BAD_REQUEST.getCode(), 400);
assertEquals(VeniceReadResponseStatus.METHOD_NOT_ALLOWED.getCode(), 405);
assertEquals(VeniceReadResponseStatus.REQUEST_TIMEOUT.getCode(), 408);
assertEquals(VeniceReadResponseStatus.MISROUTED_STORE_VERSION.getCode(), 410);
assertEquals(VeniceReadResponseStatus.TOO_MANY_REQUESTS.getCode(), 429);
assertEquals(VeniceReadResponseStatus.INTERNAL_ERROR.getCode(), 500);
assertEquals(VeniceReadResponseStatus.SERVICE_UNAVAILABLE.getCode(), 503);
}

@Test
public void testFromCode() {
for (VeniceReadResponseStatus status: VeniceReadResponseStatus.values()) {
VeniceReadResponseStatus resolved = VeniceReadResponseStatus.fromCode(status.getCode());
assertNotNull(resolved, "fromCode should resolve " + status.name());
assertEquals(resolved, status);
}
}

@Test
public void testFromCodeWithUnknownCode() {
assertNull(VeniceReadResponseStatus.fromCode(999));
assertNull(VeniceReadResponseStatus.fromCode(0));
assertNull(VeniceReadResponseStatus.fromCode(-1));
}

@Test
public void testFromCodeWithNegativeCode() {
VeniceReadResponseStatus status = VeniceReadResponseStatus.fromCode(-420);
assertNotNull(status);
assertEquals(status, VeniceReadResponseStatus.KEY_NOT_FOUND);
}

@Test
public void testAllCodesAreUnique() {
Set<Integer> codes = new HashSet<>();
for (VeniceReadResponseStatus status: VeniceReadResponseStatus.values()) {
boolean added = codes.add(status.getCode());
assertEquals(added, true, "Duplicate code found: " + status.getCode());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void batchGet(VeniceClientRequest request, StreamObserver<VeniceServerRes

private void handleRequest(VeniceClientRequest request, StreamObserver<VeniceServerResponse> responseObserver) {
VeniceServerResponse.Builder responseBuilder =
VeniceServerResponse.newBuilder().setErrorCode(VeniceReadResponseStatus.OK);
VeniceServerResponse.newBuilder().setErrorCode(VeniceReadResponseStatus.OK.getCode());
GrpcRequestContext ctx = new GrpcRequestContext(request, responseBuilder, responseObserver);
requestProcessor.process(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void processRequest(GrpcRequestContext ctx) {
ctx.setError();
statsContext.setResponseStatus(NOT_FOUND);
veniceServerResponseBuilder.setData(ByteString.EMPTY);
veniceServerResponseBuilder.setErrorCode(VeniceReadResponseStatus.KEY_NOT_FOUND);
veniceServerResponseBuilder.setErrorCode(VeniceReadResponseStatus.KEY_NOT_FOUND.getCode());
veniceServerResponseBuilder.setErrorMessage("Key not found");
invokeNextHandler(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ public void processRequest(GrpcRequestContext context) {
context.setError();
if (result == ReadQuotaEnforcementHandler.QuotaEnforcementResult.BAD_REQUEST) {
context.getVeniceServerResponseBuilder()
.setErrorCode(VeniceReadResponseStatus.BAD_REQUEST)
.setErrorCode(VeniceReadResponseStatus.BAD_REQUEST.getCode())
.setErrorMessage(INVALID_REQUEST_RESOURCE_MSG + request.getResourceName());
} else if (result == ReadQuotaEnforcementHandler.QuotaEnforcementResult.REJECTED) {
context.getVeniceServerResponseBuilder()
.setErrorCode(VeniceReadResponseStatus.TOO_MANY_REQUESTS)
.setErrorCode(VeniceReadResponseStatus.TOO_MANY_REQUESTS.getCode())
.setErrorMessage("");
} else if (result == ReadQuotaEnforcementHandler.QuotaEnforcementResult.OVER_CAPACITY) {
context.getVeniceServerResponseBuilder()
.setErrorCode(VeniceReadResponseStatus.SERVICE_UNAVAILABLE)
.setErrorCode(VeniceReadResponseStatus.SERVICE_UNAVAILABLE.getCode())
.setErrorMessage(SERVER_OVER_CAPACITY_MSG);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ public void processRequest(GrpcRequestContext ctx) {
default:
ctx.setError();
ctx.getVeniceServerResponseBuilder()
.setErrorCode(VeniceReadResponseStatus.BAD_REQUEST)
.setErrorCode(VeniceReadResponseStatus.BAD_REQUEST.getCode())
.setErrorMessage("Unknown request type: " + request.getRequestType());
}
} catch (VeniceNoStoreException e) {
ctx.setError();
ctx.getVeniceServerResponseBuilder()
.setErrorCode(VeniceReadResponseStatus.BAD_REQUEST)
.setErrorCode(VeniceReadResponseStatus.BAD_REQUEST.getCode())
.setErrorMessage("No storage exists for: " + e.getStoreName());
} catch (Exception e) {
ctx.setError();
ctx.getVeniceServerResponseBuilder()
.setErrorCode(VeniceReadResponseStatus.INTERNAL_ERROR)
.setErrorCode(VeniceReadResponseStatus.INTERNAL_ERROR.getCode())
.setErrorMessage(String.format("Internal Error: %s", e.getMessage()));
}

Expand Down
Loading
Loading