Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ service StoreGrpcService {
rpc checkResourceCleanupForStoreCreation(ClusterStoreGrpcInfo) returns (ResourceCleanupCheckGrpcResponse) {}
rpc validateStoreDeleted(ValidateStoreDeletedGrpcRequest) returns (ValidateStoreDeletedGrpcResponse);
rpc listStores(ListStoresGrpcRequest) returns (ListStoresGrpcResponse);
rpc enableStore(EnableStoreGrpcRequest) returns (EnableStoreGrpcResponse);
}

message CreateStoreGrpcRequest {
Expand Down Expand Up @@ -82,4 +83,21 @@ message ListStoresGrpcRequest {
message ListStoresGrpcResponse {
string clusterName = 1;
repeated string storeNames = 2;
}

enum StoreEnableOperation {
STORE_ENABLE_OPERATION_UNSPECIFIED = 0;
STORE_ENABLE_OPERATION_READ = 1;
STORE_ENABLE_OPERATION_WRITE = 2;
STORE_ENABLE_OPERATION_READ_WRITE = 3;
}

message EnableStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
StoreEnableOperation operation = 2;
bool status = 3;
}

message EnableStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcRequest;
import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcResponse;
import com.linkedin.venice.protocols.controller.EnableStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.EnableStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.GetValueSchemaGrpcRequest;
import com.linkedin.venice.protocols.controller.GetValueSchemaGrpcResponse;
import com.linkedin.venice.protocols.controller.LeaderControllerGrpcRequest;
import com.linkedin.venice.protocols.controller.LeaderControllerGrpcResponse;
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
import com.linkedin.venice.protocols.controller.ListStoresGrpcResponse;
import com.linkedin.venice.protocols.controller.SchemaGrpcServiceGrpc;
import com.linkedin.venice.protocols.controller.StoreEnableOperation;
import com.linkedin.venice.protocols.controller.StoreGrpcServiceGrpc;
import com.linkedin.venice.protocols.controller.StoreMigrationCheckGrpcRequest;
import com.linkedin.venice.protocols.controller.StoreMigrationCheckGrpcResponse;
Expand Down Expand Up @@ -386,6 +389,59 @@ public void testGetValueSchemaGrpcEndpoint() {
assertEquals(exception.getStatus().getCode(), io.grpc.Status.Code.INVALID_ARGUMENT);
}

@Test(timeOut = TIMEOUT_MS)
public void testEnableStoreGrpcEndpoint() {
String storeName = Utils.getUniqueString("test_enable_store");
String controllerGrpcUrl = veniceCluster.getLeaderVeniceController().getControllerGrpcUrl();
ManagedChannel channel = Grpc.newChannelBuilder(controllerGrpcUrl, InsecureChannelCredentials.create()).build();
StoreGrpcServiceGrpc.StoreGrpcServiceBlockingStub storeBlockingStub = StoreGrpcServiceGrpc.newBlockingStub(channel);

// Step 1: Create a store first
ClusterStoreGrpcInfo storeGrpcInfo = ClusterStoreGrpcInfo.newBuilder()
.setClusterName(veniceCluster.getClusterName())
.setStoreName(storeName)
.build();
CreateStoreGrpcRequest createStoreRequest = CreateStoreGrpcRequest.newBuilder()
.setStoreInfo(storeGrpcInfo)
.setOwner("owner")
.setKeySchema(DEFAULT_KEY_SCHEMA)
.setValueSchema("\"string\"")
.build();
CreateStoreGrpcResponse createResponse = storeBlockingStub.createStore(createStoreRequest);
assertNotNull(createResponse, "Create store response should not be null");

// Step 2: Test enable store read operation
EnableStoreGrpcRequest enableReadRequest = EnableStoreGrpcRequest.newBuilder()
.setStoreInfo(storeGrpcInfo)
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_READ)
.setStatus(false)
.build();
EnableStoreGrpcResponse enableReadResponse = storeBlockingStub.enableStore(enableReadRequest);
assertNotNull(enableReadResponse, "Enable store response should not be null");
assertEquals(enableReadResponse.getStoreInfo().getClusterName(), veniceCluster.getClusterName());
assertEquals(enableReadResponse.getStoreInfo().getStoreName(), storeName);

// Step 3: Test enable store write operation
EnableStoreGrpcRequest enableWriteRequest = EnableStoreGrpcRequest.newBuilder()
.setStoreInfo(storeGrpcInfo)
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_WRITE)
.setStatus(false)
.build();
EnableStoreGrpcResponse enableWriteResponse = storeBlockingStub.enableStore(enableWriteRequest);
assertNotNull(enableWriteResponse, "Enable store response should not be null");
assertEquals(enableWriteResponse.getStoreInfo().getStoreName(), storeName);

