Skip to content

Commit a6285e9

Browse files
opensearch-trigger-bot[bot]github-actions[bot]Nishtha Mehrotra
authored
Force create last run context in monitor workflow metadata when workflow is re-enabled (#1778) (#1783)
* Force create last run context in monitor worflow metadata when workflow is re-enabled * Force creation of re-enabled doc level monitors * Updated IT --------- (cherry picked from commit e2e0164) Signed-off-by: Nishtha Mehrotra <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Nishtha Mehrotra <[email protected]>
1 parent 03840d1 commit a6285e9

File tree

4 files changed

+224
-4
lines changed

4 files changed

+224
-4
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,15 @@ object MonitorMetadataService :
139139
monitor: Monitor,
140140
createWithRunContext: Boolean = true,
141141
skipIndex: Boolean = false,
142-
workflowMetadataId: String? = null
142+
workflowMetadataId: String? = null,
143+
forceCreateLastRunContext: Boolean = false
143144
): Pair<MonitorMetadata, Boolean> {
144145
try {
145146
val created = true
146-
val metadata = getMetadata(monitor, workflowMetadataId)
147+
var metadata = getMetadata(monitor, workflowMetadataId)
148+
if (forceCreateLastRunContext) {
149+
metadata = metadata?.copy(lastRunContext = createUpdatedRunContext(monitor))
150+
}
147151
return if (metadata != null) {
148152
metadata to !created
149153
} else {
@@ -159,6 +163,20 @@ object MonitorMetadataService :
159163
}
160164
}
161165

166+
private suspend fun createUpdatedRunContext(
167+
monitor: Monitor
168+
): Map<String, MutableMap<String, Any>> {
169+
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
170+
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
171+
else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
172+
(monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0]
173+
else null
174+
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
175+
createFullRunContext(monitorIndex)
176+
else emptyMap()
177+
return runContext
178+
}
179+
162180
suspend fun getMetadata(monitor: Monitor, workflowMetadataId: String? = null): MonitorMetadata? {
163181
try {
164182
val metadataId = MonitorMetadata.getId(monitor, workflowMetadataId)

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,8 +709,21 @@ class TransportIndexMonitorAction @Inject constructor(
709709
)
710710
return
711711
}
712+
var isDocLevelMonitorRestarted = false
713+
// Force re-creation of last run context if monitor is of type standard doc-level/threat-intel
714+
// And monitor is re-enabled
715+
if (request.monitor.enabled && !currentMonitor.enabled &&
716+
request.monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
717+
) {
718+
isDocLevelMonitorRestarted = true
719+
}
720+
712721
var updatedMetadata: MonitorMetadata
713-
val (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
722+
val (metadata, created) = MonitorMetadataService.getOrCreateMetadata(
723+
request.monitor,
724+
forceCreateLastRunContext = isDocLevelMonitorRestarted
725+
)
726+
714727
// Recreate runContext if metadata exists
715728
// Delete and insert all queries from/to queryIndex
716729

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,10 +551,17 @@ class TransportIndexWorkflowAction @Inject constructor(
551551
val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size)
552552

553553
for (monitor in monitors) {
554+
var isWorkflowRestarted = false
555+
556+
if (request.workflow.enabled && !currentWorkflow.enabled) {
557+
isWorkflowRestarted = true
558+
}
559+
554560
val (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata(
555561
monitor = monitor,
556562
createWithRunContext = true,
557-
workflowMetadataId = workflowMetadata.id
563+
workflowMetadataId = workflowMetadata.id,
564+
forceCreateLastRunContext = isWorkflowRestarted
558565
)
559566
if (!created &&
560567
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR

alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6351,4 +6351,186 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
63516351
}
63526352
}
63536353
}
6354+
6355+
fun `test execute workflow when monitor is disabled and re-enabled`() {
6356+
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
6357+
6358+
val index1 = "index_123"
6359+
createIndex(index1, Settings.EMPTY)
6360+
val q1 = DocLevelQuery(query = "properties:\"abcd\"", name = "1", fields = listOf())
6361+
6362+
val docLevelInput = DocLevelMonitorInput(
6363+
"description",
6364+
listOf(index1),
6365+
listOf(q1)
6366+
)
6367+
6368+
val customQueryIndex = "custom_alerts_index"
6369+
6370+
val monitor = randomDocumentLevelMonitor(
6371+
inputs = listOf(docLevelInput),
6372+
triggers = listOf(trigger),
6373+
dataSources = DataSources(
6374+
queryIndex = customQueryIndex
6375+
)
6376+
)
6377+
6378+
val monitorResponse = createMonitor(monitor)!!
6379+
6380+
val workflowRequest = randomWorkflow(
6381+
monitorIds = listOf(monitorResponse.id)
6382+
)
6383+
val workflowResponse = upsertWorkflow(workflowRequest)!!
6384+
val workflowId = workflowResponse.id
6385+
val getWorkflowResponse = getWorkflowById(id = workflowResponse.id)
6386+
6387+
assertNotNull(getWorkflowResponse)
6388+
assertEquals(workflowId, getWorkflowResponse.id)
6389+
6390+
// Verify that monitor workflow metadata exists
6391+
assertNotNull(searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata"))
6392+
6393+
val testDoc1 = """{
6394+
"properties": "abcd"
6395+
}"""
6396+
indexDoc(index1, "1", testDoc1)
6397+
indexDoc(index1, "2", testDoc1)
6398+
indexDoc(index1, "3", testDoc1)
6399+
6400+
// Run workflow
6401+
var executeWorkflowResponse = executeWorkflow(workflowRequest, workflowId, false)
6402+
Assert.assertNotNull(executeWorkflowResponse)
6403+
var findings = searchFindings(monitorResponse.id)
6404+
assertEquals(3, findings.size)
6405+
6406+
// Verify that monitor workflow metadata is updated with lastRunContext
6407+
var monitorWokflowMetadata = searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata")
6408+
val lastRunContextBeforeDisable = (monitorWokflowMetadata?.lastRunContext?.get(index1) as? Map<String, Any>)
6409+
assertEquals(2, lastRunContextBeforeDisable?.get("0"))
6410+
6411+
// Disable workflow
6412+
val disabledWorkflowRequest = randomWorkflow(
6413+
monitorIds = listOf(monitorResponse.id),
6414+
id = workflowId,
6415+
enabled = false
6416+
)
6417+
upsertWorkflow(disabledWorkflowRequest, method = RestRequest.Method.PUT, id = workflowId)
6418+
6419+
// Index doc. Since workflow is disabled, monitor workflow metadata shouldn't be updated
6420+
indexDoc(index1, "4", testDoc1)
6421+
6422+
// re-enable workflow
6423+
val enabledWorkflowRequest = randomWorkflow(
6424+
monitorIds = listOf(monitorResponse.id),
6425+
id = workflowId,
6426+
enabled = true
6427+
)
6428+
upsertWorkflow(enabledWorkflowRequest, method = RestRequest.Method.PUT, id = workflowId)
6429+
6430+
// Assert no new findings generated after workflow is re-enabled
6431+
executeWorkflowResponse = executeWorkflow(workflowRequest, workflowId, false)
6432+
Assert.assertNotNull(executeWorkflowResponse)
6433+
findings = searchFindings(monitorResponse.id)
6434+
assertEquals(3, findings.size)
6435+
6436+
// Verify that monitor workflow metadata exists
6437+
// Since workflow is re-enabled, last run context should be updated with latest sequence number
6438+
monitorWokflowMetadata = searchMonitorMetadata("${workflowResponse.id}-metadata-${monitorResponse.id}-metadata")
6439+
assertNotNull(monitorWokflowMetadata)
6440+
val lastRunContext = (monitorWokflowMetadata?.lastRunContext?.get(index1) as? Map<String, Any>)
6441+
assertEquals(3, lastRunContext?.get("0"))
6442+
}
6443+
6444+
fun `test doc level monitor when it is disabled and re-enabled`() {
6445+
// Setup doc level monitor
6446+
val docQuery = DocLevelQuery(query = "eventType:\"login\"", name = "3", fields = listOf())
6447+
6448+
val docLevelInput = DocLevelMonitorInput(
6449+
"description", listOf(index), listOf(docQuery)
6450+
)
6451+
val customFindingsIndex = "custom_findings_index"
6452+
val customFindingsIndexPattern = "custom_findings_index-1"
6453+
val customQueryIndex = "custom_alerts_index"
6454+
var monitor = randomDocumentLevelMonitor(
6455+
inputs = listOf(docLevelInput),
6456+
triggers = listOf(),
6457+
dataSources = DataSources(
6458+
queryIndex = customQueryIndex,
6459+
findingsIndex = customFindingsIndex,
6460+
findingsIndexPattern = customFindingsIndexPattern
6461+
)
6462+
)
6463+
val monitorResponse = createMonitor(monitor)
6464+
assertFalse(monitorResponse?.id.isNullOrEmpty())
6465+
6466+
val testDoc = """{
6467+
"eventType" : "login"
6468+
}"""
6469+
indexDoc(index, "1", testDoc)
6470+
6471+
monitor = monitorResponse!!.monitor
6472+
val id = monitorResponse.id
6473+
6474+
// Execute monitor
6475+
var executeMonitorResponse = executeMonitor(monitor, id, false)
6476+
Assert.assertNotNull(executeMonitorResponse)
6477+
6478+
// Assert findings generated and last run context in monitor metadata is updated
6479+
var findings = searchFindings(id, customFindingsIndex)
6480+
assertEquals(1, findings.size)
6481+
6482+
var monitorMetadata = searchMonitorMetadata("${monitorResponse.id}-metadata")
6483+
val lastRunContextBeforeDisable = (monitorMetadata?.lastRunContext?.get(index) as? Map<String, Any>)
6484+
assertEquals(0, lastRunContextBeforeDisable?.get("0"))
6485+
6486+
// Disable monitor
6487+
var updateMonitorResponse = updateMonitor(
6488+
monitor.copy(
6489+
id = monitorResponse.id,
6490+
dataSources = DataSources(
6491+
queryIndex = customQueryIndex,
6492+
),
6493+
enabled = false,
6494+
enabledTime = null
6495+
),
6496+
monitorResponse.id
6497+
)
6498+
Assert.assertNotNull(updateMonitorResponse)
6499+
6500+
// Index doc. Since monitor is disabled, monitor workflow metadata shouldn't be updated
6501+
indexDoc(index, "2", testDoc)
6502+
indexDoc(index, "3", testDoc)
6503+
indexDoc(index, "4", testDoc)
6504+
6505+
executeMonitorResponse = executeMonitor(monitor, id, false)
6506+
Assert.assertNotNull(executeMonitorResponse)
6507+
6508+
// Assert no new findings since monitor was disabled
6509+
findings = searchFindings(id, customFindingsIndex)
6510+
assertEquals(1, findings.size)
6511+
6512+
// re-enable monitor
6513+
updateMonitorResponse = updateMonitor(
6514+
monitor.copy(
6515+
id = monitorResponse.id,
6516+
dataSources = DataSources(
6517+
queryIndex = customQueryIndex,
6518+
),
6519+
enabled = true,
6520+
enabledTime = Instant.now().truncatedTo(ChronoUnit.MILLIS)
6521+
),
6522+
monitorResponse.id
6523+
)
6524+
Assert.assertNotNull(updateMonitorResponse)
6525+
executeMonitorResponse = executeMonitor(monitor, id, false)
6526+
Assert.assertNotNull(executeMonitorResponse)
6527+
6528+
// Assert no new findings since monitor didnt run
6529+
findings = searchFindings(id, customFindingsIndex)
6530+
assertEquals(1, findings.size)
6531+
// Assert last run context in monitor metadata updated on enabling it, with no new findings generated
6532+
monitorMetadata = searchMonitorMetadata("${monitorResponse.id}-metadata")
6533+
val lastRunContextAfterEnable = (monitorMetadata?.lastRunContext?.get(index) as? Map<String, Any>)
6534+
assertEquals(3, lastRunContextAfterEnable?.get("0"))
6535+
}
63546536
}

0 commit comments

Comments
 (0)