diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index e0dc4625e..7f1251e3d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -28,6 +28,7 @@ import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext +import org.opensearch.alerting.util.CommentsUtils import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.MAX_SEARCH_SIZE import org.opensearch.alerting.util.getBucketKeysHash @@ -157,7 +158,7 @@ class AlertService( workflorwRunContext: WorkflowRunContext? ): Alert? { val currentTime = Instant.now() - val currentAlert = ctx.alert + val currentAlert = ctx.alertContext?.alert val updatedActionExecutionResults = mutableListOf() val currentActionIds = mutableSetOf() @@ -686,6 +687,8 @@ class AlertService( val alertsIndex = dataSources.alertsIndex val alertsHistoryIndex = dataSources.alertsHistoryIndex + val commentIdsToDelete = mutableListOf() + var requestsToRetry = alerts.flatMap { alert -> // We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts. // In the rare event that a user acknowledges an alert between when it's read and when it's written @@ -732,13 +735,22 @@ class AlertService( listOfNotNull>( DeleteRequest(alertsIndex, alert.id) .routing(routingId), - // Only add completed alert to history index if history is enabled if (alertIndices.isAlertHistoryEnabled()) { + // Only add completed alert to history index if history is enabled IndexRequest(alertsHistoryIndex) .routing(routingId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) .id(alert.id) - } else null + } else { + // Otherwise, prepare the Alert's comments for deletion, and don't include + // a request to index the Alert to an Alert history index. + // The delete request can't be added to the list of DocWriteRequests because + // Comments are stored in aliased history indices, not a concrete Comments + // index like Alerts. A DeleteBy request will be used to delete Comments, instead + // of a regular Delete request + commentIdsToDelete.addAll(CommentsUtils.getCommentIDsByAlertIDs(client, listOf(alert.id))) + null + } ) } } @@ -758,6 +770,9 @@ class AlertService( throw ExceptionsHelper.convertToOpenSearchException(retryCause) } } + + // delete all the comments of any Alerts that were deleted + CommentsUtils.deleteComments(client, commentIdsToDelete) } /** diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 0a6e0bea3..bb25c0bf1 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -16,6 +16,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesAction import org.opensearch.alerting.action.SearchEmailAccountAction import org.opensearch.alerting.action.SearchEmailGroupAction import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.comments.CommentsIndices import org.opensearch.alerting.core.JobSweeper import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction @@ -27,6 +28,7 @@ import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSetting import org.opensearch.alerting.core.settings.ScheduledJobSettings import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction +import org.opensearch.alerting.resthandler.RestDeleteAlertingCommentAction import org.opensearch.alerting.resthandler.RestDeleteMonitorAction import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction import org.opensearch.alerting.resthandler.RestExecuteMonitorAction @@ -40,8 +42,10 @@ import org.opensearch.alerting.resthandler.RestGetMonitorAction import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction import org.opensearch.alerting.resthandler.RestGetWorkflowAction import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction +import org.opensearch.alerting.resthandler.RestIndexAlertingCommentAction import org.opensearch.alerting.resthandler.RestIndexMonitorAction import org.opensearch.alerting.resthandler.RestIndexWorkflowAction +import org.opensearch.alerting.resthandler.RestSearchAlertingCommentAction import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction import org.opensearch.alerting.resthandler.RestSearchMonitorAction @@ -54,6 +58,7 @@ import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction +import org.opensearch.alerting.transport.TransportDeleteAlertingCommentAction import org.opensearch.alerting.transport.TransportDeleteMonitorAction import org.opensearch.alerting.transport.TransportDeleteWorkflowAction import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction @@ -68,8 +73,10 @@ import org.opensearch.alerting.transport.TransportGetMonitorAction import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction import org.opensearch.alerting.transport.TransportGetWorkflowAction import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction +import org.opensearch.alerting.transport.TransportIndexAlertingCommentAction import org.opensearch.alerting.transport.TransportIndexMonitorAction import org.opensearch.alerting.transport.TransportIndexWorkflowAction +import org.opensearch.alerting.transport.TransportSearchAlertingCommentAction import org.opensearch.alerting.transport.TransportSearchEmailAccountAction import org.opensearch.alerting.transport.TransportSearchEmailGroupAction import org.opensearch.alerting.transport.TransportSearchMonitorAction @@ -158,6 +165,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R @JvmField val LEGACY_OPENDISTRO_EMAIL_GROUP_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_groups" @JvmField val FINDING_BASE_URI = "/_plugins/_alerting/findings" + @JvmField val COMMENTS_BASE_URI = "/_plugins/_alerting/comments" @JvmField val ALERTING_JOB_TYPES = listOf("monitor", "workflow") } @@ -166,6 +174,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R lateinit var scheduler: JobScheduler lateinit var sweeper: JobSweeper lateinit var scheduledJobIndices: ScheduledJobIndices + lateinit var commentsIndices: CommentsIndices lateinit var docLevelMonitorQueries: DocLevelMonitorQueries lateinit var threadPool: ThreadPool lateinit var alertIndices: AlertIndices @@ -203,6 +212,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R RestGetWorkflowAction(), RestDeleteWorkflowAction(), RestGetRemoteIndexesAction(), + RestIndexAlertingCommentAction(), + RestSearchAlertingCommentAction(), + RestDeleteAlertingCommentAction(), ) } @@ -229,6 +241,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java), ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java), ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java), + ActionPlugin.ActionHandler(AlertingActions.INDEX_COMMENT_ACTION_TYPE, TransportIndexAlertingCommentAction::class.java), + ActionPlugin.ActionHandler(AlertingActions.SEARCH_COMMENTS_ACTION_TYPE, TransportSearchAlertingCommentAction::class.java), + ActionPlugin.ActionHandler(AlertingActions.DELETE_COMMENT_ACTION_TYPE, TransportDeleteAlertingCommentAction::class.java), ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java), ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java), ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java) @@ -287,6 +302,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerConsumers() .registerDestinationSettings() scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) + commentsIndices = CommentsIndices(environment.settings(), client, threadPool, clusterService) docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService) scheduler = JobScheduler(threadPool, runner) sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES) @@ -315,6 +331,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R scheduler, runner, scheduledJobIndices, + commentsIndices, docLevelMonitorQueries, destinationMigrationCoordinator, lockService, @@ -389,7 +406,15 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE, - AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED + AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED, + AlertingSettings.ALERTING_COMMENTS_ENABLED, + AlertingSettings.COMMENTS_HISTORY_MAX_DOCS, + AlertingSettings.COMMENTS_HISTORY_INDEX_MAX_AGE, + AlertingSettings.COMMENTS_HISTORY_ROLLOVER_PERIOD, + AlertingSettings.COMMENTS_HISTORY_RETENTION_PERIOD, + AlertingSettings.COMMENTS_MAX_CONTENT_SIZE, + AlertingSettings.MAX_COMMENTS_PER_ALERT, + AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 7161ed764..0ef5e0abd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -23,6 +23,8 @@ import org.opensearch.alerting.opensearchapi.retry import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.CommentsUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.util.getBucketKeysHash @@ -36,6 +38,7 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.Comment import org.opensearch.commons.alerting.model.Finding import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.SearchInput @@ -274,6 +277,9 @@ object BucketLevelMonitorRunner : MonitorRunner() { // to alertsToUpdate to ensure the Alert doc is updated at the end in either case completedAlertsToUpdate.addAll(completedAlerts) + // retrieve max Comments per Alert notification setting + val maxComments = monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION) + // All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop val triggerCtx = triggerContexts[trigger.id]!! val triggerResult = triggerResults[trigger.id]!! @@ -291,9 +297,18 @@ object BucketLevelMonitorRunner : MonitorRunner() { if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) { for (alertCategory in actionExecutionScope.actionableAlerts) { val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf() + val alertsToExecuteActionsForIds = alertsToExecuteActionsFor.map { it.id } + val allAlertsComments = CommentsUtils.getCommentsForAlertNotification( + monitorCtx.client!!, + alertsToExecuteActionsForIds, + maxComments + ) for (alert in alertsToExecuteActionsFor) { - val alertContext = if (alertCategory != AlertCategory.NEW) AlertContext(alert = alert) - else getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs) + val alertContext = if (alertCategory != AlertCategory.NEW) { + AlertContext(alert = alert, comments = allAlertsComments[alert.id]) + } else { + getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs, allAlertsComments[alert.id]) + } val actionCtx = getActionContextForAlertCategory( alertCategory, @@ -329,12 +344,28 @@ object BucketLevelMonitorRunner : MonitorRunner() { continue } + val alertsToExecuteActionsForIds = dedupedAlerts.map { it.id } + .plus(newAlerts.map { it.id }) + .plus(completedAlerts.map { it.id }) + val allAlertsComments = CommentsUtils.getCommentsForAlertNotification( + monitorCtx.client!!, + alertsToExecuteActionsForIds, + maxComments + ) val actionCtx = triggerCtx.copy( - dedupedAlerts = dedupedAlerts, + dedupedAlerts = dedupedAlerts.map { + AlertContext(alert = it, comments = allAlertsComments[it.id]) + }, newAlerts = newAlerts.map { - getAlertContext(alert = it, alertSampleDocs = alertSampleDocs) + getAlertContext( + alert = it, + alertSampleDocs = alertSampleDocs, + alertComments = allAlertsComments[it.id] + ) + }, + completedAlerts = completedAlerts.map { + AlertContext(alert = it, comments = allAlertsComments[it.id]) }, - completedAlerts = completedAlerts, error = monitorResult.error ?: triggerResult.error ) val actionResult = this.runAction(action, actionCtx, monitorCtx, monitor, dryrun) @@ -537,17 +568,18 @@ object BucketLevelMonitorRunner : MonitorRunner() { ): BucketLevelTriggerExecutionContext { return when (alertCategory) { AlertCategory.DEDUPED -> - ctx.copy(dedupedAlerts = listOf(alertContext.alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error) + ctx.copy(dedupedAlerts = listOf(alertContext), newAlerts = emptyList(), completedAlerts = emptyList(), error = error) AlertCategory.NEW -> ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alertContext), completedAlerts = emptyList(), error = error) AlertCategory.COMPLETED -> - ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext.alert), error = error) + ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext), error = error) } } private fun getAlertContext( alert: Alert, - alertSampleDocs: Map>>> + alertSampleDocs: Map>>>, + alertComments: List? ): AlertContext { val bucketKey = alert.aggregationResultBucket?.getBucketKeysHash() val sampleDocs = alertSampleDocs[alert.triggerId]?.get(bucketKey) @@ -561,7 +593,7 @@ object BucketLevelMonitorRunner : MonitorRunner() { alert.monitorId, alert.executionId ) - AlertContext(alert = alert, sampleDocs = listOf()) + AlertContext(alert = alert, sampleDocs = listOf(), comments = alertComments) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index 69adc7ef7..4ead29364 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -55,7 +55,7 @@ abstract class MonitorRunner { dryrun: Boolean ): ActionRunResult { return try { - if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable(action, ctx.alert)) { + if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable(action, ctx.alertContext?.alert)) { return ActionRunResult(action.id, action.name, mapOf(), true, null, null) } val actionOutput = mutableMapOf() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index b975af728..b00caec7d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -6,12 +6,14 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.opensearchapi.InjectorContextElement import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.CommentsUtils import org.opensearch.alerting.util.isADMonitor import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.commons.alerting.model.Alert @@ -65,9 +67,20 @@ object QueryLevelMonitorRunner : MonitorRunner() { val updatedAlerts = mutableListOf() val triggerResults = mutableMapOf() + + val maxComments = monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION) + val alertsToExecuteActionsForIds = currentAlerts.mapNotNull { it.value }.map { it.id } + val allAlertsComments = CommentsUtils.getCommentsForAlertNotification( + monitorCtx.client!!, + alertsToExecuteActionsForIds, + maxComments + ) for (trigger in monitor.triggers) { val currentAlert = currentAlerts[trigger] - val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert) + val currentAlertContext = currentAlert?.let { + AlertContext(alert = currentAlert, comments = allAlertsComments[currentAlert.id]) + } + val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlertContext) val triggerResult = when (monitor.monitorType) { Monitor.MonitorType.QUERY_LEVEL_MONITOR -> monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) @@ -80,7 +93,7 @@ object QueryLevelMonitorRunner : MonitorRunner() { else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) } else -> - throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.") + throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.") } triggerResults[trigger.id] = triggerResult diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt index 21ba32475..9262961b4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt @@ -53,7 +53,7 @@ class TriggerService(val scriptService: ScriptService) { ): Boolean { if (workflowRunContext?.auditDelegateMonitorAlerts == true) return false // Suppress actions if the current alert is acknowledged and there are no errors. - val suppress = ctx.alert?.state == Alert.State.ACKNOWLEDGED && result.error == null && ctx.error == null + val suppress = ctx.alertContext?.alert?.state == Alert.State.ACKNOWLEDGED && result.error == null && ctx.error == null return result.triggered && !suppress } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt index ae5b53bd0..c4dd5b3b1 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt @@ -5,6 +5,9 @@ package org.opensearch.alerting.alerts +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.ResourceAlreadyExistsException @@ -19,6 +22,8 @@ import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.admin.indices.rollover.RolloverRequest import org.opensearch.action.admin.indices.rollover.RolloverResponse +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_HISTORY_WRITE_INDEX @@ -37,6 +42,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTO import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_ROLLOVER_PERIOD import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.CommentsUtils import org.opensearch.alerting.util.IndexUtils import org.opensearch.client.Client import org.opensearch.cluster.ClusterChangedEvent @@ -45,13 +51,23 @@ import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DataSources import org.opensearch.core.action.ActionListener +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.query.QueryBuilders +import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.threadpool.Scheduler.Cancellable import org.opensearch.threadpool.ThreadPool import java.time.Instant +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + /** * Class to manage the creation and rollover of alert indices and alert history indices. In progress alerts are stored * in [ALERT_INDEX]. Completed alerts are written to [ALERT_HISTORY_WRITE_INDEX] which is an alias that points at the @@ -489,9 +505,14 @@ class AlertIndices( object : ActionListener { override fun onResponse(clusterStateResponse: ClusterStateResponse) { if (clusterStateResponse.state.metadata.indices.isNotEmpty()) { - val indicesToDelete = getIndicesToDelete(clusterStateResponse) - logger.info("Deleting old $tag indices viz $indicesToDelete") - deleteAllOldHistoryIndices(indicesToDelete) + scope.launch { + val indicesToDelete = getIndicesToDelete(clusterStateResponse) + logger.info("Deleting old $tag indices viz $indicesToDelete") + if (indices == ALERT_HISTORY_ALL) { + deleteAlertComments(indicesToDelete) + } + deleteAllOldHistoryIndices(indicesToDelete) + } } else { logger.info("No Old $tag Indices to delete") } @@ -585,4 +606,38 @@ class AlertIndices( ) } } + + private suspend fun deleteAlertComments(alertHistoryIndicesToDelete: List) { + alertHistoryIndicesToDelete.forEach { alertHistoryIndex -> + val alertIDs = getAlertIDsFromAlertHistoryIndex(alertHistoryIndex) + val commentIDsToDelete = CommentsUtils.getCommentIDsByAlertIDs(client, alertIDs) + CommentsUtils.deleteComments(client, commentIDsToDelete) + } + } + + private suspend fun getAlertIDsFromAlertHistoryIndex(indexName: String): List { + val queryBuilder = QueryBuilders.matchAllQuery() + val searchSourceBuilder = SearchSourceBuilder() + .query(queryBuilder) + .version(true) + + val searchRequest = SearchRequest() + .indices(indexName) + .source(searchSourceBuilder) + + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + val alertIDs = searchResponse.hits.map { hit -> + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val alert = Alert.parse(xcp, hit.id, hit.version) + alert.id + } + + return alertIDs.distinct() + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/comments/CommentsIndices.kt b/alerting/src/main/kotlin/org/opensearch/alerting/comments/CommentsIndices.kt new file mode 100644 index 000000000..6bbeee933 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/comments/CommentsIndices.kt @@ -0,0 +1,402 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.comments + +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.ResourceAlreadyExistsException +import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.cluster.state.ClusterStateResponse +import org.opensearch.action.admin.indices.alias.Alias +import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.admin.indices.rollover.RolloverRequest +import org.opensearch.action.admin.indices.rollover.RolloverResponse +import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.IndexUtils +import org.opensearch.client.Client +import org.opensearch.cluster.ClusterChangedEvent +import org.opensearch.cluster.ClusterStateListener +import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.action.ActionListener +import org.opensearch.threadpool.Scheduler +import org.opensearch.threadpool.ThreadPool +import java.time.Instant + +/** + * Initialize the OpenSearch components required to run comments. + * + */ +class CommentsIndices( + settings: Settings, + private val client: Client, + private val threadPool: ThreadPool, + private val clusterService: ClusterService +) : ClusterStateListener { + + init { + clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.COMMENTS_HISTORY_MAX_DOCS) { commentsHistoryMaxDocs = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.COMMENTS_HISTORY_INDEX_MAX_AGE) { + commentsHistoryMaxAge = it + } + clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.COMMENTS_HISTORY_ROLLOVER_PERIOD) { + commentsHistoryRolloverPeriod = it + rescheduleCommentsRollover() + } + clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.COMMENTS_HISTORY_RETENTION_PERIOD) { + commentsHistoryRetentionPeriod = it + } + } + + companion object { + /** The alias of the index in which to write comments finding */ + const val COMMENTS_HISTORY_WRITE_INDEX = ".opensearch-alerting-comments-history-write" + + /** The index name pattern referring to all comments history indices */ + const val COMMENTS_HISTORY_ALL = ".opensearch-alerting-comments-history*" + + /** The index name pattern to create comments history indices */ + const val COMMENTS_HISTORY_INDEX_PATTERN = "<.opensearch-alerting-comments-history-{now/d}-1>" + + /** The index name pattern to query all comments, history and current comments. */ + const val ALL_COMMENTS_INDEX_PATTERN = ".opensearch-alerting-comments*" + + @JvmStatic + fun commentsMapping() = + CommentsIndices::class.java.getResource("alerting_comments.json").readText() + + private val logger = LogManager.getLogger(AlertIndices::class.java) + } + + @Volatile private var commentsHistoryMaxDocs = AlertingSettings.COMMENTS_HISTORY_MAX_DOCS.get(settings) + + @Volatile private var commentsHistoryMaxAge = AlertingSettings.COMMENTS_HISTORY_INDEX_MAX_AGE.get(settings) + + @Volatile private var commentsHistoryRolloverPeriod = AlertingSettings.COMMENTS_HISTORY_ROLLOVER_PERIOD.get(settings) + + @Volatile private var commentsHistoryRetentionPeriod = AlertingSettings.COMMENTS_HISTORY_RETENTION_PERIOD.get(settings) + + @Volatile private var isClusterManager = false + + // for JobsMonitor to report + var lastRolloverTime: TimeValue? = null + + private var commentsHistoryIndexInitialized: Boolean = false + + private var scheduledCommentsRollover: Scheduler.Cancellable? = null + + /** + * Initialize the indices required for Alerting comments. + * First check if the index exists, and if not create the index with the provided callback listeners. + * + * @param actionListener A callback listener for the index creation call. Generally in the form of onSuccess, onFailure + */ + + fun onManager() { + try { + // try to rollover immediately as we might be restarting the cluster + rolloverCommentsHistoryIndex() + // schedule the next rollover for approx MAX_AGE later + scheduledCommentsRollover = threadPool + .scheduleWithFixedDelay({ rolloverAndDeleteCommentsHistoryIndices() }, commentsHistoryRolloverPeriod, executorName()) + } catch (e: Exception) { + // This should be run on cluster startup + logger.error( + "Error creating comments indices. Comments can't be recorded until master node is restarted.", + e + ) + } + } + + fun offManager() { + scheduledCommentsRollover?.cancel() + } + + private fun executorName(): String { + return ThreadPool.Names.MANAGEMENT + } + + override fun clusterChanged(event: ClusterChangedEvent) { + // Instead of using a LocalNodeClusterManagerListener to track master changes, this service will + // track them here to avoid conditions where master listener events run after other + // listeners that depend on what happened in the master listener + if (this.isClusterManager != event.localNodeClusterManager()) { + this.isClusterManager = event.localNodeClusterManager() + if (this.isClusterManager) { + onManager() + } else { + offManager() + } + } + + // if the indexes have been deleted they need to be reinitialized + commentsHistoryIndexInitialized = event.state().metadata().hasAlias(COMMENTS_HISTORY_WRITE_INDEX) + } + + private fun rescheduleCommentsRollover() { + if (clusterService.state().nodes.isLocalNodeElectedMaster) { + scheduledCommentsRollover?.cancel() + scheduledCommentsRollover = threadPool + .scheduleWithFixedDelay({ rolloverAndDeleteCommentsHistoryIndices() }, commentsHistoryRolloverPeriod, executorName()) + } + } + + fun isCommentsHistoryInitialized(): Boolean { + return clusterService.state().metadata.hasAlias(COMMENTS_HISTORY_WRITE_INDEX) + } + + suspend fun createOrUpdateInitialCommentsHistoryIndex() { + if (!isCommentsHistoryInitialized()) { + commentsHistoryIndexInitialized = createIndex(COMMENTS_HISTORY_INDEX_PATTERN, commentsMapping(), COMMENTS_HISTORY_WRITE_INDEX) + if (commentsHistoryIndexInitialized) + IndexUtils.lastUpdatedCommentsHistoryIndex = IndexUtils.getIndexNameWithAlias( + clusterService.state(), + COMMENTS_HISTORY_WRITE_INDEX + ) + } else { + updateIndexMapping(COMMENTS_HISTORY_WRITE_INDEX, commentsMapping(), true) + } + commentsHistoryIndexInitialized + } + + private fun rolloverAndDeleteCommentsHistoryIndices() { + rolloverCommentsHistoryIndex() + deleteOldIndices("comments", COMMENTS_HISTORY_ALL) + } + + private fun rolloverCommentsHistoryIndex() { + rolloverIndex( + commentsHistoryIndexInitialized, + COMMENTS_HISTORY_WRITE_INDEX, + COMMENTS_HISTORY_INDEX_PATTERN, + commentsMapping(), + commentsHistoryMaxDocs, + commentsHistoryMaxAge, + COMMENTS_HISTORY_WRITE_INDEX + ) + } + + // TODO: Everything below is boilerplate util functions straight from AlertIndices.kt + /* + Depending on whether comments system indices will be component-specific or + component-agnostic, may need to either merge CommentsIndices.kt into AlertIndices.kt, + or factor these out into IndexUtils.kt for both AlertIndices.kt and CommentsIndices.kt + to use + */ + + private fun getIndicesToDelete(clusterStateResponse: ClusterStateResponse): List { + val indicesToDelete = mutableListOf() + for (entry in clusterStateResponse.state.metadata.indices) { + val indexMetaData = entry.value + getHistoryIndexToDelete(indexMetaData, commentsHistoryRetentionPeriod.millis, COMMENTS_HISTORY_WRITE_INDEX, true) + ?.let { indicesToDelete.add(it) } + } + return indicesToDelete + } + + private fun getHistoryIndexToDelete( + indexMetadata: IndexMetadata, + retentionPeriodMillis: Long, + writeIndex: String, + historyEnabled: Boolean + ): String? { + val creationTime = indexMetadata.creationDate + if ((Instant.now().toEpochMilli() - creationTime) > retentionPeriodMillis) { + val alias = indexMetadata.aliases.entries.firstOrNull { writeIndex == it.value.alias } + if (alias != null) { + if (historyEnabled) { + // If the index has the write alias and history is enabled, don't delete the index + return null + } else if (writeIndex == COMMENTS_HISTORY_WRITE_INDEX) { + // Otherwise reset commentsHistoryIndexInitialized since index will be deleted + commentsHistoryIndexInitialized = false + } + } + + return indexMetadata.index.name + } + return null + } + + private fun deleteAllOldHistoryIndices(indicesToDelete: List) { + if (indicesToDelete.isNotEmpty()) { + val deleteIndexRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray()) + client.admin().indices().delete( + deleteIndexRequest, + object : ActionListener { + override fun onResponse(deleteIndicesResponse: AcknowledgedResponse) { + if (!deleteIndicesResponse.isAcknowledged) { + logger.error( + "Could not delete one or more comments history indices: $indicesToDelete." + + "Retrying one by one." + ) + deleteOldHistoryIndex(indicesToDelete) + } + } + override fun onFailure(e: Exception) { + logger.error("Delete for comments History Indices $indicesToDelete Failed. Retrying one By one.") + deleteOldHistoryIndex(indicesToDelete) + } + } + ) + } + } + + private fun deleteOldHistoryIndex(indicesToDelete: List) { + for (index in indicesToDelete) { + val singleDeleteRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray()) + client.admin().indices().delete( + singleDeleteRequest, + object : ActionListener { + override fun onResponse(acknowledgedResponse: AcknowledgedResponse?) { + if (acknowledgedResponse != null) { + if (!acknowledgedResponse.isAcknowledged) { + logger.error("Could not delete one or more comments history indices: $index") + } + } + } + override fun onFailure(e: Exception) { + logger.debug("Exception ${e.message} while deleting the index $index") + } + } + ) + } + } + + private suspend fun createIndex(index: String, schemaMapping: String, alias: String? = null): Boolean { + // This should be a fast check of local cluster state. Should be exceedingly rare that the local cluster + // state does not contain the index and multiple nodes concurrently try to create the index. + // If it does happen that error is handled we catch the ResourceAlreadyExistsException + val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil { + exists(IndicesExistsRequest(index).local(true), it) + } + if (existsResponse.isExists) return true + + logger.debug("index: [$index] schema mappings: [$schemaMapping]") + val request = CreateIndexRequest(index) + .mapping(schemaMapping) + .settings(Settings.builder().put("index.hidden", true).build()) + + if (alias != null) request.alias(Alias(alias)) + return try { + val createIndexResponse: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) } + createIndexResponse.isAcknowledged + } catch (t: Exception) { + if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) { + true + } else { + throw t + } + } + } + + private suspend fun updateIndexMapping(index: String, mapping: String, alias: Boolean = false) { + val clusterState = clusterService.state() + var targetIndex = index + if (alias) { + targetIndex = IndexUtils.getIndexNameWithAlias(clusterState, index) + } + + if (targetIndex == IndexUtils.lastUpdatedCommentsHistoryIndex + ) { + return + } + + val putMappingRequest: PutMappingRequest = PutMappingRequest(targetIndex) + .source(mapping, XContentType.JSON) + val updateResponse: AcknowledgedResponse = client.admin().indices().suspendUntil { putMapping(putMappingRequest, it) } + if (updateResponse.isAcknowledged) { + logger.info("Index mapping of $targetIndex is updated") + setIndexUpdateFlag(index, targetIndex) + } else { + logger.info("Failed to update index mapping of $targetIndex") + } + } + + private fun setIndexUpdateFlag(index: String, targetIndex: String) { + when (index) { + COMMENTS_HISTORY_WRITE_INDEX -> IndexUtils.lastUpdatedCommentsHistoryIndex = targetIndex + } + } + + private fun rolloverIndex( + initialized: Boolean, + index: String, + pattern: String, + map: String, + docsCondition: Long, + ageCondition: TimeValue, + writeIndex: String + ) { + logger.info("in rolloverIndex, initialize: $initialized") + if (!initialized) { + return + } + + logger.info("sending rollover request") + // We have to pass null for newIndexName in order to get Elastic to increment the index count. + val request = RolloverRequest(index, null) + request.createIndexRequest.index(pattern) + .mapping(map) + .settings(Settings.builder().put("index.hidden", true).build()) + request.addMaxIndexDocsCondition(docsCondition) + request.addMaxIndexAgeCondition(ageCondition) + client.admin().indices().rolloverIndex( + request, + object : ActionListener { + override fun onResponse(response: RolloverResponse) { + if (!response.isRolledOver) { + logger.info("$writeIndex not rolled over. Conditions were: ${response.conditionStatus}") + } else { + logger.info("$writeIndex rolled over. Conditions were: ${response.conditionStatus}") + lastRolloverTime = TimeValue.timeValueMillis(threadPool.absoluteTimeInMillis()) + } + } + override fun onFailure(e: Exception) { + logger.error("$writeIndex not roll over failed.") + } + } + ) + } + + private fun deleteOldIndices(tag: String, indices: String) { + val clusterStateRequest = ClusterStateRequest() + .clear() + .indices(indices) + .metadata(true) + .local(true) + .indicesOptions(IndicesOptions.strictExpand()) + client.admin().cluster().state( + clusterStateRequest, + object : ActionListener { + override fun onResponse(clusterStateResponse: ClusterStateResponse) { + if (clusterStateResponse.state.metadata.indices.isNotEmpty()) { + val indicesToDelete = getIndicesToDelete(clusterStateResponse) + logger.info("Deleting old $tag indices viz $indicesToDelete") + deleteAllOldHistoryIndices(indicesToDelete) + } else { + logger.info("No Old $tag Indices to delete") + } + } + override fun onFailure(e: Exception) { + logger.error("Error fetching cluster state") + } + } + ) + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt index f981691c8..12fa1e0e0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertContext.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting.model import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.Comment import org.opensearch.commons.alerting.model.DocLevelQuery /** @@ -15,7 +16,8 @@ import org.opensearch.commons.alerting.model.DocLevelQuery data class AlertContext( val alert: Alert, val associatedQueries: List? = null, - val sampleDocs: List>? = null + val sampleDocs: List>? = null, + val comments: List? = null ) { fun asTemplateArg(): Map { val queriesContext = associatedQueries?.map { @@ -26,10 +28,20 @@ data class AlertContext( ) } + val commentsContext = comments?.map { + mapOf( + Comment.COMMENT_CREATED_TIME_FIELD to it.createdTime, + Comment.COMMENT_LAST_UPDATED_TIME_FIELD to it.lastUpdatedTime, + Comment.COMMENT_CONTENT_FIELD to it.content, + Comment.COMMENT_USER_FIELD to it.user?.name + ) + } + // Compile the custom context fields. val customContextFields = mapOf( ASSOCIATED_QUERIES_FIELD to queriesContext, - SAMPLE_DOCS_FIELD to sampleDocs + SAMPLE_DOCS_FIELD to sampleDocs, + COMMENTS_FIELD to commentsContext ) // Get the alert template args @@ -45,5 +57,6 @@ data class AlertContext( companion object { const val ASSOCIATED_QUERIES_FIELD = "associated_queries" const val SAMPLE_DOCS_FIELD = "sample_documents" + const val COMMENTS_FIELD = "comments" } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteAlertingCommentAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteAlertingCommentAction.kt new file mode 100644 index 000000000..024b79970 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteAlertingCommentAction.kt @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteCommentRequest +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.RestHandler.Route +import org.opensearch.rest.RestRequest +import org.opensearch.rest.action.RestToXContentListener +import java.io.IOException + +private val log: Logger = LogManager.getLogger(RestDeleteMonitorAction::class.java) + +/** + * Rest handlers to create and update comments. + */ +class RestDeleteAlertingCommentAction : BaseRestHandler() { + + override fun getName(): String { + return "delete_alerting_comment_action" + } + + override fun routes(): List { + return listOf( + Route( + RestRequest.Method.DELETE, + "${AlertingPlugin.COMMENTS_BASE_URI}/{id}" + ) + ) + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.info("${request.method()} ${AlertingPlugin.COMMENTS_BASE_URI}/{id}") + val commentId = request.param("id") + val deleteMonitorRequest = DeleteCommentRequest(commentId) + return RestChannelConsumer { channel -> + client.execute(AlertingActions.DELETE_COMMENT_ACTION_TYPE, deleteMonitorRequest, RestToXContentListener(channel)) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexAlertingCommentAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexAlertingCommentAction.kt new file mode 100644 index 000000000..3edd64579 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexAlertingCommentAction.kt @@ -0,0 +1,112 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.IF_PRIMARY_TERM +import org.opensearch.alerting.util.IF_SEQ_NO +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.IndexCommentRequest +import org.opensearch.commons.alerting.action.IndexCommentResponse +import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.Comment +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BytesRestResponse +import org.opensearch.rest.RestChannel +import org.opensearch.rest.RestHandler.Route +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestResponse +import org.opensearch.rest.action.RestResponseListener +import java.io.IOException + +private val log = LogManager.getLogger(RestIndexMonitorAction::class.java) + +/** + * Rest handlers to create and update alerting comments. + */ +class RestIndexAlertingCommentAction : BaseRestHandler() { + + override fun getName(): String { + return "index_alerting_comment_action" + } + + override fun routes(): List { + return listOf( + Route( + RestRequest.Method.POST, + "${AlertingPlugin.COMMENTS_BASE_URI}/{id}" + ), + Route( + RestRequest.Method.PUT, + "${AlertingPlugin.COMMENTS_BASE_URI}/{id}" + ) + ) + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.info("${request.method()} ${AlertingPlugin.COMMENTS_BASE_URI}") + + val id = request.param( + "id", + if (request.method() == RestRequest.Method.POST) Alert.NO_ID else Comment.NO_ID + ) + if (request.method() == RestRequest.Method.POST && Alert.NO_ID == id) { + throw AlertingException.wrap(IllegalArgumentException("Missing alert ID")) + } else if (request.method() == RestRequest.Method.PUT && Comment.NO_ID == id) { + throw AlertingException.wrap(IllegalArgumentException("Missing comment ID")) + } + + val alertId = if (request.method() == RestRequest.Method.POST) id else Alert.NO_ID + val commentId = if (request.method() == RestRequest.Method.PUT) id else Comment.NO_ID + + val content = request.contentParser().map()[Comment.COMMENT_CONTENT_FIELD] as String? + if (content.isNullOrEmpty()) { + throw AlertingException.wrap(IllegalArgumentException("Missing comment content")) + } + val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO) + val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + + val indexCommentRequest = IndexCommentRequest( + alertId, + "alert", + commentId, + seqNo, + primaryTerm, + request.method(), + content + ) + + return RestChannelConsumer { channel -> + client.execute(AlertingActions.INDEX_COMMENT_ACTION_TYPE, indexCommentRequest, indexCommentResponse(channel, request.method())) + } + } + + private fun indexCommentResponse(channel: RestChannel, restMethod: RestRequest.Method): + RestResponseListener { + return object : RestResponseListener(channel) { + @Throws(Exception::class) + override fun buildResponse(response: IndexCommentResponse): RestResponse { + var returnStatus = RestStatus.CREATED + if (restMethod == RestRequest.Method.PUT) + returnStatus = RestStatus.OK + + val restResponse = BytesRestResponse(returnStatus, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)) + if (returnStatus == RestStatus.CREATED) { + val location = "${AlertingPlugin.COMMENTS_BASE_URI}/${response.id}" + restResponse.addHeader("Location", location) + } + return restResponse + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchAlertingCommentAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchAlertingCommentAction.kt new file mode 100644 index 000000000..821d6639e --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchAlertingCommentAction.kt @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.comments.CommentsIndices.Companion.ALL_COMMENTS_INDEX_PATTERN +import org.opensearch.alerting.util.context +import org.opensearch.client.node.NodeClient +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory.jsonBuilder +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.SearchCommentRequest +import org.opensearch.commons.alerting.model.Comment +import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BytesRestResponse +import org.opensearch.rest.RestChannel +import org.opensearch.rest.RestHandler.Route +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestResponse +import org.opensearch.rest.action.RestResponseListener +import org.opensearch.search.builder.SearchSourceBuilder +import java.io.IOException + +private val log = LogManager.getLogger(RestIndexMonitorAction::class.java) + +/** + * Rest handler to search alerting comments. + */ +class RestSearchAlertingCommentAction() : BaseRestHandler() { + + override fun getName(): String { + return "search_alerting_comments_action" + } + + override fun routes(): List { + return listOf( + Route( + RestRequest.Method.GET, + "${AlertingPlugin.COMMENTS_BASE_URI}/_search" + ), + Route( + RestRequest.Method.POST, + "${AlertingPlugin.COMMENTS_BASE_URI}/_search" + ) + ) + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.info("${request.method()} ${AlertingPlugin.COMMENTS_BASE_URI}/_search") + + val searchSourceBuilder = SearchSourceBuilder() + searchSourceBuilder.parseXContent(request.contentOrSourceParamParser()) + searchSourceBuilder.fetchSource(context(request)) + + val searchRequest = SearchRequest() + .source(searchSourceBuilder) + .indices(ALL_COMMENTS_INDEX_PATTERN) + + val searchCommentRequest = SearchCommentRequest(searchRequest) + return RestChannelConsumer { channel -> + client.execute(AlertingActions.SEARCH_COMMENTS_ACTION_TYPE, searchCommentRequest, searchCommentResponse(channel)) + } + } + + private fun searchCommentResponse(channel: RestChannel): RestResponseListener { + return object : RestResponseListener(channel) { + @Throws(Exception::class) + override fun buildResponse(response: SearchResponse): RestResponse { + if (response.isTimedOut) { + return BytesRestResponse(RestStatus.REQUEST_TIMEOUT, response.toString()) + } + + // Swallow exception and return response as is + try { + for (hit in response.hits) { + XContentType.JSON.xContent().createParser( + channel.request().xContentRegistry, + LoggingDeprecationHandler.INSTANCE, + hit.sourceAsString + ).use { hitsParser -> + hitsParser.nextToken() + val comment = Comment.parse(hitsParser, hit.id) + val xcb = comment.toXContent(jsonBuilder(), EMPTY_PARAMS) + hit.sourceRef(BytesReference.bytes(xcb)) + } + } + } catch (e: Exception) { + log.error("The comment parsing failed. Will return response as is.") + } + return BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), EMPTY_PARAMS)) + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchMonitorAction.kt index 1bf51678e..78f9a5755 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchMonitorAction.kt @@ -127,7 +127,7 @@ class RestSearchMonitorAction( } } } catch (e: Exception) { - log.info("The monitor parsing failed. Will return response as is.") + log.error("The monitor parsing failed. Will return response as is.") } return BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), EMPTY_PARAMS)) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt index 597ff5b3e..40250d088 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/BucketLevelTriggerExecutionContext.kt @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.MonitorRunResult -import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.Monitor import java.time.Instant @@ -22,9 +21,9 @@ data class BucketLevelTriggerExecutionContext( override val results: List>, override val periodStart: Instant, override val periodEnd: Instant, - val dedupedAlerts: List = listOf(), + val dedupedAlerts: List = listOf(), val newAlerts: List = listOf(), - val completedAlerts: List = listOf(), + val completedAlerts: List = listOf(), override val error: Exception? = null ) : TriggerExecutionContext(monitor, results, periodStart, periodEnd, error) { @@ -32,9 +31,9 @@ data class BucketLevelTriggerExecutionContext( monitor: Monitor, trigger: BucketLevelTrigger, monitorRunResult: MonitorRunResult, - dedupedAlerts: List = listOf(), + dedupedAlerts: List = listOf(), newAlerts: List = listOf(), - completedAlerts: List = listOf() + completedAlerts: List = listOf() ) : this( monitor, trigger, monitorRunResult.inputResults.results, monitorRunResult.periodStart, monitorRunResult.periodEnd, dedupedAlerts, newAlerts, completedAlerts, monitorRunResult.scriptContextError(trigger) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/script/QueryLevelTriggerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/script/QueryLevelTriggerExecutionContext.kt index 729aa18d0..f24895818 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/script/QueryLevelTriggerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/script/QueryLevelTriggerExecutionContext.kt @@ -5,9 +5,9 @@ package org.opensearch.alerting.script +import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.QueryLevelTriggerRunResult -import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.QueryLevelTrigger import java.time.Instant @@ -18,7 +18,7 @@ data class QueryLevelTriggerExecutionContext( override val results: List>, override val periodStart: Instant, override val periodEnd: Instant, - val alert: Alert? = null, + val alertContext: AlertContext? = null, override val error: Exception? = null ) : TriggerExecutionContext(monitor, results, periodStart, periodEnd, error) { @@ -26,15 +26,10 @@ data class QueryLevelTriggerExecutionContext( monitor: Monitor, trigger: QueryLevelTrigger, monitorRunResult: MonitorRunResult, - alert: Alert? = null + alertContext: AlertContext? = null ) : this( - monitor, - trigger, - monitorRunResult.inputResults.results, - monitorRunResult.periodStart, - monitorRunResult.periodEnd, - alert, - monitorRunResult.scriptContextError(trigger) + monitor, trigger, monitorRunResult.inputResults.results, monitorRunResult.periodStart, monitorRunResult.periodEnd, + alertContext, monitorRunResult.scriptContextError(trigger) ) /** @@ -44,7 +39,7 @@ data class QueryLevelTriggerExecutionContext( override fun asTemplateArg(): Map { val tempArg = super.asTemplateArg().toMutableMap() tempArg["trigger"] = trigger.asTemplateArg() - tempArg["alert"] = alert?.asTemplateArg() + tempArg["alert"] = alertContext?.asTemplateArg() // map "alert" templateArg field to AlertContext wrapper instead of Alert object return tempArg } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 7a2a8aa48..cc7df33ae 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -222,5 +222,57 @@ class AlertingSettings { Int.MAX_VALUE, Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val ALERTING_COMMENTS_ENABLED = Setting.boolSetting( + "plugins.alerting.comments_enabled", + false, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val COMMENTS_HISTORY_MAX_DOCS = Setting.longSetting( + "plugins.alerting.comments_history_max_docs", + 1000L, + 0L, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val COMMENTS_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting( + "plugins.alerting.comments_history_max_age", + TimeValue(30, TimeUnit.DAYS), + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val COMMENTS_HISTORY_ROLLOVER_PERIOD = Setting.positiveTimeSetting( + "plugins.alerting.comments_history_rollover_period", + TimeValue(12, TimeUnit.HOURS), + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val COMMENTS_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting( + "plugins.alerting.comments_history_retention_period", + TimeValue(60, TimeUnit.DAYS), + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val COMMENTS_MAX_CONTENT_SIZE = Setting.longSetting( + "plugins.alerting.max_comment_character_length", + 2000L, + 0L, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val MAX_COMMENTS_PER_ALERT = Setting.longSetting( + "plugins.alerting.max_comments_per_alert", + 500L, + 0L, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val MAX_COMMENTS_PER_NOTIFICATION = Setting.intSetting( + "plugins.alerting.max_comments_per_notification", + 3, + 0, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteAlertingCommentAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteAlertingCommentAction.kt new file mode 100644 index 000000000..87e6a03a1 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteAlertingCommentAction.kt @@ -0,0 +1,189 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionRequest +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.comments.CommentsIndices.Companion.ALL_COMMENTS_INDEX_PATTERN +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.AlertingException +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteCommentRequest +import org.opensearch.commons.alerting.action.DeleteCommentResponse +import org.opensearch.commons.alerting.model.Comment +import org.opensearch.commons.authuser.User +import org.opensearch.commons.utils.recreateObject +import org.opensearch.core.action.ActionListener +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.query.QueryBuilders +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService + +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) +private val log = LogManager.getLogger(TransportDeleteAlertingCommentAction::class.java) + +class TransportDeleteAlertingCommentAction @Inject constructor( + transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + val clusterService: ClusterService, + settings: Settings, + val xContentRegistry: NamedXContentRegistry +) : HandledTransportAction( + AlertingActions.DELETE_COMMENT_ACTION_NAME, transportService, actionFilters, ::DeleteCommentRequest +), + SecureTransportAction { + + @Volatile private var alertingCommentsEnabled = AlertingSettings.ALERTING_COMMENTS_ENABLED.get(settings) + @Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + + init { + clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.ALERTING_COMMENTS_ENABLED) { + alertingCommentsEnabled = it + } + listenFilterBySettingChange(clusterService) + } + + override fun doExecute(task: Task, request: ActionRequest, actionListener: ActionListener) { + // validate feature flag enabled + if (!alertingCommentsEnabled) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Comments for Alerting is currently disabled", RestStatus.FORBIDDEN), + ) + ) + return + } + + val transformedRequest = request as? DeleteCommentRequest + ?: recreateObject(request) { DeleteCommentRequest(it) } + + val user = readUserFromThreadContext(client) + + if (!validateUserBackendRoles(user, actionListener)) { + return + } + scope.launch { + DeleteCommentHandler( + client, + actionListener, + user, + transformedRequest.commentId + ).resolveUserAndStart() + } + } + + inner class DeleteCommentHandler( + private val client: Client, + private val actionListener: ActionListener, + private val user: User?, + private val commentId: String + ) { + + private var sourceIndex: String? = null + + suspend fun resolveUserAndStart() { + try { + val comment = getComment() ?: return + + if (sourceIndex == null) { + actionListener.onFailure( + AlertingException( + "Could not resolve the index the given Comment came from", + RestStatus.INTERNAL_SERVER_ERROR, + IllegalStateException() + ) + ) + } + + // if user is null because security plugin is not installed, anyone can delete any comment + // otherwise, only allow comment deletion if the deletion requester is the same as the comment's author + // or if the user is Admin + val canDelete = user == null || user.name == comment.user?.name || isAdmin(user) + + val deleteRequest = DeleteRequest(sourceIndex, commentId) + + if (canDelete) { + log.debug("Deleting the comment with id ${deleteRequest.id()}") + val deleteResponse = client.suspendUntil { delete(deleteRequest, it) } + actionListener.onResponse(DeleteCommentResponse(deleteResponse.id)) + } else { + actionListener.onFailure( + AlertingException("Not allowed to delete this comment!", RestStatus.FORBIDDEN, IllegalStateException()) + ) + } + } catch (t: Exception) { + log.error("Failed to delete comment $commentId", t) + actionListener.onFailure(AlertingException.wrap(t)) + } + } + + private suspend fun getComment(): Comment? { + val queryBuilder = QueryBuilders + .boolQuery() + .must(QueryBuilders.termsQuery("_id", commentId)) + val searchSourceBuilder = + SearchSourceBuilder() + .version(true) + .seqNoAndPrimaryTerm(true) + .query(queryBuilder) + val searchRequest = SearchRequest() + .source(searchSourceBuilder) + .indices(ALL_COMMENTS_INDEX_PATTERN) + + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + val comments = searchResponse.hits.map { hit -> + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val comment = Comment.parse(xcp, hit.id) + sourceIndex = hit.index + comment + } + + if (comments.isEmpty()) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Comment not found", RestStatus.NOT_FOUND), + ), + ) + return null + } else if (comments.size > 1) { + actionListener.onFailure( + AlertingException.wrap(IllegalStateException("Multiple comments were found with the same ID")), + ) + return null + } + + return comments[0] + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 2e0a3811e..559b1d8c1 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -576,7 +576,7 @@ class TransportDocLevelMonitorFanOutAction dryrun: Boolean ): ActionRunResult { return try { - if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable(action, ctx.alert)) { + if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable(action, ctx.alertContext?.alert)) { return ActionRunResult(action.id, action.name, mapOf(), true, null, null) } val actionOutput = mutableMapOf() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexAlertingCommentAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexAlertingCommentAction.kt new file mode 100644 index 000000000..f26fa3528 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexAlertingCommentAction.kt @@ -0,0 +1,388 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionRequest +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.index.IndexResponse +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.comments.CommentsIndices +import org.opensearch.alerting.comments.CommentsIndices.Companion.COMMENTS_HISTORY_WRITE_INDEX +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERTING_COMMENTS_ENABLED +import org.opensearch.alerting.settings.AlertingSettings.Companion.COMMENTS_MAX_CONTENT_SIZE +import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT +import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_COMMENTS_PER_ALERT +import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.CommentsUtils +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.IndexCommentRequest +import org.opensearch.commons.alerting.action.IndexCommentResponse +import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.Comment +import org.opensearch.commons.authuser.User +import org.opensearch.commons.utils.recreateObject +import org.opensearch.core.action.ActionListener +import org.opensearch.core.common.io.stream.NamedWriteableRegistry +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.query.QueryBuilders +import org.opensearch.rest.RestRequest +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import java.lang.IllegalArgumentException +import java.time.Instant + +private val log = LogManager.getLogger(TransportIndexMonitorAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + +class TransportIndexAlertingCommentAction +@Inject +constructor( + transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + val commentsIndices: CommentsIndices, + val clusterService: ClusterService, + val settings: Settings, + val xContentRegistry: NamedXContentRegistry, + val namedWriteableRegistry: NamedWriteableRegistry, +) : HandledTransportAction( + AlertingActions.INDEX_COMMENT_ACTION_NAME, + transportService, + actionFilters, + ::IndexCommentRequest, +), + SecureTransportAction { + + @Volatile private var alertingCommentsEnabled = ALERTING_COMMENTS_ENABLED.get(settings) + @Volatile private var commentsMaxContentSize = COMMENTS_MAX_CONTENT_SIZE.get(settings) + @Volatile private var maxCommentsPerAlert = MAX_COMMENTS_PER_ALERT.get(settings) + @Volatile private var indexTimeout = INDEX_TIMEOUT.get(settings) + + @Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + + init { + clusterService.clusterSettings.addSettingsUpdateConsumer(ALERTING_COMMENTS_ENABLED) { alertingCommentsEnabled = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(COMMENTS_MAX_CONTENT_SIZE) { commentsMaxContentSize = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(MAX_COMMENTS_PER_ALERT) { maxCommentsPerAlert = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(INDEX_TIMEOUT) { indexTimeout = it } + listenFilterBySettingChange(clusterService) + } + + override fun doExecute( + task: Task, + request: ActionRequest, + actionListener: ActionListener, + ) { + // validate feature flag enabled + if (!alertingCommentsEnabled) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Comments for Alerting is currently disabled", RestStatus.FORBIDDEN), + ) + ) + return + } + + val transformedRequest = + request as? IndexCommentRequest + ?: recreateObject(request, namedWriteableRegistry) { + IndexCommentRequest(it) + } + + // validate comment content size + if (transformedRequest.content.length > commentsMaxContentSize) { + actionListener.onFailure( + AlertingException.wrap( + IllegalArgumentException("Comment content exceeds max length of $commentsMaxContentSize characters"), + ) + ) + return + } + + // validate the request is for the correct entity type + if (transformedRequest.entityType != "alert") { + actionListener.onFailure( + AlertingException.wrap( + IllegalArgumentException( + "Index comment request is for wrong entity type, expected alert, got ${transformedRequest.entityType}" + ) + ) + ) + return + } + + val user = readUserFromThreadContext(client) + + client.threadPool().threadContext.stashContext().use { + scope.launch { + IndexCommentHandler(client, actionListener, transformedRequest, user).start() + } + } + } + + inner class IndexCommentHandler( + private val client: Client, + private val actionListener: ActionListener, + private val request: IndexCommentRequest, + private val user: User?, + ) { + suspend fun start() { + commentsIndices.createOrUpdateInitialCommentsHistoryIndex() + if (request.method == RestRequest.Method.PUT) { + updateComment() + } else { + indexComment() + } + } + + private suspend fun indexComment() { + val alert = getAlert() ?: return + + val numCommentsOnThisAlert = CommentsUtils.getCommentIDsByAlertIDs(client, listOf(alert.id)).size + if (numCommentsOnThisAlert >= maxCommentsPerAlert) { + actionListener.onFailure( + AlertingException.wrap( + IllegalArgumentException( + "This request would create more than the allowed number of Comments" + + "for this Alert: $maxCommentsPerAlert" + ) + ) + ) + return + } + + log.debug("checking user permissions in index comment") + checkUserPermissionsWithResource(user, alert.monitorUser, actionListener, "monitor", alert.monitorId) + + val comment = Comment( + entityId = request.entityId, + entityType = request.entityType, + content = request.content, + createdTime = Instant.now(), + user = user + ) + + val indexRequest = + IndexRequest(COMMENTS_HISTORY_WRITE_INDEX) + .source(comment.toXContentWithUser(XContentFactory.jsonBuilder())) + .setIfSeqNo(request.seqNo) + .setIfPrimaryTerm(request.primaryTerm) + .timeout(indexTimeout) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + + log.debug("Creating new comment: ${comment.toXContentWithUser(XContentFactory.jsonBuilder())}") + + try { + val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) } + val failureReasons = checkShardsFailure(indexResponse) + if (failureReasons != null) { + actionListener.onFailure( + AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())), + ) + return + } + + actionListener.onResponse( + IndexCommentResponse(indexResponse.id, indexResponse.seqNo, indexResponse.primaryTerm, comment) + ) + } catch (t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + + private suspend fun updateComment() { + val currentComment = getComment() ?: return + + // check that the user has permissions to edit the comment. user can edit comment if + // - user is Admin + // - user is the author of the comment + if (user != null && !isAdmin(user) && user.name != currentComment.user?.name) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Comment can only be edited by Admin or author of comment", + RestStatus.FORBIDDEN, + ), + ), + ) + return + } + + // retains everything from the original comment except content and lastUpdatedTime + val requestComment = currentComment.copy(content = request.content, lastUpdatedTime = Instant.now()) + + val indexRequest = + IndexRequest(COMMENTS_HISTORY_WRITE_INDEX) + .source(requestComment.toXContentWithUser(XContentFactory.jsonBuilder())) + .id(requestComment.id) + .setIfSeqNo(request.seqNo) + .setIfPrimaryTerm(request.primaryTerm) + .timeout(indexTimeout) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + + log.debug( + "Updating comment, ${currentComment.id}, from: " + + "${currentComment.content} to: " + + requestComment.content, + ) + + try { + val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) } + val failureReasons = checkShardsFailure(indexResponse) + if (failureReasons != null) { + actionListener.onFailure( + AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())), + ) + return + } + + actionListener.onResponse( + IndexCommentResponse( + indexResponse.id, + indexResponse.seqNo, + indexResponse.primaryTerm, + requestComment, + ), + ) + } catch (t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + + private suspend fun getAlert(): Alert? { + // need to validate the existence of the Alert that user is trying to add Comment to. + // Also need to check if user has permissions to add a Comment to the passed in Alert. To do this, + // we retrieve the Alert to get its associated monitor user, and use that to + // check if they have permissions to the Monitor that generated the Alert + val queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("_id", listOf(request.entityId))) + val searchSourceBuilder = + SearchSourceBuilder() + .version(true) + .seqNoAndPrimaryTerm(true) + .query(queryBuilder) + + // search all alerts, since user might want to create a comment + // on a completed alert + val searchRequest = + SearchRequest() + .indices(AlertIndices.ALL_ALERT_INDEX_PATTERN) + .source(searchSourceBuilder) + + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + val alerts = searchResponse.hits.map { hit -> + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val alert = Alert.parse(xcp, hit.id, hit.version) + alert + } + + if (alerts.isEmpty()) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Alert not found", RestStatus.NOT_FOUND), + ) + ) + return null + } else if (alerts.size > 1) { + actionListener.onFailure( + AlertingException.wrap(IllegalStateException("Multiple alerts were found with the same ID")), + ) + return null + } + + return alerts[0] + } + + private suspend fun getComment(): Comment? { + // need to validate the existence of the Alert that user is trying to add Comment to. + // Also need to check if user has permissions to add a Comment to the passed in Alert. To do this, + // we retrieve the Alert to get its associated monitor user, and use that to + // check if they have permissions to the Monitor that generated the Alert + val queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("_id", listOf(request.commentId))) + val searchSourceBuilder = + SearchSourceBuilder() + .version(true) + .seqNoAndPrimaryTerm(true) + .query(queryBuilder) + + // search all alerts, since user might want to create a comment + // on a completed alert + val searchRequest = + SearchRequest() + .indices(CommentsIndices.ALL_COMMENTS_INDEX_PATTERN) + .source(searchSourceBuilder) + + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + val comments = searchResponse.hits.map { hit -> + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val comment = Comment.parse(xcp, hit.id) + comment + } + + if (comments.isEmpty()) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Comment not found", RestStatus.NOT_FOUND), + ), + ) + return null + } else if (comments.size > 1) { + actionListener.onFailure( + AlertingException.wrap(IllegalStateException("Multiple comments were found with the same ID")), + ) + return null + } + + return comments[0] + } + + private fun checkShardsFailure(response: IndexResponse): String? { + val failureReasons = StringBuilder() + if (response.shardInfo.failed > 0) { + response.shardInfo.failures.forEach { entry -> + failureReasons.append(entry.reason()) + } + return failureReasons.toString() + } + return null + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchAlertingCommentAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchAlertingCommentAction.kt new file mode 100644 index 000000000..2e6199726 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchAlertingCommentAction.kt @@ -0,0 +1,182 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionRequest +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.AlertingException +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.SearchCommentRequest +import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.Comment +import org.opensearch.commons.authuser.User +import org.opensearch.commons.utils.recreateObject +import org.opensearch.core.action.ActionListener +import org.opensearch.core.common.io.stream.NamedWriteableRegistry +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.query.BoolQueryBuilder +import org.opensearch.index.query.QueryBuilders +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import java.io.IOException + +private val log = LogManager.getLogger(TransportSearchAlertingCommentAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + +class TransportSearchAlertingCommentAction @Inject constructor( + transportService: TransportService, + val settings: Settings, + val client: Client, + clusterService: ClusterService, + actionFilters: ActionFilters, + val namedWriteableRegistry: NamedWriteableRegistry +) : HandledTransportAction( + AlertingActions.SEARCH_COMMENTS_ACTION_NAME, transportService, actionFilters, ::SearchRequest +), + SecureTransportAction { + + @Volatile private var alertingCommentsEnabled = AlertingSettings.ALERTING_COMMENTS_ENABLED.get(settings) + @Volatile override var filterByEnabled: Boolean = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + init { + clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.ALERTING_COMMENTS_ENABLED) { + alertingCommentsEnabled = it + } + listenFilterBySettingChange(clusterService) + } + + override fun doExecute(task: Task, request: ActionRequest, actionListener: ActionListener) { + // validate feature flag enabled + if (!alertingCommentsEnabled) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Comments for Alerting is currently disabled", RestStatus.FORBIDDEN), + ) + ) + return + } + + val transformedRequest = request as? SearchCommentRequest + ?: recreateObject(request, namedWriteableRegistry) { + SearchCommentRequest(it) + } + + val searchSourceBuilder = transformedRequest.searchRequest.source() + .seqNoAndPrimaryTerm(true) + .version(true) + val queryBuilder = if (searchSourceBuilder.query() == null) BoolQueryBuilder() + else QueryBuilders.boolQuery().must(searchSourceBuilder.query()) + + searchSourceBuilder.query(queryBuilder) + .seqNoAndPrimaryTerm(true) + .version(true) + + val user = readUserFromThreadContext(client) + client.threadPool().threadContext.stashContext().use { + scope.launch { + resolve(transformedRequest, actionListener, user) + } + } + } + + suspend fun resolve(searchCommentRequest: SearchCommentRequest, actionListener: ActionListener, user: User?) { + if (user == null) { + // user is null when: 1/ security is disabled. 2/when user is super-admin. + search(searchCommentRequest.searchRequest, actionListener) + } else if (!doFilterForUser(user)) { + // security is enabled and filterby is disabled. + search(searchCommentRequest.searchRequest, actionListener) + } else { + // security is enabled and filterby is enabled. + try { + log.debug("Filtering result by: {}", user.backendRoles) + + // first retrieve all Alert IDs current User can see after filtering by backend roles + val alertIDs = getFilteredAlertIDs(user) + + // then filter the returned Comments based on the Alert IDs they're allowed to see + val queryBuilder = searchCommentRequest.searchRequest.source().query() as BoolQueryBuilder + searchCommentRequest.searchRequest.source().query( + queryBuilder.filter( + QueryBuilders.termsQuery(Comment.ENTITY_ID_FIELD, alertIDs) + ) + ) + + search(searchCommentRequest.searchRequest, actionListener) + } catch (ex: IOException) { + actionListener.onFailure(AlertingException.wrap(ex)) + } + } + } + + fun search(searchRequest: SearchRequest, actionListener: ActionListener) { + client.search( + searchRequest, + object : ActionListener { + override fun onResponse(response: SearchResponse) { + actionListener.onResponse(response) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + ) + } + + // retrieve the IDs of all Alerts after filtering by current User's + // backend roles + private suspend fun getFilteredAlertIDs(user: User): List { + val queryBuilder = QueryBuilders + .boolQuery() + .filter(QueryBuilders.termsQuery("monitor_user.backend_roles.keyword", user.backendRoles)) + val searchSourceBuilder = + SearchSourceBuilder() + .version(true) + .seqNoAndPrimaryTerm(true) + .query(queryBuilder) + val searchRequest = SearchRequest() + .source(searchSourceBuilder) + .indices(ALL_ALERT_INDEX_PATTERN) + // .preference(Preference.PRIMARY_FIRST.type()) // expensive, be careful + + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + val alertIDs = searchResponse.hits.map { hit -> + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val alert = Alert.parse(xcp, hit.id, hit.version) + alert.id + } + + return alertIDs + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/CommentsUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/CommentsUtils.kt new file mode 100644 index 000000000..15cc50c27 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/CommentsUtils.kt @@ -0,0 +1,117 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.alerting.comments.CommentsIndices.Companion.ALL_COMMENTS_INDEX_PATTERN +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.client.Client +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.Comment +import org.opensearch.core.action.ActionListener +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.BulkByScrollResponse +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder +import org.opensearch.search.builder.SearchSourceBuilder +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +private val log = LogManager.getLogger(CommentsUtils::class.java) + +class CommentsUtils { + companion object { + // Searches through all Comments history indices and returns a list of all Comments associated + // with the Entities given by the list of Entity IDs + suspend fun getCommentsByAlertIDs(client: Client, alertIDs: List): List { + val queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("entity_id", alertIDs)) + val searchSourceBuilder = + SearchSourceBuilder() + .version(true) + .seqNoAndPrimaryTerm(true) + .query(queryBuilder) + + val searchRequest = + SearchRequest() + .indices(ALL_COMMENTS_INDEX_PATTERN) + .source(searchSourceBuilder) + + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + val comments = searchResponse.hits.map { hit -> + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val comment = Comment.parse(xcp, hit.id) + comment + } + + return comments + } + + // Identical to getCommentsByAlertIDs, just returns list of Comment IDs instead of list of Comment objects + suspend fun getCommentIDsByAlertIDs(client: Client, alertIDs: List): List { + val comments = getCommentsByAlertIDs(client, alertIDs) + return comments.map { it.id } + } + + /** + * Performs a Search request to retrieve the top maxComments most recent comments associated with the + * given Alert, where maxComments is a cluster setting. + */ + suspend fun getCommentsForAlertNotification( + client: Client, + alertIds: List, + maxComments: Int + ): Map> { + val allComments = getCommentsByAlertIDs(client, alertIds) + val sortedComments = allComments.sortedByDescending { it.createdTime } + val alertIdToComments = mutableMapOf>() + for (comment in sortedComments) { + if (!alertIdToComments.containsKey(comment.entityId)) { + alertIdToComments[comment.entityId] = mutableListOf() + } else if (alertIdToComments[comment.entityId]!!.size >= maxComments) { + continue + } + alertIdToComments[comment.entityId]!!.add(comment) + } + return alertIdToComments.mapValues { it.value.toList() } + } + + // Deletes all Comments given by the list of Comments IDs + suspend fun deleteComments(client: Client, commentIDs: List) { + if (commentIDs.isEmpty()) return + val deleteResponse: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(ALL_COMMENTS_INDEX_PATTERN) + .filter(QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("_id", commentIDs))) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) = cont.resumeWithException(t) + } + ) + } + deleteResponse.bulkFailures.forEach { + log.error("Failed to delete Comment. Comment ID: [${it.id}] cause: [${it.cause}] ") + } + } + + // TODO: make getCommentsByAlertID and getCommentIDsByAlertID + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index 41e4a7bbd..8e73128a8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -9,6 +9,7 @@ import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.comments.CommentsIndices import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.ClusterState @@ -35,6 +36,8 @@ class IndexUtils { private set var findingIndexSchemaVersion: Int private set + var alertingCommentIndexSchemaVersion: Int + private set var scheduledJobIndexUpdated: Boolean = false private set @@ -42,13 +45,17 @@ class IndexUtils { private set var findingIndexUpdated: Boolean = false private set + var commentsIndexUpdated: Boolean = false + private set var lastUpdatedAlertHistoryIndex: String? = null var lastUpdatedFindingHistoryIndex: String? = null + var lastUpdatedCommentsHistoryIndex: String? = null init { scheduledJobIndexSchemaVersion = getSchemaVersion(ScheduledJobIndices.scheduledJobMappings()) alertIndexSchemaVersion = getSchemaVersion(AlertIndices.alertMapping()) findingIndexSchemaVersion = getSchemaVersion(AlertIndices.findingMapping()) + alertingCommentIndexSchemaVersion = getSchemaVersion(CommentsIndices.commentsMapping()) } @JvmStatic @@ -66,6 +73,11 @@ class IndexUtils { findingIndexUpdated = true } + @JvmStatic + fun commentsIndexUpdated() { + commentsIndexUpdated = true + } + @JvmStatic fun getSchemaVersion(mapping: String): Int { val xcp = XContentType.JSON.xContent().createParser( diff --git a/alerting/src/main/resources/org/opensearch/alerting/comments/alerting_comments.json b/alerting/src/main/resources/org/opensearch/alerting/comments/alerting_comments.json new file mode 100644 index 000000000..967b6dfd2 --- /dev/null +++ b/alerting/src/main/resources/org/opensearch/alerting/comments/alerting_comments.json @@ -0,0 +1,56 @@ +{ + "dynamic": "false", + "properties": { + "entity_id": { + "type": "keyword" + }, + "content": { + "type": "text" + }, + "created_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "last_updated_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } + } + } +} \ No newline at end of file diff --git a/alerting/src/main/resources/org/opensearch/alerting/org.opensearch.alerting.txt b/alerting/src/main/resources/org/opensearch/alerting/org.opensearch.alerting.txt index bd1f94482..243f4cd60 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/org.opensearch.alerting.txt +++ b/alerting/src/main/resources/org/opensearch/alerting/org.opensearch.alerting.txt @@ -27,7 +27,7 @@ class org.opensearch.alerting.script.QueryLevelTriggerExecutionContext { List getResults() java.time.Instant getPeriodStart() java.time.Instant getPeriodEnd() - Alert getAlert() + AlertContext getAlertContext() Exception getError() } @@ -45,6 +45,10 @@ class org.opensearch.commons.alerting.model.QueryLevelTrigger { List getActions() } +class org.opensearch.alerting.model.AlertContext { + Alert getAlert() +} + class org.opensearch.commons.alerting.model.Alert { String getId() long getVersion() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 4d4c9ff6f..e3cb5121a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -14,6 +14,7 @@ import org.apache.http.message.BasicHeader import org.junit.AfterClass import org.junit.rules.DisableOnDebug import org.opensearch.action.search.SearchResponse +import org.opensearch.alerting.AlertingPlugin.Companion.COMMENTS_BASE_URI import org.opensearch.alerting.AlertingPlugin.Companion.EMAIL_ACCOUNT_BASE_URI import org.opensearch.alerting.AlertingPlugin.Companion.EMAIL_GROUP_BASE_URI import org.opensearch.alerting.alerts.AlertIndices @@ -45,6 +46,7 @@ import org.opensearch.commons.alerting.action.GetFindingsResponse import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.ChainedAlertTrigger +import org.opensearch.commons.alerting.model.Comment import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger @@ -56,6 +58,7 @@ import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.alerting.util.string +import org.opensearch.commons.authuser.User import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.ToXContent @@ -534,6 +537,40 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return alert.copy(id = alertJson["_id"] as String, version = (alertJson["_version"] as Int).toLong()) } + protected fun createAlertComment(alertId: String, content: String): Comment { + val createRequestBody = jsonBuilder() + .startObject() + .field(Comment.COMMENT_CONTENT_FIELD, content) + .endObject() + .string() + + val createResponse = client().makeRequest( + "POST", + "$COMMENTS_BASE_URI/$alertId", + StringEntity(createRequestBody, APPLICATION_JSON) + ) + + assertEquals("Unable to create a new alert", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val commentId = responseBody["_id"] as String + assertNotEquals("response is missing Id", Comment.NO_ID, commentId) + + val comment = responseBody["comment"] as Map<*, *> + + return Comment( + id = commentId, + entityId = comment["entity_id"] as String, + entityType = comment["entity_type"] as String, + content = comment["content"] as String, + createdTime = Instant.ofEpochMilli(comment["created_time"] as Long), + lastUpdatedTime = if (comment["last_updated_time"] != null) { + Instant.ofEpochMilli(comment["last_updated_time"] as Long) + } else null, + user = comment["user"]?.let { User(it as String, emptyList(), emptyList(), emptyList()) } + ) + } + protected fun createRandomMonitor(refresh: Boolean = false, withMetadata: Boolean = false): Monitor { val monitor = randomQueryLevelMonitor(withMetadata = withMetadata) val monitorId = createMonitor(monitor, refresh).id diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/NotesIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/NotesIndicesIT.kt new file mode 100644 index 000000000..dd04bf3f3 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/NotesIndicesIT.kt @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.alerts + +import org.opensearch.alerting.AlertingRestTestCase + +class NotesIndicesIT : AlertingRestTestCase() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/AlertingCommentsRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/AlertingCommentsRestApiIT.kt new file mode 100644 index 000000000..5527d3531 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/AlertingCommentsRestApiIT.kt @@ -0,0 +1,153 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.opensearch.alerting.AlertingPlugin.Companion.COMMENTS_BASE_URI +import org.opensearch.alerting.AlertingRestTestCase +import org.opensearch.alerting.makeRequest +import org.opensearch.alerting.randomAlert +import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERTING_COMMENTS_ENABLED +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.Comment.Companion.COMMENT_CONTENT_FIELD +import org.opensearch.commons.alerting.util.string +import org.opensearch.core.rest.RestStatus +import org.opensearch.index.query.QueryBuilders +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.test.junit.annotations.TestLogging +import java.util.concurrent.TimeUnit + +@TestLogging("level:DEBUG", reason = "Debug for tests.") +@Suppress("UNCHECKED_CAST") +class AlertingCommentsRestApiIT : AlertingRestTestCase() { + + fun `test creating comment`() { + client().updateSettings(ALERTING_COMMENTS_ENABLED.key, "true") + + val monitor = createRandomMonitor(refresh = true) + val alert = createAlert(randomAlert(monitor).copy(state = Alert.State.ACTIVE)) + val alertId = alert.id + val commentContent = "test comment" + + val comment = createAlertComment(alertId, commentContent) + + assertEquals("Comment does not have correct content", commentContent, comment.content) + assertEquals("Comment does not have correct alert ID", alertId, comment.entityId) + } + + fun `test updating comment`() { + client().updateSettings(ALERTING_COMMENTS_ENABLED.key, "true") + + val monitor = createRandomMonitor(refresh = true) + val alert = createAlert(randomAlert(monitor).copy(state = Alert.State.ACTIVE)) + val alertId = alert.id + val commentContent = "test comment" + + val commentId = createAlertComment(alertId, commentContent).id + + val updateContent = "updated comment" + val updateRequestBody = XContentFactory.jsonBuilder() + .startObject() + .field(COMMENT_CONTENT_FIELD, updateContent) + .endObject() + .string() + + val updateResponse = client().makeRequest( + "PUT", + "$COMMENTS_BASE_URI/$commentId", + StringEntity(updateRequestBody, ContentType.APPLICATION_JSON) + ) + + assertEquals("Update comment failed", RestStatus.OK, updateResponse.restStatus()) + + val updateResponseBody = updateResponse.asMap() + + val comment = updateResponseBody["comment"] as Map<*, *> + val actualContent = comment["content"] as String + assertEquals("Comment does not have correct content after update", updateContent, actualContent) + } + + fun `test searching single comment by alert id`() { + client().updateSettings(ALERTING_COMMENTS_ENABLED.key, "true") + + val monitor = createRandomMonitor(refresh = true) + val alert = createAlert(randomAlert(monitor).copy(state = Alert.State.ACTIVE)) + val alertId = alert.id + val commentContent = "test comment" + + createAlertComment(alertId, commentContent) + + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 3, TimeUnit.SECONDS) + + val search = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).toString() + val searchResponse = client().makeRequest( + "GET", + "$COMMENTS_BASE_URI/_search", + StringEntity(search, ContentType.APPLICATION_JSON) + ) + + val xcp = createParser(XContentType.JSON.xContent(), searchResponse.entity.content) + val hits = xcp.map()["hits"]!! as Map> + logger.info("hits: $hits") + val numberDocsFound = hits["total"]?.get("value") + assertEquals("No Comments found", 1, numberDocsFound) + + val searchHits = hits["hits"] as List<*> + val hit = searchHits[0] as Map<*, *> + val commentHit = hit["_source"] as Map<*, *> + assertEquals("returned Comment does not match alert id in search query", alertId, commentHit["entity_id"]) + assertEquals("returned Comment does not have expected content", commentContent, commentHit["content"]) + } + + fun `test deleting comments`() { + client().updateSettings(ALERTING_COMMENTS_ENABLED.key, "true") + + val monitor = createRandomMonitor(refresh = true) + val alert = createAlert(randomAlert(monitor).copy(state = Alert.State.ACTIVE)) + val alertId = alert.id + val commentContent = "test comment" + + val commentId = createAlertComment(alertId, commentContent).id + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 3, TimeUnit.SECONDS) + + val deleteResponse = client().makeRequest( + "DELETE", + "$COMMENTS_BASE_URI/$commentId" + ) + + assertEquals("Delete comment failed", RestStatus.OK, deleteResponse.restStatus()) + + val deleteResponseBody = deleteResponse.asMap() + + val deletedCommentId = deleteResponseBody["_id"] as String + assertEquals("Deleted Comment ID does not match Comment ID in delete request", commentId, deletedCommentId) + } + + // TODO: test list + /* + create comment with empty content should fail + create without alert id should fail + update without comment id should fail + search comments across multiple alerts + (belongs in CommentsIT) create comment thats too large based on cluster setting should fail + create comment on alert that alrdy has max comments based on cluster setting should fail + create comment on alert user doesn't have backend roles to view should fail + search comment on alert user doesn't have backend roles to view should fail + comments are shown in notifications for query monitor + comments are shown in notifications for bucket monitor + (belongs in CommentsIT) update comment that user didn't author should fail + (belongs in CommentsIT) delete comment that user didn't author should fail + (belongs in CommentsIT) update comment that user didn't author but user is Admin should succeed + */ +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureAlertingNotesRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureAlertingNotesRestApiIT.kt new file mode 100644 index 000000000..a16f92d3b --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureAlertingNotesRestApiIT.kt @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.opensearch.alerting.AlertingRestTestCase + +class SecureAlertingNotesRestApiIT : AlertingRestTestCase() diff --git a/release-notes/opensearch-alerting.release-notes-2.15.0.0.md b/release-notes/opensearch-alerting.release-notes-2.15.0.0.md index 3f2fed10c..d933c3413 100644 --- a/release-notes/opensearch-alerting.release-notes-2.15.0.0.md +++ b/release-notes/opensearch-alerting.release-notes-2.15.0.0.md @@ -10,6 +10,7 @@ Compatible with OpenSearch 2.15.0 ### Enhancements * Add start_time and end_time to GetAlertsRequest. ([#1551](https://github.com/opensearch-project/alerting/pull/1551)) +* Add Alerting Comments experimental feature ([#1561](https://github.com/opensearch-project/alerting/pull/1561)) ### Documentation * Added 2.15 release notes. ([#1569](https://github.com/opensearch-project/alerting/pull/1569)) \ No newline at end of file