Skip to content

Commit b860c7a

Browse files
pthirunclaude
andcommitted
[Controller] Add gRPC support for enableStore API
Add gRPC support for the enableStore API while maintaining backward compatibility with the existing HTTP endpoint. Changes: - Add enableStore RPC to StoreGrpcService.proto with StoreEnableOperation enum - Add handler method to StoreRequestHandler - Add gRPC service implementation with ACL checks - Update HTTP route to delegate to shared handler - Add comprehensive unit and integration tests Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 1225a93 commit b860c7a

File tree

7 files changed

+339
-8
lines changed

7 files changed

+339
-8
lines changed

internal/venice-common/src/main/proto/controller/StoreGrpcService.proto

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ service StoreGrpcService {
1414
rpc checkResourceCleanupForStoreCreation(ClusterStoreGrpcInfo) returns (ResourceCleanupCheckGrpcResponse) {}
1515
rpc validateStoreDeleted(ValidateStoreDeletedGrpcRequest) returns (ValidateStoreDeletedGrpcResponse);
1616
rpc listStores(ListStoresGrpcRequest) returns (ListStoresGrpcResponse);
17+
rpc enableStore(EnableStoreGrpcRequest) returns (EnableStoreGrpcResponse);
1718
}
1819

1920
message CreateStoreGrpcRequest {
@@ -82,4 +83,21 @@ message ListStoresGrpcRequest {
8283
message ListStoresGrpcResponse {
8384
string clusterName = 1;
8485
repeated string storeNames = 2;
86+
}
87+
88+
enum StoreEnableOperation {
89+
STORE_ENABLE_OPERATION_UNSPECIFIED = 0;
90+
STORE_ENABLE_OPERATION_READ = 1;
91+
STORE_ENABLE_OPERATION_WRITE = 2;
92+
STORE_ENABLE_OPERATION_READ_WRITE = 3;
93+
}
94+
95+
message EnableStoreGrpcRequest {
96+
ClusterStoreGrpcInfo storeInfo = 1;
97+
StoreEnableOperation operation = 2;
98+
bool status = 3;
99+
}
100+
101+
message EnableStoreGrpcResponse {
102+
ClusterStoreGrpcInfo storeInfo = 1;
85103
}

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestControllerGrpcEndpoints.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse;
2020
import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcRequest;
2121
import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcResponse;
22+
import com.linkedin.venice.protocols.controller.EnableStoreGrpcRequest;
23+
import com.linkedin.venice.protocols.controller.EnableStoreGrpcResponse;
2224
import com.linkedin.venice.protocols.controller.LeaderControllerGrpcRequest;
2325
import com.linkedin.venice.protocols.controller.LeaderControllerGrpcResponse;
2426
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
2527
import com.linkedin.venice.protocols.controller.ListStoresGrpcResponse;
28+
import com.linkedin.venice.protocols.controller.StoreEnableOperation;
2629
import com.linkedin.venice.protocols.controller.StoreGrpcServiceGrpc;
2730
import com.linkedin.venice.protocols.controller.ValidateStoreDeletedGrpcRequest;
2831
import com.linkedin.venice.protocols.controller.ValidateStoreDeletedGrpcResponse;
@@ -316,6 +319,59 @@ public void testListStoresGrpcEndpoint() {
316319
}
317320
}
318321

322+
@Test(timeOut = TIMEOUT_MS)
323+
public void testEnableStoreGrpcEndpoint() {
324+
String storeName = Utils.getUniqueString("test_enable_store");
325+
String controllerGrpcUrl = veniceCluster.getLeaderVeniceController().getControllerGrpcUrl();
326+
ManagedChannel channel = Grpc.newChannelBuilder(controllerGrpcUrl, InsecureChannelCredentials.create()).build();
327+
StoreGrpcServiceGrpc.StoreGrpcServiceBlockingStub storeBlockingStub = StoreGrpcServiceGrpc.newBlockingStub(channel);
328+
329+
// Step 1: Create a store first
330+
ClusterStoreGrpcInfo storeGrpcInfo = ClusterStoreGrpcInfo.newBuilder()
331+
.setClusterName(veniceCluster.getClusterName())
332+
.setStoreName(storeName)
333+
.build();
334+
CreateStoreGrpcRequest createStoreRequest = CreateStoreGrpcRequest.newBuilder()
335+
.setStoreInfo(storeGrpcInfo)
336+
.setOwner("owner")
337+
.setKeySchema(DEFAULT_KEY_SCHEMA)
338+
.setValueSchema("\"string\"")
339+
.build();
340+
CreateStoreGrpcResponse createResponse = storeBlockingStub.createStore(createStoreRequest);
341+
assertNotNull(createResponse, "Create store response should not be null");
342+
343+
// Step 2: Test enable store read operation
344+
EnableStoreGrpcRequest enableReadRequest = EnableStoreGrpcRequest.newBuilder()
345+
.setStoreInfo(storeGrpcInfo)
346+
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_READ)
347+
.setStatus(false)
348+
.build();
349+
EnableStoreGrpcResponse enableReadResponse = storeBlockingStub.enableStore(enableReadRequest);
350+
assertNotNull(enableReadResponse, "Enable store response should not be null");
351+
assertEquals(enableReadResponse.getStoreInfo().getClusterName(), veniceCluster.getClusterName());
352+
assertEquals(enableReadResponse.getStoreInfo().getStoreName(), storeName);
353+
354+
// Step 3: Test enable store write operation
355+
EnableStoreGrpcRequest enableWriteRequest = EnableStoreGrpcRequest.newBuilder()
356+
.setStoreInfo(storeGrpcInfo)
357+
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_WRITE)
358+
.setStatus(false)
359+
.build();
360+
EnableStoreGrpcResponse enableWriteResponse = storeBlockingStub.enableStore(enableWriteRequest);
361+
assertNotNull(enableWriteResponse, "Enable store response should not be null");
362+
assertEquals(enableWriteResponse.getStoreInfo().getStoreName(), storeName);
363+
364+
// Step 4: Test enable store read-write operation
365+
EnableStoreGrpcRequest enableReadWriteRequest = EnableStoreGrpcRequest.newBuilder()
366+
.setStoreInfo(storeGrpcInfo)
367+
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_READ_WRITE)
368+
.setStatus(true)
369+
.build();
370+
EnableStoreGrpcResponse enableReadWriteResponse = storeBlockingStub.enableStore(enableReadWriteRequest);
371+
assertNotNull(enableReadWriteResponse, "Enable store response should not be null");
372+
assertEquals(enableReadWriteResponse.getStoreInfo().getStoreName(), storeName);
373+
}
374+
319375
private static class MockDynamicAccessController extends NoOpDynamicAccessController {
320376
private final Set<String> resourcesInAllowList = ConcurrentHashMap.newKeySet();
321377

services/venice-controller/src/main/java/com/linkedin/venice/controller/grpc/server/StoreGrpcServiceImpl.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse;
1313
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest;
1414
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcResponse;
15+
import com.linkedin.venice.protocols.controller.EnableStoreGrpcRequest;
16+
import com.linkedin.venice.protocols.controller.EnableStoreGrpcResponse;
1517
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest;
1618
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse;
1719
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
@@ -147,4 +149,25 @@ public void listStores(ListStoresGrpcRequest grpcRequest, StreamObserver<ListSto
147149
clusterName,
148150
null);
149151
}
152+
153+
/**
154+
* Enable or disable store read/write abilities.
155+
* ACL check required; only allowlist users can enable/disable stores.
156+
*/
157+
@Override
158+
public void enableStore(
159+
EnableStoreGrpcRequest grpcRequest,
160+
StreamObserver<EnableStoreGrpcResponse> responseObserver) {
161+
LOGGER.debug("Received enableStore with args: {}", grpcRequest);
162+
String clusterName = grpcRequest.getStoreInfo().getClusterName();
163+
String storeName = grpcRequest.getStoreInfo().getStoreName();
164+
handleRequest(StoreGrpcServiceGrpc.getEnableStoreMethod(), () -> {
165+
if (!isAllowListUser(accessManager, storeName, Context.current())) {
166+
throw new VeniceUnauthorizedAccessException(
167+
ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX + StoreGrpcServiceGrpc.getEnableStoreMethod().getFullMethodName()
168+
+ " on resource: " + storeName);
169+
}
170+
return storeRequestHandler.enableStore(grpcRequest);
171+
}, responseObserver, clusterName, storeName);
172+
}
150173
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoreRequestHandler.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111
import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse;
1212
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest;
1313
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcResponse;
14+
import com.linkedin.venice.protocols.controller.EnableStoreGrpcRequest;
15+
import com.linkedin.venice.protocols.controller.EnableStoreGrpcResponse;
1416
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest;
1517
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse;
1618
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
1719
import com.linkedin.venice.protocols.controller.ListStoresGrpcResponse;
20+
import com.linkedin.venice.protocols.controller.StoreEnableOperation;
1821
import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcRequest;
1922
import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcResponse;
2023
import com.linkedin.venice.protocols.controller.ValidateStoreDeletedGrpcRequest;
@@ -255,4 +258,46 @@ public ListStoresGrpcResponse listStores(ListStoresGrpcRequest request) {
255258
LOGGER.info("Found {} stores in cluster: {}", selectedStoreNames.size(), clusterName);
256259
return ListStoresGrpcResponse.newBuilder().setClusterName(clusterName).addAllStoreNames(selectedStoreNames).build();
257260
}
261+
262+
/**
263+
* Enable or disable store read/write abilities.
264+
* @param request the request containing cluster, store name, operation type, and status
265+
* @return response with cluster and store info
266+
*/
267+
public EnableStoreGrpcResponse enableStore(EnableStoreGrpcRequest request) {
268+
ClusterStoreGrpcInfo storeInfo = request.getStoreInfo();
269+
ControllerRequestParamValidator.validateClusterStoreInfo(storeInfo);
270+
String clusterName = storeInfo.getClusterName();
271+
String storeName = storeInfo.getStoreName();
272+
StoreEnableOperation operation = request.getOperation();
273+
boolean status = request.getStatus();
274+
275+
if (operation == StoreEnableOperation.STORE_ENABLE_OPERATION_UNSPECIFIED) {
276+
throw new IllegalArgumentException("Operation type is required for enable store");
277+
}
278+
279+
LOGGER.info(
280+
"Enable store for store: {} in cluster: {} with operation: {} and status: {}",
281+
storeName,
282+
clusterName,
283+
operation,
284+
status);
285+
286+
switch (operation) {
287+
case STORE_ENABLE_OPERATION_READ:
288+
admin.setStoreReadability(clusterName, storeName, status);
289+
break;
290+
case STORE_ENABLE_OPERATION_WRITE:
291+
admin.setStoreWriteability(clusterName, storeName, status);
292+
break;
293+
case STORE_ENABLE_OPERATION_READ_WRITE:
294+
admin.setStoreReadWriteability(clusterName, storeName, status);
295+
break;
296+
default:
297+
throw new VeniceException("Invalid operation type: " + operation);
298+
}
299+
300+
LOGGER.info("Successfully updated store: {} in cluster: {} with operation: {}", storeName, clusterName, operation);
301+
return EnableStoreGrpcResponse.newBuilder().setStoreInfo(storeInfo).build();
302+
}
258303
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,11 @@
109109
import com.linkedin.venice.meta.StoreInfo;
110110
import com.linkedin.venice.meta.Version;
111111
import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo;
112+
import com.linkedin.venice.protocols.controller.EnableStoreGrpcRequest;
113+
import com.linkedin.venice.protocols.controller.EnableStoreGrpcResponse;
112114
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
113115
import com.linkedin.venice.protocols.controller.ListStoresGrpcResponse;
116+
import com.linkedin.venice.protocols.controller.StoreEnableOperation;
114117
import com.linkedin.venice.protocols.controller.ValidateStoreDeletedGrpcRequest;
115118
import com.linkedin.venice.protocols.controller.ValidateStoreDeletedGrpcResponse;
116119
import com.linkedin.venice.pubsub.PubSubTopicRepository;
@@ -747,23 +750,38 @@ public void internalHandle(Request request, ControllerResponse veniceResponse) {
747750
return;
748751
}
749752
AdminSparkServer.validateParams(request, ENABLE_STORE.getParams(), admin);
750-
String cluster = request.queryParams(CLUSTER);
753+
String clusterName = request.queryParams(CLUSTER);
751754
String storeName = request.queryParams(NAME);
752755
String operation = request.queryParams(OPERATION);
753756
boolean status = Utils.parseBooleanOrThrow(request.queryParams(STATUS), "storeAccessStatus");
754757

