diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 4d9e3818b..83a1a01bd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -349,6 +349,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT, LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE, LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES, + AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED, DestinationSettings.EMAIL_USERNAME, DestinationSettings.EMAIL_PASSWORD, DestinationSettings.ALLOW_LIST, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index de63ab727..c8a6bb47c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -26,8 +26,6 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext -import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT -import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction @@ -236,6 +234,28 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } } + val fieldsToBeQueried = mutableSetOf() + if (monitorCtx.fetchOnlyQueryFieldNames) { + for (it in queries) { + if (it.queryFieldNames.isEmpty()) { + fieldsToBeQueried.clear() + logger.debug( + "Monitor ${monitor.id} : " + + "Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " + + "Cannot optimize monitor to fetch only query-relevant fields. " + + "Querying entire doc source." + ) + break + } + fieldsToBeQueried.addAll(it.queryFieldNames) + } + if (fieldsToBeQueried.isNotEmpty()) + logger.debug( + "Monitor ${monitor.id} Querying only fields " + + "${fieldsToBeQueried.joinToString()} instead of entire _source of documents" + ) + } + // Prepare DocumentExecutionContext for each index val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) @@ -252,6 +272,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { docsToQueries, updatedIndexNames, concreteIndicesSeenSoFar, + ArrayList(fieldsToBeQueried) ) } } @@ -686,6 +707,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { docsToQueries: MutableMap>, monitorInputIndices: List, concreteIndices: List, + fieldsToBeQueried: List, ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { @@ -700,8 +722,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() { shard, prevSeqNo, maxSeqNo, - null, - docIds + docIds, + fieldsToBeQueried ) val startTime = System.currentTimeMillis() transformedDocs.addAll( @@ -792,8 +814,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() { shard: String, prevSeqNo: Long?, maxSeqNo: Long, - query: String?, docIds: List? = null, + fieldsToFetch: List, ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -801,10 +823,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - if (query != null) { - boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) - } - if (!docIds.isNullOrEmpty()) { boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) } @@ -818,6 +836,14 @@ class DocumentLevelMonitorRunner : MonitorRunner() { .query(boolQueryBuilder) .size(10000) ) + .preference(Preference.PRIMARY_FIRST.type()) + + if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToFetch.isNotEmpty()) { + request.source().fetchSource(false) + for (field in fieldsToFetch) { + request.source().fetchField(field) + } + } val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { logger.error("Failed search shard. Response: $response") @@ -908,7 +934,11 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ): List> { return hits.mapNotNull(fun(hit: SearchHit): Pair? { try { - val sourceMap = hit.sourceAsMap + val sourceMap = if (hit.hasSource()) { + hit.sourceAsMap + } else { + constructSourceMapFromFieldsInHit(hit) + } transformDocumentFieldNames( sourceMap, conflictingFields, @@ -929,6 +959,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() { }) } + private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap { + if (hit.fields == null) + return mutableMapOf() + val sourceMap: MutableMap = mutableMapOf() + for (field in hit.fields) { + if (field.value.values != null && field.value.values.isNotEmpty()) + if (field.value.values.size == 1) { + sourceMap[field.key] = field.value.values[0] + } else sourceMap[field.key] = field.value.values + } + return sourceMap + } + /** * Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names * but different mappings & [fieldNameSuffixPattern] to field names which have unique names. @@ -986,7 +1029,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { * */ private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean { - var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) + var thresholdPercentage = monitorCtx.percQueryDocsSizeMemoryPercentageLimit val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes @@ -994,7 +1037,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs: Int, monitorCtx: MonitorRunnerExecutionContext): Boolean { - var maxNumDocsThreshold = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) + var maxNumDocsThreshold = monitorCtx.percQueryMaxNumDocsInMemory return numDocs >= maxNumDocsThreshold } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 2e72af40b..043ae88d4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -50,5 +50,9 @@ data class MonitorRunnerExecutionContext( @Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, @Volatile var indexTimeout: TimeValue? = null, - @Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE + @Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, + @Volatile var fetchOnlyQueryFieldNames: Boolean = true, + @Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, + @Volatile var percQueryDocsSizeMemoryPercentageLimit: Int = + AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 7e91ca842..85f79a961 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -26,11 +26,14 @@ import org.opensearch.alerting.script.TriggerExecutionContext import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings @@ -182,6 +185,23 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.findingsIndexBatchSize = it } + monitorCtx.fetchOnlyQueryFieldNames = DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED) { + monitorCtx.fetchOnlyQueryFieldNames = it + } + + monitorCtx.percQueryMaxNumDocsInMemory = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY) { + monitorCtx.percQueryMaxNumDocsInMemory = it + } + + monitorCtx.percQueryDocsSizeMemoryPercentageLimit = + PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings + .addSettingsUpdateConsumer(PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT) { + monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it + } + return this } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index d106db95d..6f822006c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -18,6 +18,8 @@ class AlertingSettings { companion object { const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000 + const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000 + const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10 val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -45,7 +47,17 @@ class AlertingSettings { */ val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting( "plugins.alerting.monitor.percolate_query_max_num_docs_in_memory", - 300000, 1000, + DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, 1000, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + /** + * Boolean setting to enable/disable optimizing doc level monitors by fetchign only fields mentioned in queries. + * Enabled by default. If disabled, will fetch entire source of documents while fetch data from shards. + */ + val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting( + "plugins.alerting.monitor.doc_level_monitor_query_field_names_enabled", + true, Setting.Property.NodeScope, Setting.Property.Dynamic ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index c4106bef4..7c1b1c15c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.common.xcontent.json.JsonXContent @@ -236,6 +237,152 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Alert saved for test monitor", 0, alerts.size) } + fun `test dryrun execute monitor with queryFieldNames set up with correct field`() { + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + + val docQuery = + DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf(), queryFieldNames = listOf("test_field")) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(1, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] + as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) + } + + fun `test dryrun execute monitor with queryFieldNames set up with wrong field`() { + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + // using wrong field name + val docQuery = DocLevelQuery( + query = "test_field:\"us-west-2\"", + name = "3", + fields = listOf(), + queryFieldNames = listOf("wrong_field") + ) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(0, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] + as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) + } + + fun `test fetch_query_field_names setting is disabled by configuring queryFieldNames set up with wrong field still works`() { + adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.key, "false") + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + // using wrong field name + val docQuery = DocLevelQuery( + query = "test_field:\"us-west-2\"", + name = "3", + fields = listOf(), + queryFieldNames = listOf("wrong_field") + ) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(1, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] + as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) + } + fun `test execute monitor returns search result with dryrun`() { val testIndex = createTestIndex() val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 3ddffa73b..132bb3933 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -365,6 +365,204 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertEquals("Didn't match query", 1, findings[0].docLevelQueries.size) } + fun `test all fields fetched and submitted to percolate query when one of the queries doesn't have queryFieldNames`() { + // doesn't have query field names so even if other queries pass the wrong fields to query, findings will get generated on matching docs + val docQuery1 = DocLevelQuery( + query = "source.ip.v6.v1:12345", + name = "3", + fields = listOf() + ) + val docQuery2 = DocLevelQuery( + query = "source.ip.v6.v2:16645", + name = "4", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery3 = DocLevelQuery( + query = "source.ip.v4.v0:120", + name = "5", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery4 = + DocLevelQuery( + query = "alias.some.fff:\"us-west-2\"", + name = "6", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery5 = DocLevelQuery( + query = "message:\"This is an error from IAD region\"", + name = "7", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1"), + fields = listOf() + ) + val docQuery6 = + DocLevelQuery( + query = "type.subtype:\"some subtype\"", + name = "8", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery7 = + DocLevelQuery( + query = "supertype.type:\"some type\"", + name = "9", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5, docQuery6, docQuery7) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + // Trying to test here few different "nesting" situations and "wierd" characters + val testDoc = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v1" : 12345, + "source.ip.v6.v2" : 16645, + "source.ip.v4.v0" : 120, + "test_bad_char" : "\u0000", + "test_strict_date_time" : "$testTime", + "test_field.some_other_field" : "us-west-2", + "type.subtype" : "some subtype", + "supertype.type" : "some type" + }""" + indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") + ) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertEquals("Didn't match all 7 queries", 7, findings[0].docLevelQueries.size) + } + + fun `test percolate query failure when queryFieldNames has alias`() { + // doesn't have query field names so even if other queries pass the wrong fields to query, findings will get generated on matching docs + val docQuery1 = DocLevelQuery( + query = "source.ip.v6.v1:12345", + name = "3", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery2 = DocLevelQuery( + query = "source.ip.v6.v2:16645", + name = "4", + fields = listOf(), + queryFieldNames = listOf("source.ip.v6.v2") + ) + val docQuery3 = DocLevelQuery( + query = "source.ip.v4.v0:120", + name = "5", + fields = listOf(), + queryFieldNames = listOf("source.ip.v6.v4") + ) + val docQuery4 = + DocLevelQuery( + query = "alias.some.fff:\"us-west-2\"", + name = "6", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff") + ) + val docQuery5 = DocLevelQuery( + query = "message:\"This is an error from IAD region\"", + name = "7", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1"), + fields = listOf() + ) + val docQuery6 = + DocLevelQuery( + query = "type.subtype:\"some subtype\"", + name = "8", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery7 = + DocLevelQuery( + query = "supertype.type:\"some type\"", + name = "9", + fields = listOf(), + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5, docQuery6, docQuery7) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + // Trying to test here few different "nesting" situations and "wierd" characters + val testDoc = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v1" : 12345, + "source.ip.v6.v2" : 16645, + "source.ip.v4.v0" : 120, + "test_bad_char" : "\u0000", + "test_strict_date_time" : "$testTime", + "test_field.some_other_field" : "us-west-2", + "type.subtype" : "some subtype", + "supertype.type" : "some type" + }""" + indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") + ) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 0) + searchAlerts(id) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + Assert.assertTrue(getAlertsResponse.alerts[0].state.toString().equals(Alert.State.ERROR.toString())) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 0, findings.size) + } + fun `test execute monitor with custom query index`() { val q1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") val q2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4")