Skip to content

Commit

Permalink
test5
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Jun 9, 2024
1 parent 051acf0 commit 2eb8a82
Showing 1 changed file with 103 additions and 18 deletions.
121 changes: 103 additions & 18 deletions src/main/java/org/opensearch/timeseries/indices/IndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package org.opensearch.timeseries.indices;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.forecast.constant.ForecastCommonName.CUSTOM_RESULT_INDEX_PREFIX;
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;

import java.io.IOException;
Expand Down Expand Up @@ -51,6 +52,8 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.cluster.LocalNodeClusterManagerListener;
Expand All @@ -66,9 +69,12 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParser.Token;
import org.opensearch.forecast.indices.ForecastIndex;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -283,7 +289,7 @@ protected void choosePrimaryShards(CreateIndexRequest request, boolean hiddenInd
);
}

protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRetentionPeriod) {
protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRetentionPeriod, Integer customResultIndexTtl) {
Set<String> candidates = new HashSet<String>();

ClusterStateRequest clusterStateRequest = new ClusterStateRequest()
Expand All @@ -296,9 +302,11 @@ protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRet
adminClient.cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
String latestToDelete = null;
long latest = Long.MIN_VALUE;
long customTtlMillis = (customResultIndexTtl != null) ? customResultIndexTtl * 24 * 60 * 60 * 1000L : Long.MAX_VALUE;
for (IndexMetadata indexMetaData : clusterStateResponse.getState().metadata().indices().values()) {
long creationTime = indexMetaData.getCreationDate();
if ((Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis()) {
long indexAgeMillis = Instant.now().toEpochMilli() - creationTime;
if (indexAgeMillis > historyRetentionPeriod.millis() || indexAgeMillis > customTtlMillis) {
String indexName = indexMetaData.getIndex().getName();
candidates.add(indexName);
if (latest < creationTime) {
Expand Down Expand Up @@ -1077,7 +1085,7 @@ public void onClusterManager() {

// schedule the next rollover for approx MAX_AGE later
scheduledRollover = threadPool
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName());
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), TimeValue.timeValueMinutes(5), executorName());
} catch (Exception e) {
// This should be run on cluster startup
logger.error("Error rollover result indices. " + "Can't rollover result until clusterManager node is restarted.", e);
Expand All @@ -1100,6 +1108,8 @@ protected void rescheduleRollover() {
if (scheduledRollover != null) {
scheduledRollover.cancel();
}
System.out.println(5);

scheduledRollover = threadPool
.scheduleWithFixedDelay(() -> rolloverAndDeleteHistoryIndex(), historyRolloverPeriod, executorName());
}
Expand Down Expand Up @@ -1234,35 +1244,110 @@ protected void rolloverAndDeleteHistoryIndex(
String rolloverIndexPattern,
IndexType resultIndex
) {
if (!doesDefaultResultIndexExist()) {
return;
System.out.println("resultIndexAlias: " + resultIndexAlias);
System.out.println("allResultIndicesPattern: " + allResultIndicesPattern);
System.out.println("rolloverIndexPattern: " + rolloverIndexPattern);
System.out.println("resultIndex: " + resultIndex.getIndexName());

// build rollover request for default result index
RolloverRequest defaultResultIndexRolloverRequest = buildRolloverRequest(resultIndexAlias, rolloverIndexPattern);

// get config files that have custom result index alias to perform rollover on
getConfigsWithCustomResultIndexAlias(ActionListener.wrap(candidateResultAliases -> {
if (candidateResultAliases == null || candidateResultAliases.isEmpty()) {
// no custom result index alias found
if (!doesDefaultResultIndexExist()) {
// no default result index found either
return;
}
// perform rollover and delete on default result index
proceedWithDefaultRolloverAndDelete(
resultIndexAlias,
defaultResultIndexRolloverRequest,
allResultIndicesPattern,
resultIndex
);
logger.info("Candidate custom result indices are empty.");
return;
}

System.out.println("size: " + candidateResultAliases.size());

// perform rollover and delete on found custom result index alias
candidateResultAliases.forEach(config -> handleCustomResultIndex(config, resultIndex));

}, e -> {
logger.error("Failed to get configs with custom result index alias.", e);
// perform rollover and delete on default result index if getting error on getting custom result index alias
proceedWithDefaultRolloverAndDelete(resultIndexAlias, defaultResultIndexRolloverRequest, allResultIndicesPattern, resultIndex);
}));
}

private void handleCustomResultIndex(Config config, IndexType resultIndex) {
System.out.println("detector name: " + config.getName());
System.out.println("custom index name: " + config.getCustomResultIndexOrAlias());
RolloverRequest rolloverRequest = buildRolloverRequest(
config.getCustomResultIndexOrAlias(),
getCustomResultIndexPattern(config.getCustomResultIndexOrAlias())
);

// add rollover conditions if found in config
if (config.getCustomResultIndexMinAge() != null) {
rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueMinutes(10));

// rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueDays(config.getCustomResultIndexMinAge()));
}
if (config.getCustomResultIndexMinSize() != null) {
rolloverRequest.addMaxIndexSizeCondition(new ByteSizeValue(config.getCustomResultIndexMinSize(), ByteSizeUnit.MB));
}

// We have to pass null for newIndexName in order to get Elastic to increment the index count.
RolloverRequest rollOverRequest = new RolloverRequest(resultIndexAlias, null);
// perform rollover and delete on custom result index alias
proceedWithRolloverAndDelete(
config.getCustomResultIndexOrAlias(),
rolloverRequest,
getAllCustomResultIndexPattern(config.getCustomResultIndexOrAlias()),
resultIndex,
config.getCustomResultIndexTTL()
);
}

private void proceedWithDefaultRolloverAndDelete(
String resultIndexAlias,
RolloverRequest rolloverRequest,
String allResultIndicesPattern,
IndexType resultIndex
) {
proceedWithRolloverAndDelete(resultIndexAlias, rolloverRequest, allResultIndicesPattern, resultIndex, null);
}

private RolloverRequest buildRolloverRequest(String resultIndexAlias, String rolloverIndexPattern) {
RolloverRequest rollOverRequest = new RolloverRequest(resultIndexAlias, null);
CreateIndexRequest createRequest = rollOverRequest.getCreateIndexRequest();

createRequest.index(rolloverIndexPattern).mapping(resultMapping, XContentType.JSON);

choosePrimaryShards(createRequest, true);

rollOverRequest.addMaxIndexDocsCondition(historyMaxDocs * getNumberOfPrimaryShards());

return rollOverRequest;
}

private void proceedWithRolloverAndDelete(
String resultIndexAlias,
RolloverRequest rollOverRequest,
String allResultIndicesPattern,
IndexType resultIndex,
Integer customResultIndexTtl
) {
adminClient.indices().rolloverIndex(rollOverRequest, ActionListener.wrap(response -> {
if (!response.isRolledOver()) {
logger.warn("{} not rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus());
} else {
IndexState indexStatetate = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping()));
indexStatetate.mappingUpToDate = true;
IndexState indexState = indexStates.computeIfAbsent(resultIndex, k -> new IndexState(k.getMapping()));
indexState.mappingUpToDate = true;
logger.info("{} rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus());
deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod);
deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod, customResultIndexTtl);
}
}, exception -> {
// e.g., we may roll over too often. Since the index pattern is opensearch-ad-plugin-result-d-history-{now/d}-000001,
// we cannot roll over twice in the same day as the index with the same name exists. We will get
// resource_already_exists_exception.
logger.error("Fail to roll over result index", exception);
}));
}, exception -> { logger.error("Fail to roll over result index", exception); }));
}

protected void initResultIndexDirectly(
Expand Down

0 comments on commit 2eb8a82

Please sign in to comment.