Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove usage of cluster level setting for circuit breaker #2567

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Refactoring
* Small Refactor Post Lucene 10.0.1 upgrade [#2541](https://github.com/opensearch-project/k-NN/pull/2541)
* Refactor codec to leverage backwards_codecs [#2546](https://github.com/opensearch-project/k-NN/pull/2546)
* Remove usage of cluster level setting for circuit breaker [#2567](https://github.com/opensearch-project/k-NN/pull/2567)

## [Unreleased 2.x](https://github.com/opensearch-project/k-NN/compare/2.19...2.x)
### Features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.junit.Before;
import org.opensearch.Version;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.knn.plugin.stats.KNNStats;
Expand All @@ -21,7 +22,7 @@ public class StatsIT extends AbstractRollingUpgradeTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
this.knnStats = new KNNStats();
this.knnStats = new KNNStats(null, () -> Version.CURRENT);
}

// Validate if all the KNN Stats metrics from old version are present in new version
Expand Down
89 changes: 8 additions & 81 deletions src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,18 @@

package org.opensearch.knn.index;

import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.plugin.stats.StatNames;
import org.opensearch.knn.plugin.transport.KNNStatsAction;
import org.opensearch.knn.plugin.transport.KNNStatsNodeResponse;
import org.opensearch.knn.plugin.transport.KNNStatsRequest;
import org.opensearch.knn.plugin.transport.KNNStatsResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.transport.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import lombok.Getter;

/**
* Runs the circuit breaker logic and updates the settings
*/
@Getter
public class KNNCircuitBreaker {
private static Logger logger = LogManager.getLogger(KNNCircuitBreaker.class);
public static final String KNN_CIRCUIT_BREAKER_TIER = "knn_cb_tier";
public static int CB_TIME_INTERVAL = 2 * 60; // seconds

private static KNNCircuitBreaker INSTANCE;
private ThreadPool threadPool;
private ClusterService clusterService;
private Client client;

private boolean isTripped = false;

private KNNCircuitBreaker() {}

Expand All @@ -44,68 +28,11 @@ public static synchronized KNNCircuitBreaker getInstance() {
}

/**
* SetInstance of Circuit Breaker
* Set the circuit breaker flag
*
* @param instance KNNCircuitBreaker instance
* @param isTripped true if circuit breaker is tripped, false otherwise
*/
public static synchronized void setInstance(KNNCircuitBreaker instance) {
INSTANCE = instance;
}

public void initialize(ThreadPool threadPool, ClusterService clusterService, Client client) {
this.threadPool = threadPool;
this.clusterService = clusterService;
this.client = client;
NativeMemoryCacheManager nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();
Runnable runnable = () -> {
if (nativeMemoryCacheManager.isCacheCapacityReached() && clusterService.localNode().isDataNode()) {
long currentSizeKiloBytes = nativeMemoryCacheManager.getCacheSizeInKilobytes();
long circuitBreakerLimitSizeKiloBytes = KNNSettings.state().getCircuitBreakerLimit().getKb();
long circuitBreakerUnsetSizeKiloBytes = (long) ((KNNSettings.getCircuitBreakerUnsetPercentage() / 100)
* circuitBreakerLimitSizeKiloBytes);
/**
* Unset capacityReached flag if currentSizeBytes is less than circuitBreakerUnsetSizeBytes
*/
if (currentSizeKiloBytes <= circuitBreakerUnsetSizeKiloBytes) {
nativeMemoryCacheManager.setCacheCapacityReached(false);
}
}

// Leader node untriggers CB if all nodes have not reached their max capacity
if (KNNSettings.isCircuitBreakerTriggered() && clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
KNNStatsRequest knnStatsRequest = new KNNStatsRequest();
knnStatsRequest.addStat(StatNames.CACHE_CAPACITY_REACHED.getName());
knnStatsRequest.timeout(new TimeValue(1000 * 10)); // 10 second timeout

try {
KNNStatsResponse knnStatsResponse = client.execute(KNNStatsAction.INSTANCE, knnStatsRequest).get();
List<KNNStatsNodeResponse> nodeResponses = knnStatsResponse.getNodes();

List<String> nodesAtMaxCapacity = new ArrayList<>();
for (KNNStatsNodeResponse nodeResponse : nodeResponses) {
if ((Boolean) nodeResponse.getStatsMap().get(StatNames.CACHE_CAPACITY_REACHED.getName())) {
nodesAtMaxCapacity.add(nodeResponse.getNode().getId());
}
}

if (!nodesAtMaxCapacity.isEmpty()) {
logger.info(
"[KNN] knn.circuit_breaker.triggered stays set. Nodes at max cache capacity: "
+ String.join(",", nodesAtMaxCapacity)
+ "."
);
} else {
logger.info(
"[KNN] Cache capacity below 75% of the circuit breaker limit for all nodes."
+ " Unsetting knn.circuit_breaker.triggered flag."
);
KNNSettings.state().updateCircuitBreakerSettings(false);
}
} catch (Exception e) {
logger.error("[KNN] Exception getting stats: " + e);
}
}
};
this.threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueSeconds(CB_TIME_INTERVAL), ThreadPool.Names.GENERIC);
public synchronized void setTripped(boolean isTripped) {
this.isTripped = isTripped;
}
}
49 changes: 4 additions & 45 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchParseException;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Booleans;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.IndexModule;
Expand All @@ -28,7 +25,6 @@
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.monitor.os.OsProbe;
import org.opensearch.transport.client.Client;

import java.security.InvalidParameterException;
import java.util.Arrays;
Expand Down Expand Up @@ -317,7 +313,8 @@ public class KNNSettings {
KNN_CIRCUIT_BREAKER_TRIGGERED,
false,
NodeScope,
Dynamic
Dynamic,
Setting.Property.Deprecated
);

public static final Setting<Double> KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE_SETTING = Setting.doubleSetting(
Expand Down Expand Up @@ -473,8 +470,8 @@ public class KNNSettings {
private final static Map<String, Setting<?>> FEATURE_FLAGS = getFeatureFlags().stream()
.collect(toUnmodifiableMap(Setting::getKey, Function.identity()));

@Setter
private ClusterService clusterService;
private Client client;
@Setter
private Optional<String> nodeCbAttribute;

Expand Down Expand Up @@ -638,10 +635,6 @@ public static boolean isKNNPluginEnabled() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_PLUGIN_ENABLED);
}

public static boolean isCircuitBreakerTriggered() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED);
}

