Skip to content

Commit

Permalink
Introduce node level circuit breaker settings for k-NN (#2509)
Browse files Browse the repository at this point in the history
* Introduce node level circuit breaker settings for k-NN

Signed-off-by: Mark Wu <[email protected]>

* Cache node attribute and fix signoff

Signed-off-by: Mark Wu <[email protected]>

* Do not rebuild cache if value remain unchanged

Signed-off-by: Mark Wu <[email protected]>

* Use ClusterPlugin instead of listener

Signed-off-by: Mark Wu <[email protected]>

* Revert wildcard import

Signed-off-by: Mark Wu <[email protected]>

---------

Signed-off-by: Mark Wu <[email protected]>
  • Loading branch information
markwu-sde authored Feb 26, 2025
1 parent f4ffe80 commit d5b2982
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
### Enhancements
* Introduce node level circuit breakers for k-NN [#2509](https://github.com/opensearch-project/k-NN/pull/2509)
### Bug Fixes
### Infrastructure
* Removed JDK 11 and 17 version from CI runs [#1921](https://github.com/opensearch-project/k-NN/pull/1921)
Expand Down
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ integTest {
testClusters.integTest {
testDistribution = "ARCHIVE"

//Used for circuit breaker integration tests
setting 'node.attr.knn_cb_tier', 'integ'

// Optionally install security
if (System.getProperty("security.enabled") != null) {
configureSecurityPlugin(testClusters.integTest)
Expand All @@ -517,6 +520,7 @@ testClusters.integTest {
environment('PATH', System.getenv('PATH') + ";$rootDir/jni/build/release" + ";$rootDir/src/main/resources/windowsDependencies")
}


// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
public class KNNCircuitBreaker {
private static Logger logger = LogManager.getLogger(KNNCircuitBreaker.class);
public static final String KNN_CIRCUIT_BREAKER_TIER = "knn_cb_tier";
public static int CB_TIME_INTERVAL = 2 * 60; // seconds

private static KNNCircuitBreaker INSTANCE;
Expand Down Expand Up @@ -59,7 +60,7 @@ public void initialize(ThreadPool threadPool, ClusterService clusterService, Cli
Runnable runnable = () -> {
if (nativeMemoryCacheManager.isCacheCapacityReached() && clusterService.localNode().isDataNode()) {
long currentSizeKiloBytes = nativeMemoryCacheManager.getCacheSizeInKilobytes();
long circuitBreakerLimitSizeKiloBytes = KNNSettings.getCircuitBreakerLimit().getKb();
long circuitBreakerLimitSizeKiloBytes = KNNSettings.state().getCircuitBreakerLimit().getKb();
long circuitBreakerUnsetSizeKiloBytes = (long) ((KNNSettings.getCircuitBreakerUnsetPercentage() / 100)
* circuitBreakerLimitSizeKiloBytes);
/**
Expand Down
155 changes: 143 additions & 12 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.knn.index;

import lombok.Setter;
import lombok.extern.log4j.Log4j2;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -75,7 +77,8 @@ public class KNNSettings {
public static final String KNN_ALGO_PARAM_EF_SEARCH = "index.knn.algo_param.ef_search";
public static final String KNN_ALGO_PARAM_INDEX_THREAD_QTY = "knn.algo_param.index_thread_qty";
public static final String KNN_MEMORY_CIRCUIT_BREAKER_ENABLED = "knn.memory.circuit_breaker.enabled";
public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT = "knn.memory.circuit_breaker.limit";
public static final String KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT = "knn.memory.circuit_breaker.limit";
public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX = KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT + ".";
public static final String KNN_VECTOR_STREAMING_MEMORY_LIMIT_IN_MB = "knn.vector_streaming_memory.limit";
public static final String KNN_CIRCUIT_BREAKER_TRIGGERED = "knn.circuit_breaker.triggered";
public static final String KNN_CACHE_ITEM_EXPIRY_ENABLED = "knn.cache.item.expiry.enabled";
Expand Down Expand Up @@ -408,17 +411,42 @@ public class KNNSettings {
* Weight circuit breaker settings
*/
put(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, Setting.boolSetting(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, true, NodeScope, Dynamic));

/**
* Group setting that manages node-level circuit breaker configurations based on node tiers.
* Settings under this group define memory limits for different node classifications.
* Validation of limit occurs before the setting is retrieved.
*/
put(
KNN_MEMORY_CIRCUIT_BREAKER_LIMIT,
KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX,
Setting.groupSetting(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX, settings -> {
settings.keySet()
.forEach(
(limit) -> parseknnMemoryCircuitBreakerValue(
settings.get(limit),
KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT
)
);
}, NodeScope, Dynamic)
);

/**
* Cluster-wide circuit breaker limit that serves as the default configuration.
* This setting is used when a node either:
* - Has no knn_cb_tier attribute defined
* - Has a tier that doesn't match any node-level configuration
* Default value: {@value KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT}
*/
put(
KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT,
new Setting<>(
KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT,
KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT,
KNNSettings.KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT,
(s) -> parseknnMemoryCircuitBreakerValue(s, KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT),
(s) -> parseknnMemoryCircuitBreakerValue(s, KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT),
NodeScope,
Dynamic
)
);

/**
* Cache expiry time settings
*/
Expand All @@ -435,6 +463,8 @@ public class KNNSettings {

private ClusterService clusterService;
private Client client;
@Setter
private Optional<String> nodeCbAttribute;

private KNNSettings() {}

Expand All @@ -455,10 +485,8 @@ private void setSettingsUpdateConsumers() {
updatedSettings.getAsBoolean(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED))
);

builder.maxWeight(((ByteSizeValue) getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)).getKb());
if (updatedSettings.hasValue(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)) {
builder.maxWeight(((ByteSizeValue) getSetting(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT).get(updatedSettings)).getKb());
}
// Recompute cache weight
builder.maxWeight(getUpdatedCircuitBreakerLimit(updatedSettings).getKb());

builder.isExpirationLimited(
updatedSettings.getAsBoolean(KNN_CACHE_ITEM_EXPIRY_ENABLED, getSettingValue(KNN_CACHE_ITEM_EXPIRY_ENABLED))
Expand Down Expand Up @@ -597,8 +625,106 @@ public static boolean isCircuitBreakerTriggered() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED);
}

public static ByteSizeValue getCircuitBreakerLimit() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT);
/**
* Retrieves the node-specific circuit breaker limit based on the existing settings.
*
* @return String representation of the node-specific circuit breaker limit,
* or null if no node-specific limit is set or found
*/
private String getNodeCbLimit() {
if (nodeCbAttribute.isPresent()) {
Settings configuredNodeCbLimits = KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX);
return configuredNodeCbLimits.get(nodeCbAttribute.get());
}
return null;
}

/**
* Gets node-specific circuit breaker limit from updated settings.
*
* @param updatedSettings Settings object containing pending updates
* @return String representation of new limit if exists for this node's tier, null otherwise
*/
private String getNodeCbLimit(Settings updatedSettings) {
if (nodeCbAttribute.isPresent()) {
return updatedSettings.getByPrefix(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX).get(nodeCbAttribute.get());
}
return null;
}

/**
* Returns the cluster-level circuit breaker limit. Needed for initialization
* during startup when node attributes are not yet available through ClusterService.
* This limit serves two purposes:
* 1. As a temporary value during node startup before node-specific limits can be determined
* 2. As a fallback value for nodes that don't have a knn_cb_tier attribute or
* whose tier doesn't match any configured node-level limit
*
* @return ByteSizeValue representing the cluster-wide circuit breaker limit
*/
public static ByteSizeValue getClusterCbLimit() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT);
}

/**
* Returns the circuit breaker limit for this node using existing configuration. The limit is determined by:
* 1. Node-specific limit based on the node's circuit breaker tier attribute, if configured
* 2. Cluster-level default limit if no node-specific configuration exists
*
* @return ByteSizeValue representing the circuit breaker limit, either as a percentage
* of available memory or as an absolute value
*/
public ByteSizeValue getCircuitBreakerLimit() {

return parseknnMemoryCircuitBreakerValue(getNodeCbLimit(), getClusterCbLimit(), KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT);

}

/**
* Determines if and how the circuit breaker limit should be updated for this node.
* Evaluates both node-specific and cluster-level updates in the updated settings,
* maintaining proper precedence:
* 1. Node-tier specific limit from updates (if available)
* 2. Appropriate fallback value based on node's current configuration
*
* @param updatedCbLimits Settings object containing pending circuit breaker updates
* @return ByteSizeValue representing the new circuit breaker limit to apply,
* or null if no applicable updates found
*/
private ByteSizeValue getUpdatedCircuitBreakerLimit(Settings updatedCbLimits) {
// Parse any updates, using appropriate fallback if no node-specific limit update exists
return parseknnMemoryCircuitBreakerValue(
getNodeCbLimit(updatedCbLimits),
getFallbackCbLimitValue(updatedCbLimits),
KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT
);
}

/**
* Determines the appropriate fallback circuit breaker limit value.
* The fallback logic follows this hierarchy:
* 1. If node currently uses cluster-level limit:
* - Use updated cluster-level limit if available
* - Otherwise maintain current limit
* 2. If node uses tier-specific limit:
* - Maintain current limit (ignore cluster-level updates)
*
* This ensures nodes maintain their configuration hierarchy and don't
* inadvertently fall back to cluster-level limits when they should use
* tier-specific values.
*
* @param updatedCbLimits Settings object containing pending updates
* @return ByteSizeValue representing the appropriate fallback limit
*/
private ByteSizeValue getFallbackCbLimitValue(Settings updatedCbLimits) {
// Update cluster level limit if used
if (getNodeCbLimit() == null && updatedCbLimits.hasValue(KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT)) {
return (ByteSizeValue) getSetting(KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT).get(updatedCbLimits);

}

// Otherwise maintain current limit (either tier-specific or cluster-level)
return getCircuitBreakerLimit();
}

public static double getCircuitBreakerUnsetPercentage() {
Expand Down Expand Up @@ -666,10 +792,15 @@ public static boolean isShardLevelRescoringDisabledForDiskBasedVector(String ind
public void initialize(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
this.nodeCbAttribute = Optional.empty();
setSettingsUpdateConsumers();
}

public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, String settingName) {
return parseknnMemoryCircuitBreakerValue(sValue, null, settingName);
}

public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, ByteSizeValue defaultValue, String settingName) {
settingName = Objects.requireNonNull(settingName);
if (sValue != null && sValue.endsWith("%")) {
final String percentAsString = sValue.substring(0, sValue.length() - 1);
Expand All @@ -689,7 +820,7 @@ public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, Str
throw new OpenSearchParseException("failed to parse [{}] as a double", e, percentAsString);
}
} else {
return parseBytesSizeValue(sValue, settingName);
return parseBytesSizeValue(sValue, defaultValue, settingName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,20 @@ public static synchronized NativeMemoryCacheManager getInstance() {
return INSTANCE;
}

/**
* Initialize NativeMemoryCacheManager with configurations.
* Note: maxWeight is initially set to the cluster-level circuit breaker limit
* because node attributes are not yet available during startup. Once the
* ClusterService is fully bootstrapped, the circuit breaker will update this
* value to use any node-specific limits based on the node's circuit_breaker_tier
* attribute if configured.
*/
private void initialize() {
initialize(
NativeMemoryCacheManagerDto.builder()
.isWeightLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED))
.maxWeight(KNNSettings.getCircuitBreakerLimit().getKb())
// Initially use cluster-level limit; will be updated later during cache refresh if node-specific limit exists
.maxWeight(KNNSettings.getClusterCbLimit().getKb())
.isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED))
.expiryTimeInMin(
((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes()
Expand Down Expand Up @@ -127,7 +136,7 @@ public synchronized void rebuildCache() {
rebuildCache(
NativeMemoryCacheManagerDto.builder()
.isWeightLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED))
.maxWeight(KNNSettings.getCircuitBreakerLimit().getKb())
.maxWeight(KNNSettings.state().getCircuitBreakerLimit().getKb())
.isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED))
.expiryTimeInMin(
((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes()
Expand Down Expand Up @@ -460,7 +469,7 @@ private void onRemoval(RemovalNotification<String, NativeMemoryAllocation> remov
}

private Float getSizeAsPercentage(long size) {
long cbLimit = KNNSettings.getCircuitBreakerLimit().getKb();
long cbLimit = KNNSettings.state().getCircuitBreakerLimit().getKb();
if (cbLimit == 0) {
return 0.0F;
}
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/org/opensearch/knn/plugin/KNNPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.cluster.NamedDiff;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.opensearch.knn.training.TrainingJobClusterStateListener;
import org.opensearch.knn.training.TrainingJobRunner;
import org.opensearch.knn.training.VectorReader;
import org.opensearch.plugins.ClusterPlugin;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.ExtensiblePlugin;
Expand Down Expand Up @@ -116,6 +118,7 @@
import static org.opensearch.knn.common.KNNConstants.KNN_THREAD_POOL_PREFIX;
import static org.opensearch.knn.common.KNNConstants.MODEL_INDEX_NAME;
import static org.opensearch.knn.common.KNNConstants.TRAIN_THREAD_POOL;
import static org.opensearch.knn.index.KNNCircuitBreaker.KNN_CIRCUIT_BREAKER_TIER;

/**
* Entry point for the KNN plugin where we define mapper for knn_vector type
Expand Down Expand Up @@ -153,6 +156,7 @@ public class KNNPlugin extends Plugin
SearchPlugin,
ActionPlugin,
EnginePlugin,
ClusterPlugin,
ScriptPlugin,
ExtensiblePlugin,
SystemIndexPlugin {
Expand Down Expand Up @@ -362,4 +366,22 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
public Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
return Optional.of(new KNNConcurrentSearchRequestDecider.Factory());
}

@Override
public void onNodeStarted(DiscoveryNode localNode) {
// Attempt to fetch a cb tier from node attributes and cache the result.
// Get this node's circuit breaker tier attribute
Optional<String> tierAttribute = Optional.ofNullable(localNode.getAttributes().get(KNN_CIRCUIT_BREAKER_TIER));
if (tierAttribute.isPresent()) {
KNNSettings.state().setNodeCbAttribute(tierAttribute);

// Only rebuild the cache if the weight has actually changed
if (KNNSettings.state().getCircuitBreakerLimit().getKb() != NativeMemoryCacheManager.getInstance()
.getMaxCacheSizeInKilobytes()) {
NativeMemoryCacheManager.getInstance().rebuildCache();
}
}

}

}
Loading

0 comments on commit d5b2982

Please sign in to comment.