// Step 4: Test enable store read-write operation
EnableStoreGrpcRequest enableReadWriteRequest = EnableStoreGrpcRequest.newBuilder()
.setStoreInfo(storeGrpcInfo)
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_READ_WRITE)
.setStatus(true)
.build();
EnableStoreGrpcResponse enableReadWriteResponse = storeBlockingStub.enableStore(enableReadWriteRequest);
assertNotNull(enableReadWriteResponse, "Enable store response should not be null");
assertEquals(enableReadWriteResponse.getStoreInfo().getStoreName(), storeName);
}

private static class MockDynamicAccessController extends NoOpDynamicAccessController {
private final Set<String> resourcesInAllowList = ConcurrentHashMap.newKeySet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.EnableStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.EnableStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
Expand Down Expand Up @@ -147,4 +149,25 @@ public void listStores(ListStoresGrpcRequest grpcRequest, StreamObserver<ListSto
clusterName,
null);
}

/**
* Enable or disable store read/write abilities.
* ACL check required; only allowlist users can enable/disable stores.
*/
@Override
public void enableStore(
EnableStoreGrpcRequest grpcRequest,
StreamObserver<EnableStoreGrpcResponse> responseObserver) {
LOGGER.debug("Received enableStore with args: {}", grpcRequest);
String clusterName = grpcRequest.getStoreInfo().getClusterName();
String storeName = grpcRequest.getStoreInfo().getStoreName();
handleRequest(StoreGrpcServiceGrpc.getEnableStoreMethod(), () -> {
if (!isAllowListUser(accessManager, storeName, Context.current())) {
throw new VeniceUnauthorizedAccessException(
ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX + StoreGrpcServiceGrpc.getEnableStoreMethod().getFullMethodName()
+ " on resource: " + storeName);
}
return storeRequestHandler.enableStore(grpcRequest);
}, responseObserver, clusterName, storeName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.EnableStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.EnableStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
import com.linkedin.venice.protocols.controller.ListStoresGrpcResponse;
import com.linkedin.venice.protocols.controller.StoreEnableOperation;
import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.ValidateStoreDeletedGrpcRequest;
Expand Down Expand Up @@ -255,4 +258,46 @@ public ListStoresGrpcResponse listStores(ListStoresGrpcRequest request) {
LOGGER.info("Found {} stores in cluster: {}", selectedStoreNames.size(), clusterName);
return ListStoresGrpcResponse.newBuilder().setClusterName(clusterName).addAllStoreNames(selectedStoreNames).build();
}

/**
* Enable or disable store read/write abilities.
* @param request the request containing cluster, store name, operation type, and status
* @return response with cluster and store info
*/
public EnableStoreGrpcResponse enableStore(EnableStoreGrpcRequest request) {
ClusterStoreGrpcInfo storeInfo = request.getStoreInfo();
ControllerRequestParamValidator.validateClusterStoreInfo(storeInfo);
String clusterName = storeInfo.getClusterName();
String storeName = storeInfo.getStoreName();
StoreEnableOperation operation = request.getOperation();
boolean status = request.getStatus();

if (operation == StoreEnableOperation.STORE_ENABLE_OPERATION_UNSPECIFIED) {
throw new IllegalArgumentException("Operation type is required for enable store");
}

LOGGER.info(
"Enable store for store: {} in cluster: {} with operation: {} and status: {}",
storeName,
clusterName,
operation,
status);

switch (operation) {
case STORE_ENABLE_OPERATION_READ:
admin.setStoreReadability(clusterName, storeName, status);
break;
case STORE_ENABLE_OPERATION_WRITE:
admin.setStoreWriteability(clusterName, storeName, status);
break;
case STORE_ENABLE_OPERATION_READ_WRITE:
admin.setStoreReadWriteability(clusterName, storeName, status);
break;
default:
throw new VeniceException("Invalid operation type: " + operation);
}

LOGGER.info("Successfully updated store: {} in cluster: {} with operation: {}", storeName, clusterName, operation);
return EnableStoreGrpcResponse.newBuilder().setStoreInfo(storeInfo).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo;
import com.linkedin.venice.protocols.controller.EnableStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.EnableStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
import com.linkedin.venice.protocols.controller.ListStoresGrpcResponse;
import com.linkedin.venice.protocols.controller.StoreEnableOperation;
import com.linkedin.venice.protocols.controller.ValidateStoreDeletedGrpcRequest;
import com.linkedin.venice.protocols.controller.ValidateStoreDeletedGrpcResponse;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
Expand Down Expand Up @@ -747,23 +750,38 @@ public void internalHandle(Request request, ControllerResponse veniceResponse) {
return;
}
AdminSparkServer.validateParams(request, ENABLE_STORE.getParams(), admin);
String cluster = request.queryParams(CLUSTER);
String clusterName = request.queryParams(CLUSTER);
String storeName = request.queryParams(NAME);
String operation = request.queryParams(OPERATION);
boolean status = Utils.parseBooleanOrThrow(request.queryParams(STATUS), "storeAccessStatus");

veniceResponse.setCluster(cluster);
veniceResponse.setName(storeName);

// Convert operation string to enum
StoreEnableOperation grpcOperation;
if (operation.equals(READ_OPERATION)) {
admin.setStoreReadability(cluster, storeName, status);
} else if ((operation.equals(WRITE_OPERATION))) {
admin.setStoreWriteability(cluster, storeName, status);
grpcOperation = StoreEnableOperation.STORE_ENABLE_OPERATION_READ;
} else if (operation.equals(WRITE_OPERATION)) {
grpcOperation = StoreEnableOperation.STORE_ENABLE_OPERATION_WRITE;
} else if (operation.equals(READ_WRITE_OPERATION)) {
admin.setStoreReadWriteability(cluster, storeName, status);
grpcOperation = StoreEnableOperation.STORE_ENABLE_OPERATION_READ_WRITE;
} else {
throw new VeniceException(OPERATION + " parameter:" + operation + " is invalid.");
}

// Build gRPC request
ClusterStoreGrpcInfo storeInfo =
ClusterStoreGrpcInfo.newBuilder().setClusterName(clusterName).setStoreName(storeName).build();
EnableStoreGrpcRequest grpcRequest = EnableStoreGrpcRequest.newBuilder()
.setStoreInfo(storeInfo)
.setOperation(grpcOperation)
.setStatus(status)
.build();

// Call handler
EnableStoreGrpcResponse grpcResponse = storeRequestHandler.enableStore(grpcRequest);

// Map response back to HTTP
veniceResponse.setCluster(grpcResponse.getStoreInfo().getClusterName());
veniceResponse.setName(grpcResponse.getStoreInfo().getStoreName());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.EnableStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.EnableStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
import com.linkedin.venice.protocols.controller.ListStoresGrpcResponse;
import com.linkedin.venice.protocols.controller.ResourceCleanupCheckGrpcResponse;
import com.linkedin.venice.protocols.controller.StoreEnableOperation;
import com.linkedin.venice.protocols.controller.StoreGrpcServiceGrpc;
import com.linkedin.venice.protocols.controller.StoreGrpcServiceGrpc.StoreGrpcServiceBlockingStub;
import com.linkedin.venice.protocols.controller.UpdateAclForStoreGrpcRequest;
Expand Down Expand Up @@ -445,4 +448,93 @@ public void testListStoresWithFilters() {
assertEquals(actualResponse.getClusterName(), TEST_CLUSTER, "Cluster name should match");
assertEquals(actualResponse.getStoreNamesCount(), 1, "Should have 1 store after filtering");
}

@Test
public void testEnableStoreReturnsSuccessfulResponse() {
when(controllerAccessManager.isAllowListUser(anyString(), any())).thenReturn(true);
ClusterStoreGrpcInfo storeInfo =
ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build();
EnableStoreGrpcRequest request = EnableStoreGrpcRequest.newBuilder()
.setStoreInfo(storeInfo)
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_READ)
.setStatus(true)
.build();
EnableStoreGrpcResponse expectedResponse = EnableStoreGrpcResponse.newBuilder().setStoreInfo(storeInfo).build();
when(storeRequestHandler.enableStore(any(EnableStoreGrpcRequest.class))).thenReturn(expectedResponse);

EnableStoreGrpcResponse actualResponse = blockingStub.enableStore(request);

assertNotNull(actualResponse, "Response should not be null");
assertEquals(actualResponse.getStoreInfo().getClusterName(), TEST_CLUSTER, "Cluster name should match");
assertEquals(actualResponse.getStoreInfo().getStoreName(), TEST_STORE, "Store name should match");
}

@Test
public void testEnableStoreReturnsErrorResponse() {
when(controllerAccessManager.isAllowListUser(anyString(), any())).thenReturn(true);
ClusterStoreGrpcInfo storeInfo =
ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build();
EnableStoreGrpcRequest request = EnableStoreGrpcRequest.newBuilder()
.setStoreInfo(storeInfo)
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_READ)
.setStatus(true)
.build();
when(storeRequestHandler.enableStore(any(EnableStoreGrpcRequest.class)))
.thenThrow(new VeniceException("Failed to enable store"));

StatusRuntimeException e = expectThrows(StatusRuntimeException.class, () -> blockingStub.enableStore(request));

assertNotNull(e.getStatus(), "Status should not be null");
assertEquals(e.getStatus().getCode(), Status.INTERNAL.getCode());
VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(e);
assertNotNull(errorInfo, "Error info should not be null");
assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.GENERAL_ERROR);
assertTrue(errorInfo.getErrorMessage().contains("Failed to enable store"));
}

