Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doc level monitor optimization: Fetch only relevant fields instead of the entire source. #1386

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,5 @@ run {
useCluster testClusters.integTest
}

// Only apply jacoco test coverage if we are running a local single node cluster
if (!usingRemoteCluster && !usingMultiNode) {
apply from: '../build-tools/opensearchplugin-coverage.gradle'
}

apply from: '../build-tools/pkgbuild.gradle'
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT,
LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE,
LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED,
DestinationSettings.EMAIL_USERNAME,
DestinationSettings.EMAIL_PASSWORD,
DestinationSettings.ALLOW_LIST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
Expand Down Expand Up @@ -57,6 +58,7 @@ import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.search.SearchHit
import org.opensearch.search.SearchHits
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
Expand Down Expand Up @@ -215,6 +217,21 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
}
}
val fieldsToBeQueried = mutableSetOf<String>()

for (it in queries) {
if (it.queryFieldNames.isEmpty()) {
fieldsToBeQueried.clear()
logger.debug(
"Monitor ${monitor.id} : " +
"Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " +
"Cannot optimize monitor to fetch only query-relevant fields. " +
"Querying entire doc source."
)
break
}
fieldsToBeQueried.addAll(it.queryFieldNames)
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)
Expand All @@ -226,7 +243,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
updatedIndexName,
concreteIndexName,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName)
matchingDocIdsPerIndex?.get(concreteIndexName),
ArrayList(fieldsToBeQueried)
)

if (matchingDocs.isNotEmpty()) {
Expand Down Expand Up @@ -501,9 +519,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

try {
publishFinding(monitor, monitorCtx, finding)
} catch (e: Exception) {
// suppress exception
logger.error("Optional finding callback failed", e)
} catch (_: Exception) {
}
return finding.id
}
Expand Down Expand Up @@ -605,7 +621,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
index: String,
concreteIndex: String,
conflictingFields: List<String>,
docIds: List<String>? = null
docIds: List<String>? = null,
fieldsToBeQueried: List<String>
): List<Pair<String, BytesReference>> {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val matchingDocs = mutableListOf<Pair<String, BytesReference>>()
Expand All @@ -622,7 +639,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
prevSeqNo,
maxSeqNo,
null,
docIds
docIds,
fieldsToBeQueried
)

if (hits.hits.isNotEmpty()) {
Expand All @@ -642,7 +660,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
prevSeqNo: Long?,
maxSeqNo: Long,
query: String?,
docIds: List<String>? = null
docIds: List<String>? = null,
fieldsToFetch: List<String>,
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
Expand All @@ -668,10 +687,19 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.size(10000) // fixme: make this configurable.
)
.preference(Preference.PRIMARY_FIRST.type())

if (DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) && fieldsToFetch.isNotEmpty()) {
logger.error("PERF_DEBUG: Query field names: ${fieldsToFetch.joinToString() }}")
request.source().fetchSource(false)
for (field in fieldsToFetch) {
request.source().fetchField(field)
}
}
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: $shard")
}
logger.error("Monitor ${monitorCtx.client} PERF_DEBUG: Percolate query time taken = ")
return response.hits
}

Expand Down Expand Up @@ -730,7 +758,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
conflictingFields: List<String>
): List<Pair<String, BytesReference>> {
return hits.map { hit ->
val sourceMap = hit.sourceAsMap
val sourceMap = if (hit.hasSource()) {
hit.sourceAsMap
} else {
logger.error("PERF_DEBUG:Building percolate query source docs from relevant fields only")
constructSourceMapFromFieldsInHit(hit)
}

transformDocumentFieldNames(
sourceMap,
Expand All @@ -750,6 +783,19 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}

private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap<String, Any> {
if (hit.fields == null)
return mutableMapOf()
val sourceMap: MutableMap<String, Any> = mutableMapOf()
for (field in hit.fields) {
if (field.value.values != null && field.value.values.isNotEmpty())
if (field.value.values.size == 1) {
sourceMap[field.key] = field.value.values[0]
} else sourceMap[field.key] = field.value.values
}
return sourceMap
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names
* but different mappings & [fieldNameSuffixPattern] to field names which have unique names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ class AlertingSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/**
*/
val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting(
"plugins.alerting.monitor.doc_level_monitor_fetch_only_query_fields_enabled",
true,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val INPUT_TIMEOUT = Setting.positiveTimeSetting(
"plugins.alerting.input_timeout",
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
},
"fields": {
"type": "text"
},
"query_field_names": {
"type": "keyword"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}

fun `test execute monitor with custom alerts index`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docQuery = DocLevelQuery(
query = "test_field:\"us-west-2\"",
name = "3",
fields = listOf(),
queryFieldNames = listOf("test_field", "non_existent_field_name")
)
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customAlertsIndex = "custom_alerts_index"
Expand Down
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies {
exclude group: 'com.google.guava'
}
implementation 'com.google.guava:guava:32.0.1-jre'
api "org.opensearch:common-utils:${common_utils_version}@jar"
api files("/Users/snistala/Documents/opensearch/common-utils/build/libs/common-utils-3.0.0.0-SNAPSHOT.jar")
implementation 'commons-validator:commons-validator:1.7'

testImplementation "org.opensearch.test:framework:${opensearch_version}"
Expand Down
Loading