Skip to content

Commit

Permalink
add UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Jun 9, 2024
1 parent d5ba7f5 commit fac3bbd
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRet
long indexAgeMillis = Instant.now().toEpochMilli() - creationTime;
if (indexAgeMillis > historyRetentionPeriod.millis()) {
String indexName = indexMetaData.getIndex().getName();
System.out.println("indexName: " + indexName);
candidates.add(indexName);
if (latest < creationTime) {
latest = creationTime;
Expand All @@ -318,7 +317,7 @@ protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRet
}
if (candidates.size() > 1) {
// delete all indices except the last one because the last one may contain docs newer than the retention period
//candidates.remove(latestToDelete);
candidates.remove(latestToDelete);
String[] toDelete = candidates.toArray(Strings.EMPTY_ARRAY);
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(toDelete);
adminClient.indices().delete(deleteIndexRequest, ActionListener.wrap(deleteIndexResponse -> {
Expand Down Expand Up @@ -1285,9 +1284,7 @@ private void handleCustomResultIndex(Config config, IndexType resultIndex) {

// add rollover conditions if found in config
if (config.getCustomResultIndexMinAge() != null) {
rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueMinutes(1));

// rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueDays(config.getCustomResultIndexMinAge()));
rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueDays(config.getCustomResultIndexMinAge()));
}
if (config.getCustomResultIndexMinSize() != null) {
rolloverRequest.addMaxIndexSizeCondition(new ByteSizeValue(config.getCustomResultIndexMinSize(), ByteSizeUnit.MB));
Expand Down Expand Up @@ -1339,8 +1336,7 @@ private void proceedWithRolloverAndDelete(
if (resultIndexAlias.startsWith(ADCommonName.CUSTOM_RESULT_INDEX_PREFIX) || resultIndexAlias.startsWith(CUSTOM_RESULT_INDEX_PREFIX)) {
// handle custom result index deletion
if (customResultIndexTtl != null) {
// deleteOldHistoryIndices(allResultIndicesPattern, TimeValue.timeValueHours(customResultIndexTtl * 24));
deleteOldHistoryIndices(allResultIndicesPattern, TimeValue.timeValueMinutes(1));
deleteOldHistoryIndices(allResultIndicesPattern, TimeValue.timeValueHours(customResultIndexTtl * 24));

}
} else {
Expand Down
170 changes: 168 additions & 2 deletions src/test/java/org/opensearch/ad/indices/RolloverTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,100 @@
package org.opensearch.ad.indices;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.timeseries.TestHelpers.createSearchResponse;
import static org.opensearch.timeseries.constant.CommonName.CONFIG_INDEX;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import com.google.common.collect.ImmutableList;
import org.apache.lucene.search.TotalHits;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.rollover.Condition;
import org.opensearch.action.admin.indices.rollover.MaxAgeCondition;
import org.opensearch.action.admin.indices.rollover.MaxDocsCondition;
import org.opensearch.action.admin.indices.rollover.MaxSizeCondition;
import org.opensearch.action.admin.indices.rollover.RolloverRequest;
import org.opensearch.action.admin.indices.rollover.RolloverResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.mock.model.MockSimpleLog;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.recycler.Recycler;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AbstractTimeSeriesTest;
import org.opensearch.timeseries.TestHelpers;
import org.opensearch.timeseries.function.BiCheckedFunction;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.model.Feature;
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

public class RolloverTests extends AbstractTimeSeriesTest {
private ADIndexManagement adIndices;
private IndicesAdminClient indicesClient;
private ClusterAdminClient clusterAdminClient;
private Client client;
private ClusterName clusterName;
private ClusterState clusterState;
private ClusterService clusterService;
private NamedXContentRegistry namedXContentRegistry;
private long defaultMaxDocs;
private int numberOfNodes;

@Override
public void setUp() throws Exception {
super.setUp();
Client client = mock(Client.class);
client = mock(Client.class);
indicesClient = mock(IndicesAdminClient.class);
AdminClient adminClient = mock(AdminClient.class);
clusterService = mock(ClusterService.class);
Expand Down Expand Up @@ -98,14 +138,16 @@ public void setUp() throws Exception {
numberOfNodes = 2;
when(nodeFilter.getNumberOfEligibleDataNodes()).thenReturn(numberOfNodes);

namedXContentRegistry = TestHelpers.xContentRegistry();

adIndices = new ADIndexManagement(
client,
clusterService,
threadPool,
settings,
nodeFilter,
TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES,
NamedXContentRegistry.EMPTY
namedXContentRegistry
);

clusterAdminClient = mock(ClusterAdminClient.class);
Expand Down Expand Up @@ -248,4 +290,128 @@ public void testRetryingDelete() {
// 1 group delete, 1 separate retry for each index to delete
verify(indicesClient, times(2)).delete(any(), any());
}

public void testNoCustomResultIndexFound_RolloverDefaultResultIndex_shouldSucceed() {
setUpGetConfigs_withNoCustomResultIndexAlias();
setUpRolloverSuccess();

adIndices.rolloverAndDeleteHistoryIndex();
verify(indicesClient, times(1)).rolloverIndex(any(), any());
verify(client, times(1)).search(any(), any());
}

public void testCustomResultIndexFound_RolloverCustomResultIndex_withConditions_shouldSucceed() throws IOException {
setUpGetConfigs_withCustomResultIndexAlias();
setUpRolloverSuccessForCustomIndex();

adIndices.rolloverAndDeleteHistoryIndex();

verify(indicesClient, times(1)).rolloverIndex(any(), any());
verify(client, times(1)).search(any(), any());
}

private void setUpGetConfigs_withNoCustomResultIndexAlias() {
Metadata.Builder metaBuilder = Metadata
.builder()
.put(indexMeta(".opendistro-anomaly-detectors", 1L, ADCommonName.ANOMALY_RESULT_INDEX_ALIAS), true);
clusterState = ClusterState.builder(clusterName).metadata(metaBuilder.build()).build();
when(clusterService.state()).thenReturn(clusterState);

String detectorString = "{\"name\":\"AhtYYGWTgqkzairTchcs\",\"description\":\"iIiAVPMyFgnFlEniLbMyfJxyoGvJAl\","
+ "\"time_field\":\"HmdFH\",\"indices\":[\"ffsBF\"],\"filter_query\":{\"bool\":{\"filter\":[{\"exists\":"
+ "{\"field\":\"value\",\"boost\":1}}],\"adjust_pure_negative\":true,\"boost\":1}},\"window_delay\":"
+ "{\"period\":{\"interval\":2,\"unit\":\"Minutes\"}},\"shingle_size\":8,\"schema_version\":-512063255,"
+ "\"feature_attributes\":[{\"feature_id\":\"OTYJs\",\"feature_name\":\"eYYCM\",\"feature_enabled\":false,"
+ "\"aggregation_query\":{\"XzewX\":{\"value_count\":{\"field\":\"ok\"}}}}],\"recency_emphasis\":3342,"
+ "\"history\":62,\"last_update_time\":1717192049845,\"category_field\":[\"Tcqcb\"],\"customResultIndexOrAlias\":"
+ "\"\",\"imputation_option\":{\"method\":\"FIXED_VALUES\",\"defaultFill\""
+ ":[],\"integerSensitive\":false},\"suggested_seasonality\":64,\"detection_interval\":{\"period\":"
+ "{\"interval\":5,\"unit\":\"Minutes\"}},\"detector_type\":\"MULTI_ENTITY\",\"rules\":[]}";

doAnswer(invocation -> {
ActionListener<SearchResponse> listener = invocation.getArgument(1);
SearchHit config = SearchHit.fromXContent(TestHelpers.parser(detectorString));
SearchHits searchHits = new SearchHits(new SearchHit[] { config }, new TotalHits(1, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse response = new InternalSearchResponse(
searchHits,
InternalAggregations.EMPTY,
null,
null,
false,
null,
1
);
SearchResponse searchResponse = new SearchResponse(
response,
null,
1,
1,
0,
100,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
);
listener.onResponse(searchResponse);
return null;
}).when(client).search(any(), any());
}

private void setUpRolloverSuccessForCustomIndex() {
doAnswer(invocation -> {
RolloverRequest request = invocation.getArgument(0);
@SuppressWarnings("unchecked")
ActionListener<RolloverResponse> listener = (ActionListener<RolloverResponse>) invocation.getArgument(1);

assertEquals("opensearch-ad-plugin-result-", request.indices()[0]);
Map<String, Condition<?>> conditions = request.getConditions();
assertEquals(2, conditions.size());
assertEquals(new MaxAgeCondition(TimeValue.timeValueDays(7)), conditions.get(MaxAgeCondition.NAME));
assertEquals(new MaxSizeCondition(new ByteSizeValue(51200, ByteSizeUnit.MB)), conditions.get(MaxSizeCondition.NAME));

CreateIndexRequest createIndexRequest = request.getCreateIndexRequest();
assertEquals("<opensearch-ad-plugin-result--history-{now/d}-1>", createIndexRequest.index());
assertTrue(createIndexRequest.mappings().contains("data_start_time"));
listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), true, true, true));
return null;
}).when(indicesClient).rolloverIndex(any(), any());
}

private void setUpGetConfigs_withCustomResultIndexAlias() throws IOException {
IndexMetadata defaultResultIndex = IndexMetadata.builder(".opendistro-anomaly-detectors")
.settings(settings(Version.CURRENT))
.putAlias(AliasMetadata.builder(ADCommonName.ANOMALY_RESULT_INDEX_ALIAS).writeIndex(true).build())
.numberOfShards(1)
.numberOfReplicas(0)
.build();
IndexMetadata customResultIndex = IndexMetadata.builder("opensearch-ad-plugin-result-test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(0)
.putAlias(AliasMetadata.builder(ADCommonName.CUSTOM_RESULT_INDEX_PREFIX).writeIndex(true).build())
.build();

clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
.metadata(Metadata.builder().put(defaultResultIndex, false).put(customResultIndex, false).build())
.build();

when(clusterService.state()).thenReturn(clusterState);

String detectorStringWithCustomResultIndex = "{\"name\":\"todagtCMkwpcaedpyYUM\",\"description\":\"ClrcaMpuLfeDSlVduRcKlqPZyqWDBf\"," +
"\"time_field\":\"dJRwh\",\"indices\":[\"eIrgWMqAED\"],\"feature_attributes\":[{\"feature_id\":\"lxYRN\"," +
"\"feature_name\":\"eqSeU\",\"feature_enabled\":true,\"aggregation_query\":{\"aa\":{\"value_count\":{\"field\":\"ok\"}}}}]," +
"\"detection_interval\":{\"period\":{\"interval\":425,\"unit\":\"Minutes\"}}," +
"\"window_delay\":{\"period\":{\"interval\":973,\"unit\":\"Minutes\"}},\"shingle_size\":4,\"schema_version\":-1203962153," +
"\"ui_metadata\":{\"JbAaV\":{\"feature_id\":\"rIFjS\",\"feature_name\":\"QXCmS\",\"feature_enabled\":false," +
"\"aggregation_query\":{\"aa\":{\"value_count\":{\"field\":\"ok\"}}}}},\"last_update_time\":1568396089028," +
"\"result_index\":\"opensearch-ad-plugin-result-\",\"result_index_min_size\":51200,\"result_index_min_age\":7}";

AnomalyDetector parsedDetector = AnomalyDetector.parse(TestHelpers.parser(detectorStringWithCustomResultIndex), "id", 1L, null, null);

doAnswer(invocation -> {
Object[] args = invocation.getArguments();
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) args[1];
listener.onResponse(createSearchResponse(parsedDetector));
return null;
}).when(client).search(any(), any());
}
}

0 comments on commit fac3bbd

Please sign in to comment.