755-
veniceResponse.setCluster(cluster);
756-
veniceResponse.setName(storeName);
757-
758+
// Convert operation string to enum
759+
StoreEnableOperation grpcOperation;
758760
if (operation.equals(READ_OPERATION)) {
759-
admin.setStoreReadability(cluster, storeName, status);
760-
} else if ((operation.equals(WRITE_OPERATION))) {
761-
admin.setStoreWriteability(cluster, storeName, status);
761+
grpcOperation = StoreEnableOperation.STORE_ENABLE_OPERATION_READ;
762+
} else if (operation.equals(WRITE_OPERATION)) {
763+
grpcOperation = StoreEnableOperation.STORE_ENABLE_OPERATION_WRITE;
762764
} else if (operation.equals(READ_WRITE_OPERATION)) {
763-
admin.setStoreReadWriteability(cluster, storeName, status);
765+
grpcOperation = StoreEnableOperation.STORE_ENABLE_OPERATION_READ_WRITE;
764766
} else {
765767
throw new VeniceException(OPERATION + " parameter:" + operation + " is invalid.");
766768
}
769+
770+
// Build gRPC request
771+
ClusterStoreGrpcInfo storeInfo =
772+
ClusterStoreGrpcInfo.newBuilder().setClusterName(clusterName).setStoreName(storeName).build();
773+
EnableStoreGrpcRequest grpcRequest = EnableStoreGrpcRequest.newBuilder()
774+
.setStoreInfo(storeInfo)
775+
.setOperation(grpcOperation)
776+
.setStatus(status)
777+
.build();
778+
779+
// Call handler
780+
EnableStoreGrpcResponse grpcResponse = storeRequestHandler.enableStore(grpcRequest);
781+
782+
// Map response back to HTTP
783+
veniceResponse.setCluster(grpcResponse.getStoreInfo().getClusterName());
784+
veniceResponse.setName(grpcResponse.getStoreInfo().getStoreName());
767785
}
768786
};
769787
}

