Skip to content

Commit 03840d1

Browse files
shards assignment to local Node when fanout flag is disabled (#1749) (#1756)
* shards assignment to local Node when fanout flag is disabled Signed-off-by: Riya Saxena <[email protected]> * shards assignment to local Node when fanout flag is disabled Signed-off-by: Riya Saxena <[email protected]> * shards assignment to local Node when fanout flag is disabled Signed-off-by: Riya Saxena <[email protected]> * shards assignment to local Node when fanout flag is disabled Signed-off-by: Riya Saxena <[email protected]> * tests fix Signed-off-by: Riya Saxena <[email protected]> * tests fix Signed-off-by: Riya Saxena <[email protected]> --------- Signed-off-by: Riya Saxena <[email protected]> (cherry picked from commit fee62b5)
1 parent ed9c4d9 commit 03840d1

File tree

3 files changed

+77
-3
lines changed

3 files changed

+77
-3
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
239239
shards.remove("index")
240240
shards.remove("shards_count")
241241

242-
val nodeMap = getNodes(monitorCtx)
242+
/**
243+
* if fanout flag is disabled and force assign all shards to local node
244+
* thus effectively making the fan-out a single node operation.
245+
* This is done to avoid de-dupe Alerts generated by Aggregation Sigma Rules
246+
**/
247+
val localNode = monitorCtx.clusterService!!.localNode()
248+
val nodeMap: Map<String, DiscoveryNode> = if (docLevelMonitorInput?.fanoutEnabled == true) {
249+
getNodes(monitorCtx)
250+
} else {
251+
logger.info("Fan-out is disabled for chained findings monitor ${monitor.id}")
252+
mapOf(localNode.id to localNode)
253+
}
254+
243255
val nodeShardAssignments = distributeShards(
244256
monitorCtx,
245257
nodeMap.keys.toList(),

alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2791,6 +2791,68 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
27912791
}
27922792
}
27932793

2794+
fun `test document-level monitor fanout disabled approach when aliases contain indices with multiple shards`() {
2795+
val aliasName = "test-alias"
2796+
createIndexAlias(
2797+
aliasName,
2798+
"""
2799+
"properties" : {
2800+
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
2801+
"test_field" : { "type" : "keyword" },
2802+
"number" : { "type" : "keyword" }
2803+
}
2804+
""".trimIndent(),
2805+
"\"index.number_of_shards\": 7"
2806+
)
2807+
2808+
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
2809+
val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery), false)
2810+
2811+
val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
2812+
val monitor = createMonitor(
2813+
randomDocumentLevelMonitor(
2814+
inputs = listOf(docLevelInput),
2815+
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))),
2816+
enabled = true,
2817+
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
2818+
)
2819+
)
2820+
2821+
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
2822+
val testDoc = """{
2823+
"@timestamp": "$testTime",
2824+
"message" : "This is an error from IAD region",
2825+
"test_strict_date_time" : "$testTime",
2826+
"test_field" : "us-west-2"
2827+
}"""
2828+
indexDoc(aliasName, "1", testDoc)
2829+
indexDoc(aliasName, "2", testDoc)
2830+
indexDoc(aliasName, "4", testDoc)
2831+
indexDoc(aliasName, "5", testDoc)
2832+
indexDoc(aliasName, "6", testDoc)
2833+
indexDoc(aliasName, "7", testDoc)
2834+
OpenSearchTestCase.waitUntil(
2835+
{ searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 },
2836+
2,
2837+
TimeUnit.MINUTES
2838+
)
2839+
2840+
rolloverDatastream(aliasName)
2841+
indexDoc(aliasName, "11", testDoc)
2842+
indexDoc(aliasName, "12", testDoc)
2843+
indexDoc(aliasName, "14", testDoc)
2844+
indexDoc(aliasName, "15", testDoc)
2845+
indexDoc(aliasName, "16", testDoc)
2846+
indexDoc(aliasName, "17", testDoc)
2847+
OpenSearchTestCase.waitUntil(
2848+
{ searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 },
2849+
2,
2850+
TimeUnit.MINUTES
2851+
)
2852+
2853+
deleteDataStream(aliasName)
2854+
}
2855+
27942856
fun `test execute monitor generates alerts and findings with renewable locks`() {
27952857
val testIndex = createTestIndex()
27962858
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))

sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void onFailure(Exception e) {
136136
};
137137
} else if (runMonitorParam.equals("multiple")) {
138138
SampleRemoteMonitorInput2 input2 = new SampleRemoteMonitorInput2("hello",
139-
new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of()))));
139+
new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of())), true));
140140
BytesStreamOutput out1 = new BytesStreamOutput();
141141
input2.writeTo(out1);
142142
BytesReference input1Serialized1 = out1.bytes();
@@ -220,7 +220,7 @@ public void onFailure(Exception e) {
220220
sampleRemoteDocLevelMonitorInput.writeTo(out2);
221221
BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes();
222222

223-
DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList());
223+
DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList(), true);
224224
RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput);
225225

226226
Monitor remoteDocLevelMonitor = new Monitor(

0 commit comments

Comments
 (0)