Skip to content

Commit

Permalink
add more it
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Jan 27, 2025
1 parent b876a22 commit 4400f44
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1020,36 +1020,39 @@ public <T> void initCustomResultIndexAndExecute(String resultIndexOrAlias, Execu
* creates flattened result index
* @param flattenedResultIndexAlias the flattened result index alias
* @param actionListener the action listener
* @throws IOException
*/
public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener<CreateIndexResponse> actionListener)
throws IOException {
String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias);
logger.info("Initializing flattened result index: {}", indexName);
public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener<Void> actionListener) {
try {
String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias);
logger.info("Initializing flattened result index: {}", indexName);

CreateIndexRequest request = new CreateIndexRequest(indexName)
.mapping(getFlattenedResultMappings(), XContentType.JSON)
.settings(settings);
CreateIndexRequest request = new CreateIndexRequest(indexName)
.mapping(getFlattenedResultMappings(), XContentType.JSON)
.settings(settings);

if (flattenedResultIndexAlias != null) {
request.alias(new Alias(flattenedResultIndexAlias));
}
if (flattenedResultIndexAlias != null) {
request.alias(new Alias(flattenedResultIndexAlias));
}

choosePrimaryShards(request, false);
choosePrimaryShards(request, false);

adminClient.indices().create(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
actionListener.onResponse(response);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
logger.error(errorMsg);
actionListener.onFailure(new IllegalStateException(errorMsg));
}
}, exception -> {
logger.error("Failed to create flattened result index: {}", indexName, exception);
actionListener.onFailure(exception);
}));
adminClient.indices().create(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
actionListener.onResponse(null);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
logger.error(errorMsg);
actionListener.onFailure(new IllegalStateException(errorMsg));
}
}, exception -> {
logger.error("Failed to create flattened result index: {}", indexName, exception);
actionListener.onFailure(exception);
}));
} catch (IOException e) {
logger.error("Error while building mappings for flattened result index: {}", flattenedResultIndexAlias, e);
actionListener.onFailure(e);
}
}

public String getFlattenedResultIndexAlias(String indexOrAliasName, String configId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,20 @@ private void handleFlattenResultIndexMappingUpdate(ActionListener<T> listener) {
return;
}
if (config.getFlattenResultIndexMapping() != null && config.getFlattenResultIndexMapping()) {
setupIngestPipeline(id, listener);
String flattenedResultIndexAlias = timeSeriesIndices
.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), config.getId());
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(config.getId());
timeSeriesIndices
.initFlattenedResultIndex(
flattenedResultIndexAlias,
ActionListener.wrap(initResponse -> setupIngestPipeline(config.getId(), ActionListener.wrap(pipelineResponse -> {
updateResultIndexSetting(
pipelineId,
flattenedResultIndexAlias,
ActionListener.wrap(updateResponse -> listener.onResponse(updateResponse), listener::onFailure)
);
}, listener::onFailure)), listener::onFailure)
);
} else {
String pipelineId = timeSeriesIndices.getFlattenResultIndexIngestPipelineId(config.getId());
client.admin().cluster().deletePipeline(new DeletePipelineRequest(pipelineId), new ActionListener<AcknowledgedResponse>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public void testUpdateAnomalyDetector_enableFlattenResultIndex_shouldCreatePipel
// create a detector with flatten result index disabled, shouldn't find related ingest pipeline
Response response = TestHelpers
.makeRequest(client(), "POST", TestHelpers.AD_BASE_DETECTORS_URI, ImmutableMap.of(), TestHelpers.toHttpEntity(detector), null);
assertEquals("Create anomaly detector with flattened result index failed", RestStatus.CREATED, TestHelpers.restStatus(response));
assertEquals("Create anomaly detector without flattened result index failed", RestStatus.CREATED, TestHelpers.restStatus(response));
Map<String, Object> responseMap = entityAsMap(response);
String id = (String) responseMap.get("_id");
String expectedPipelineId = String.format(Locale.ROOT, "flatten_result_index_ingest_pipeline%s", id.toLowerCase(Locale.ROOT));
Expand Down

0 comments on commit 4400f44

Please sign in to comment.