Skip to content

Commit

Permalink
Resolving merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
bonytoni committed Jul 26, 2024
1 parent df1e78b commit 5c3f6a3
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,17 @@ private ConfigKeys() {
/** Whether current controller is parent or not */
public static final String CONTROLLER_PARENT_MODE = "controller.parent.mode";

/**
* This config specifies the state of the region of the parent controller.
*
* The region can be in one of the following states:
* ACTIVE: the parent controller in the region is serving requests.
* PASSIVE: the parent controller in the region is rejecting requests.
*
* By default, this is set to ACTIVE.
*/
public static final String CONTROLLER_PARENT_REGION_STATE = "controller.parent.region.state";

/**
* This config is used to control how many errored topics we are going to keep in parent cluster.
* This is mostly used to investigate the Kafka missing message issue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,13 @@ void updateRoutersClusterConfig(
*/
boolean isParent();

/**
* Return the state of the region of the parent controller.
* @return {@link ParentControllerRegionState#ACTIVE} which means that the parent controller in the region is serving requests.
* Otherwise, return {@link ParentControllerRegionState#PASSIVE}
*/
ParentControllerRegionState getParentControllerRegionState();

/**
* Get child datacenter to child controller url mapping.
* @return A map of child datacenter -> child controller url
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.linkedin.venice.controller;

/**
* Enum representing the state of the region where the parent controller resides.
* (i.e., Region dc-0 is ACTIVE while Region dc-1 is PASSIVE
* This means that ParentController in dc-0 is serving requests while ParentController in dc-1 is rejecting requests)
*/
public enum ParentControllerRegionState {
/** The region is active, so the parent controller in the region is serving requests */
ACTIVE,
/**
* The region is passive, so the parent controller in the region is rejecting requests.
* This region is ready to take over if components in the currently active region fails
*/
PASSIVE
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static com.linkedin.venice.ConfigKeys.CONTROLLER_NAME;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_PARENT_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_PARENT_MODE;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_PARENT_REGION_STATE;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_PARENT_SYSTEM_STORE_HEARTBEAT_CHECK_WAIT_TIME_SECONDS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_PARENT_SYSTEM_STORE_REPAIR_CHECK_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_PARENT_SYSTEM_STORE_REPAIR_RETRY_COUNT;
Expand Down Expand Up @@ -157,6 +158,7 @@
import static com.linkedin.venice.SSLConfig.DEFAULT_CONTROLLER_SSL_ENABLED;
import static com.linkedin.venice.VeniceConstants.DEFAULT_PER_ROUTER_READ_QUOTA;
import static com.linkedin.venice.VeniceConstants.DEFAULT_SSL_FACTORY_CLASS_NAME;
import static com.linkedin.venice.controller.ParentControllerRegionState.ACTIVE;
import static com.linkedin.venice.pubsub.PubSubConstants.DEFAULT_KAFKA_REPLICATION_FACTOR;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_CONSUMER_POOL_SIZE_DEFAULT_VALUE;
import static com.linkedin.venice.utils.ByteUtils.BYTES_PER_MB;
Expand Down Expand Up @@ -221,6 +223,7 @@ public class VeniceControllerClusterConfig {
private final String controllerClusterZkAddress;
private final boolean multiRegion;
private final boolean parent;
private final ParentControllerRegionState parentControllerRegionState;
private final Map<String, String> childDataCenterControllerUrlMap;
private final String d2ServiceName;
private final String clusterDiscoveryD2ServiceName;
Expand Down Expand Up @@ -631,6 +634,8 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
this.controllerClusterReplica = props.getInt(CONTROLLER_CLUSTER_REPLICA, 3);
this.controllerClusterZkAddress = props.getString(CONTROLLER_CLUSTER_ZK_ADDRESSS, getZkAddress());
this.parent = props.getBoolean(CONTROLLER_PARENT_MODE, false);
this.parentControllerRegionState =
ParentControllerRegionState.valueOf(props.getString(CONTROLLER_PARENT_REGION_STATE, ACTIVE.name()));

if (childDatacenters.isEmpty()) {
this.childDataCenterControllerUrlMap = Collections.emptyMap();
Expand Down Expand Up @@ -1162,6 +1167,10 @@ public boolean isParent() {
return parent;
}

public ParentControllerRegionState getParentControllerRegionState() {
return parentControllerRegionState;
}

public long getDeprecatedJobTopicRetentionMs() {
return deprecatedJobTopicRetentionMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public boolean isParent() {
return getCommonConfig().isParent();
}

public ParentControllerRegionState getParentControllerRegionState() {
return getCommonConfig().getParentControllerRegionState();
}

public String getControllerName() {
return getCommonConfig().getControllerName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7553,6 +7553,14 @@ public boolean isParent() {
return multiClusterConfigs.isParent();
}

/**
* @see Admin#getParentControllerRegionState()
*/
@Override
public ParentControllerRegionState getParentControllerRegionState() {
return multiClusterConfigs.getParentControllerRegionState();
}

/**
* @see Admin#getChildDataCenterControllerUrlMap(String)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4871,6 +4871,14 @@ public boolean isParent() {
return getVeniceHelixAdmin().isParent();
}

/**
* @see Admin#getParentControllerRegionState()
*/
@Override
public ParentControllerRegionState getParentControllerRegionState() {
return getVeniceHelixAdmin().getParentControllerRegionState();
}

/**
* @see Admin#getChildDataCenterControllerUrlMap(String)
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.linkedin.venice.controller.server;

import static com.linkedin.venice.controller.ParentControllerRegionState.ACTIVE;

import com.linkedin.venice.HttpConstants;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controller.ParentControllerRegionState;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.exceptions.ErrorType;
import spark.Request;
import spark.Response;
import spark.Route;


/**
* Handler for checking the state of the region of the parent controller to handle requests in its region.
*/
public class VeniceParentControllerRegionStateHandler implements Route {
public static final String ACTIVE_CHECK_FAILURE_WARN_MESSAGE_PREFIX =
"Only active parent controllers are allowed to run ";
private Admin admin;
private Route route;

public VeniceParentControllerRegionStateHandler(Admin admin, Route route) {
this.admin = admin;
this.route = route;
}

@Override
public Object handle(Request request, Response response) throws Exception {
ParentControllerRegionState parentControllerRegionState = admin.getParentControllerRegionState();
boolean isParent = admin.isParent();
if (isParent && parentControllerRegionState != ACTIVE) {
ControllerResponse responseObject = new ControllerResponse();
response.type(HttpConstants.JSON);
response.status(HttpConstants.SC_MISDIRECTED_REQUEST);
responseObject.setError(ACTIVE_CHECK_FAILURE_WARN_MESSAGE_PREFIX + request.url());
responseObject.setErrorType(ErrorType.INCORRECT_CONTROLLER);
return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);
}
return route.handle(request, response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.linkedin.venice.controller.server;

import static com.linkedin.venice.controller.ParentControllerRegionState.ACTIVE;
import static com.linkedin.venice.controller.ParentControllerRegionState.PASSIVE;
import static com.linkedin.venice.controller.server.VeniceParentControllerRegionStateHandler.ACTIVE_CHECK_FAILURE_WARN_MESSAGE_PREFIX;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.linkedin.venice.HttpConstants;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.ExceptionType;
import com.linkedin.venice.utils.ObjectMapperFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
import spark.Request;
import spark.Response;
import spark.Route;


public class TestVeniceParentControllerRegionStateHandler {
private Logger logger = LogManager.getLogger(TestVeniceParentControllerRegionStateHandler.class);
private Admin admin = mock(Admin.class);
private Request request = mock(Request.class);
private Response response = mock(Response.class);
private VeniceRouteHandler<ControllerResponse> veniceRouteHandler =
new VeniceRouteHandler<ControllerResponse>(ControllerResponse.class) {
@Override
public void internalHandle(Request request, ControllerResponse veniceResponse) {
logger.info("VeniceParentControllerRegionStateHandler is working");
veniceResponse.setName("working test response");
veniceResponse.setCluster("working test cluster");
}
};

@Test
public void testActiveParent() throws Exception {
reset(admin, request, response);
when(admin.getParentControllerRegionState()).thenReturn(ACTIVE);
when(admin.isParent()).thenReturn(true);
Assert.assertEquals(admin.getParentControllerRegionState(), ACTIVE);
Assert.assertTrue(admin.isParent());
verifySuccessResponse(response);
}

@Test
public void testPassiveParent() throws Exception {
reset(admin, request, response);
when(admin.getParentControllerRegionState()).thenReturn(PASSIVE);
when(admin.isParent()).thenReturn(true);
Assert.assertEquals(admin.getParentControllerRegionState(), PASSIVE);
Assert.assertTrue(admin.isParent());
verifyErrorResponse(response);
}

@Test
public void testActiveChild() throws Exception {
reset(admin, request, response);
when(admin.getParentControllerRegionState()).thenReturn(ACTIVE);
when(admin.isParent()).thenReturn(false);
Assert.assertEquals(admin.getParentControllerRegionState(), ACTIVE);
Assert.assertFalse(admin.isParent());
verifySuccessResponse(response);
}

@Test
public void testPassiveChild() throws Exception {
reset(admin, request, response);
when(admin.getParentControllerRegionState()).thenReturn(PASSIVE);
when(admin.isParent()).thenReturn(false);
Assert.assertEquals(admin.getParentControllerRegionState(), PASSIVE);
Assert.assertFalse(admin.isParent());
verifySuccessResponse(response);
}

private void verifySuccessResponse(Response response) throws Exception {
Route route = new VeniceParentControllerRegionStateHandler(admin, veniceRouteHandler);
String veniceResponseStr = route.handle(request, response).toString();
verify(response, never()).status(HttpConstants.SC_MISDIRECTED_REQUEST);
ControllerResponse veniceResponse =
ObjectMapperFactory.getInstance().readValue(veniceResponseStr, ControllerResponse.class);
Assert.assertEquals(veniceResponse.getName(), "working test response");
Assert.assertEquals(veniceResponse.getCluster(), "working test cluster");
}

private void verifyErrorResponse(Response response) throws Exception {
Route route = new VeniceParentControllerRegionStateHandler(admin, veniceRouteHandler);
String veniceResponseStr = route.handle(request, response).toString();
ControllerResponse veniceResponse =
ObjectMapperFactory.getInstance().readValue(veniceResponseStr, ControllerResponse.class);
verify(response).status(HttpConstants.SC_MISDIRECTED_REQUEST);
Assert.assertTrue(veniceResponse.getError().startsWith(ACTIVE_CHECK_FAILURE_WARN_MESSAGE_PREFIX));
Assert.assertEquals(veniceResponse.getErrorType(), ErrorType.INCORRECT_CONTROLLER);
Assert.assertEquals(veniceResponse.getExceptionType(), ExceptionType.INCORRECT_CONTROLLER);
}
}

0 comments on commit 5c3f6a3

Please sign in to comment.