Skip to content

Commit

Permalink
optimize sequence number calculation and reduce search requests in do…
Browse files Browse the repository at this point in the history
…c level monitor execution (#1445)

* optimize sequence number calculation and reduce search requests by n where n is number of shards being queried in the executino

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* fix tests

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* optimize check indices and execute to query only write index of aliases and datastreams during monitor creation

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* fix test

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* add javadoc

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* add tests to verify seq_no calculation

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

---------

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep authored and jowg-amazon committed Mar 14, 2024
1 parent 3b3fe5a commit ea67888
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.opensearch.alerting.resthandler.RestSearchMonitorAction
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.service.DeleteMonitorService
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
Expand Down Expand Up @@ -328,6 +329,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD,
AlertingSettings.ALERTING_MAX_MONITORS,
AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY,
AlertingSettings.REQUEST_TIMEOUT,
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
Expand All @@ -30,7 +30,6 @@ import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
Expand Down Expand Up @@ -59,6 +58,8 @@ import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indices.IndexClosedException
import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.search.SearchHit
import org.opensearch.search.SearchHits
Expand Down Expand Up @@ -207,7 +208,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}

// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
val indexUpdatedRunContext = initializeNewLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName,
Expand Down Expand Up @@ -255,25 +256,29 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
"${fieldsToBeQueried.joinToString()} instead of entire _source of documents"
)
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

fetchShardDataAndMaybeExecutePercolateQueries(
monitor,
monitorCtx,
docExecutionContext,
val indexExecutionContext = IndexExecutionContext(
queries,
indexLastRunContext,
indexUpdatedRunContext,
updatedIndexName,
concreteIndexName,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
)

fetchShardDataAndMaybeExecutePercolateQueries(
monitor,
monitorCtx,
indexExecutionContext,
monitorMetadata,
inputRunResults,
docsToQueries,
updatedIndexNames,
concreteIndicesSeenSoFar,
ArrayList(fieldsToBeQueried)
)
) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number
indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo
}
}
}
/* if all indices are covered still in-memory docs size limit is not breached we would need to submit
Expand Down Expand Up @@ -615,7 +620,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
)
}

private suspend fun updateLastRunContext(
private fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
Expand All @@ -624,8 +629,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard)
updatedLastRunContext[shard] = maxSeqNo.toString()
updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO.toString()
}
return updatedLastRunContext
}
Expand Down Expand Up @@ -657,33 +661,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

/**
* Get the current max seq number of the shard. We find it by searching the last document
* in the primary shard.
*/
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long {
val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(QueryBuilders.matchAllQuery())
.size(1)
)
val response: SearchResponse = client.suspendUntil { client.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
nonPercolateSearchesTimeTakenStat += response.took.millis
if (response.hits.hits.isEmpty())
return -1L

return response.hits.hits[0].seqNo
}

