diff --git a/alerting/build.gradle b/alerting/build.gradle index efaff1b6f..86087e5f7 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -424,9 +424,5 @@ run { useCluster testClusters.integTest } -// Only apply jacoco test coverage if we are running a local single node cluster -if (!usingRemoteCluster && !usingMultiNode) { - apply from: '../build-tools/opensearchplugin-coverage.gradle' -} apply from: '../build-tools/pkgbuild.gradle' diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 0a80f33ae..f2c77a4ea 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -334,6 +334,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT, LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE, LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES, + AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED, DestinationSettings.EMAIL_USERNAME, DestinationSettings.EMAIL_PASSWORD, DestinationSettings.ALLOW_LIST, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 3b8e4dee7..f9cd5233d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -22,6 +22,7 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction @@ -57,6 +58,7 @@ import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders import org.opensearch.percolator.PercolateQueryBuilderExt +import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder @@ -215,6 +217,21 @@ object DocumentLevelMonitorRunner : MonitorRunner() { indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) } } + val fieldsToBeQueried = mutableSetOf() + + for (it in queries) { + if (it.queryFieldNames.isEmpty()) { + fieldsToBeQueried.clear() + logger.debug( + "Monitor ${monitor.id} : " + + "Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " + + "Cannot optimize monitor to fetch only query-relevant fields. " + + "Querying entire doc source." + ) + break + } + fieldsToBeQueried.addAll(it.queryFieldNames) + } // Prepare DocumentExecutionContext for each index val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) @@ -226,7 +243,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { updatedIndexName, concreteIndexName, conflictingFields.toList(), - matchingDocIdsPerIndex?.get(concreteIndexName) + matchingDocIdsPerIndex?.get(concreteIndexName), + ArrayList(fieldsToBeQueried) ) if (matchingDocs.isNotEmpty()) { @@ -501,9 +519,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { try { publishFinding(monitor, monitorCtx, finding) - } catch (e: Exception) { - // suppress exception - logger.error("Optional finding callback failed", e) + } catch (_: Exception) { } return finding.id } @@ -605,7 +621,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { index: String, concreteIndex: String, conflictingFields: List, - docIds: List? = null + docIds: List? = null, + fieldsToBeQueried: List ): List> { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int val matchingDocs = mutableListOf>() @@ -622,7 +639,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { prevSeqNo, maxSeqNo, null, - docIds + docIds, + fieldsToBeQueried ) if (hits.hits.isNotEmpty()) { @@ -642,7 +660,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { prevSeqNo: Long?, maxSeqNo: Long, query: String?, - docIds: List? = null + docIds: List? = null, + fieldsToFetch: List, ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -668,10 +687,19 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .size(10000) // fixme: make this configurable. ) .preference(Preference.PRIMARY_FIRST.type()) + + if (DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) && fieldsToFetch.isNotEmpty()) { + logger.error("PERF_DEBUG: Query field names: ${fieldsToFetch.joinToString() }}") + request.source().fetchSource(false) + for (field in fieldsToFetch) { + request.source().fetchField(field) + } + } val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { throw IOException("Failed to search shard: $shard") } + logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query time taken = ") return response.hits } @@ -730,7 +758,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { conflictingFields: List ): List> { return hits.map { hit -> - val sourceMap = hit.sourceAsMap + val sourceMap = if (hit.hasSource()) { + hit.sourceAsMap + } else { + logger.error("PERF_DEBUG:Building percolate query source docs from relevant fields only") + constructSourceMapFromFieldsInHit(hit) + } transformDocumentFieldNames( sourceMap, @@ -750,6 +783,19 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } + private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap { + if (hit.fields == null) + return mutableMapOf() + val sourceMap: MutableMap = mutableMapOf() + for (field in hit.fields) { + if (field.value.values != null && field.value.values.isNotEmpty()) + if (field.value.values.size == 1) { + sourceMap[field.key] = field.value.values[0] + } else sourceMap[field.key] = field.value.values + } + return sourceMap + } + /** * Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names * but different mappings & [fieldNameSuffixPattern] to field names which have unique names. diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index e23d44c5b..64189268a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -24,6 +24,14 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + /** + */ + val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting( + "plugins.alerting.monitor.doc_level_monitor_fetch_only_query_fields_enabled", + true, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val INPUT_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.input_timeout", LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT, diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json index d2ecc0907..1bfea4ebc 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json @@ -49,6 +49,9 @@ }, "fields": { "type": "text" + }, + "query_field_names": { + "type": "keyword" } } }, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 2a3527dd1..de2adee4c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -135,7 +135,12 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { } fun `test execute monitor with custom alerts index`() { - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docQuery = DocLevelQuery( + query = "test_field:\"us-west-2\"", + name = "3", + fields = listOf(), + queryFieldNames = listOf("test_field", "non_existent_field_name") + ) val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) val customAlertsIndex = "custom_alerts_index" diff --git a/core/build.gradle b/core/build.gradle index b1ecf7eac..7aeb8c284 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -19,7 +19,7 @@ dependencies { exclude group: 'com.google.guava' } implementation 'com.google.guava:guava:32.0.1-jre' - api "org.opensearch:common-utils:${common_utils_version}@jar" + api files("/Users/snistala/Documents/opensearch/common-utils/build/libs/common-utils-3.0.0.0-SNAPSHOT.jar") implementation 'commons-validator:commons-validator:1.7' testImplementation "org.opensearch.test:framework:${opensearch_version}"