Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce node level circuit breaker settings for k-NN #2509

Merged
merged 9 commits into from
Feb 26, 2025
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
### Enhancements
* Introduce node level circuit breakers for k-NN [#2509](https://github.com/opensearch-project/k-NN/pull/2509)
### Bug Fixes
### Infrastructure
* Removed JDK 11 and 17 version from CI runs [#1921](https://github.com/opensearch-project/k-NN/pull/1921)
Expand Down
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ integTest {
testClusters.integTest {
testDistribution = "ARCHIVE"

//Used for circuit breaker integration tests
setting 'node.attr.knn_cb_tier', 'integ'

// Optionally install security
if (System.getProperty("security.enabled") != null) {
configureSecurityPlugin(testClusters.integTest)
Expand All @@ -517,6 +520,7 @@ testClusters.integTest {
environment('PATH', System.getenv('PATH') + ";$rootDir/jni/build/release" + ";$rootDir/src/main/resources/windowsDependencies")
}


// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
public class KNNCircuitBreaker {
private static Logger logger = LogManager.getLogger(KNNCircuitBreaker.class);
public static final String KNN_CIRCUIT_BREAKER_TIER = "knn_cb_tier";
public static int CB_TIME_INTERVAL = 2 * 60; // seconds

private static KNNCircuitBreaker INSTANCE;
Expand Down Expand Up @@ -59,7 +60,7 @@ public void initialize(ThreadPool threadPool, ClusterService clusterService, Cli
Runnable runnable = () -> {
if (nativeMemoryCacheManager.isCacheCapacityReached() && clusterService.localNode().isDataNode()) {
long currentSizeKiloBytes = nativeMemoryCacheManager.getCacheSizeInKilobytes();
long circuitBreakerLimitSizeKiloBytes = KNNSettings.getCircuitBreakerLimit().getKb();
long circuitBreakerLimitSizeKiloBytes = KNNSettings.state().getCircuitBreakerLimit().getKb();
long circuitBreakerUnsetSizeKiloBytes = (long) ((KNNSettings.getCircuitBreakerUnsetPercentage() / 100)
* circuitBreakerLimitSizeKiloBytes);
/**
Expand Down
155 changes: 143 additions & 12 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.knn.index;

import lombok.Setter;
import lombok.extern.log4j.Log4j2;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -75,7 +77,8 @@ public class KNNSettings {
public static final String KNN_ALGO_PARAM_EF_SEARCH = "index.knn.algo_param.ef_search";
public static final String KNN_ALGO_PARAM_INDEX_THREAD_QTY = "knn.algo_param.index_thread_qty";
public static final String KNN_MEMORY_CIRCUIT_BREAKER_ENABLED = "knn.memory.circuit_breaker.enabled";
public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT = "knn.memory.circuit_breaker.limit";
public static final String KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT = "knn.memory.circuit_breaker.limit";
public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX = KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT + ".";
public static final String KNN_VECTOR_STREAMING_MEMORY_LIMIT_IN_MB = "knn.vector_streaming_memory.limit";
public static final String KNN_CIRCUIT_BREAKER_TRIGGERED = "knn.circuit_breaker.triggered";
public static final String KNN_CACHE_ITEM_EXPIRY_ENABLED = "knn.cache.item.expiry.enabled";
Expand Down Expand Up @@ -402,17 +405,42 @@ public class KNNSettings {
* Weight circuit breaker settings
*/
put(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, Setting.boolSetting(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, true, NodeScope, Dynamic));

/**
* Group setting that manages node-level circuit breaker configurations based on node tiers.
* Settings under this group define memory limits for different node classifications.
* Validation of limit occurs before the setting is retrieved.
*/
put(
KNN_MEMORY_CIRCUIT_BREAKER_LIMIT,
KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX,
Setting.groupSetting(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX, settings -> {
settings.keySet()
.forEach(
(limit) -> parseknnMemoryCircuitBreakerValue(
settings.get(limit),
KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran into an issue where not having the validation being performed in parseKnnMemoryCircuitBreakerValue specifically here would result in dropping the cluster on a uncaught exception if the same method was hit in line 633 with a invalid knn cb value. There may be some exception handling happening earlier in the stack that allows an exception to be caught gracefully here doesn't exist later on but to maintain the same behavior we validate the given values before trying to update the cache.

)
);
}, NodeScope, Dynamic)
);

/**
* Cluster-wide circuit breaker limit that serves as the default configuration.
* This setting is used when a node either:
* - Has no knn_cb_tier attribute defined
* - Has a tier that doesn't match any node-level configuration
* Default value: {@value KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT}
*/
put(
KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT,
new Setting<>(
KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT,
KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT,
KNNSettings.KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT,
(s) -> parseknnMemoryCircuitBreakerValue(s, KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT),
(s) -> parseknnMemoryCircuitBreakerValue(s, KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT),
NodeScope,
Dynamic
)
);

/**
* Cache expiry time settings
*/
Expand All @@ -429,6 +457,8 @@ public class KNNSettings {

private ClusterService clusterService;
private Client client;
@Setter
private Optional<String> nodeCbAttribute;

private KNNSettings() {}

Expand All @@ -449,10 +479,8 @@ private void setSettingsUpdateConsumers() {
updatedSettings.getAsBoolean(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED))
);

builder.maxWeight(((ByteSizeValue) getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)).getKb());
if (updatedSettings.hasValue(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)) {
builder.maxWeight(((ByteSizeValue) getSetting(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT).get(updatedSettings)).getKb());
}
// Recompute cache weight
builder.maxWeight(getUpdatedCircuitBreakerLimit(updatedSettings).getKb());

builder.isExpirationLimited(
updatedSettings.getAsBoolean(KNN_CACHE_ITEM_EXPIRY_ENABLED, getSettingValue(KNN_CACHE_ITEM_EXPIRY_ENABLED))
Expand Down Expand Up @@ -591,8 +619,106 @@ public static boolean isCircuitBreakerTriggered() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED);
}

public static ByteSizeValue getCircuitBreakerLimit() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT);
/**
* Retrieves the node-specific circuit breaker limit based on the existing settings.
*
* @return String representation of the node-specific circuit breaker limit,
* or null if no node-specific limit is set or found
*/
private String getNodeCbLimit() {
if (nodeCbAttribute.isPresent()) {
Settings configuredNodeCbLimits = KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX);
return configuredNodeCbLimits.get(nodeCbAttribute.get());
}
return null;
}

/**
* Gets node-specific circuit breaker limit from updated settings.
*
* @param updatedSettings Settings object containing pending updates
* @return String representation of new limit if exists for this node's tier, null otherwise
*/
private String getNodeCbLimit(Settings updatedSettings) {
if (nodeCbAttribute.isPresent()) {
return updatedSettings.getByPrefix(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX).get(nodeCbAttribute.get());
}
return null;
}

/**
* Returns the cluster-level circuit breaker limit. Needed for initialization
* during startup when node attributes are not yet available through ClusterService.
* This limit serves two purposes:
* 1. As a temporary value during node startup before node-specific limits can be determined
* 2. As a fallback value for nodes that don't have a knn_cb_tier attribute or
* whose tier doesn't match any configured node-level limit
*
* @return ByteSizeValue representing the cluster-wide circuit breaker limit
*/
public static ByteSizeValue getClusterCbLimit() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT);
}

/**
* Returns the circuit breaker limit for this node using existing configuration. The limit is determined by:
* 1. Node-specific limit based on the node's circuit breaker tier attribute, if configured
* 2. Cluster-level default limit if no node-specific configuration exists
*
* @return ByteSizeValue representing the circuit breaker limit, either as a percentage
* of available memory or as an absolute value
*/
public ByteSizeValue getCircuitBreakerLimit() {

return parseknnMemoryCircuitBreakerValue(getNodeCbLimit(), getClusterCbLimit(), KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT);

}

/**
* Determines if and how the circuit breaker limit should be updated for this node.
* Evaluates both node-specific and cluster-level updates in the updated settings,
* maintaining proper precedence:
* 1. Node-tier specific limit from updates (if available)
* 2. Appropriate fallback value based on node's current configuration
*
* @param updatedCbLimits Settings object containing pending circuit breaker updates
* @return ByteSizeValue representing the new circuit breaker limit to apply,
* or null if no applicable updates found
*/
private ByteSizeValue getUpdatedCircuitBreakerLimit(Settings updatedCbLimits) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updates circuit breaker settings every time the cluster settings change, should we only update when the node-specific tier is modified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I had addressed this in my comment here. The cache gets rebuilt every time the a dynamic cluster setting changes so I don't see an option to lazily update the based off what setting got updated.

If we look at the current code the weight is already getting recomputed every time any dynamic setting gets refreshed. What I'm doing here is simplifying that logic to recompute the max weight to take into account node level attributes.

// Parse any updates, using appropriate fallback if no node-specific limit update exists
return parseknnMemoryCircuitBreakerValue(
getNodeCbLimit(updatedCbLimits),
getFallbackCbLimitValue(updatedCbLimits),
KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT
);
}

/**
* Determines the appropriate fallback circuit breaker limit value.
* The fallback logic follows this hierarchy:
* 1. If node currently uses cluster-level limit:
* - Use updated cluster-level limit if available
* - Otherwise maintain current limit
* 2. If node uses tier-specific limit:
* - Maintain current limit (ignore cluster-level updates)
*
* This ensures nodes maintain their configuration hierarchy and don't
* inadvertently fall back to cluster-level limits when they should use
* tier-specific values.
*
* @param updatedCbLimits Settings object containing pending updates
* @return ByteSizeValue representing the appropriate fallback limit
*/
private ByteSizeValue getFallbackCbLimitValue(Settings updatedCbLimits) {
// Update cluster level limit if used
if (getNodeCbLimit() == null && updatedCbLimits.hasValue(KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT)) {
return (ByteSizeValue) getSetting(KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT).get(updatedCbLimits);

}

// Otherwise maintain current limit (either tier-specific or cluster-level)
return getCircuitBreakerLimit();
}

public static double getCircuitBreakerUnsetPercentage() {
Expand Down Expand Up @@ -660,10 +786,15 @@ public static boolean isShardLevelRescoringDisabledForDiskBasedVector(String ind
public void initialize(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
this.nodeCbAttribute = Optional.empty();
setSettingsUpdateConsumers();
}

public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, String settingName) {
return parseknnMemoryCircuitBreakerValue(sValue, null, settingName);
}

public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, ByteSizeValue defaultValue, String settingName) {
settingName = Objects.requireNonNull(settingName);
if (sValue != null && sValue.endsWith("%")) {
final String percentAsString = sValue.substring(0, sValue.length() - 1);
Expand All @@ -683,7 +814,7 @@ public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, Str
throw new OpenSearchParseException("failed to parse [{}] as a double", e, percentAsString);
}
} else {
return parseBytesSizeValue(sValue, settingName);
return parseBytesSizeValue(sValue, defaultValue, settingName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,20 @@ public static synchronized NativeMemoryCacheManager getInstance() {
return INSTANCE;
}

/**
* Initialize NativeMemoryCacheManager with configurations.
* Note: maxWeight is initially set to the cluster-level circuit breaker limit
* because node attributes are not yet available during startup. Once the
* ClusterService is fully bootstrapped, the circuit breaker will update this
* value to use any node-specific limits based on the node's circuit_breaker_tier
* attribute if configured.
*/
private void initialize() {
initialize(
NativeMemoryCacheManagerDto.builder()
.isWeightLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED))
.maxWeight(KNNSettings.getCircuitBreakerLimit().getKb())
// Initially use cluster-level limit; will be updated later during cache refresh if node-specific limit exists
.maxWeight(KNNSettings.getClusterCbLimit().getKb())
.isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED))
.expiryTimeInMin(
((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes()
Expand Down Expand Up @@ -127,7 +136,7 @@ public synchronized void rebuildCache() {
rebuildCache(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire cache is rebuilt every time a node joins, even if the circuit breaker limit remains unchanged. Can we lazily do this that updates only if the limit actually changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand here the cache gets rebuilt in 3 distinct places within the code:

  1. On initialization
  2. On a dynamic settings update
  3. On indexModule

For option 1, when a node joins I assumed this to be a part of the initialization. In that case my thought was that we need to recompute the value of the circuit breaker limit since each the attributes belonging to that node may correspond to a different circuit breaker tier.

As for option 2, existing behavior within k-nn dictates that whenever a dynamic setting gets updated the cache gets rebuilt. This applies to all settings that are dynamically configured. The configuration system within k-nn will throw away no-ops in the updated settings and so each call to this option will guarantee to have one of the dynamically configurations to have changed. A good callout here is that the cache is being rebuilt every time one of these settings have changed (regardless of if the limit has changed) but I see this as modifying existing k-nn behavior which I'd like to minimize outside the scope of my PR. The only change I am making here is modifying how we compute the weight of the cache in the event where it needs to be rebuilt.

The behavior with option 3 doesn't have a relation with our change I believe.

NativeMemoryCacheManagerDto.builder()
.isWeightLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED))
.maxWeight(KNNSettings.getCircuitBreakerLimit().getKb())
.maxWeight(KNNSettings.state().getCircuitBreakerLimit().getKb())
.isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED))
.expiryTimeInMin(
((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes()
Expand Down Expand Up @@ -460,7 +469,7 @@ private void onRemoval(RemovalNotification<String, NativeMemoryAllocation> remov
}

private Float getSizeAsPercentage(long size) {
long cbLimit = KNNSettings.getCircuitBreakerLimit().getKb();
long cbLimit = KNNSettings.state().getCircuitBreakerLimit().getKb();
if (cbLimit == 0) {
return 0.0F;
}
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/org/opensearch/knn/plugin/KNNPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.cluster.NamedDiff;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.opensearch.knn.training.TrainingJobClusterStateListener;
import org.opensearch.knn.training.TrainingJobRunner;
import org.opensearch.knn.training.VectorReader;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.ExtensiblePlugin;
Expand Down Expand Up @@ -116,6 +118,7 @@
import static org.opensearch.knn.common.KNNConstants.KNN_THREAD_POOL_PREFIX;
import static org.opensearch.knn.common.KNNConstants.MODEL_INDEX_NAME;
import static org.opensearch.knn.common.KNNConstants.TRAIN_THREAD_POOL;
import static org.opensearch.knn.index.KNNCircuitBreaker.KNN_CIRCUIT_BREAKER_TIER;

/**
* Entry point for the KNN plugin where we define mapper for knn_vector type
Expand Down Expand Up @@ -153,6 +156,7 @@ public class KNNPlugin extends Plugin
SearchPlugin,
ActionPlugin,
EnginePlugin,
ClusterPlugin,
ScriptPlugin,
ExtensiblePlugin,
SystemIndexPlugin {
Expand Down Expand Up @@ -362,4 +366,22 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
public Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
return Optional.of(new KNNConcurrentSearchRequestDecider.Factory());
}

@Override
public void onNodeStarted(DiscoveryNode localNode) {
// Attempt to fetch a cb tier from node attributes and cache the result.
// Get this node's circuit breaker tier attribute
Optional<String> tierAttribute = Optional.ofNullable(localNode.getAttributes().get(KNN_CIRCUIT_BREAKER_TIER));
if (tierAttribute.isPresent()) {
KNNSettings.state().setNodeCbAttribute(tierAttribute);

// Only rebuild the cache if the weight has actually changed
if (KNNSettings.state().getCircuitBreakerLimit().getKb() != NativeMemoryCacheManager.getInstance()
.getMaxCacheSizeInKilobytes()) {
NativeMemoryCacheManager.getInstance().rebuildCache();
}
}

}

}
Loading
Loading