Skip to content

Commit

Permalink
Cache node attribute and fix signoff
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Wu <[email protected]>
  • Loading branch information
markwu-sde committed Feb 12, 2025
1 parent 7061074 commit e5f652b
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 56 deletions.
18 changes: 16 additions & 2 deletions src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

/**
* Runs the circuit breaker logic and updates the settings
*/
public class KNNCircuitBreaker {
private static Logger logger = LogManager.getLogger(KNNCircuitBreaker.class);
public static final String KNN_CIRCUIT_BREAKER_TIER = "knn_cb_tier";
public static int CB_TIME_INTERVAL = 2 * 60; // seconds

private static KNNCircuitBreaker INSTANCE;
Expand Down Expand Up @@ -112,8 +114,20 @@ public void initialize(ThreadPool threadPool, ClusterService clusterService, Cli
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void afterStart() {
// Node is fully initialized, rebuild the cache to fetch node-specific limits
nativeMemoryCacheManager.rebuildCache();
// Attempt to fetch a cb tier from node attributes and cache the result.
// Get this node's circuit breaker tier attribute
Optional<String> tierAttribute = Optional.ofNullable(
clusterService.localNode().getAttributes().get(KNN_CIRCUIT_BREAKER_TIER)
);
if (tierAttribute.isPresent()) {
// Only rebuild the cache if the attribute was present
logger.info(
"[KNN] Node specific circuit breaker " + tierAttribute.get() + " classification found. Rebuilding the cache."
);
KNNSettings.state().setNodeCbAttribute(tierAttribute);
nativeMemoryCacheManager.rebuildCache();
}

}
});
}
Expand Down
90 changes: 39 additions & 51 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.knn.index;

import lombok.Setter;
import lombok.extern.log4j.Log4j2;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -117,7 +119,6 @@ public class KNNSettings {
public static final Integer KNN_MAX_MODEL_CACHE_SIZE_LIMIT_PERCENTAGE = 25; // Model cache limit cannot exceed 25% of the JVM heap
public static final String KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT = "50%";
public static final String KNN_DEFAULT_VECTOR_STREAMING_MEMORY_LIMIT_PCT = "1%";
public static final String KNN_CIRCUIT_BREAKER_TIER = "knn_cb_tier";

public static final Integer ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD_DEFAULT_VALUE = -1;
public static final Integer KNN_DEFAULT_QUANTIZATION_STATE_CACHE_SIZE_LIMIT_PERCENTAGE = 5; // By default, set aside 5% of the JVM for
Expand Down Expand Up @@ -439,6 +440,8 @@ public class KNNSettings {

private ClusterService clusterService;
private Client client;
@Setter
private Optional<String> nodeCbAttribute;

private KNNSettings() {}

Expand All @@ -459,8 +462,7 @@ private void setSettingsUpdateConsumers() {
updatedSettings.getAsBoolean(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED))
);

// If any cb related limits have changed at the cluster or node level, we need to check if the max weight of the cache needs to
// be updated as well.
// Recompute cache weight
builder.maxWeight(getUpdatedCircuitBreakerLimit(updatedSettings).getKb());

builder.isExpirationLimited(
Expand Down Expand Up @@ -591,14 +593,30 @@ public static boolean isCircuitBreakerTriggered() {
}

/**
* Retrieves all available circuit breaker limit configurations.
* This group setting includes Node-specific circuit breaker limits
* It does not include the cluster-wide default limit, which is set separately.
*
* @return Settings object containing all node-level circuit breaker limit configurations
*/
public static Settings getAllAvailableNodeCbLimits() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX);
* Retrieves the node-specific circuit breaker limit based on the existing settings.
*
* @return String representation of the node-specific circuit breaker limit,
* or null if no node-specific limit is set or found
*/
private String getNodeCbLimit() {
if (nodeCbAttribute.isPresent()) {
Settings configuredNodeCbLimits = KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX);
return configuredNodeCbLimits.get(nodeCbAttribute.get());
}
return null;
}

/**
* Gets node-specific circuit breaker limit from updated settings.
*
* @param updatedSettings Settings object containing pending updates
* @return String representation of new limit if exists for this node's tier, null otherwise
*/
private String getNodeCbLimit(Settings updatedSettings) {
if (nodeCbAttribute.isPresent()) {
return updatedSettings.getByPrefix(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX).get(nodeCbAttribute.get());
}
return null;
}

/**
Expand All @@ -611,12 +629,12 @@ public static Settings getAllAvailableNodeCbLimits() {
*
* @return ByteSizeValue representing the cluster-wide circuit breaker limit
*/
public static ByteSizeValue getClusterLevelCircuitBreakerLimit() {
public static ByteSizeValue getClusterCbLimit() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT);
}