services/venice-controller/src/test/java/com/linkedin/venice/controller/grpc/server/StoreGrpcServiceImplTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@
2323
import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse;
2424
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest;
2525
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcResponse;
26+
import com.linkedin.venice.protocols.controller.EnableStoreGrpcRequest;
27+
import com.linkedin.venice.protocols.controller.EnableStoreGrpcResponse;
2628
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest;
2729
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse;
2830
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
2931
import com.linkedin.venice.protocols.controller.ListStoresGrpcResponse;
3032
import com.linkedin.venice.protocols.controller.ResourceCleanupCheckGrpcResponse;
33+
import com.linkedin.venice.protocols.controller.StoreEnableOperation;
3134
import com.linkedin.venice.protocols.controller.StoreGrpcServiceGrpc;
3235
import com.linkedin.venice.protocols.controller.StoreGrpcServiceGrpc.StoreGrpcServiceBlockingStub;
3336
import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcRequest;
@@ -445,4 +448,93 @@ public void testListStoresWithFilters() {
445448
assertEquals(actualResponse.getClusterName(), TEST_CLUSTER, "Cluster name should match");
446449
assertEquals(actualResponse.getStoreNamesCount(), 1, "Should have 1 store after filtering");
447450
}
451+
452+
@Test
453+
public void testEnableStoreReturnsSuccessfulResponse() {
454+
when(controllerAccessManager.isAllowListUser(anyString(), any())).thenReturn(true);
455+
ClusterStoreGrpcInfo storeInfo =
456+
ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build();
457+
EnableStoreGrpcRequest request = EnableStoreGrpcRequest.newBuilder()
458+
.setStoreInfo(storeInfo)
459+
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_READ)
460+
.setStatus(true)
461+
.build();
462+
EnableStoreGrpcResponse expectedResponse = EnableStoreGrpcResponse.newBuilder().setStoreInfo(storeInfo).build();
463+
when(storeRequestHandler.enableStore(any(EnableStoreGrpcRequest.class))).thenReturn(expectedResponse);
464+
465+
EnableStoreGrpcResponse actualResponse = blockingStub.enableStore(request);
466+
467+
assertNotNull(actualResponse, "Response should not be null");
468+
assertEquals(actualResponse.getStoreInfo().getClusterName(), TEST_CLUSTER, "Cluster name should match");
469+
assertEquals(actualResponse.getStoreInfo().getStoreName(), TEST_STORE, "Store name should match");
470+
}
471+
472+
@Test
473+
public void testEnableStoreReturnsErrorResponse() {
474+
when(controllerAccessManager.isAllowListUser(anyString(), any())).thenReturn(true);
475+
ClusterStoreGrpcInfo storeInfo =
476+
ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build();
477+
EnableStoreGrpcRequest request = EnableStoreGrpcRequest.newBuilder()
478+
.setStoreInfo(storeInfo)
479+
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_READ)
480+
.setStatus(true)
481+
.build();
482+
when(storeRequestHandler.enableStore(any(EnableStoreGrpcRequest.class)))
483+
.thenThrow(new VeniceException("Failed to enable store"));
484+
485+
StatusRuntimeException e = expectThrows(StatusRuntimeException.class, () -> blockingStub.enableStore(request));
486+
487+
assertNotNull(e.getStatus(), "Status should not be null");
488+
assertEquals(e.getStatus().getCode(), Status.INTERNAL.getCode());
489+
VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(e);
490+
assertNotNull(errorInfo, "Error info should not be null");
491+
assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.GENERAL_ERROR);
492+
assertTrue(errorInfo.getErrorMessage().contains("Failed to enable store"));
493+
}
494+
495+
@Test
496+
public void testEnableStoreReturnsPermissionDenied() {
497+
when(controllerAccessManager.isAllowListUser(anyString(), any())).thenReturn(false);
498+
ClusterStoreGrpcInfo storeInfo =
499+
ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build();
500+
EnableStoreGrpcRequest request = EnableStoreGrpcRequest.newBuilder()
501+
.setStoreInfo(storeInfo)
502+
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_READ)
503+
.setStatus(true)
504+
.build();
505+
506+
StatusRuntimeException e = expectThrows(StatusRuntimeException.class, () -> blockingStub.enableStore(request));
507+
508+
assertNotNull(e.getStatus(), "Status should not be null");
509+
assertEquals(e.getStatus().getCode(), Status.PERMISSION_DENIED.getCode());
510+
VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(e);
511+
assertNotNull(errorInfo, "Error info should not be null");
512+
assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.UNAUTHORIZED);
513+
assertTrue(
514+
errorInfo.getErrorMessage().contains("Only admin users are allowed to run"),
515+
"Actual: " + errorInfo.getErrorMessage());
516+
}
517+
518+
@Test
519+
public void testEnableStoreReturnsBadRequestForInvalidArgument() {
520+
when(controllerAccessManager.isAllowListUser(anyString(), any())).thenReturn(true);
521+
ClusterStoreGrpcInfo storeInfo =
522+
ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build();
523+
EnableStoreGrpcRequest request = EnableStoreGrpcRequest.newBuilder()
524+
.setStoreInfo(storeInfo)
525+
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_UNSPECIFIED)
526+
.setStatus(true)
527+
.build();
528+
when(storeRequestHandler.enableStore(any(EnableStoreGrpcRequest.class)))
529+
.thenThrow(new IllegalArgumentException("Operation type is required for enable store"));
530+
531+
StatusRuntimeException e = expectThrows(StatusRuntimeException.class, () -> blockingStub.enableStore(request));
532+
533+
assertNotNull(e.getStatus(), "Status should not be null");
534+
assertEquals(e.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
535+
VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(e);
536+
assertNotNull(errorInfo, "Error info should not be null");
537+
assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.BAD_REQUEST);
538+
assertTrue(errorInfo.getErrorMessage().contains("Operation type is required for enable store"));
539+
}
448540
}

0 commit comments

Comments
 (0)