Skip to content

Commit 20bd2e3

Browse files
committed
Revert "Implemented active parent controller check for all routes with VeniceParentControllerRegionStateHandler"
This reverts commit 1c03f5b.
1 parent a4209a9 commit 20bd2e3

19 files changed

+1155
-1270
lines changed

services/venice-controller/src/main/java/com/linkedin/venice/controller/ParentControllerRegionState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
/**
44
* Enum representing the state of the region where the parent controller resides.
5-
* i.e., Region dc-0 is ACTIVE while Region dc-1 is PASSIVE
6-
* This means that ParentController in dc-0 is serving requests while ParentController in dc-1 is rejecting requests
5+
* (i.e., Region dc-0 is ACTIVE while Region dc-1 is PASSIVE
6+
* This means that ParentController in dc-0 is serving requests while ParentController in dc-1 is rejecting requests)
77
*/
88
public enum ParentControllerRegionState {
99
/** The region is active, so the parent controller in the region is serving requests */

services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminCommandExecutionRoutes.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public AdminCommandExecutionRoutes(boolean sslEnabled, Optional<DynamicAccessCon
2626
* @see Admin#getAdminCommandExecutionTracker(String)
2727
*/
2828
public Route getExecution(Admin admin) {
29-
return new VeniceParentControllerRegionStateHandler(admin, (request, response) -> {
29+
return (request, response) -> {
3030
AdminCommandExecutionResponse responseObject = new AdminCommandExecutionResponse();
3131
response.type(HttpConstants.JSON);
3232
// This request should only hit the parent controller. If a PROD controller get this kind of request, a empty
@@ -50,15 +50,15 @@ public Route getExecution(Admin admin) {
5050
"Could not track execution in this controller. Make sure you send the command to a correct parent controller.");
5151
}
5252
return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);
53-
});
53+
};
5454
}
5555

