Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce node level circuit breaker settings for k-NN #2509

Merged
merged 9 commits into from
Feb 26, 2025
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ integTest {
testClusters.integTest {
testDistribution = "ARCHIVE"

//Used for circuit breaker integration tests
setting 'node.attr.knn_cb_tier', 'integ'

// Optionally install security
if (System.getProperty("security.enabled") != null) {
configureSecurityPlugin(testClusters.integTest)
Expand All @@ -517,6 +520,7 @@ testClusters.integTest {
environment('PATH', System.getenv('PATH') + ";$rootDir/jni/build/release" + ";$rootDir/src/main/resources/windowsDependencies")
}


// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
Expand Down
26 changes: 25 additions & 1 deletion src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.knn.index;

import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.plugin.stats.StatNames;
import org.opensearch.knn.plugin.transport.KNNStatsAction;
Expand All @@ -20,12 +21,14 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

/**
* Runs the circuit breaker logic and updates the settings
*/
public class KNNCircuitBreaker {
private static Logger logger = LogManager.getLogger(KNNCircuitBreaker.class);
public static final String KNN_CIRCUIT_BREAKER_TIER = "knn_cb_tier";
public static int CB_TIME_INTERVAL = 2 * 60; // seconds

private static KNNCircuitBreaker INSTANCE;
Expand Down Expand Up @@ -59,7 +62,7 @@ public void initialize(ThreadPool threadPool, ClusterService clusterService, Cli
Runnable runnable = () -> {
if (nativeMemoryCacheManager.isCacheCapacityReached() && clusterService.localNode().isDataNode()) {
long currentSizeKiloBytes = nativeMemoryCacheManager.getCacheSizeInKilobytes();
long circuitBreakerLimitSizeKiloBytes = KNNSettings.getCircuitBreakerLimit().getKb();
long circuitBreakerLimitSizeKiloBytes = KNNSettings.state().getCircuitBreakerLimit().getKb();
long circuitBreakerUnsetSizeKiloBytes = (long) ((KNNSettings.getCircuitBreakerUnsetPercentage() / 100)
* circuitBreakerLimitSizeKiloBytes);
/**
Expand Down Expand Up @@ -106,5 +109,26 @@ public void initialize(ThreadPool threadPool, ClusterService clusterService, Cli
}
};
this.threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueSeconds(CB_TIME_INTERVAL), ThreadPool.Names.GENERIC);

// Update when node is fully joined
clusterService.addLifecycleListener(new LifecycleListener() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Node attributes aren't available during the time the cache manager computes its size on initialization. With the additional dependency on node attributes to the circuit breaker we may need to check if the circuit breaker needs to be updated after the node has bootstrapped.

I only see 1 example of a listener being attached in here but attached the listener on KnnCircuitBreaker initialization due needing to refresh the cache to recompute the size and caching the attribute value. If there's any feedback on this that'd be appreciated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach. I think it makes sense.

@Override
public void afterStart() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really node attribute (knn_cb_tier) on node start
Instead, this should fetch attributes dynamically whenever needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I had the attribute fetched dynamically to compute the circuit breaker value whenever we need it. The circuit breaker value is really only used by the CacheManager and I had mentioned earlier that on cache initialization we don't have access to node attributes yet and so there were 2 thoughts that I had with this approach -

  1. Not having fetching this attribute on node start means that the cache will continue to maintain the cluster level size until the cache is rebuilt again with a updated value from the attribute. Not doing this on node start would mean this process would have to be happening during some sort of dynamic update of the cache (through a settings update or something else) in which we'll have to pass in the cache and have it rebuild. My idea here is that we set the correct cache size as soon as the node starts to ensure that there's no discrepancy between what users are expecting and what the cache size actually is. Since the circuit breaker here already interacts with the NativeCacheManager I thought it'd be the logical place to insert

  2. Another issue is related to the comment I posted. I want to avoid having to reference cluster state configurations during different parts of the lifecycle and just have the value persist through the entirety.

Operating under the assumption that node attributes are considered to be statically defined as of now fetching the value on node start seems to be easier here but also open to suggestions on how we can go about doing this dynamically.

// Attempt to fetch a cb tier from node attributes and cache the result.
// Get this node's circuit breaker tier attribute
Optional<String> tierAttribute = Optional.ofNullable(
clusterService.localNode().getAttributes().get(KNN_CIRCUIT_BREAKER_TIER)
);
if (tierAttribute.isPresent()) {
// Only rebuild the cache if the attribute was present
logger.info(
"[KNN] Node specific circuit breaker " + tierAttribute.get() + " classification found. Rebuilding the cache."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"[KNN] Node specific circuit breaker " + tierAttribute.get() + " classification found. Rebuilding the cache."
"[KNN] Node specific circuit breaker {} classification found. Rebuilding the cache.", tierAttribute.get());

);
KNNSettings.state().setNodeCbAttribute(tierAttribute);
nativeMemoryCacheManager.rebuildCache();
}

}
});
}
}
155 changes: 143 additions & 12 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.knn.index;

import lombok.Setter;
import lombok.extern.log4j.Log4j2;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -75,7 +77,8 @@ public class KNNSettings {
public static final String KNN_ALGO_PARAM_EF_SEARCH = "index.knn.algo_param.ef_search";
public static final String KNN_ALGO_PARAM_INDEX_THREAD_QTY = "knn.algo_param.index_thread_qty";
public static final String KNN_MEMORY_CIRCUIT_BREAKER_ENABLED = "knn.memory.circuit_breaker.enabled";
public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT = "knn.memory.circuit_breaker.limit";
public static final String KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT = "knn.memory.circuit_breaker.limit";
public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX = "knn.memory.circuit_breaker.limit.";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX = KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT + ".";

public static final String KNN_VECTOR_STREAMING_MEMORY_LIMIT_IN_MB = "knn.vector_streaming_memory.limit";
public static final String KNN_CIRCUIT_BREAKER_TRIGGERED = "knn.circuit_breaker.triggered";
public static final String KNN_CACHE_ITEM_EXPIRY_ENABLED = "knn.cache.item.expiry.enabled";
Expand Down Expand Up @@ -402,17 +405,42 @@ public class KNNSettings {
* Weight circuit breaker settings
*/
put(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, Setting.boolSetting(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, true, NodeScope, Dynamic));

/**
* Group setting that manages node-level circuit breaker configurations based on node tiers.
* Settings under this group define memory limits for different node classifications.
* Validation of limit occurs before the setting is retrieved.
*/
put(
KNN_MEMORY_CIRCUIT_BREAKER_LIMIT,
KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX,
Setting.groupSetting(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX, settings -> {
settings.keySet()
.forEach(
(limit) -> parseknnMemoryCircuitBreakerValue(
settings.get(limit),
KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran into an issue where not having the validation being performed in parseKnnMemoryCircuitBreakerValue specifically here would result in dropping the cluster on a uncaught exception if the same method was hit in line 633 with a invalid knn cb value. There may be some exception handling happening earlier in the stack that allows an exception to be caught gracefully here doesn't exist later on but to maintain the same behavior we validate the given values before trying to update the cache.

)
);
}, NodeScope, Dynamic)
);

/**
* Cluster-wide circuit breaker limit that serves as the default configuration.
* This setting is used when a node either:
* - Has no knn_cb_tier attribute defined
* - Has a tier that doesn't match any node-level configuration
* Default value: {@value KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT}
*/
put(
KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT,
new Setting<>(
KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT,
KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT,
KNNSettings.KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT,
(s) -> parseknnMemoryCircuitBreakerValue(s, KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT),
(s) -> parseknnMemoryCircuitBreakerValue(s, KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT),
NodeScope,
Dynamic
)
);

/**
* Cache expiry time settings
*/
Expand All @@ -429,6 +457,8 @@ public class KNNSettings {

private ClusterService clusterService;
private Client client;
@Setter
private Optional<String> nodeCbAttribute;

private KNNSettings() {}

Expand All @@ -449,10 +479,8 @@ private void setSettingsUpdateConsumers() {
updatedSettings.getAsBoolean(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED))
);

builder.maxWeight(((ByteSizeValue) getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)).getKb());
if (updatedSettings.hasValue(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)) {
builder.maxWeight(((ByteSizeValue) getSetting(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT).get(updatedSettings)).getKb());
}
// Recompute cache weight
builder.maxWeight(getUpdatedCircuitBreakerLimit(updatedSettings).getKb());

builder.isExpirationLimited(
updatedSettings.getAsBoolean(KNN_CACHE_ITEM_EXPIRY_ENABLED, getSettingValue(KNN_CACHE_ITEM_EXPIRY_ENABLED))
Expand Down Expand Up @@ -591,8 +619,106 @@ public static boolean isCircuitBreakerTriggered() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED);
}

public static ByteSizeValue getCircuitBreakerLimit() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT);
/**
* Retrieves the node-specific circuit breaker limit based on the existing settings.
*
* @return String representation of the node-specific circuit breaker limit,
* or null if no node-specific limit is set or found
*/
private String getNodeCbLimit() {
if (nodeCbAttribute.isPresent()) {
Settings configuredNodeCbLimits = KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX);
return configuredNodeCbLimits.get(nodeCbAttribute.get());
}
return null;
}

/**
* Gets node-specific circuit breaker limit from updated settings.
*
* @param updatedSettings Settings object containing pending updates
* @return String representation of new limit if exists for this node's tier, null otherwise
*/
private String getNodeCbLimit(Settings updatedSettings) {
if (nodeCbAttribute.isPresent()) {
return updatedSettings.getByPrefix(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX).get(nodeCbAttribute.get());
}
return null;
}

/**
* Returns the cluster-level circuit breaker limit. Needed for initialization
* during startup when node attributes are not yet available through ClusterService.
* This limit serves two purposes:
* 1. As a temporary value during node startup before node-specific limits can be determined
* 2. As a fallback value for nodes that don't have a knn_cb_tier attribute or
* whose tier doesn't match any configured node-level limit
*
* @return ByteSizeValue representing the cluster-wide circuit breaker limit
*/
public static ByteSizeValue getClusterCbLimit() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT);
}

/**
* Returns the circuit breaker limit for this node using existing configuration. The limit is determined by:
* 1. Node-specific limit based on the node's circuit breaker tier attribute, if configured
* 2. Cluster-level default limit if no node-specific configuration exists
*
* @return ByteSizeValue representing the circuit breaker limit, either as a percentage
* of available memory or as an absolute value
*/
public ByteSizeValue getCircuitBreakerLimit() {

return parseknnMemoryCircuitBreakerValue(getNodeCbLimit(), getClusterCbLimit(), KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT);

}

/**
* Determines if and how the circuit breaker limit should be updated for this node.
* Evaluates both node-specific and cluster-level updates in the updated settings,
* maintaining proper precedence:
* 1. Node-tier specific limit from updates (if available)
* 2. Appropriate fallback value based on node's current configuration
*
* @param updatedCbLimits Settings object containing pending circuit breaker updates
* @return ByteSizeValue representing the new circuit breaker limit to apply,
* or null if no applicable updates found
*/
private ByteSizeValue getUpdatedCircuitBreakerLimit(Settings updatedCbLimits) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updates circuit breaker settings every time the cluster settings change, should we only update when the node-specific tier is modified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I had addressed this in my comment here. The cache gets rebuilt every time the a dynamic cluster setting changes so I don't see an option to lazily update the based off what setting got updated.

If we look at the current code the weight is already getting recomputed every time any dynamic setting gets refreshed. What I'm doing here is simplifying that logic to recompute the max weight to take into account node level attributes.

// Parse any updates, using appropriate fallback if no node-specific limit update exists
return parseknnMemoryCircuitBreakerValue(
getNodeCbLimit(updatedCbLimits),
getFallbackCbLimitValue(updatedCbLimits),
KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT
);
}