/**
* Retrieves the node-specific circuit breaker limit based on the existing settings.
*
Expand Down Expand Up @@ -806,8 +799,7 @@ public static boolean isShardLevelRescoringDisabledForDiskBasedVector(String ind
.getAsBoolean(KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED, false);
}

public void initialize(Client client, ClusterService clusterService) {
this.client = client;
public void initialize(ClusterService clusterService) {
this.clusterService = clusterService;
this.nodeCbAttribute = Optional.empty();
setSettingsUpdateConsumers();
Expand Down Expand Up @@ -841,35 +833,6 @@ public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, Byt
}
}

/**
* Updates knn.circuit_breaker.triggered setting to true/false
* @param flag true/false
*/
public synchronized void updateCircuitBreakerSettings(boolean flag) {
ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = new ClusterUpdateSettingsRequest();
Settings circuitBreakerSettings = Settings.builder().put(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED, flag).build();
clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings);
client.admin().cluster().updateSettings(clusterUpdateSettingsRequest, new ActionListener<ClusterUpdateSettingsResponse>() {
@Override
public void onResponse(ClusterUpdateSettingsResponse clusterUpdateSettingsResponse) {
logger.debug(
"Cluster setting {}, acknowledged: {} ",
clusterUpdateSettingsRequest.persistentSettings(),
clusterUpdateSettingsResponse.isAcknowledged()
);
}

@Override
public void onFailure(Exception e) {
logger.info(
"Exception while updating circuit breaker setting {} to {}",
clusterUpdateSettingsRequest.persistentSettings(),
e.getMessage()
);
}
});
}

public static ByteSizeValue getVectorStreamingMemoryLimit() {
return KNNSettings.state().getSettingValue(KNN_VECTOR_STREAMING_MEMORY_LIMIT_IN_MB);
}
Expand All @@ -888,10 +851,6 @@ public static int getEfSearchParam(String index) {
);
}

public void setClusterService(ClusterService clusterService) {
this.clusterService = clusterService;
}