5656
/**
5757
* No ACL check; any user is allowed to check last succeeded execution Id.
5858
* @see Admin#getLastSucceedExecutionId(String)
5959
*/
6060
public Route getLastSucceedExecutionId(Admin admin) {
61-
return new VeniceParentControllerRegionStateHandler(admin, (request, response) -> {
61+
return (request, response) -> {
6262
LastSucceedExecutionIdResponse responseObject = new LastSucceedExecutionIdResponse();
6363
response.type(HttpConstants.JSON);
6464
AdminSparkServer.validateParams(request, LAST_SUCCEED_EXECUTION_ID.getParams(), admin);
@@ -71,6 +71,6 @@ public Route getLastSucceedExecutionId(Admin admin) {
7171
AdminSparkServer.handleError(e, request, response);
7272
}
7373
return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);
74-
});
74+
};
7575
}
7676
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public AdminTopicMetadataRoutes(boolean sslEnabled, Optional<DynamicAccessContro
3232
* @see Admin#getAdminTopicMetadata(String, Optional)
3333
*/
3434
public Route getAdminTopicMetadata(Admin admin) {
35-
return new VeniceParentControllerRegionStateHandler(admin, (request, response) -> {
35+
return (request, response) -> {
3636
AdminTopicMetadataResponse responseObject = new AdminTopicMetadataResponse();
3737
response.type(HttpConstants.JSON);
3838
try {
@@ -57,14 +57,14 @@ public Route getAdminTopicMetadata(Admin admin) {
5757
AdminSparkServer.handleError(new VeniceException(e), request, response);
5858
}
5959
return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);
60-
});
60+
};
6161
}
6262

6363
/**
6464
* @see Admin#updateAdminTopicMetadata(String, long, Optional, Optional, Optional)
6565
*/
6666
public Route updateAdminTopicMetadata(Admin admin) {
67-
return new VeniceParentControllerRegionStateHandler(admin, (request, response) -> {
67+
return (request, response) -> {
6868
ControllerResponse responseObject = new ControllerResponse();
6969
response.type(HttpConstants.JSON);
7070
try {
@@ -101,6 +101,6 @@ public Route updateAdminTopicMetadata(Admin admin) {
101101
AdminSparkServer.handleError(new VeniceException(e), request, response);
102102
}
103103
return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);
104-
});
104+
};
105105
}
106106
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterDiscovery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public class ClusterDiscovery {
1515
* No ACL check; any user is allowed to discover cluster
1616
*/
1717
public static Route discoverCluster(Admin admin) {
18-
return new VeniceParentControllerRegionStateHandler(admin, (request, response) -> {
18+
return (request, response) -> {
1919
D2ServiceDiscoveryResponse responseObject = new D2ServiceDiscoveryResponse();
2020
try {
2121
AdminSparkServer.validateParams(request, CLUSTER_DISCOVERY.getParams(), admin);
@@ -30,6 +30,6 @@ public static Route discoverCluster(Admin admin) {
3030
}
3131
response.type(HttpConstants.JSON);
3232
return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);
33-
});
33+
};
3434
}
3535
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterRoutes.java

Lines changed: 55 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -31,75 +31,68 @@ public ClusterRoutes(boolean sslEnabled, Optional<DynamicAccessController> acces
3131
* @see Admin#updateClusterConfig(String, UpdateClusterConfigQueryParams)
3232
*/
3333
public Route updateClusterConfig(Admin admin) {
34-
return new VeniceParentControllerRegionStateHandler(
35-
admin,
36-
new VeniceRouteHandler<ControllerResponse>(ControllerResponse.class) {
37-
@Override
38-
public void internalHandle(Request request, ControllerResponse veniceResponse) {
39-
// Only allow allowlist users to run this command
40-
if (!checkIsAllowListUser(request, veniceResponse, () -> isAllowListUser(request))) {
41-
return;
42-
}
43-
AdminSparkServer.validateParams(request, UPDATE_CLUSTER_CONFIG.getParams(), admin);
44-
String clusterName = request.queryParams(CLUSTER);
34+
return new VeniceRouteHandler<ControllerResponse>(ControllerResponse.class) {
35+
@Override
36+
public void internalHandle(Request request, ControllerResponse veniceResponse) {
37+
// Only allow allowlist users to run this command
38+
if (!checkIsAllowListUser(request, veniceResponse, () -> isAllowListUser(request))) {
39+
return;
40+
}
41+
AdminSparkServer.validateParams(request, UPDATE_CLUSTER_CONFIG.getParams(), admin);
42+
String clusterName = request.queryParams(CLUSTER);
4543

46-
veniceResponse.setCluster(clusterName);
44+
veniceResponse.setCluster(clusterName);
4745

48-
Map<String, String> params =
49-
Utils.extractQueryParamsFromRequest(request.queryMap().toMap(), veniceResponse);
46+
Map<String, String> params = Utils.extractQueryParamsFromRequest(request.queryMap().toMap(), veniceResponse);
5047

51-
try {
52-
admin.updateClusterConfig(clusterName, new UpdateClusterConfigQueryParams(params));
53-
} catch (Exception e) {
54-
veniceResponse.setError(
55-
"Failed when updating configs for cluster: " + clusterName + ". Exception type: "
56-
+ e.getClass().toString() + ". Detailed message = " + e.getMessage());
57-
}
58-
}
59-
});
48+
try {
49+
admin.updateClusterConfig(clusterName, new UpdateClusterConfigQueryParams(params));
50+
} catch (Exception e) {
51+
veniceResponse.setError(
52+
"Failed when updating configs for cluster: " + clusterName + ". Exception type: "
53+
+ e.getClass().toString() + ". Detailed message = " + e.getMessage());
54+
}
55+
}
56+
};
6057
}
6158

6259
/**
6360
* No ACL check; any user is allowed to check whether store migration is allowed for a specific cluster.
6461
* @see Admin#isStoreMigrationAllowed(String)
6562
*/
6663
public Route isStoreMigrationAllowed(Admin admin) {
67-
return new VeniceParentControllerRegionStateHandler(
68-
admin,
69-
new VeniceRouteHandler<StoreMigrationResponse>(StoreMigrationResponse.class) {
70-
@Override
71-
public void internalHandle(Request request, StoreMigrationResponse veniceResponse) {
72-
AdminSparkServer.validateParams(request, STORE_MIGRATION_ALLOWED.getParams(), admin);
73-
String clusterName = request.queryParams(CLUSTER);
74-
veniceResponse.setCluster(clusterName);
75-
veniceResponse.setStoreMigrationAllowed(admin.isStoreMigrationAllowed(clusterName));
76-
}
77-
});
64+
return new VeniceRouteHandler<StoreMigrationResponse>(StoreMigrationResponse.class) {
65+
@Override
66+
public void internalHandle(Request request, StoreMigrationResponse veniceResponse) {
67+
AdminSparkServer.validateParams(request, STORE_MIGRATION_ALLOWED.getParams(), admin);
68+
String clusterName = request.queryParams(CLUSTER);
69+
veniceResponse.setCluster(clusterName);
70+
veniceResponse.setStoreMigrationAllowed(admin.isStoreMigrationAllowed(clusterName));
71+
}
72+
};
7873
}
7974

8075
/**
8176
* @see Admin#wipeCluster(String, String, Optional, Optional)
8277
*/
8378
public Route wipeCluster(Admin admin) {
84-
return new VeniceParentControllerRegionStateHandler(
85-
admin,
86-
new VeniceRouteHandler<ControllerResponse>(ControllerResponse.class) {
87-
@Override
88-
public void internalHandle(Request request, ControllerResponse veniceResponse) {
89-
// Only allow allowlist users to run this command
90-
if (!checkIsAllowListUser(request, veniceResponse, () -> isAllowListUser(request))) {
91-
return;
92-
}
93-
AdminSparkServer.validateParams(request, WIPE_CLUSTER.getParams(), admin);
94-
String cluster = request.queryParams(CLUSTER);
95-
String fabric = request.queryParams(FABRIC);
96-
Optional<String> storeName = Optional.ofNullable(request.queryParams(NAME));
97-
Optional<Integer> versionNum = Optional.ofNullable(request.queryParams(VERSION)).map(Integer::parseInt);
98-
veniceResponse.setCluster(cluster);
99-
storeName.ifPresent(veniceResponse::setName);
100-
admin.wipeCluster(cluster, fabric, storeName, versionNum);
101-
}
102-
});
79+
return new VeniceRouteHandler<ControllerResponse>(ControllerResponse.class) {
80+
@Override
81+
public void internalHandle(Request request, ControllerResponse veniceResponse) {
82+
// Only allow allowlist users to run this command
83+
if (!checkIsAllowListUser(request, veniceResponse, () -> isAllowListUser(request))) {
84+
return;
85+
}
86+
AdminSparkServer.validateParams(request, WIPE_CLUSTER.getParams(), admin);
87+
String cluster = request.queryParams(CLUSTER);
88+
String fabric = request.queryParams(FABRIC);
89+
Optional<String> storeName = Optional.ofNullable(request.queryParams(NAME));
90+
Optional<Integer> versionNum = Optional.ofNullable(request.queryParams(VERSION)).map(Integer::parseInt);
91+
veniceResponse.setCluster(cluster);
92+
storeName.ifPresent(veniceResponse::setName);
93+
admin.wipeCluster(cluster, fabric, storeName, versionNum);
94+
}
95+
};
10396
}
10497

10598
/**
@@ -108,16 +101,14 @@ public void internalHandle(Request request, ControllerResponse veniceResponse) {
108101
* @see Admin#cleanupInstanceCustomizedStates(String)
109102
*/
110103
public Route cleanupInstanceCustomizedStates(Admin admin) {
111-
return new VeniceParentControllerRegionStateHandler(
112-
admin,
113-
new VeniceRouteHandler<MultiStoreTopicsResponse>(MultiStoreTopicsResponse.class) {
114-
@Override
115-
public void internalHandle(Request request, MultiStoreTopicsResponse veniceResponse) {
116-
AdminSparkServer.validateParams(request, CLEANUP_INSTANCE_CUSTOMIZED_STATES.getParams(), admin);
117-
String clusterName = request.queryParams(CLUSTER);
118-
veniceResponse.setCluster(clusterName);
119-
veniceResponse.setTopics(admin.cleanupInstanceCustomizedStates(clusterName));
120-
}
121-
});
104+
return new VeniceRouteHandler<MultiStoreTopicsResponse>(MultiStoreTopicsResponse.class) {
105+
@Override
106+
public void internalHandle(Request request, MultiStoreTopicsResponse veniceResponse) {
107+
AdminSparkServer.validateParams(request, CLEANUP_INSTANCE_CUSTOMIZED_STATES.getParams(), admin);
108+
String clusterName = request.queryParams(CLUSTER);
109+
veniceResponse.setCluster(clusterName);
110+
veniceResponse.setTopics(admin.cleanupInstanceCustomizedStates(clusterName));
111+
}
112+
};
122113
}
123114
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ControllerRoutes.java

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public ControllerRoutes(
4747
* @see Admin#getLeaderController(String)
4848
*/
4949
public Route getLeaderController(Admin admin) {
50-
return new VeniceParentControllerRegionStateHandler(admin, (request, response) -> {
50+
return (request, response) -> {
5151
LeaderControllerResponse responseObject = new LeaderControllerResponse();
5252
try {
5353
AdminSparkServer.validateParams(request, LEADER_CONTROLLER.getParams(), admin);
@@ -65,7 +65,7 @@ public Route getLeaderController(Admin admin) {
6565
}
6666
response.type(HttpConstants.JSON);
6767
return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);
68-
});
68+
};
6969
}
7070

7171
/**
@@ -74,30 +74,28 @@ public Route getLeaderController(Admin admin) {
7474
* @see Admin#getChildControllerD2ServiceName(String)
7575
*/
7676
public Route getChildControllers(Admin admin) {
77-
return new VeniceParentControllerRegionStateHandler(
78-
admin,
79-
new VeniceRouteHandler<ChildAwareResponse>(ChildAwareResponse.class) {
80-
@Override
81-
public void internalHandle(Request request, ChildAwareResponse veniceResponse) {
82-
AdminSparkServer.validateParams(request, LIST_CHILD_CLUSTERS.getParams(), admin);
83-
String clusterName = request.queryParams(CLUSTER);
84-
85-
veniceResponse.setCluster(clusterName);
86-
87-
if (admin.isParent()) {
88-
veniceResponse.setChildDataCenterControllerUrlMap(admin.getChildDataCenterControllerUrlMap(clusterName));
89-
veniceResponse.setChildDataCenterControllerD2Map(admin.getChildDataCenterControllerD2Map(clusterName));
90-
veniceResponse.setD2ServiceName(admin.getChildControllerD2ServiceName(clusterName));
91-
}
92-
}
93-
});
77+
return new VeniceRouteHandler<ChildAwareResponse>(ChildAwareResponse.class) {
78+
@Override
79+
public void internalHandle(Request request, ChildAwareResponse veniceResponse) {
80+
AdminSparkServer.validateParams(request, LIST_CHILD_CLUSTERS.getParams(), admin);
81+
String clusterName = request.queryParams(CLUSTER);
82+
83+
veniceResponse.setCluster(clusterName);
84+
85+
if (admin.isParent()) {
86+
veniceResponse.setChildDataCenterControllerUrlMap(admin.getChildDataCenterControllerUrlMap(clusterName));
87+
veniceResponse.setChildDataCenterControllerD2Map(admin.getChildDataCenterControllerD2Map(clusterName));
88+
veniceResponse.setD2ServiceName(admin.getChildControllerD2ServiceName(clusterName));
89+
}
90+
}
91+
};
9492
}
9593

9694
/**
9795
* @see TopicManager#updateTopicCompactionPolicy(PubSubTopic, boolean)
9896
*/
9997
public Route updateKafkaTopicLogCompaction(Admin admin) {
100-
return new VeniceParentControllerRegionStateHandler(admin, updateKafkaTopicConfig(admin, adminRequest -> {
98+
return updateKafkaTopicConfig(admin, adminRequest -> {
10199
AdminSparkServer.validateParams(adminRequest, UPDATE_KAFKA_TOPIC_LOG_COMPACTION.getParams(), admin);
102100
PubSubTopic topicName = pubSubTopicRepository.getTopic(adminRequest.queryParams(TOPIC));
103101
boolean kafkaTopicLogCompactionEnabled = Utils.parseBooleanFromString(
@@ -106,14 +104,14 @@ public Route updateKafkaTopicLogCompaction(Admin admin) {
106104

107105
TopicManager topicManager = admin.getTopicManager();
108106
topicManager.updateTopicCompactionPolicy(topicName, kafkaTopicLogCompactionEnabled);
109-
}));
107+
});
110108
}
111109

112110
/**
113111
* No ACL check; any user is allowed to check topic configs.
114112
*/
115113
public Route getKafkaTopicConfigs(Admin admin) {
116-
return new VeniceParentControllerRegionStateHandler(admin, (request, response) -> {
114+
return (request, response) -> {
117115
PubSubTopicConfigResponse responseObject = new PubSubTopicConfigResponse();
118116
response.type(HttpConstants.JSON);
119117

@@ -129,37 +127,37 @@ public Route getKafkaTopicConfigs(Admin admin) {
129127
}
130128
response.type(HttpConstants.JSON);
131129
return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);
132-
});
130+
};
133131
}
134132

135133
/**
136134
* @see TopicManager#updateTopicRetention(PubSubTopic, long)
137135
*/
138136
public Route updateKafkaTopicRetention(Admin admin) {
139-
return new VeniceParentControllerRegionStateHandler(admin, updateKafkaTopicConfig(admin, adminRequest -> {
137+
return updateKafkaTopicConfig(admin, adminRequest -> {
140138
AdminSparkServer.validateParams(adminRequest, UPDATE_KAFKA_TOPIC_RETENTION.getParams(), admin);
141139
PubSubTopic topicName = pubSubTopicRepository.getTopic(adminRequest.queryParams(TOPIC));
142140
long kafkaTopicRetentionIsMs =
143141
Utils.parseLongFromString(adminRequest.queryParams(KAFKA_TOPIC_RETENTION_IN_MS), KAFKA_TOPIC_RETENTION_IN_MS);
144142
TopicManager topicManager = admin.getTopicManager();
145143
topicManager.updateTopicRetention(topicName, kafkaTopicRetentionIsMs);
146-
}));
144+
});
147145
}
148146

149147
public Route updateKafkaTopicMinInSyncReplica(Admin admin) {
150-
return new VeniceParentControllerRegionStateHandler(admin, updateKafkaTopicConfig(admin, adminRequest -> {
148+
return updateKafkaTopicConfig(admin, adminRequest -> {
151149
AdminSparkServer.validateParams(adminRequest, UPDATE_KAFKA_TOPIC_MIN_IN_SYNC_REPLICA.getParams(), admin);
152150
PubSubTopic topicName = pubSubTopicRepository.getTopic(adminRequest.queryParams(TOPIC));
153151
int kafkaTopicMinISR = Utils.parseIntFromString(
154152
adminRequest.queryParams(KAFKA_TOPIC_MIN_IN_SYNC_REPLICA),
155153
KAFKA_TOPIC_MIN_IN_SYNC_REPLICA);
156154
TopicManager topicManager = admin.getTopicManager();
157155
topicManager.updateTopicMinInSyncReplica(topicName, kafkaTopicMinISR);
158-
}));
156+
});
159157
}
160158

161159
private Route updateKafkaTopicConfig(Admin admin, UpdateTopicConfigFunction updateTopicConfigFunction) {
162-
return new VeniceParentControllerRegionStateHandler(admin, (request, response) -> {
160+
return (request, response) -> {
163161
ControllerResponse responseObject = new ControllerResponse();
164162
response.type(HttpConstants.JSON);
165163
// Only allow allowlist users to run this command
@@ -178,7 +176,7 @@ private Route updateKafkaTopicConfig(Admin admin, UpdateTopicConfigFunction upda
178176
}
179177
response.type(HttpConstants.JSON);
180178
return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);
181-
});
179+
};
182180
}
183181

184182
@FunctionalInterface

0 commit comments

Comments
 (0)