/**
* Determines the appropriate fallback circuit breaker limit value.
* The fallback logic follows this hierarchy:
* 1. If node currently uses cluster-level limit:
* - Use updated cluster-level limit if available
* - Otherwise maintain current limit
* 2. If node uses tier-specific limit:
* - Maintain current limit (ignore cluster-level updates)
*
* This ensures nodes maintain their configuration hierarchy and don't
* inadvertently fall back to cluster-level limits when they should use
* tier-specific values.
*
* @param updatedCbLimits Settings object containing pending updates
* @return ByteSizeValue representing the appropriate fallback limit
*/
private ByteSizeValue getFallbackCbLimitValue(Settings updatedCbLimits) {
// Update cluster level limit if used
if (getNodeCbLimit() == null && updatedCbLimits.hasValue(KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT)) {
return (ByteSizeValue) getSetting(KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT).get(updatedCbLimits);

}

// Otherwise maintain current limit (either tier-specific or cluster-level)
return getCircuitBreakerLimit();
}

public static double getCircuitBreakerUnsetPercentage() {
Expand Down Expand Up @@ -660,10 +786,15 @@ public static boolean isShardLevelRescoringDisabledForDiskBasedVector(String ind
public void initialize(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
this.nodeCbAttribute = Optional.empty();
setSettingsUpdateConsumers();
}

public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, String settingName) {
return parseknnMemoryCircuitBreakerValue(sValue, null, settingName);
}

public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, ByteSizeValue defaultValue, String settingName) {
settingName = Objects.requireNonNull(settingName);
if (sValue != null && sValue.endsWith("%")) {
final String percentAsString = sValue.substring(0, sValue.length() - 1);
Expand All @@ -683,7 +814,7 @@ public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, Str
throw new OpenSearchParseException("failed to parse [{}] as a double", e, percentAsString);
}
} else {
return parseBytesSizeValue(sValue, settingName);
return parseBytesSizeValue(sValue, defaultValue, settingName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,20 @@ public static synchronized NativeMemoryCacheManager getInstance() {
return INSTANCE;
}

/**
* Initialize NativeMemoryCacheManager with configurations.
* Note: maxWeight is initially set to the cluster-level circuit breaker limit
* because node attributes are not yet available during startup. Once the
* ClusterService is fully bootstrapped, the circuit breaker will update this
* value to use any node-specific limits based on the node's circuit_breaker_tier
* attribute if configured.
*/
private void initialize() {
initialize(
NativeMemoryCacheManagerDto.builder()
.isWeightLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED))
.maxWeight(KNNSettings.getCircuitBreakerLimit().getKb())
// Initially use cluster-level limit; will be updated later during cache refresh if node-specific limit exists
.maxWeight(KNNSettings.getClusterCbLimit().getKb())
.isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED))
.expiryTimeInMin(
((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes()
Expand Down Expand Up @@ -127,7 +136,7 @@ public synchronized void rebuildCache() {
rebuildCache(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire cache is rebuilt every time a node joins, even if the circuit breaker limit remains unchanged. Can we lazily do this that updates only if the limit actually changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand here the cache gets rebuilt in 3 distinct places within the code:

  1. On initialization
  2. On a dynamic settings update
  3. On indexModule

For option 1, when a node joins I assumed this to be a part of the initialization. In that case my thought was that we need to recompute the value of the circuit breaker limit since each the attributes belonging to that node may correspond to a different circuit breaker tier.

As for option 2, existing behavior within k-nn dictates that whenever a dynamic setting gets updated the cache gets rebuilt. This applies to all settings that are dynamically configured. The configuration system within k-nn will throw away no-ops in the updated settings and so each call to this option will guarantee to have one of the dynamically configurations to have changed. A good callout here is that the cache is being rebuilt every time one of these settings have changed (regardless of if the limit has changed) but I see this as modifying existing k-nn behavior which I'd like to minimize outside the scope of my PR. The only change I am making here is modifying how we compute the weight of the cache in the event where it needs to be rebuilt.

The behavior with option 3 doesn't have a relation with our change I believe.

NativeMemoryCacheManagerDto.builder()
.isWeightLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED))
.maxWeight(KNNSettings.getCircuitBreakerLimit().getKb())
.maxWeight(KNNSettings.state().getCircuitBreakerLimit().getKb())
.isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED))
.expiryTimeInMin(
((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes()
Expand Down Expand Up @@ -460,7 +469,7 @@ private void onRemoval(RemovalNotification<String, NativeMemoryAllocation> remov
}

private Float getSizeAsPercentage(long size) {
long cbLimit = KNNSettings.getCircuitBreakerLimit().getKb();
long cbLimit = KNNSettings.state().getCircuitBreakerLimit().getKb();
if (cbLimit == 0) {
return 0.0F;
}
Expand Down
Loading
Loading