/**
* Returns the circuit breaker limit for this node. The limit is determined by:
* Returns the circuit breaker limit for this node using existing configuration. The limit is determined by:
* 1. Node-specific limit based on the node's circuit breaker tier attribute, if configured
* 2. Cluster-level default limit if no node-specific configuration exists
*
Expand All @@ -625,37 +643,8 @@ public static ByteSizeValue getClusterLevelCircuitBreakerLimit() {
*/
public ByteSizeValue getCircuitBreakerLimit() {

// Attempt to fetch node-level limits if present - otherwise fall back to the cluster-level value
return parseknnMemoryCircuitBreakerValue(
getKnnCircuitBreakerLimitForNode(getAllAvailableNodeCbLimits()),
getClusterLevelCircuitBreakerLimit(),
KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX
);
}

/**
* Retrieves the circuit breaker limit configuration for this node's tier.
* The method:
* 1. Gets the node's knn_cb_tier attribute (if it exists)
* 2. Looks up the corresponding limit configuration
*
* For example, if a node has attribute knn_cb_tier: "high",
* this will return the limit configured for the "high" tier.
*
* @return String representing the circuit breaker limit for this node's tier,
* null if either:
* - Node has no knn_cb_tier attribute
* - No configuration exists for the node's tier
*/
private String getKnnCircuitBreakerLimitForNode(Settings settings) {
// Get this node's circuit breaker tier attribute
String tierAttribute = clusterService.localNode().getAttributes().get(KNN_CIRCUIT_BREAKER_TIER);

if (tierAttribute == null) {
return null;
}
return parseknnMemoryCircuitBreakerValue(getNodeCbLimit(), getClusterCbLimit(), KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT);

return settings.get(tierAttribute);
}

/**
Expand All @@ -672,9 +661,9 @@ private String getKnnCircuitBreakerLimitForNode(Settings settings) {
private ByteSizeValue getUpdatedCircuitBreakerLimit(Settings updatedCbLimits) {
// Parse any updates, using appropriate fallback if no node-specific limit update exists
return parseknnMemoryCircuitBreakerValue(
getKnnCircuitBreakerLimitForNode(updatedCbLimits.getByPrefix(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX)),
getNodeCbLimit(updatedCbLimits),
getFallbackCbLimitValue(updatedCbLimits),
KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX
KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT
);
}

Expand All @@ -695,12 +684,10 @@ private ByteSizeValue getUpdatedCircuitBreakerLimit(Settings updatedCbLimits) {
* @return ByteSizeValue representing the appropriate fallback limit
*/
private ByteSizeValue getFallbackCbLimitValue(Settings updatedCbLimits) {
// Check if node is currently using cluster-level limit
if (getKnnCircuitBreakerLimitForNode(getAllAvailableNodeCbLimits()) == null) {
// If there's an updated cluster-level limit, use it
if (updatedCbLimits.hasValue(KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT)) {
return (ByteSizeValue) getSetting(KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT).get(updatedCbLimits);
}
// Update cluster level limit if used
if (getNodeCbLimit() == null && updatedCbLimits.hasValue(KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT)) {
return (ByteSizeValue) getSetting(KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT).get(updatedCbLimits);

}

// Otherwise maintain current limit (either tier-specific or cluster-level)
Expand Down Expand Up @@ -772,6 +759,7 @@ public static boolean isShardLevelRescoringDisabledForDiskBasedVector(String ind
public void initialize(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
this.nodeCbAttribute = Optional.empty();
setSettingsUpdateConsumers();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private void initialize() {
NativeMemoryCacheManagerDto.builder()
.isWeightLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED))
// Initially use cluster-level limit; will be updated later during cache refresh if node-specific limit exists
.maxWeight(KNNSettings.getClusterLevelCircuitBreakerLimit().getKb())
.maxWeight(KNNSettings.getClusterCbLimit().getKb())
.isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED))
.expiryTimeInMin(
((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void testGetIndexGraphCount() throws ExecutionException, IOException {

public void testGetMaxCacheSizeInKB() {
NativeMemoryCacheManager nativeMemoryCacheManager = new NativeMemoryCacheManager();
assertEquals(KNNSettings.getClusterLevelCircuitBreakerLimit().getKb(), nativeMemoryCacheManager.getMaxCacheSizeInKilobytes());
assertEquals(KNNSettings.getClusterCbLimit().getKb(), nativeMemoryCacheManager.getMaxCacheSizeInKilobytes());
nativeMemoryCacheManager.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public static void setUpClass() throws Exception {
CIRCUIT_BREAKER_LIMIT_100KB,
KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT
);
knnSettingsMockedStatic.when(KNNSettings::getClusterLevelCircuitBreakerLimit).thenReturn(v);
knnSettingsMockedStatic.when(KNNSettings::getClusterCbLimit).thenReturn(v);
knnSettingsMockedStatic.when(KNNSettings::state).thenReturn(knnSettings);
knnSettingsMockedStatic.when(KNNSettings::isKNNPluginEnabled).thenReturn(true);
ByteSizeValue cacheSize = ByteSizeValue.parseBytesSizeValue("1024kb", QUANTIZATION_STATE_CACHE_SIZE_LIMIT); // Setting 1MB as an
Expand Down

0 comments on commit e5f652b

Please sign in to comment.