Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate notifications backend #129

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion reports-scheduler/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ integTest {

if (System.getProperty("tests.clustername") != null) {
exclude 'org/opensearch/reportsscheduler/ReportsSchedulerPluginIT.class'
} else {
// assuming that opensearch-notifications will be installed when running integTest against a remote cluster
exclude 'org/opensearch/reportsscheduler/rest/ReportWithNotificationIT.class'
}
}

Expand All @@ -247,7 +250,7 @@ integTest.dependsOn(bundle)
integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))}

testClusters.integTest {
testDistribution = "ARCHIVE"
testDistribution = "INTEG_TEST"
// need to install job-scheduler first, need to assemble job-scheduler first
plugin(provider(new Callable<RegularFile>(){
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
package org.opensearch.reportsscheduler

import org.opensearch.OpenSearchStatusException
import org.opensearch.jobscheduler.spi.JobSchedulerExtension
import org.opensearch.jobscheduler.spi.ScheduledJobParser
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
Expand Down Expand Up @@ -58,6 +59,7 @@ import org.opensearch.reportsscheduler.settings.PluginSettings
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.service.ClusterService
Expand All @@ -72,9 +74,11 @@ import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.repositories.RepositoriesService
import org.opensearch.rest.RestController
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestStatus
import org.opensearch.script.ScriptService
import org.opensearch.threadpool.ThreadPool
import org.opensearch.watcher.ResourceWatcherService
Expand Down Expand Up @@ -122,6 +126,12 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension {
PluginSettings.addSettingsUpdateConsumer(clusterService)
ReportDefinitionsIndex.initialize(client, clusterService)
ReportInstancesIndex.initialize(client, clusterService)
(client as? NodeClient)?.let { NotificationsActions.initialize(it) } ?: run {
throw OpenSearchStatusException(
"Unable to cast client to NodeClient for Notifications call",
RestStatus.INTERNAL_SERVER_ERROR
)
}
return emptyList()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.client.Client
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.indices.InvalidIndexNameException
Expand Down Expand Up @@ -73,20 +74,29 @@ abstract class PluginBaseAction<Request : ActionRequest, Response : ActionRespon
request: Request,
listener: ActionListener<Response>
) {
val userStr: String? = client.threadPool().threadContext.getTransient<String>(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
val userStr: String? =
client.threadPool().threadContext.getTransient<String>(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
val user: User? = User.parse(userStr)
val storedThreadContext = client.threadPool().threadContext.newStoredContext(false)
scope.launch {
try {
listener.onResponse(executeRequest(request, user))
client.threadPool().threadContext.stashContext().use {
storedThreadContext.restore()
listener.onResponse(executeRequest(request, user))
}
} catch (exception: OpenSearchStatusException) {
Metrics.REPORT_EXCEPTIONS_ES_STATUS_EXCEPTION.counter.increment()
log.warn("$LOG_PREFIX:OpenSearchStatusException: message:${exception.message}")
listener.onFailure(exception)
} catch (exception: OpenSearchSecurityException) {
Metrics.REPORT_EXCEPTIONS_ES_SECURITY_EXCEPTION.counter.increment()
log.warn("$LOG_PREFIX:OpenSearchSecurityException:", exception)
listener.onFailure(OpenSearchStatusException("Permissions denied: ${exception.message} - Contact administrator",
RestStatus.FORBIDDEN))
listener.onFailure(
OpenSearchStatusException(
"Permissions denied: ${exception.message} - Contact administrator",
RestStatus.FORBIDDEN
)
)
} catch (exception: VersionConflictEngineException) {
Metrics.REPORT_EXCEPTIONS_VERSION_CONFLICT_ENGINE_EXCEPTION.counter.increment()
log.warn("$LOG_PREFIX:VersionConflictEngineException:", exception)
Expand Down Expand Up @@ -125,4 +135,43 @@ abstract class PluginBaseAction<Request : ActionRequest, Response : ActionRespon
* @return the response to return.
*/
abstract fun executeRequest(request: Request, user: User?): Response

/**
* Executes the given [block] function on this resource and then closes it down correctly whether an exception
* is thrown or not.
*
* In case if the resource is being closed due to an exception occurred in [block], and the closing also fails with an exception,
* the latter is added to the [suppressed][java.lang.Throwable.addSuppressed] exceptions of the former.
*
* @param block a function to process this [AutoCloseable] resource.
* @return the result of [block] function invoked on this resource.
*/
@Suppress("TooGenericExceptionCaught")
private inline fun <T : ThreadContext.StoredContext, R> T.use(block: (T) -> R): R {
var exception: Throwable? = null
try {
return block(this)
} catch (e: Throwable) {
exception = e
throw e
} finally {
closeFinally(exception)
}
}

/**
* Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when
* it's being closed due to some other [cause] exception occurred.
*
* The suppressed exception is added to the list of suppressed exceptions of [cause] exception.
*/
@Suppress("TooGenericExceptionCaught")
private fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) {
null -> close()
else -> try {
close()
} catch (closeException: Throwable) {
cause.addSuppressed(closeException)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

package org.opensearch.reportsscheduler.action

import org.opensearch.OpenSearchStatusException
import org.opensearch.commons.authuser.User
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.index.ReportDefinitionsIndex
Expand All @@ -45,10 +46,10 @@ import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.model.ReportInstance.Status
import org.opensearch.reportsscheduler.model.UpdateReportInstanceStatusRequest
import org.opensearch.reportsscheduler.model.UpdateReportInstanceStatusResponse
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.reportsscheduler.security.UserAccessManager
import org.opensearch.reportsscheduler.settings.PluginSettings
import org.opensearch.reportsscheduler.util.logger
import org.opensearch.OpenSearchStatusException
import org.opensearch.rest.RestStatus
import java.time.Instant
import kotlin.random.Random
Expand Down Expand Up @@ -125,6 +126,8 @@ internal object ReportInstanceActions {
Metrics.REPORT_FROM_DEFINITION_ID_SYSTEM_ERROR.counter.increment()
throw OpenSearchStatusException("Report Instance Creation failed", RestStatus.INTERNAL_SERVER_ERROR)
}
if (reportDefinitionDetails.reportDefinition.delivery != null)
NotificationsActions.send(reportDefinitionDetails.reportDefinition.delivery, docId)
val reportInstanceCopy = reportInstance.copy(id = docId)
return OnDemandReportCreateResponse(reportInstanceCopy, UserAccessManager.hasAllInfoAccess(user))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.reportsscheduler.notifications

import org.opensearch.action.ActionListener
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.notifications.NotificationConstants.FEATURE_REPORTS
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.SendNotificationResponse
import org.opensearch.commons.notifications.model.ChannelMessage
import org.opensearch.commons.notifications.model.EventSource
import org.opensearch.commons.notifications.model.SeverityType
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.model.CreateReportDefinitionResponse
import org.opensearch.reportsscheduler.model.ReportDefinition
import org.opensearch.reportsscheduler.util.logger

/**
* Report definitions index operation actions.
*/
internal object NotificationsActions {
private val log by logger(NotificationsActions::class.java)

private lateinit var client: NodeClient

/**
* Initialize the class
* @param client NodeClient for transport call
*/
fun initialize(client: NodeClient) {
this.client = client
}

/**
* Send notifications based on delivery parameter
* @param delivery [ReportDefinition.Delivery] object
* @return [CreateReportDefinitionResponse]
*/
fun send(delivery: ReportDefinition.Delivery, referenceId: String) {
log.info("$LOG_PREFIX:NotificationsActions-send")
NotificationsPluginInterface.sendNotification(
client,
EventSource(delivery.title, referenceId, FEATURE_REPORTS, SeverityType.INFO),
ChannelMessage(delivery.textDescription, delivery.htmlDescription, null),
delivery.configIds,
object : ActionListener<SendNotificationResponse> {
override fun onResponse(sendNotificationResponse: SendNotificationResponse) {
log.info("$LOG_PREFIX:NotificationsActions-send:$sendNotificationResponse")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please record response and also add metrics around success and failure

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it in another PR, merging this to unblock others

}

override fun onFailure(exception: Exception) {
log.error("$LOG_PREFIX:NotificationsActions-send Error:$exception")
}
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREF
import org.opensearch.reportsscheduler.index.ReportInstancesIndex
import org.opensearch.reportsscheduler.model.ReportDefinitionDetails
import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.reportsscheduler.util.logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -68,6 +69,8 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner {
log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance")
} else {
log.info("$LOG_PREFIX:runJob-created job:$id")
if (reportDefinitionDetails.reportDefinition.delivery != null)
NotificationsActions.send(reportDefinitionDetails.reportDefinition.delivery, id)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,29 @@ import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.apache.http.message.BasicHeader
import org.apache.http.ssl.SSLContextBuilder
import org.junit.After
import org.junit.AfterClass
import org.junit.Before
import org.opensearch.client.Request
import org.opensearch.client.RequestOptions
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
import org.opensearch.client.RestClient
import org.opensearch.client.RestClientBuilder
import org.opensearch.client.WarningFailureException
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.common.xcontent.DeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.test.rest.OpenSearchRestTestCase
import org.junit.After
import org.junit.AfterClass
import org.junit.Before
import java.io.BufferedReader
import java.io.IOException
import java.io.InputStreamReader
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Path
import java.nio.charset.StandardCharsets
import java.security.cert.X509Certificate
import javax.management.MBeanServerInvocationHandler
import javax.management.ObjectName
Expand Down Expand Up @@ -184,6 +185,8 @@ abstract class PluginRestTestCase : OpenSearchRestTestCase() {
client().performRequest(request)
} catch (exception: ResponseException) {
exception.response
} catch (exception: WarningFailureException) {
exception.response
}
if (expectedRestStatus != null) {
assertEquals(expectedRestStatus, response.statusLine.statusCode)
Expand Down Expand Up @@ -275,10 +278,10 @@ abstract class PluginRestTestCase : OpenSearchRestTestCase() {
val serverUrl = "service:jmx:rmi:///jndi/rmi://127.0.0.1:7777/jmxrmi"
JMXConnectorFactory.connect(JMXServiceURL(serverUrl)).use { connector ->
val proxy = MBeanServerInvocationHandler.newProxyInstance(
connector.mBeanServerConnection,
ObjectName("org.jacoco:type=Runtime"),
IProxy::class.java,
false
connector.mBeanServerConnection,
ObjectName("org.jacoco:type=Runtime"),
IProxy::class.java,
false
)
proxy.getExecutionData(false)?.let {
val path = Path.of("$jacocoBuildPath/integTest.exec")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ fun constructReportDefinitionRequest(
"triggerType":"OnDemand"
},
""".trimIndent(),
name: String = "report_definition"
name: String = "report_definition",
delivery: String = ""
): String {
return """
{
Expand All @@ -54,21 +55,14 @@ fun constructReportDefinitionRequest(
"origin":"localhost:5601",
"id":"id"
},
$trigger
$delivery
"format":{
"duration":"PT1H",
"fileFormat":"Pdf",
"limit":1000,
"header":"optional header",
"footer":"optional footer"
},
$trigger
joshuali925 marked this conversation as resolved.
Show resolved Hide resolved
"delivery":{
"recipients":["nobody@email.com"],
"deliveryFormat":"LinkOnly",
"title":"title",
"textDescription":"textDescription",
"htmlDescription":"optional htmlDescription",
"configIds":["optional_configIds"]
}
}
}
Expand Down
Loading