Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ public void internalHandle(Request request, StoreMigrationResponse veniceRespons
if (!checkIsAllowListUser(request, veniceResponse, () -> isAllowListUser(request))) {
return;
}
if (!admin.isParent()) {
veniceResponse.setError("Store migration is only allowed from a parent controller!");
veniceResponse.setErrorType(ErrorType.BAD_REQUEST);
return;
}
AdminSparkServer.validateParams(request, MIGRATE_STORE.getParams(), admin);
String srcClusterName = request.queryParams(CLUSTER);
String destClusterName = request.queryParams(CLUSTER_DEST);
Expand Down Expand Up @@ -406,6 +411,11 @@ public void internalHandle(Request request, StoreMigrationResponse veniceRespons
if (!checkIsAllowListUser(request, veniceResponse, () -> isAllowListUser(request))) {
return;
}
if (!admin.isParent()) {
veniceResponse.setError("Complete migration is only allowed from a parent controller!");
veniceResponse.setErrorType(ErrorType.BAD_REQUEST);
return;
}
AdminSparkServer.validateParams(request, COMPLETE_MIGRATION.getParams(), admin);
String srcClusterName = request.queryParams(CLUSTER);
String destClusterName = request.queryParams(CLUSTER_DEST);
Expand Down Expand Up @@ -448,6 +458,11 @@ public void internalHandle(Request request, StoreMigrationResponse veniceRespons
if (!checkIsAllowListUser(request, veniceResponse, () -> isAllowListUser(request))) {
return;
}
if (!admin.isParent()) {
veniceResponse.setError("Abort migration is only allowed from a parent controller!");
veniceResponse.setErrorType(ErrorType.BAD_REQUEST);
return;
}
AdminSparkServer.validateParams(request, ABORT_MIGRATION.getParams(), admin);
String srcClusterName = request.queryParams(CLUSTER);
String destClusterName = request.queryParams(CLUSTER_DEST);
Expand Down Expand Up @@ -484,6 +499,11 @@ public void internalHandle(Request request, StoreMigrationResponse storeMigratio
if (!checkIsAllowListUser(request, storeMigrationResponse, () -> isAllowListUser(request))) {
return;
}
if (!admin.isParent()) {
storeMigrationResponse.setError("Auto store migration is only allowed from a parent controller!");
storeMigrationResponse.setErrorType(ErrorType.BAD_REQUEST);
return;
}
AdminSparkServer.validateParams(request, MIGRATE_STORE.getParams(), admin);
String srcClusterName = request.queryParams(CLUSTER);
String destClusterName = request.queryParams(CLUSTER_DEST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public void testDeleteStore() throws Exception {
@Test
public void testMigrateStore() throws Exception {
Admin mockAdmin = mock(VeniceParentHelixAdmin.class, RETURNS_DEEP_STUBS);
doReturn(true).when(mockAdmin).isParent();
String DEST_CLUSTER = "dest_cluster";
when(mockAdmin.discoverCluster(TEST_STORE_NAME)).thenReturn(TEST_CLUSTER);
Request request = mock(Request.class);
Expand Down Expand Up @@ -358,6 +359,7 @@ public void testRepushStoreWithErrorResponse() throws Exception {
@Test
public void testAutoMigrateStore() throws Exception {
Admin mockAdmin = mock(VeniceParentHelixAdmin.class, RETURNS_DEEP_STUBS);
doReturn(true).when(mockAdmin).isParent();
String DEST_CLUSTER = "dest_cluster";
when(mockAdmin.discoverCluster(TEST_STORE_NAME)).thenReturn(TEST_CLUSTER);
Request request = mock(Request.class);
Expand Down Expand Up @@ -678,4 +680,87 @@ public void testGetAllStoresWithFilters() throws Exception {
Assert.assertEquals(response.getStores().length, 1);
Assert.assertEquals(response.getStores()[0], "user-store");
}

@Test
public void testMigrateStoreBlockedOnChildController() throws Exception {
Admin mockAdmin = mock(VeniceHelixAdmin.class, RETURNS_DEEP_STUBS);
doReturn(false).when(mockAdmin).isParent();

Request request = mock(Request.class);
doReturn(LEADER_CONTROLLER.getPath()).when(request).pathInfo();
doReturn(TEST_CLUSTER).when(request).queryParams(eq(ControllerApiConstants.CLUSTER));
doReturn("dest_cluster").when(request).queryParams(eq(ControllerApiConstants.CLUSTER_DEST));
doReturn(TEST_STORE_NAME).when(request).queryParams(eq(ControllerApiConstants.NAME));

Route migrateStoreRoute = new StoresRoutes(false, Optional.empty(), pubSubTopicRepository).migrateStore(mockAdmin);
StoreMigrationResponse migrationResponse = ObjectMapperFactory.getInstance()
.readValue(migrateStoreRoute.handle(request, mock(Response.class)).toString(), StoreMigrationResponse.class);
Assert.assertTrue(migrationResponse.isError());
Assert.assertEquals(migrationResponse.getErrorType(), ErrorType.BAD_REQUEST);
Assert.assertTrue(migrationResponse.getError().contains("parent controller"));
}

@Test
public void testCompleteMigrationBlockedOnChildController() throws Exception {
Admin mockAdmin = mock(VeniceHelixAdmin.class, RETURNS_DEEP_STUBS);
doReturn(false).when(mockAdmin).isParent();

Request request = mock(Request.class);
doReturn(LEADER_CONTROLLER.getPath()).when(request).pathInfo();
doReturn(TEST_CLUSTER).when(request).queryParams(eq(ControllerApiConstants.CLUSTER));
doReturn("dest_cluster").when(request).queryParams(eq(ControllerApiConstants.CLUSTER_DEST));
doReturn(TEST_STORE_NAME).when(request).queryParams(eq(ControllerApiConstants.NAME));

Route completeMigrationRoute =
new StoresRoutes(false, Optional.empty(), pubSubTopicRepository).completeMigration(mockAdmin);
StoreMigrationResponse migrationResponse = ObjectMapperFactory.getInstance()
.readValue(
completeMigrationRoute.handle(request, mock(Response.class)).toString(),
StoreMigrationResponse.class);
Assert.assertTrue(migrationResponse.isError());
Assert.assertEquals(migrationResponse.getErrorType(), ErrorType.BAD_REQUEST);
Assert.assertTrue(migrationResponse.getError().contains("parent controller"));
}

@Test
public void testAbortMigrationBlockedOnChildController() throws Exception {
Admin mockAdmin = mock(VeniceHelixAdmin.class, RETURNS_DEEP_STUBS);
doReturn(false).when(mockAdmin).isParent();

Request request = mock(Request.class);
doReturn(LEADER_CONTROLLER.getPath()).when(request).pathInfo();
doReturn(TEST_CLUSTER).when(request).queryParams(eq(ControllerApiConstants.CLUSTER));
doReturn("dest_cluster").when(request).queryParams(eq(ControllerApiConstants.CLUSTER_DEST));
doReturn(TEST_STORE_NAME).when(request).queryParams(eq(ControllerApiConstants.NAME));

Route abortMigrationRoute =
new StoresRoutes(false, Optional.empty(), pubSubTopicRepository).abortMigration(mockAdmin);
StoreMigrationResponse migrationResponse = ObjectMapperFactory.getInstance()
.readValue(abortMigrationRoute.handle(request, mock(Response.class)).toString(), StoreMigrationResponse.class);
Assert.assertTrue(migrationResponse.isError());
Assert.assertEquals(migrationResponse.getErrorType(), ErrorType.BAD_REQUEST);
Assert.assertTrue(migrationResponse.getError().contains("parent controller"));
}

@Test
public void testAutoMigrateStoreBlockedOnChildController() throws Exception {
Admin mockAdmin = mock(VeniceHelixAdmin.class, RETURNS_DEEP_STUBS);
doReturn(false).when(mockAdmin).isParent();

Request request = mock(Request.class);
doReturn(LEADER_CONTROLLER.getPath()).when(request).pathInfo();
doReturn(TEST_CLUSTER).when(request).queryParams(eq(ControllerApiConstants.CLUSTER));
doReturn("dest_cluster").when(request).queryParams(eq(ControllerApiConstants.CLUSTER_DEST));
doReturn(TEST_STORE_NAME).when(request).queryParams(eq(ControllerApiConstants.STORE_NAME));

Route autoMigrateStoreRoute =
new StoresRoutes(false, Optional.empty(), pubSubTopicRepository).autoMigrateStore(mockAdmin);
StoreMigrationResponse migrationResponse = ObjectMapperFactory.getInstance()
.readValue(
autoMigrateStoreRoute.handle(request, mock(Response.class)).toString(),
StoreMigrationResponse.class);
Assert.assertTrue(migrationResponse.isError());
Assert.assertEquals(migrationResponse.getErrorType(), ErrorType.BAD_REQUEST);
Assert.assertTrue(migrationResponse.getError().contains("parent controller"));
}
}
Loading