@Test
public void testEnableStoreReturnsPermissionDenied() {
when(controllerAccessManager.isAllowListUser(anyString(), any())).thenReturn(false);
ClusterStoreGrpcInfo storeInfo =
ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build();
EnableStoreGrpcRequest request = EnableStoreGrpcRequest.newBuilder()
.setStoreInfo(storeInfo)
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_READ)
.setStatus(true)
.build();

StatusRuntimeException e = expectThrows(StatusRuntimeException.class, () -> blockingStub.enableStore(request));

assertNotNull(e.getStatus(), "Status should not be null");
assertEquals(e.getStatus().getCode(), Status.PERMISSION_DENIED.getCode());
VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(e);
assertNotNull(errorInfo, "Error info should not be null");
assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.UNAUTHORIZED);
assertTrue(
errorInfo.getErrorMessage().contains("Only admin users are allowed to run"),
"Actual: " + errorInfo.getErrorMessage());
}

@Test
public void testEnableStoreReturnsBadRequestForInvalidArgument() {
when(controllerAccessManager.isAllowListUser(anyString(), any())).thenReturn(true);
ClusterStoreGrpcInfo storeInfo =
ClusterStoreGrpcInfo.newBuilder().setClusterName(TEST_CLUSTER).setStoreName(TEST_STORE).build();
EnableStoreGrpcRequest request = EnableStoreGrpcRequest.newBuilder()
.setStoreInfo(storeInfo)
.setOperation(StoreEnableOperation.STORE_ENABLE_OPERATION_UNSPECIFIED)
.setStatus(true)
.build();
when(storeRequestHandler.enableStore(any(EnableStoreGrpcRequest.class)))
.thenThrow(new IllegalArgumentException("Operation type is required for enable store"));

StatusRuntimeException e = expectThrows(StatusRuntimeException.class, () -> blockingStub.enableStore(request));

assertNotNull(e.getStatus(), "Status should not be null");
assertEquals(e.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
VeniceControllerGrpcErrorInfo errorInfo = GrpcRequestResponseConverter.parseControllerGrpcError(e);
assertNotNull(errorInfo, "Error info should not be null");
assertEquals(errorInfo.getErrorType(), ControllerGrpcErrorType.BAD_REQUEST);
assertTrue(errorInfo.getErrorMessage().contains("Operation type is required for enable store"));
}
}
Loading
Loading