Skip to content

Commit

Permalink
address the comments for pr 1366
Browse files Browse the repository at this point in the history
  • Loading branch information
riysaxen-amzn committed Jan 9, 2024
1 parent a865512 commit 86d45f8
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 22 deletions.
11 changes: 0 additions & 11 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ import java.util.concurrent.TimeUnit
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.math.max

/** Service that handles CRUD operations for alerts */
class AlertService(
Expand Down Expand Up @@ -877,16 +876,6 @@ class AlertService(
return searchResponse
}

fun getCancelAfterTimeInterval(): Long {
// The default value for the cancelAfterTimeInterval is -1 and so, in this case
// we should ignore processing on the value
val givenInterval = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval!!.minutes
if (givenInterval == -1L) {
return givenInterval
}
return max(givenInterval, ALERTS_SEARCH_TIMEOUT.minutes)
}

private fun List<AlertError>?.update(alertError: AlertError?): List<AlertError> {
return when {
this == null && alertError == null -> emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.common.unit.TimeValue
Expand Down Expand Up @@ -411,8 +412,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
sr.source().query(queryBuilder)
}
sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
MonitorRunnerService
.monitorCtx.alertService!!.getCancelAfterTimeInterval()
getCancelAfterTimeInterval()
)
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
Expand Down Expand Up @@ -582,10 +583,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.query(QueryBuilders.matchAllQuery())
.size(1)
)
request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
MonitorRunnerService
.monitorCtx.alertService!!.getCancelAfterTimeInterval()
)
request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(getCancelAfterTimeInterval())

val response: SearchResponse = client.suspendUntil { client.search(request, it) }

Expand Down Expand Up @@ -675,10 +673,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.preference(Preference.PRIMARY_FIRST.type())

request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
MonitorRunnerService
.monitorCtx.alertService!!.getCancelAfterTimeInterval()
getCancelAfterTimeInterval()
)

val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: $shard")
Expand Down Expand Up @@ -720,8 +716,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

try {
searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
MonitorRunnerService
.monitorCtx.alertService!!.getCancelAfterTimeInterval()
getCancelAfterTimeInterval()
)

response = monitorCtx.client!!.suspendUntil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.alerting.util

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertService
import org.opensearch.alerting.MonitorRunnerService
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.settings.DestinationSettings
Expand All @@ -18,6 +20,7 @@ import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.ActionExecutionScope
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import kotlin.math.max

private val logger = LogManager.getLogger("AlertingUtils")

Expand Down Expand Up @@ -163,6 +166,16 @@ inline fun <T : ThreadContext.StoredContext, R> T.use(block: (T) -> R): R {
}
}

fun getCancelAfterTimeInterval(): Long {
// The default value for the cancelAfterTimeInterval is -1 and so, in this case
// we should ignore processing on the value
val givenInterval = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval!!.minutes
if (givenInterval == -1L) {
return givenInterval
}
return max(givenInterval, AlertService.ALERTS_SEARCH_TIMEOUT.minutes)
}

/**
* Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when
* it's being closed due to some other [cause] exception occurred.
Expand Down

0 comments on commit 86d45f8

Please sign in to comment.