private fun getShardsCount(clusterService: ClusterService, index: String): Int {
val allShards: List<ShardRouting> = clusterService!!.state().routingTable().allShards(index)
return allShards.filter { it.primary() }.size
Expand All @@ -697,51 +674,79 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
private suspend fun fetchShardDataAndMaybeExecutePercolateQueries(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
indexName: String,
concreteIndexName: String,
conflictingFields: List<String>,
docIds: List<String>? = null,
indexExecutionCtx: IndexExecutionContext,
monitorMetadata: MonitorMetadata,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
fieldsToBeQueried: List<String>,
updateLastRunContext: (String, String) -> Unit
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()
try {
val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull()

val hits: SearchHits = searchShard(
monitorCtx,
concreteIndexName,
shard,
prevSeqNo,
maxSeqNo,
docIds,
fieldsToBeQueried
)
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
indexName,
concreteIndexName,
monitor.id,
conflictingFields,
val prevSeqNo = indexExecutionCtx.lastRunContext[shard].toString().toLongOrNull()
val from = prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED
var to: Long = Long.MAX_VALUE
while (to >= from) {
val hits: SearchHits = searchShard(
monitorCtx,
indexExecutionCtx.concreteIndexName,
shard,
from,
to,
indexExecutionCtx.docIds,
fieldsToBeQueried,
)
)
docTransformTimeTakenStat += System.currentTimeMillis() - startTime
if (hits.hits.isEmpty()) {
if (to == Long.MAX_VALUE) {
updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString()) // didn't find any docs
}
break
}
if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed
updateLastRunContext(shard, hits.hits[0].seqNo.toString())
}
val leastSeqNoFromHits = hits.hits.last().seqNo
to = leastSeqNoFromHits - 1
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
transformSearchHitsAndReconstructDocs(
hits,
indexExecutionCtx.indexName,
indexExecutionCtx.concreteIndexName,
monitor.id,
indexExecutionCtx.conflictingFields,
)
)
if (
transformedDocs.isNotEmpty() &&
shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx)
) {
performPercolateQueryAndResetCounters(
monitorCtx,
monitor,
monitorMetadata,
monitorInputIndices,
concreteIndices,
inputRunResults,
docsToQueries,
)
}
docTransformTimeTakenStat += System.currentTimeMillis() - startTime
}
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id} :" +
" Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}",
"Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " +
"Error: ${e.message}",
e
)
if (e is IndexClosedException) {
throw e
}
}
if (
transformedDocs.isNotEmpty() &&
Expand Down Expand Up @@ -833,8 +838,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(boolQueryBuilder)
.size(10000)
.size(monitorCtx.docLevelMonitorShardFetchSize)
)
.preference(Preference.PRIMARY_FIRST.type())

Expand All @@ -846,7 +853,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
logger.error("Failed search shard. Response: $response")
throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}")
}
nonPercolateSearchesTimeTakenStat += response.took.millis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,6 @@ data class MonitorRunnerExecutionContext(
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
@Volatile var percQueryDocsSizeMemoryPercentageLimit: Int =
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
@Volatile var docLevelMonitorShardFetchSize: Int =
AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
Expand Down Expand Up @@ -202,6 +203,13 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it
}

monitorCtx.docLevelMonitorShardFetchSize =
DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings
.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE) {
monitorCtx.docLevelMonitorShardFetchSize = it
}

return this
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.model

import org.opensearch.commons.alerting.model.DocLevelQuery

/** DTO that contains all the necessary context for fetching data from shard and performing percolate queries */
data class IndexExecutionContext(
val queries: List<DocLevelQuery>,
val lastRunContext: MutableMap<String, Any>,
val updatedLastRunContext: MutableMap<String, Any>,
val indexName: String,
val concreteIndexName: String,
val conflictingFields: List<String>,
val docIds: List<String>? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class AlertingSettings {
const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000
const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000
const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10
const val DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = 10000

val ALERTING_MAX_MONITORS = Setting.intSetting(
"plugins.alerting.monitor.max_monitors",
Expand All @@ -39,6 +40,16 @@ class AlertingSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** Purely a setting used to verify seq_no calculation
*/
val DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = Setting.intSetting(
"plugins.alerting.monitor.doc_level_monitor_shard_fetch_size",
DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
1,
10000,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document
* level monitor execution. The docs are being collected from searching on shards of indices mentioned in the
* monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,15 @@ class TransportIndexMonitorAction @Inject constructor(
else (it as DocLevelMonitorInput).indices
indices.addAll(inputIndices)
}
val searchRequest = SearchRequest().indices(*indices.toTypedArray())
val updatedIndices = indices.map { index ->
if (IndexUtils.isAlias(index, clusterService.state()) || IndexUtils.isDataStream(index, clusterService.state())) {
val metadata = clusterService.state().metadata.indicesLookup[index]?.writeIndex
metadata?.index?.name ?: index
} else {
index
}
}
val searchRequest = SearchRequest().indices(*updatedIndices.toTypedArray())
.source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery()))
client.search(
searchRequest,
Expand Down
Loading

0 comments on commit ea67888

Please sign in to comment.