Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Make Helix rebalance preferences and capacity keys configurable for the controller #1475

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class ConfigConstants {
public static final long DEFAULT_PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS =
TimeUnit.MINUTES.toSeconds(10);

public static final String CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY = "cluster_resource_weight";

/**
* End of controller config default value
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2398,4 +2398,40 @@ private ConfigKeys() {
* Default: {@value com.linkedin.venice.meta.NameRepository#DEFAULT_MAXIMUM_ENTRY_COUNT}
*/
public static final String NAME_REPOSITORY_MAX_ENTRY_COUNT = "name.repository.max.entry.count";

/**
* Specifies the value to use for Helix's rebalance preference for evenness when using Waged.
* Default is -1 which will use Helix's default.
*/
public static final String CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS =
"controller.helix.rebalance.preference.evenness";

/**
* Specifies the value to use for Helix's rebalance preference for less movement when using Waged.
* Default is -1 which will use Helix's default.
*/
public static final String CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT =
"controller.helix.rebalance.preference.less.movement";

/**
* Specifies the value to use for Helix's rebalance preference for force baseline convergence when using Waged.
* This should always be turned off, so it doesn't overpower other constraints.
* Default is -1 which will use Helix's default.
*/
public static final String CONTROLLER_HELIX_REBALANCE_PREFERENCE_FORCE_BASELINE_CONVERGE =
"controller.helix.rebalance.preference.force.baseline.converge";

/**
* Specifies the capacity a controller instance can handle.
* The weight of each Helix resource is determined by {@link ConfigKeys#CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT}.
* Default is -1 which indicates disabled.
*/
public static final String CONTROLLER_HELIX_INSTANCE_CAPACITY = "controller.helix.instance.capacity";

/**
* Specifies the weight of each Helix resource.
* The maximum weight per instance is determined by {@link ConfigKeys#CONTROLLER_HELIX_INSTANCE_CAPACITY}.
* Default is -1 which indicates disabled.
*/
public static final String CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT = "controller.helix.default.instance.capacity";
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.ConfigConstants.CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY;
import static com.linkedin.venice.utils.TestUtils.assertCommand;
import static com.linkedin.venice.utils.TestUtils.shutdownExecutor;
import static com.linkedin.venice.utils.TestUtils.waitForNonDeterministicAssertion;
Expand All @@ -16,6 +17,8 @@
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.integration.utils.HelixAsAServiceWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
Expand All @@ -31,6 +34,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -40,6 +44,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixAdmin;
import org.apache.helix.PropertyKey;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixManager;
Expand Down Expand Up @@ -255,6 +260,65 @@ public void testTransitionToHAASControllerAsStorageClusterLeader() {
}
}

@Test(timeOut = 60 * Time.MS_PER_SECOND)
public void testRebalancePreferenceAndCapacityKeys() {
try (VeniceClusterWrapper venice = ServiceFactory.getVeniceCluster(0, 0, 0, 1);
HelixAsAServiceWrapper helixAsAServiceWrapper = startAndWaitForHAASToBeAvailable(venice.getZk().getAddress())) {
String controllerClusterName = "venice-controllers";

int helixRebalancePreferenceEvenness = 10;
int helixRebalancePreferenceLessMovement = 2;
int helixRebalancePreferenceForceBaselineConverge = 1;
int helixInstanceCapacity = 1000;
int helixResourceCapacityWeight = 10;

Properties clusterProperties = (Properties) enableControllerAndStorageClusterHAASProperties.clone();
clusterProperties
.put(ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS, helixRebalancePreferenceEvenness);
clusterProperties
.put(ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT, helixRebalancePreferenceLessMovement);
clusterProperties.put(
ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_FORCE_BASELINE_CONVERGE,
helixRebalancePreferenceForceBaselineConverge);
clusterProperties.put(ConfigKeys.CONTROLLER_HELIX_INSTANCE_CAPACITY, helixInstanceCapacity);
clusterProperties.put(ConfigKeys.CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT, helixResourceCapacityWeight);

VeniceControllerWrapper controllerWrapper = venice.addVeniceController(clusterProperties);

VeniceHelixAdmin veniceHelixAdmin = controllerWrapper.getVeniceHelixAdmin();

SafeHelixManager helixManager = veniceHelixAdmin.getHelixManager();
SafeHelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
PropertyKey.Builder propertyKeyBuilder = new PropertyKey.Builder(controllerClusterName);
ClusterConfig clusterConfig = helixDataAccessor.getProperty(propertyKeyBuilder.clusterConfig());

Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> globalRebalancePreference =
clusterConfig.getGlobalRebalancePreference();
assertEquals(
(int) globalRebalancePreference.get(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS),
helixRebalancePreferenceEvenness);
assertEquals(
(int) globalRebalancePreference.get(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT),
helixRebalancePreferenceLessMovement);
assertEquals(
(int) globalRebalancePreference.get(ClusterConfig.GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE),
helixRebalancePreferenceForceBaselineConverge);

List<String> instanceCapacityKeys = clusterConfig.getInstanceCapacityKeys();
assertEquals(instanceCapacityKeys.size(), 1);

Map<String, Integer> defaultInstanceCapacityMap = clusterConfig.getDefaultInstanceCapacityMap();
assertEquals(
(int) defaultInstanceCapacityMap.get(CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY),
helixInstanceCapacity);

Map<String, Integer> defaultPartitionWeightMap = clusterConfig.getDefaultPartitionWeightMap();
assertEquals(
(int) defaultPartitionWeightMap.get(CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY),
helixResourceCapacityWeight);
}
}

