diff --git a/CHANGELOG.md b/CHANGELOG.md index 70ec3ae510..7070498883 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD) ### Features ### Enhancements +* Introduce node level circuit breakers for k-NN [#2509](https://github.com/opensearch-project/k-NN/pull/2509) ### Bug Fixes ### Infrastructure * Removed JDK 11 and 17 version from CI runs [#1921](https://github.com/opensearch-project/k-NN/pull/1921) diff --git a/build.gradle b/build.gradle index b5f7158471..81d104d768 100644 --- a/build.gradle +++ b/build.gradle @@ -506,6 +506,9 @@ integTest { testClusters.integTest { testDistribution = "ARCHIVE" + //Used for circuit breaker integration tests + setting 'node.attr.knn_cb_tier', 'integ' + // Optionally install security if (System.getProperty("security.enabled") != null) { configureSecurityPlugin(testClusters.integTest) @@ -517,6 +520,7 @@ testClusters.integTest { environment('PATH', System.getenv('PATH') + ";$rootDir/jni/build/release" + ";$rootDir/src/main/resources/windowsDependencies") } + // Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1 if (_numNodes > 1) numberOfNodes = _numNodes // When running integration tests it doesn't forward the --debug-jvm to the cluster anymore diff --git a/src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java b/src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java index fbb025c973..4829777be0 100644 --- a/src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java +++ b/src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java @@ -26,6 +26,7 @@ */ public class KNNCircuitBreaker { private static Logger logger = LogManager.getLogger(KNNCircuitBreaker.class); + public static final String KNN_CIRCUIT_BREAKER_TIER = "knn_cb_tier"; public static int CB_TIME_INTERVAL = 2 * 60; // seconds private static KNNCircuitBreaker INSTANCE; @@ -59,7 +60,7 @@ public void initialize(ThreadPool threadPool, ClusterService clusterService, Cli Runnable runnable = () -> { if (nativeMemoryCacheManager.isCacheCapacityReached() && clusterService.localNode().isDataNode()) { long currentSizeKiloBytes = nativeMemoryCacheManager.getCacheSizeInKilobytes(); - long circuitBreakerLimitSizeKiloBytes = KNNSettings.getCircuitBreakerLimit().getKb(); + long circuitBreakerLimitSizeKiloBytes = KNNSettings.state().getCircuitBreakerLimit().getKb(); long circuitBreakerUnsetSizeKiloBytes = (long) ((KNNSettings.getCircuitBreakerUnsetPercentage() / 100) * circuitBreakerLimitSizeKiloBytes); /** diff --git a/src/main/java/org/opensearch/knn/index/KNNSettings.java b/src/main/java/org/opensearch/knn/index/KNNSettings.java index c33f3ea63c..95bb7099b5 100644 --- a/src/main/java/org/opensearch/knn/index/KNNSettings.java +++ b/src/main/java/org/opensearch/knn/index/KNNSettings.java @@ -5,6 +5,7 @@ package org.opensearch.knn.index; +import lombok.Setter; import lombok.extern.log4j.Log4j2; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -35,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -75,7 +77,8 @@ public class KNNSettings { public static final String KNN_ALGO_PARAM_EF_SEARCH = "index.knn.algo_param.ef_search"; public static final String KNN_ALGO_PARAM_INDEX_THREAD_QTY = "knn.algo_param.index_thread_qty"; public static final String KNN_MEMORY_CIRCUIT_BREAKER_ENABLED = "knn.memory.circuit_breaker.enabled"; - public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT = "knn.memory.circuit_breaker.limit"; + public static final String KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT = "knn.memory.circuit_breaker.limit"; + public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX = KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT + "."; public static final String KNN_VECTOR_STREAMING_MEMORY_LIMIT_IN_MB = "knn.vector_streaming_memory.limit"; public static final String KNN_CIRCUIT_BREAKER_TRIGGERED = "knn.circuit_breaker.triggered"; public static final String KNN_CACHE_ITEM_EXPIRY_ENABLED = "knn.cache.item.expiry.enabled"; @@ -402,17 +405,42 @@ public class KNNSettings { * Weight circuit breaker settings */ put(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, Setting.boolSetting(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, true, NodeScope, Dynamic)); + + /** + * Group setting that manages node-level circuit breaker configurations based on node tiers. + * Settings under this group define memory limits for different node classifications. + * Validation of limit occurs before the setting is retrieved. + */ put( - KNN_MEMORY_CIRCUIT_BREAKER_LIMIT, + KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX, + Setting.groupSetting(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX, settings -> { + settings.keySet() + .forEach( + (limit) -> parseknnMemoryCircuitBreakerValue( + settings.get(limit), + KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT + ) + ); + }, NodeScope, Dynamic) + ); + + /** + * Cluster-wide circuit breaker limit that serves as the default configuration. + * This setting is used when a node either: + * - Has no knn_cb_tier attribute defined + * - Has a tier that doesn't match any node-level configuration + * Default value: {@value KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT} + */ + put( + KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT, new Setting<>( - KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT, + KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT, KNNSettings.KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT, - (s) -> parseknnMemoryCircuitBreakerValue(s, KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT), + (s) -> parseknnMemoryCircuitBreakerValue(s, KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT), NodeScope, Dynamic ) ); - /** * Cache expiry time settings */ @@ -429,6 +457,8 @@ public class KNNSettings { private ClusterService clusterService; private Client client; + @Setter + private Optional nodeCbAttribute; private KNNSettings() {} @@ -449,10 +479,8 @@ private void setSettingsUpdateConsumers() { updatedSettings.getAsBoolean(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED, getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_ENABLED)) ); - builder.maxWeight(((ByteSizeValue) getSettingValue(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)).getKb()); - if (updatedSettings.hasValue(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)) { - builder.maxWeight(((ByteSizeValue) getSetting(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT).get(updatedSettings)).getKb()); - } + // Recompute cache weight + builder.maxWeight(getUpdatedCircuitBreakerLimit(updatedSettings).getKb()); builder.isExpirationLimited( updatedSettings.getAsBoolean(KNN_CACHE_ITEM_EXPIRY_ENABLED, getSettingValue(KNN_CACHE_ITEM_EXPIRY_ENABLED)) @@ -591,8 +619,106 @@ public static boolean isCircuitBreakerTriggered() { return KNNSettings.state().getSettingValue(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED); } - public static ByteSizeValue getCircuitBreakerLimit() { - return KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT); + /** + * Retrieves the node-specific circuit breaker limit based on the existing settings. + * + * @return String representation of the node-specific circuit breaker limit, + * or null if no node-specific limit is set or found + */ + private String getNodeCbLimit() { + if (nodeCbAttribute.isPresent()) { + Settings configuredNodeCbLimits = KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX); + return configuredNodeCbLimits.get(nodeCbAttribute.get()); + } + return null; + } + + /** + * Gets node-specific circuit breaker limit from updated settings. + * + * @param updatedSettings Settings object containing pending updates + * @return String representation of new limit if exists for this node's tier, null otherwise + */ + private String getNodeCbLimit(Settings updatedSettings) { + if (nodeCbAttribute.isPresent()) { + return updatedSettings.getByPrefix(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_PREFIX).get(nodeCbAttribute.get()); + } + return null; + } + + /** + * Returns the cluster-level circuit breaker limit. Needed for initialization + * during startup when node attributes are not yet available through ClusterService. + * This limit serves two purposes: + * 1. As a temporary value during node startup before node-specific limits can be determined + * 2. As a fallback value for nodes that don't have a knn_cb_tier attribute or + * whose tier doesn't match any configured node-level limit + * + * @return ByteSizeValue representing the cluster-wide circuit breaker limit + */ + public static ByteSizeValue getClusterCbLimit() { + return KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT); + } + + /** + * Returns the circuit breaker limit for this node using existing configuration. The limit is determined by: + * 1. Node-specific limit based on the node's circuit breaker tier attribute, if configured + * 2. Cluster-level default limit if no node-specific configuration exists + * + * @return ByteSizeValue representing the circuit breaker limit, either as a percentage + * of available memory or as an absolute value + */ + public ByteSizeValue getCircuitBreakerLimit() { + + return parseknnMemoryCircuitBreakerValue(getNodeCbLimit(), getClusterCbLimit(), KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT); + + } + + /** + * Determines if and how the circuit breaker limit should be updated for this node. + * Evaluates both node-specific and cluster-level updates in the updated settings, + * maintaining proper precedence: + * 1. Node-tier specific limit from updates (if available) + * 2. Appropriate fallback value based on node's current configuration + * + * @param updatedCbLimits Settings object containing pending circuit breaker updates + * @return ByteSizeValue representing the new circuit breaker limit to apply, + * or null if no applicable updates found + */ + private ByteSizeValue getUpdatedCircuitBreakerLimit(Settings updatedCbLimits) { + // Parse any updates, using appropriate fallback if no node-specific limit update exists + return parseknnMemoryCircuitBreakerValue( + getNodeCbLimit(updatedCbLimits), + getFallbackCbLimitValue(updatedCbLimits), + KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT + ); + } + + /** + * Determines the appropriate fallback circuit breaker limit value. + * The fallback logic follows this hierarchy: + * 1. If node currently uses cluster-level limit: + * - Use updated cluster-level limit if available + * - Otherwise maintain current limit + * 2. If node uses tier-specific limit: + * - Maintain current limit (ignore cluster-level updates) + * + * This ensures nodes maintain their configuration hierarchy and don't + * inadvertently fall back to cluster-level limits when they should use + * tier-specific values. + * + * @param updatedCbLimits Settings object containing pending updates + * @return ByteSizeValue representing the appropriate fallback limit + */ + private ByteSizeValue getFallbackCbLimitValue(Settings updatedCbLimits) { + // Update cluster level limit if used + if (getNodeCbLimit() == null && updatedCbLimits.hasValue(KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT)) { + return (ByteSizeValue) getSetting(KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT).get(updatedCbLimits); + + } + + // Otherwise maintain current limit (either tier-specific or cluster-level) + return getCircuitBreakerLimit(); } public static double getCircuitBreakerUnsetPercentage() { @@ -660,10 +786,15 @@ public static boolean isShardLevelRescoringDisabledForDiskBasedVector(String ind public void initialize(Client client, ClusterService clusterService) { this.client = client; this.clusterService = clusterService; + this.nodeCbAttribute = Optional.empty(); setSettingsUpdateConsumers(); } public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, String settingName) { + return parseknnMemoryCircuitBreakerValue(sValue, null, settingName); + } + + public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, ByteSizeValue defaultValue, String settingName) { settingName = Objects.requireNonNull(settingName); if (sValue != null && sValue.endsWith("%")) { final String percentAsString = sValue.substring(0, sValue.length() - 1); @@ -683,7 +814,7 @@ public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, Str throw new OpenSearchParseException("failed to parse [{}] as a double", e, percentAsString); } } else { - return parseBytesSizeValue(sValue, settingName); + return parseBytesSizeValue(sValue, defaultValue, settingName); } } diff --git a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java index 76e94ee665..cc7edbf276 100644 --- a/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java +++ b/src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java @@ -81,11 +81,20 @@ public static synchronized NativeMemoryCacheManager getInstance() { return INSTANCE; } + /** + * Initialize NativeMemoryCacheManager with configurations. + * Note: maxWeight is initially set to the cluster-level circuit breaker limit + * because node attributes are not yet available during startup. Once the + * ClusterService is fully bootstrapped, the circuit breaker will update this + * value to use any node-specific limits based on the node's circuit_breaker_tier + * attribute if configured. + */ private void initialize() { initialize( NativeMemoryCacheManagerDto.builder() .isWeightLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED)) - .maxWeight(KNNSettings.getCircuitBreakerLimit().getKb()) + // Initially use cluster-level limit; will be updated later during cache refresh if node-specific limit exists + .maxWeight(KNNSettings.getClusterCbLimit().getKb()) .isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED)) .expiryTimeInMin( ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes() @@ -127,7 +136,7 @@ public synchronized void rebuildCache() { rebuildCache( NativeMemoryCacheManagerDto.builder() .isWeightLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED)) - .maxWeight(KNNSettings.getCircuitBreakerLimit().getKb()) + .maxWeight(KNNSettings.state().getCircuitBreakerLimit().getKb()) .isExpirationLimited(KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED)) .expiryTimeInMin( ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES)).getMinutes() @@ -460,7 +469,7 @@ private void onRemoval(RemovalNotification remov } private Float getSizeAsPercentage(long size) { - long cbLimit = KNNSettings.getCircuitBreakerLimit().getKb(); + long cbLimit = KNNSettings.state().getCircuitBreakerLimit().getKb(); if (cbLimit == 0) { return 0.0F; } diff --git a/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java b/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java index a2bebf6a12..e8aa580f47 100644 --- a/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java +++ b/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java @@ -10,6 +10,7 @@ import org.opensearch.cluster.NamedDiff; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; @@ -82,6 +83,7 @@ import org.opensearch.knn.training.TrainingJobClusterStateListener; import org.opensearch.knn.training.TrainingJobRunner; import org.opensearch.knn.training.VectorReader; +import org.opensearch.plugins.ClusterPlugin; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.EnginePlugin; import org.opensearch.plugins.ExtensiblePlugin; @@ -116,6 +118,7 @@ import static org.opensearch.knn.common.KNNConstants.KNN_THREAD_POOL_PREFIX; import static org.opensearch.knn.common.KNNConstants.MODEL_INDEX_NAME; import static org.opensearch.knn.common.KNNConstants.TRAIN_THREAD_POOL; +import static org.opensearch.knn.index.KNNCircuitBreaker.KNN_CIRCUIT_BREAKER_TIER; /** * Entry point for the KNN plugin where we define mapper for knn_vector type @@ -153,6 +156,7 @@ public class KNNPlugin extends Plugin SearchPlugin, ActionPlugin, EnginePlugin, + ClusterPlugin, ScriptPlugin, ExtensiblePlugin, SystemIndexPlugin { @@ -362,4 +366,22 @@ public Collection getSystemIndexDescriptors(Settings sett public Optional getConcurrentSearchRequestDeciderFactory() { return Optional.of(new KNNConcurrentSearchRequestDecider.Factory()); } + + @Override + public void onNodeStarted(DiscoveryNode localNode) { + // Attempt to fetch a cb tier from node attributes and cache the result. + // Get this node's circuit breaker tier attribute + Optional tierAttribute = Optional.ofNullable(localNode.getAttributes().get(KNN_CIRCUIT_BREAKER_TIER)); + if (tierAttribute.isPresent()) { + KNNSettings.state().setNodeCbAttribute(tierAttribute); + + // Only rebuild the cache if the weight has actually changed + if (KNNSettings.state().getCircuitBreakerLimit().getKb() != NativeMemoryCacheManager.getInstance() + .getMaxCacheSizeInKilobytes()) { + NativeMemoryCacheManager.getInstance().rebuildCache(); + } + } + + } + } diff --git a/src/test/java/org/opensearch/knn/index/KNNCircuitBreakerIT.java b/src/test/java/org/opensearch/knn/index/KNNCircuitBreakerIT.java index 385491cd98..b9e0118113 100644 --- a/src/test/java/org/opensearch/knn/index/KNNCircuitBreakerIT.java +++ b/src/test/java/org/opensearch/knn/index/KNNCircuitBreakerIT.java @@ -5,13 +5,17 @@ package org.opensearch.knn.index; +import org.junit.Assert; import org.opensearch.knn.KNNRestTestCase; import org.apache.hc.core5.http.io.entity.EntityUtils; import org.opensearch.client.Response; import org.opensearch.common.settings.Settings; import org.opensearch.knn.index.query.KNNQueryBuilder; +import org.opensearch.knn.plugin.stats.StatNames; +import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Map; import static org.opensearch.knn.index.KNNCircuitBreaker.CB_TIME_INTERVAL; @@ -21,20 +25,79 @@ */ public class KNNCircuitBreakerIT extends KNNRestTestCase { private static final Integer ALWAYS_BUILD_GRAPH = 0; + private static final String INDEX_1 = INDEX_NAME + "1"; + private static final String INDEX_2 = INDEX_NAME + "2"; /** - * To trip the circuit breaker, we will create two indices and index documents. Each index will be small enough so - * that individually they fit into the cache, but together they do not. To prevent Lucene conditions where - * multiple segments may or may not be created, we will force merge each index into a single segment before - * searching. + * Base setup for all circuit breaker tests. + * Creates two indices with enough documents to consume ~2kb of memory when loaded. */ - private void tripCb() throws Exception { - // Make sure that Cb is intially not tripped + private void setupIndices() throws Exception { + generateCbLoad(INDEX_1, INDEX_2); + + // Verify initial state assertFalse(isCbTripped()); + } - // Set circuit breaker limit to 1 KB + /** + * Tests circuit breaker behavior with only cluster-level limit configured. + * Expected behavior: + * 1. Set cluster limit to 1kb + * 2. Load indices consuming 2kb + * 3. Circuit breaker should trip + */ + private void testClusterLevelCircuitBreaker() throws Exception { + // Set cluster-level limit to 1kb (half of what indices require) updateClusterSettings("knn.memory.circuit_breaker.limit", "1kb"); + // Load indices into cache + search(INDEX_1, INDEX_2); + + // Verify circuit breaker tripped + Thread.sleep(5 * 1000); + assertTrue(isCbTripped()); + } + + /** + * Tests circuit breaker behavior with only node-level limit configured. + * Expected behavior: + * 1. Set cluster limit high (10kb) to ensure it doesn't interfere + * 2. Set node limit to 1kb + * 3. Load indices consuming 2kb + * 4. Circuit breaker should trip because node limit (1kb) < memory needed (2kb) + */ + private void testNodeLevelCircuitBreaker() throws Exception { + + // Set cluster-level limit high to ensure it doesn't interfere + updateClusterSettings("knn.memory.circuit_breaker.limit", "10kb"); + + // Set node-level limit to 1kb (half of what indices require) + updateClusterSettings("knn.memory.circuit_breaker.limit.integ", "1kb"); + + // Load indices into cache + search(INDEX_1, INDEX_2); + + // Verify circuit breaker tripped with 1kb node limit + Thread.sleep(5 * 1000); + assertTrue(isCbTripped()); + + // Increase node limit to 4kb - should untrip CB and show 50% usage + updateClusterSettings("knn.memory.circuit_breaker.limit.integ", "4kb"); + + // Load indices again + search(INDEX_1, INDEX_2); + + // Verify CB untripped and correct memory usage + Thread.sleep(5 * 1000); + verifyCbUntrips(); + + // The contents of the cache should take about 2kb with the current test. + // This could change in the future depending on the cache library and other factors + // Verify value is greater than what the percentage would be if the cluster level circuit breaker was in play with 2kb + Assert.assertTrue(getGraphMemoryPercentage() > 20.0); + } + + private void generateCbLoad(String indexName1, String indexName2) throws Exception { // Create index with 1 primary and numNodes-1 replicas so that the data will be on every node in the cluster int numNodes = Integer.parseInt(System.getProperty("cluster.number_of_nodes", "1")); Settings settings = Settings.builder() @@ -44,9 +107,6 @@ private void tripCb() throws Exception { .put(KNNSettings.INDEX_KNN_ADVANCED_APPROXIMATE_THRESHOLD, ALWAYS_BUILD_GRAPH) .build(); - String indexName1 = INDEX_NAME + "1"; - String indexName2 = INDEX_NAME + "2"; - createKnnIndex(indexName1, settings, createKnnIndexMapping(FIELD_NAME, 2)); createKnnIndex(indexName2, settings, createKnnIndexMapping(FIELD_NAME, 2)); @@ -60,7 +120,9 @@ private void tripCb() throws Exception { forceMergeKnnIndex(indexName1); forceMergeKnnIndex(indexName2); + } + private void search(String indexName1, String indexName2) throws IOException { // Execute search on both indices - will cause eviction float[] qvector = { 1.9f, 2.4f }; int k = 10; @@ -70,10 +132,16 @@ private void tripCb() throws Exception { searchKNNIndex(indexName1, new KNNQueryBuilder(FIELD_NAME, qvector, k), k); searchKNNIndex(indexName2, new KNNQueryBuilder(FIELD_NAME, qvector, k), k); } + } - // Give cluster 5 seconds to update settings and then assert that Cb get triggered - Thread.sleep(5 * 1000); // seconds - assertTrue(isCbTripped()); + private double getGraphMemoryPercentage() throws Exception { + Response response = getKnnStats( + Collections.emptyList(), + Collections.singletonList(StatNames.GRAPH_MEMORY_USAGE_PERCENTAGE.getName()) + ); + String responseBody = EntityUtils.toString(response.getEntity()); + List> nodeStatsResponse = parseNodeStatsResponse(responseBody); + return Double.parseDouble(nodeStatsResponse.getFirst().get(StatNames.GRAPH_MEMORY_USAGE_PERCENTAGE.getName()).toString()); } public boolean isCbTripped() throws Exception { @@ -84,12 +152,17 @@ public boolean isCbTripped() throws Exception { } public void testCbTripped() throws Exception { - tripCb(); + setupIndices(); + testClusterLevelCircuitBreaker(); + testNodeLevelCircuitBreaker(); } - public void testCbUntrips() throws Exception { - updateClusterSettings("knn.circuit_breaker.triggered", "true"); - assertTrue(isCbTripped()); + public void verifyCbUntrips() throws Exception { + + if (!isCbTripped()) { + updateClusterSettings("knn.circuit_breaker.triggered", "true"); + + } int backOffInterval = 5; // seconds for (int i = 0; i < CB_TIME_INTERVAL; i += backOffInterval) { diff --git a/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java b/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java index 961164a766..0a01703ab0 100644 --- a/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java +++ b/src/test/java/org/opensearch/knn/index/KNNSettingsTests.java @@ -42,13 +42,13 @@ public class KNNSettingsTests extends KNNTestCase { public void testGetSettingValueFromConfig() { long expectedKNNCircuitBreakerLimit = 13; Node mockNode = createMockNode( - Map.of(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT, "\"" + expectedKNNCircuitBreakerLimit + "kb\"") + Map.of(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT, "\"" + expectedKNNCircuitBreakerLimit + "kb\"") ); mockNode.start(); ClusterService clusterService = mockNode.injector().getInstance(ClusterService.class); KNNSettings.state().setClusterService(clusterService); long actualKNNCircuitBreakerLimit = ((ByteSizeValue) KNNSettings.state() - .getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)).getKb(); + .getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT)).getKb(); mockNode.close(); assertEquals(expectedKNNCircuitBreakerLimit, actualKNNCircuitBreakerLimit); } @@ -60,11 +60,11 @@ public void testGetSettingValueDefault() { ClusterService clusterService = mockNode.injector().getInstance(ClusterService.class); KNNSettings.state().setClusterService(clusterService); long actualKNNCircuitBreakerLimit = ((ByteSizeValue) KNNSettings.state() - .getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT)).getKb(); + .getSettingValue(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT)).getKb(); mockNode.close(); assertEquals( - ((ByteSizeValue) KNNSettings.dynamicCacheSettings.get(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT).getDefault(Settings.EMPTY)) - .getKb(), + ((ByteSizeValue) KNNSettings.dynamicCacheSettings.get(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT) + .getDefault(Settings.EMPTY)).getKb(), actualKNNCircuitBreakerLimit ); diff --git a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java index 5cdedf11b6..45ccf0e9d0 100644 --- a/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java +++ b/src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java @@ -294,7 +294,7 @@ public void testGetIndexGraphCount() throws ExecutionException, IOException { public void testGetMaxCacheSizeInKB() { NativeMemoryCacheManager nativeMemoryCacheManager = new NativeMemoryCacheManager(); - assertEquals(KNNSettings.getCircuitBreakerLimit().getKb(), nativeMemoryCacheManager.getMaxCacheSizeInKilobytes()); + assertEquals(KNNSettings.getClusterCbLimit().getKb(), nativeMemoryCacheManager.getMaxCacheSizeInKilobytes()); nativeMemoryCacheManager.close(); } diff --git a/src/test/java/org/opensearch/knn/index/query/KNNWeightTests.java b/src/test/java/org/opensearch/knn/index/query/KNNWeightTests.java index 8dbddd276a..591c4b1d01 100644 --- a/src/test/java/org/opensearch/knn/index/query/KNNWeightTests.java +++ b/src/test/java/org/opensearch/knn/index/query/KNNWeightTests.java @@ -137,15 +137,15 @@ public static void setUpClass() throws Exception { final KNNSettings knnSettings = mock(KNNSettings.class); knnSettingsMockedStatic = mockStatic(KNNSettings.class); when(knnSettings.getSettingValue(eq(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_ENABLED))).thenReturn(true); - when(knnSettings.getSettingValue(eq(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT))).thenReturn(CIRCUIT_BREAKER_LIMIT_100KB); + when(knnSettings.getSettingValue(eq(KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT))).thenReturn(CIRCUIT_BREAKER_LIMIT_100KB); when(knnSettings.getSettingValue(eq(KNNSettings.KNN_CACHE_ITEM_EXPIRY_ENABLED))).thenReturn(false); when(knnSettings.getSettingValue(eq(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES))).thenReturn(TimeValue.timeValueMinutes(10)); final ByteSizeValue v = ByteSizeValue.parseBytesSizeValue( CIRCUIT_BREAKER_LIMIT_100KB, - KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT + KNNSettings.KNN_MEMORY_CIRCUIT_BREAKER_CLUSTER_LIMIT ); - knnSettingsMockedStatic.when(KNNSettings::getCircuitBreakerLimit).thenReturn(v); + knnSettingsMockedStatic.when(KNNSettings::getClusterCbLimit).thenReturn(v); knnSettingsMockedStatic.when(KNNSettings::state).thenReturn(knnSettings); knnSettingsMockedStatic.when(KNNSettings::isKNNPluginEnabled).thenReturn(true); ByteSizeValue cacheSize = ByteSizeValue.parseBytesSizeValue("1024kb", QUANTIZATION_STATE_CACHE_SIZE_LIMIT); // Setting 1MB as an