Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor settings management for plugin #1393

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Upgrade guava to 32.1.3 [#1319](https://github.com/opensearch-project/k-NN/pull/1319)
* Bump lucene codec to 99 [#1383](https://github.com/opensearch-project/k-NN/pull/1383)
### Refactoring
* Refactor settings management for plugin [#1393](https://github.com/opensearch-project/k-NN/pull/1393)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import static org.opensearch.knn.TestUtils.KNN_BWC_PREFIX;
import static org.opensearch.knn.TestUtils.NODES_BWC_CLUSTER;
import static org.opensearch.knn.index.KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY;
import static org.opensearch.knn.index.codec.KNNCodecService.KNN_ALGO_PARAM_INDEX_THREAD_QTY;

public class RecallIT extends AbstractRestartUpgradeTestCase {
private static final String TEST_INDEX_RECALL_OLD = KNN_BWC_PREFIX + "test-index-recall-old";
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/opensearch/knn/index/IndexUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.common.ValidationException;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.mapper.KNNVectorFieldMapper;
import org.opensearch.knn.index.mapper.LegacyFieldMapper;
import org.opensearch.knn.index.util.KNNEngine;
import org.opensearch.knn.indices.ModelDao;
import org.opensearch.knn.indices.ModelMetadata;
Expand Down Expand Up @@ -242,7 +243,7 @@ public static Map<String, Object> getParametersAtLoading(SpaceType spaceType, KN
// For nmslib, we need to add the dynamic ef_search parameter that needs to be passed in when the
// hnsw graphs are loaded into memory
if (KNNEngine.NMSLIB.equals(knnEngine)) {
loadParameters.put(HNSW_ALGO_EF_SEARCH, KNNSettings.getEfSearchParam(indexName));
loadParameters.put(HNSW_ALGO_EF_SEARCH, LegacyFieldMapper.getEfSearchParam(indexName));
}

return Collections.unmodifiableMap(loadParameters);
Expand Down
133 changes: 91 additions & 42 deletions src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,95 @@

package org.opensearch.knn.index;

import org.opensearch.OpenSearchParseException;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.plugin.stats.StatNames;
import org.opensearch.knn.plugin.transport.KNNStatsAction;
import org.opensearch.knn.plugin.transport.KNNStatsNodeResponse;
import org.opensearch.knn.plugin.transport.KNNStatsRequest;
import org.opensearch.knn.plugin.transport.KNNStatsResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.monitor.os.OsProbe;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.opensearch.common.settings.Setting.Property.Dynamic;
import static org.opensearch.common.settings.Setting.Property.NodeScope;
import static org.opensearch.core.common.unit.ByteSizeValue.parseBytesSizeValue;

/**
* Runs the circuit breaker logic and updates the settings
*/
public class KNNCircuitBreaker {
public static final String KNN_MEMORY_CIRCUIT_BREAKER_ENABLED = "knn.memory.circuit_breaker.enabled";
public static final Setting<Boolean> KNN_MEMORY_CIRCUIT_BREAKER_ENABLED_SETTING = Setting.boolSetting(
KNN_MEMORY_CIRCUIT_BREAKER_ENABLED,
true,
NodeScope,
Dynamic
);
public static final String KNN_MEMORY_CIRCUIT_BREAKER_LIMIT = "knn.memory.circuit_breaker.limit";
public static final String KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT = "50%";
public static final Setting<ByteSizeValue> KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_SETTING = new Setting<>(
KNN_MEMORY_CIRCUIT_BREAKER_LIMIT,
KNN_DEFAULT_MEMORY_CIRCUIT_BREAKER_LIMIT,
KNNCircuitBreaker::parseByteSizeValue,
NodeScope,
Dynamic
);

private static ByteSizeValue parseByteSizeValue(String sValue) {
if (sValue != null && sValue.endsWith("%")) {
final String percentAsString = sValue.substring(0, sValue.length() - 1);
try {
final double percent = Double.parseDouble(percentAsString);
if (percent < 0 || percent > 100) {
throw new OpenSearchParseException("percentage should be in [0-100], got [{}]", percentAsString);

Check warning on line 55 in src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java#L55

Added line #L55 was not covered by tests
}
long physicalMemoryInBytes = OsProbe.getInstance().getTotalPhysicalMemorySize();
if (physicalMemoryInBytes <= 0) {
throw new IllegalStateException("Physical memory size could not be determined");

Check warning on line 59 in src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java#L59

Added line #L59 was not covered by tests
}
long esJvmSizeInBytes = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
long eligibleMemoryInBytes = physicalMemoryInBytes - esJvmSizeInBytes;
return new ByteSizeValue((long) ((percent / 100) * eligibleMemoryInBytes), ByteSizeUnit.BYTES);
} catch (NumberFormatException e) {
throw new OpenSearchParseException("failed to parse [{}] as a double", e, percentAsString);

Check warning on line 65 in src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java#L64-L65

Added lines #L64 - L65 were not covered by tests
}
} else {
return parseBytesSizeValue(sValue, KNN_MEMORY_CIRCUIT_BREAKER_LIMIT);
}
}

public static final String KNN_CIRCUIT_BREAKER_TRIGGERED = "knn.circuit_breaker.triggered";
public static final Setting<Boolean> KNN_CIRCUIT_BREAKER_TRIGGERED_SETTING = Setting.boolSetting(
KNN_CIRCUIT_BREAKER_TRIGGERED,
false,
NodeScope,
Dynamic
);

public static final String KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE = "knn.circuit_breaker.unset.percentage";
public static final Integer KNN_DEFAULT_CIRCUIT_BREAKER_UNSET_PERCENTAGE = 75;
public static final Setting<Double> KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE_SETTING = Setting.doubleSetting(
KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE,
KNN_DEFAULT_CIRCUIT_BREAKER_UNSET_PERCENTAGE,
0,
100,
NodeScope,
Dynamic
);

private static Logger logger = LogManager.getLogger(KNNCircuitBreaker.class);
public static int CB_TIME_INTERVAL = 2 * 60; // seconds

private static KNNCircuitBreaker INSTANCE;
private ThreadPool threadPool;
private ClusterService clusterService;
private Client client;

private KNNCircuitBreaker() {}

Expand All @@ -51,16 +113,15 @@
INSTANCE = instance;
}

public void initialize(ThreadPool threadPool, ClusterService clusterService, Client client) {
public void initialize(ThreadPool threadPool, ClusterService clusterService) {
this.threadPool = threadPool;
this.clusterService = clusterService;
this.client = client;
NativeMemoryCacheManager nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();
Runnable runnable = () -> {
if (nativeMemoryCacheManager.isCacheCapacityReached() && clusterService.localNode().isDataNode()) {
long currentSizeKiloBytes = nativeMemoryCacheManager.getCacheSizeInKilobytes();
long circuitBreakerLimitSizeKiloBytes = KNNSettings.getCircuitBreakerLimit().getKb();
long circuitBreakerUnsetSizeKiloBytes = (long) ((KNNSettings.getCircuitBreakerUnsetPercentage() / 100)
long circuitBreakerLimitSizeKiloBytes = KNNCircuitBreakerUtil.instance().getCircuitBreakerLimit().getKb();
long circuitBreakerUnsetSizeKiloBytes = (long) ((KNNCircuitBreakerUtil.instance().getCircuitBreakerUnsetPercentage() / 100)

Check warning on line 124 in src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java#L123-L124

Added lines #L123 - L124 were not covered by tests
* circuitBreakerLimitSizeKiloBytes);
/**
* Unset capacityReached flag if currentSizeBytes is less than circuitBreakerUnsetSizeBytes
Expand All @@ -71,37 +132,25 @@
}

// Leader node untriggers CB if all nodes have not reached their max capacity
if (KNNSettings.isCircuitBreakerTriggered() && clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
KNNStatsRequest knnStatsRequest = new KNNStatsRequest();
knnStatsRequest.addStat(StatNames.CACHE_CAPACITY_REACHED.getName());
knnStatsRequest.timeout(new TimeValue(1000 * 10)); // 10 second timeout

if (KNNCircuitBreakerUtil.instance().isCircuitBreakerTriggered()
&& clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
List<String> nodesAtMaxCapacity;
try {
KNNStatsResponse knnStatsResponse = client.execute(KNNStatsAction.INSTANCE, knnStatsRequest).get();
List<KNNStatsNodeResponse> nodeResponses = knnStatsResponse.getNodes();

List<String> nodesAtMaxCapacity = new ArrayList<>();
for (KNNStatsNodeResponse nodeResponse : nodeResponses) {
if ((Boolean) nodeResponse.getStatsMap().get(StatNames.CACHE_CAPACITY_REACHED.getName())) {
nodesAtMaxCapacity.add(nodeResponse.getNode().getId());
}
}

if (!nodesAtMaxCapacity.isEmpty()) {
logger.info(
"[KNN] knn.circuit_breaker.triggered stays set. Nodes at max cache capacity: "
+ String.join(",", nodesAtMaxCapacity)
+ "."
);
} else {
logger.info(
"[KNN] Cache capacity below 75% of the circuit breaker limit for all nodes."
+ " Unsetting knn.circuit_breaker.triggered flag."
);
KNNSettings.state().updateCircuitBreakerSettings(false);
}
} catch (Exception e) {
logger.error("[KNN] Exception getting stats: " + e);
nodesAtMaxCapacity = KNNCircuitBreakerUtil.instance().getNodesAtMaxCapacity();
} catch (ExecutionException | InterruptedException e) {
logger.error("Unable to get knn stats and determine if any nodes are at capacity", e);
return;

Check warning on line 142 in src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java#L140-L142

Added lines #L140 - L142 were not covered by tests
}

if (!nodesAtMaxCapacity.isEmpty()) {
logger.info(

Check warning on line 146 in src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java#L146

Added line #L146 was not covered by tests
"[KNN] knn.circuit_breaker.triggered stays set. Nodes at max cache capacity: "
+ String.join(",", nodesAtMaxCapacity)

Check warning on line 148 in src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java#L148

Added line #L148 was not covered by tests
+ "."
);
} else {
logger.info("No nodes are at max cache capacity. Unsetting knn.circuit_breaker.triggered flag.");
KNNCircuitBreakerUtil.instance().updateCircuitBreakerSettings(false);
}
}
};
Expand Down
140 changes: 140 additions & 0 deletions src/main/java/org/opensearch/knn/index/KNNCircuitBreakerUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index;

import lombok.extern.log4j.Log4j2;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.knn.plugin.stats.StatNames;
import org.opensearch.knn.plugin.transport.KNNStatsAction;
import org.opensearch.knn.plugin.transport.KNNStatsNodeResponse;
import org.opensearch.knn.plugin.transport.KNNStatsRequest;
import org.opensearch.knn.plugin.transport.KNNStatsResponse;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

import static org.opensearch.knn.index.KNNCircuitBreaker.KNN_CIRCUIT_BREAKER_TRIGGERED;
import static org.opensearch.knn.index.KNNCircuitBreaker.KNN_CIRCUIT_BREAKER_TRIGGERED_SETTING;
import static org.opensearch.knn.index.KNNCircuitBreaker.KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE_SETTING;
import static org.opensearch.knn.index.KNNCircuitBreaker.KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_SETTING;

/**
* Singleton utility class to interact with the circuit breaker.
*/
@Log4j2
public class KNNCircuitBreakerUtil {
private static KNNCircuitBreakerUtil instance;

private KNNCircuitBreakerUtil() {}

private Client client;

/**
* Return instance of the KNNCircuitBreakerUtil, must be initialized first for proper usage
* @return instance of KNNCircuitBreakerUtil
*/
public static synchronized KNNCircuitBreakerUtil instance() {
if (instance == null) {
instance = new KNNCircuitBreakerUtil();
}
return instance;
}

/**
*
* @param client client for interfacing with the cluster
*/
public void initialize(final Client client) {
this.client = Objects.requireNonNull(client, "client must not be null");
}

/**
* Get the nodes in cluster that have reached the maximum capacity
*
* @return list of IDs for nodes that have reached the max capacity
*/
public List<String> getNodesAtMaxCapacity() throws ExecutionException, InterruptedException {
KNNStatsRequest knnStatsRequest = new KNNStatsRequest();
knnStatsRequest.addStat(StatNames.CACHE_CAPACITY_REACHED.getName());
knnStatsRequest.timeout(new TimeValue(1000 * 10)); // 10 second timeout

KNNStatsResponse knnStatsResponse = client.execute(KNNStatsAction.INSTANCE, knnStatsRequest).get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since initialize is outside object creation, can you add non-null check for client?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

List<KNNStatsNodeResponse> nodeResponses = knnStatsResponse.getNodes();

List<String> nodesAtMaxCapacity = new ArrayList<>();
for (KNNStatsNodeResponse nodeResponse : nodeResponses) {
if ((Boolean) nodeResponse.getStatsMap().get(StatNames.CACHE_CAPACITY_REACHED.getName())) {
nodesAtMaxCapacity.add(nodeResponse.getNode().getId());

Check warning on line 78 in src/main/java/org/opensearch/knn/index/KNNCircuitBreakerUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/knn/index/KNNCircuitBreakerUtil.java#L78

Added line #L78 was not covered by tests
}
}

return nodesAtMaxCapacity;
}

/**
* Updates knn.circuit_breaker.triggered setting to true/false
* @param flag true/false
*/
public synchronized void updateCircuitBreakerSettings(boolean flag) {
ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = new ClusterUpdateSettingsRequest();
Settings circuitBreakerSettings = Settings.builder().put(KNN_CIRCUIT_BREAKER_TRIGGERED, flag).build();
clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings);
client.admin().cluster().updateSettings(clusterUpdateSettingsRequest, new ActionListener<>() {
@Override
public void onResponse(ClusterUpdateSettingsResponse clusterUpdateSettingsResponse) {
log.debug(
"Cluster setting {}, acknowledged: {} ",
clusterUpdateSettingsRequest.persistentSettings(),
clusterUpdateSettingsResponse.isAcknowledged()
);
}

@Override
public void onFailure(Exception e) {
log.info(

Check warning on line 105 in src/main/java/org/opensearch/knn/index/KNNCircuitBreakerUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/knn/index/KNNCircuitBreakerUtil.java#L105

Added line #L105 was not covered by tests
"Exception while updating circuit breaker setting {} to {}",
clusterUpdateSettingsRequest.persistentSettings(),
e.getMessage()

Check warning on line 108 in src/main/java/org/opensearch/knn/index/KNNCircuitBreakerUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/knn/index/KNNCircuitBreakerUtil.java#L107-L108

Added lines #L107 - L108 were not covered by tests
);
}

Check warning on line 110 in src/main/java/org/opensearch/knn/index/KNNCircuitBreakerUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/knn/index/KNNCircuitBreakerUtil.java#L110

Added line #L110 was not covered by tests
});
}

/**
* Get if circuit breaker is triggered or not
*
* @return If circuit breaker is enabled for the cluster
*/
public boolean isCircuitBreakerTriggered() {
return KNNClusterUtil.instance().getClusterSetting(KNN_CIRCUIT_BREAKER_TRIGGERED_SETTING);
}

/**
* Get value of limit for circuit breaker
*
* @return Value of circuit breaker limit as ByteSizeValue
*/
public ByteSizeValue getCircuitBreakerLimit() {
return KNNClusterUtil.instance().getClusterSetting(KNN_MEMORY_CIRCUIT_BREAKER_LIMIT_SETTING);
}

/**
* Get value of circuit breaker unset percentage setting
*
* @return Value as double of unset circuit breaker percentage
*/
public double getCircuitBreakerUnsetPercentage() {
return KNNClusterUtil.instance().getClusterSetting(KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE_SETTING);

Check warning on line 138 in src/main/java/org/opensearch/knn/index/KNNCircuitBreakerUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/knn/index/KNNCircuitBreakerUtil.java#L138

Added line #L138 was not covered by tests
}
}
23 changes: 23 additions & 0 deletions src/main/java/org/opensearch/knn/index/KNNClusterUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;

/**
* Class abstracts information related to underlying OpenSearch cluster
Expand All @@ -34,6 +36,7 @@ public static synchronized KNNClusterUtil instance() {

/**
* Initializes instance of cluster context by injecting dependencies
*
* @param clusterService
*/
public void initialize(final ClusterService clusterService) {
Expand All @@ -55,4 +58,24 @@ public Version getClusterMinVersion() {
return Version.CURRENT;
}
}

/**
* Get setting value for the cluster. Return default if not set.
*
* @param <T> Setting type
* @return T setting value or default
*/
public <T> T getClusterSetting(Setting<T> setting) {
return clusterService.getClusterSettings().get(setting);
}

/**
* Get index metadata for a particular index
*
* @param indexName Name of the index
* @return IndexMetadata for the given index
*/
public IndexMetadata getIndexMetadata(String indexName) {
return clusterService.state().getMetadata().index(indexName);
}
}
Loading
Loading