static class SpaceTypeValidator implements Setting.Validator<String> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.lucene.util.BytesRef;
import org.opensearch.Version;
import org.opensearch.common.settings.Settings;
import org.opensearch.knn.index.KNNCircuitBreaker;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.KnnCircuitBreakerException;
import org.opensearch.knn.index.SpaceType;
Expand Down Expand Up @@ -114,9 +115,9 @@ public static int getExpectedVectorLength(final KNNVectorFieldType knnVectorFiel
* Validate if the circuit breaker is triggered
*/
static void validateIfCircuitBreakerIsNotTriggered() {
if (KNNSettings.isCircuitBreakerTriggered()) {
if (KNNCircuitBreaker.getInstance().isTripped()) {
throw new KnnCircuitBreakerException(
"Parsing the created knn vector fields prior to indexing has failed as the circuit breaker triggered. This indicates that the cluster is low on memory resources and cannot index more documents at the moment. Check _plugins/_knn/stats for the circuit breaker status."
"Parsing the created knn vector fields prior to indexing has failed as the circuit breaker triggered. This indicates that the node is low on memory resources and cannot index more documents at the moment. Check _plugins/_knn/stats for the circuit breaker status."
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.KNNCircuitBreaker;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.plugin.stats.StatNames;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class NativeMemoryCacheManager implements Closeable {
private static NativeMemoryCacheManager INSTANCE;
@Setter
private static ThreadPool threadPool;
public static int CB_TIME_INTERVAL = 20; // seconds

private Cache<String, NativeMemoryAllocation> cache;
private Deque<String> accessRecencyQueue;
Expand Down Expand Up @@ -121,9 +123,12 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
cacheCapacityReached = new AtomicBoolean(false);
accessRecencyQueue = new ConcurrentLinkedDeque<>();
cache = cacheBuilder.build();
// Set to false when initialized. This will ensure that we dont have to wait for the maintenance job
KNNCircuitBreaker.getInstance().setTripped(false);

if (threadPool != null) {
startMaintenance(cache);
circuitBreakerUpdater();
} else {
logger.warn("ThreadPool is null during NativeMemoryCacheManager initialization. Maintenance will not start.");
}
Expand Down Expand Up @@ -413,24 +418,6 @@ public void invalidateAll() {
cache.invalidateAll();
}

/**
* Returns whether or not the capacity of the cache has been reached
*
* @return Boolean of whether cache limit has been reached
*/
public Boolean isCacheCapacityReached() {
return cacheCapacityReached.get();
}

/**
* Sets cache capacity reached
*
* @param value Boolean value to set cache Capacity Reached to
*/
public void setCacheCapacityReached(Boolean value) {
cacheCapacityReached.set(value);
}

/**
* Get the stats of all of the OpenSearch indices currently loaded into the cache
*
Expand Down Expand Up @@ -461,8 +448,7 @@ private void onRemoval(RemovalNotification<String, NativeMemoryAllocation> remov
nativeMemoryAllocation.close();

if (RemovalCause.SIZE == removalNotification.getCause()) {
KNNSettings.state().updateCircuitBreakerSettings(true);
setCacheCapacityReached(true);
KNNCircuitBreaker.getInstance().setTripped(true);
}

logger.debug("[KNN] Cache evicted. Key {}, Reason: {}", removalNotification.getKey(), removalNotification.getCause());
Expand Down Expand Up @@ -500,4 +486,21 @@ private void startMaintenance(Cache<String, NativeMemoryAllocation> cacheInstanc

maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT);
}

private void circuitBreakerUpdater() {
Runnable runnable = () -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the previous approach was directly in the codepath. Are we moving to runnable tasks because we want a mechanism to unset the CB?

I would rather have it as a side effect than a constant monitor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was actually in the KNNCircuitBreaker class: https://github.com/opensearch-project/k-NN/blob/main/src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java#L60-L109. I moved it to the cache manager because it removes the circular dependency between the 2. However, the logic remains consistent

if (KNNCircuitBreaker.getInstance().isTripped()) {
long currentSizeKiloBytes = getCacheSizeInKilobytes();
long circuitBreakerLimitSizeKiloBytes = KNNSettings.state().getCircuitBreakerLimit().getKb();
long circuitBreakerUnsetSizeKiloBytes = (long) ((KNNSettings.getCircuitBreakerUnsetPercentage() / 100)
* circuitBreakerLimitSizeKiloBytes);

// Unset capacityReached flag if currentSizeBytes is less than circuitBreakerUnsetSizeBytes
if (currentSizeKiloBytes <= circuitBreakerUnsetSizeKiloBytes) {
KNNCircuitBreaker.getInstance().setTripped(false);
}
}
};
threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueSeconds(CB_TIME_INTERVAL), ThreadPool.Names.GENERIC);
}
}
Loading
Loading