private static class InitTask implements Callable<Void> {
private final HelixAdminClient client;
private final HashMap<String, String> helixClusterProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_INFO_PROCESSOR_PACKAGE;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_INFO_SOURCES;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_PROVIDER;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_INSTANCE_CAPACITY;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_FORCE_BASELINE_CONVERGE;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REST_CUSTOMIZED_HEALTH_URL;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_INSTANCE_TAG_LIST;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_JETTY_CONFIG_OVERRIDE_PREFIX;
Expand Down Expand Up @@ -545,6 +550,12 @@ public class VeniceControllerClusterConfig {
private Set<PushJobCheckpoints> pushJobUserErrorCheckpoints;
private boolean isHybridStorePartitionCountUpdateEnabled;

private final int helixRebalancePreferenceEvenness;
private final int helixRebalancePreferenceLessMovement;
private final int helixRebalancePreferenceForceBaselineConverge;
private final int helixInstanceCapacity;
private final int helixResourceCapacityWeight;

public VeniceControllerClusterConfig(VeniceProperties props) {
this.props = props;
this.clusterName = props.getString(CLUSTER_NAME);
Expand Down Expand Up @@ -996,6 +1007,13 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
this.pushJobUserErrorCheckpoints = parsePushJobUserErrorCheckpoints(props);
this.isHybridStorePartitionCountUpdateEnabled =
props.getBoolean(ConfigKeys.CONTROLLER_ENABLE_HYBRID_STORE_PARTITION_COUNT_UPDATE, false);

this.helixRebalancePreferenceEvenness = props.getInt(CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS, -1);
this.helixRebalancePreferenceLessMovement = props.getInt(CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT, -1);
this.helixRebalancePreferenceForceBaselineConverge =
props.getInt(CONTROLLER_HELIX_REBALANCE_PREFERENCE_FORCE_BASELINE_CONVERGE, -1);
this.helixInstanceCapacity = props.getInt(CONTROLLER_HELIX_INSTANCE_CAPACITY, -1);
this.helixResourceCapacityWeight = props.getInt(CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT, -1);
kvargha marked this conversation as resolved.
Show resolved Hide resolved
}

public VeniceProperties getProps() {
Expand Down Expand Up @@ -1833,4 +1851,24 @@ static Set<PushJobCheckpoints> parsePushJobUserErrorCheckpoints(VeniceProperties
public Set<PushJobCheckpoints> getPushJobUserErrorCheckpoints() {
return pushJobUserErrorCheckpoints;
}

public int getHelixRebalancePreferenceEvenness() {
return helixRebalancePreferenceEvenness;
}

public int getHelixRebalancePreferenceLessMovement() {
return helixRebalancePreferenceLessMovement;
}

public int getHelixRebalancePreferenceForceBaselineConverge() {
return helixRebalancePreferenceForceBaselineConverge;
}

public int getHelixInstanceCapacity() {
return helixInstanceCapacity;
}

public int getHelixResourceCapacityWeight() {
return helixResourceCapacityWeight;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8835,6 +8835,10 @@ public PubSubTopicRepository getPubSubTopicRepository() {
return pubSubTopicRepository;
}

protected SafeHelixManager getHelixManager() {
kvargha marked this conversation as resolved.
Show resolved Hide resolved
return helixManager;
}

String getPushJobStatusStoreClusterName() {
return pushJobStatusStoreClusterName;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.ConfigConstants.CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceRetriableException;
import com.linkedin.venice.helix.ZkClientFactory;
import com.linkedin.venice.stats.ZkClientStatusStats;
import com.linkedin.venice.utils.RetryUtils;
import io.tehuti.metrics.MetricsRepository;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
Expand All @@ -21,12 +25,10 @@
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LeaderStandbySMD;
import org.apache.helix.model.RESTConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -43,6 +45,7 @@ public class ZkHelixAdminClient implements HelixAdminClient {
private static final String CONTROLLER_HAAS_ZK_CLIENT_NAME = "controller-zk-client-for-haas-admin";

private final HelixAdmin helixAdmin;
private final ConfigAccessor helixConfigAccessor;
private final VeniceControllerClusterConfig commonConfig;
private final VeniceControllerMultiClusterConfig multiClusterConfigs;
private final String haasSuperClusterName;
Expand All @@ -65,6 +68,7 @@ public ZkHelixAdminClient(
throw new VeniceException("Failed to connect to ZK within " + ZkClient.DEFAULT_CONNECTION_TIMEOUT + " ms!");
}
helixAdmin = new ZKHelixAdmin(helixAdminZkClient);
helixConfigAccessor = new ConfigAccessor(helixAdminZkClient);
}

/**
Expand Down Expand Up @@ -98,6 +102,52 @@ public void createVeniceControllerCluster() {
// Topology and fault zone type fields are used by CRUSH alg. Helix would apply the constrains on CRUSH alg to
// choose proper instance to hold the replica.
clusterConfig.setTopologyAwareEnabled(false);
clusterConfig.setPersistBestPossibleAssignment(true);

// We want to prioritize evenness over less movement when it comes to resource assignment, because the cost
// of rebalancing for the controller is cheap as it is stateless.
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> globalRebalancePreference = new HashMap<>();

if (commonConfig.getHelixRebalancePreferenceEvenness() > -1
&& commonConfig.getHelixRebalancePreferenceLessMovement() > -1) {
// EVENNESS and LESS_MOVEMENT need to be defined together
globalRebalancePreference.put(
ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS,
commonConfig.getHelixRebalancePreferenceEvenness());
globalRebalancePreference.put(
ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT,
commonConfig.getHelixRebalancePreferenceLessMovement());
}

if (commonConfig.getHelixRebalancePreferenceForceBaselineConverge() > -1) {
globalRebalancePreference.put(
ClusterConfig.GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE,
commonConfig.getHelixRebalancePreferenceForceBaselineConverge());
}

if (!globalRebalancePreference.isEmpty()) {
clusterConfig.setGlobalRebalancePreference(globalRebalancePreference);
}

if (commonConfig.getHelixInstanceCapacity() > -1 && commonConfig.getHelixResourceCapacityWeight() > -1) {
List<String> instanceCapacityKeys = new ArrayList<>();
instanceCapacityKeys.add(CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY);
kvargha marked this conversation as resolved.
Show resolved Hide resolved
clusterConfig.setInstanceCapacityKeys(instanceCapacityKeys);

// This is how much capacity a participant can take. The Helix documentation recommends setting this to a high
// value to avoid rebalance failures. The primary goal of setting this is to enable a constraint that takes
// the current top-state distribution into account when rebalancing.
Map<String, Integer> defaultInstanceCapacityMap = new HashMap<>();
defaultInstanceCapacityMap
.put(CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY, commonConfig.getHelixInstanceCapacity());
clusterConfig.setDefaultInstanceCapacityMap(defaultInstanceCapacityMap);

// This is how much weight each resource in a cluster has
Map<String, Integer> defaultPartitionWeightMap = new HashMap<>();
defaultPartitionWeightMap
.put(CONTROLLER_DEFAULT_HELIX_RESOURCE_CAPACITY_KEY, commonConfig.getHelixResourceCapacityWeight());
clusterConfig.setDefaultPartitionWeightMap(defaultPartitionWeightMap);
}

updateClusterConfigs(controllerClusterName, clusterConfig);
helixAdmin.addStateModelDef(controllerClusterName, LeaderStandbySMD.name, LeaderStandbySMD.build());
Expand Down Expand Up @@ -215,21 +265,15 @@ public void addClusterToGrandCluster(String clusterName) {
*/
@Override
public void updateClusterConfigs(String clusterName, ClusterConfig clusterConfig) {
HelixConfigScope configScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
Map<String, String> helixClusterProperties = new HashMap<>(clusterConfig.getRecord().getSimpleFields());
helixAdmin.setConfig(configScope, helixClusterProperties);
helixConfigAccessor.setClusterConfig(clusterName, clusterConfig);
}

/**
* @see HelixAdminClient#updateRESTConfigs(String, RESTConfig)
*/
@Override
public void updateRESTConfigs(String clusterName, RESTConfig restConfig) {
HelixConfigScope configScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.REST).forCluster(clusterName).build();
Map<String, String> helixRestProperties = new HashMap<>(restConfig.getRecord().getSimpleFields());
helixAdmin.setConfig(configScope, helixRestProperties);
helixConfigAccessor.setRESTConfig(clusterName, restConfig);
}
kvargha marked this conversation as resolved.
Show resolved Hide resolved

/**
Expand Down
Loading
Loading