Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate notifications backend #129

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion reports-scheduler/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ integTest {

if (System.getProperty("tests.clustername") != null) {
exclude 'org/opensearch/reportsscheduler/ReportsSchedulerPluginIT.class'
} else {
// assuming that opensearch-notifications will be installed when running integTest against a remote cluster
exclude 'org/opensearch/reportsscheduler/rest/ReportWithNotificationIT.class'
}
}

Expand All @@ -247,7 +250,7 @@ integTest.dependsOn(bundle)
integTest.getClusters().forEach{c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))}

testClusters.integTest {
testDistribution = "ARCHIVE"
testDistribution = "INTEG_TEST"
// need to install job-scheduler first, need to assemble job-scheduler first
plugin(provider(new Callable<RegularFile>(){
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
package org.opensearch.reportsscheduler

import org.opensearch.OpenSearchStatusException
import org.opensearch.jobscheduler.spi.JobSchedulerExtension
import org.opensearch.jobscheduler.spi.ScheduledJobParser
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
Expand Down Expand Up @@ -55,6 +56,7 @@ import org.opensearch.reportsscheduler.settings.PluginSettings
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.node.DiscoveryNodes
import org.opensearch.cluster.service.ClusterService
Expand All @@ -69,9 +71,11 @@ import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.repositories.RepositoriesService
import org.opensearch.rest.RestController
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestStatus
import org.opensearch.script.ScriptService
import org.opensearch.threadpool.ThreadPool
import org.opensearch.watcher.ResourceWatcherService
Expand Down Expand Up @@ -118,6 +122,12 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension {
PluginSettings.addSettingsUpdateConsumer(clusterService)
ReportDefinitionsIndex.initialize(client, clusterService)
ReportInstancesIndex.initialize(client, clusterService)
(client as? NodeClient)?.let { NotificationsActions.initialize(it) } ?: run {
throw OpenSearchStatusException(
"Unable to cast client to NodeClient for Notifications call",
RestStatus.INTERNAL_SERVER_ERROR
)
}
return emptyList()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.client.Client
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.indices.InvalidIndexNameException
Expand Down Expand Up @@ -73,20 +74,29 @@ abstract class PluginBaseAction<Request : ActionRequest, Response : ActionRespon
request: Request,
listener: ActionListener<Response>
) {
val userStr: String? = client.threadPool().threadContext.getTransient<String>(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
val userStr: String? =
client.threadPool().threadContext.getTransient<String>(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
val user: User? = User.parse(userStr)
val storedThreadContext = client.threadPool().threadContext.newStoredContext(false)
scope.launch {
try {
listener.onResponse(executeRequest(request, user))
client.threadPool().threadContext.stashContext().use {
storedThreadContext.restore()
listener.onResponse(executeRequest(request, user))
}
} catch (exception: OpenSearchStatusException) {
Metrics.REPORT_EXCEPTIONS_ES_STATUS_EXCEPTION.counter.increment()
log.warn("$LOG_PREFIX:OpenSearchStatusException: message:${exception.message}")
listener.onFailure(exception)
} catch (exception: OpenSearchSecurityException) {
Metrics.REPORT_EXCEPTIONS_ES_SECURITY_EXCEPTION.counter.increment()
log.warn("$LOG_PREFIX:OpenSearchSecurityException:", exception)
listener.onFailure(OpenSearchStatusException("Permissions denied: ${exception.message} - Contact administrator",
RestStatus.FORBIDDEN))
listener.onFailure(
OpenSearchStatusException(
"Permissions denied: ${exception.message} - Contact administrator",
RestStatus.FORBIDDEN
)
)
} catch (exception: VersionConflictEngineException) {
Metrics.REPORT_EXCEPTIONS_VERSION_CONFLICT_ENGINE_EXCEPTION.counter.increment()
log.warn("$LOG_PREFIX:VersionConflictEngineException:", exception)
Expand Down Expand Up @@ -125,4 +135,43 @@ abstract class PluginBaseAction<Request : ActionRequest, Response : ActionRespon
* @return the response to return.
*/
abstract fun executeRequest(request: Request, user: User?): Response

/**
* Executes the given [block] function on this resource and then closes it down correctly whether an exception
* is thrown or not.
*
* In case if the resource is being closed due to an exception occurred in [block], and the closing also fails with an exception,
* the latter is added to the [suppressed][java.lang.Throwable.addSuppressed] exceptions of the former.
*
* @param block a function to process this [AutoCloseable] resource.
* @return the result of [block] function invoked on this resource.
*/
@Suppress("TooGenericExceptionCaught")
private inline fun <T : ThreadContext.StoredContext, R> T.use(block: (T) -> R): R {
var exception: Throwable? = null
try {
return block(this)
} catch (e: Throwable) {
exception = e
throw e
} finally {
closeFinally(exception)
}
}

/**
* Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when
* it's being closed due to some other [cause] exception occurred.
*
* The suppressed exception is added to the list of suppressed exceptions of [cause] exception.
*/
@Suppress("TooGenericExceptionCaught")
private fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) {
null -> close()
else -> try {
close()
} catch (closeException: Throwable) {
cause.addSuppressed(closeException)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

package org.opensearch.reportsscheduler.action

import org.opensearch.OpenSearchStatusException
import org.opensearch.commons.authuser.User
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.index.ReportDefinitionsIndex
Expand All @@ -44,9 +45,9 @@ import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.model.ReportInstance.Status
import org.opensearch.reportsscheduler.model.UpdateReportInstanceStatusRequest
import org.opensearch.reportsscheduler.model.UpdateReportInstanceStatusResponse
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.reportsscheduler.security.UserAccessManager
import org.opensearch.reportsscheduler.util.logger
import org.opensearch.OpenSearchStatusException
import org.opensearch.rest.RestStatus
import java.time.Instant

Expand Down Expand Up @@ -122,6 +123,8 @@ internal object ReportInstanceActions {
Metrics.REPORT_FROM_DEFINITION_ID_SYSTEM_ERROR.counter.increment()
throw OpenSearchStatusException("Report Instance Creation failed", RestStatus.INTERNAL_SERVER_ERROR)
}
if (reportDefinitionDetails.reportDefinition.delivery != null)
NotificationsActions.send(reportDefinitionDetails.reportDefinition.delivery, docId)
val reportInstanceCopy = reportInstance.copy(id = docId)
return OnDemandReportCreateResponse(reportInstanceCopy, true)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.reportsscheduler.notifications

import org.opensearch.action.ActionListener
import org.opensearch.client.node.NodeClient
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.commons.ConfigConstants
import org.opensearch.commons.notifications.NotificationConstants.FEATURE_REPORTS
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.SendNotificationResponse
import org.opensearch.commons.notifications.model.ChannelMessage
import org.opensearch.commons.notifications.model.EventSource
import org.opensearch.commons.notifications.model.SeverityType
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.model.CreateReportDefinitionResponse
import org.opensearch.reportsscheduler.model.ReportDefinition
import org.opensearch.reportsscheduler.util.logger

/**
* Report definitions index operation actions.
*/
internal object NotificationsActions {
private val log by logger(NotificationsActions::class.java)

private lateinit var client: NodeClient

/**
* Initialize the class
* @param client NodeClient for transport call
*/
fun initialize(client: NodeClient) {
this.client = client
}

/**
* Send notifications based on delivery parameter
* @param delivery [ReportDefinition.Delivery] object
* @param referenceId [String] object
* @return [CreateReportDefinitionResponse]
*/
fun send(delivery: ReportDefinition.Delivery, referenceId: String): SendNotificationResponse? {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/opensearch-project/dashboards-reports/blob/371f1d18855de49688379634fa867f3f8eaef8e4/reports-scheduler/src/main/kotlin/org/opensearch/reportsscheduler/model/ReportDefinition.kt#L53

I think by combining the origin + path("_plugin/reporting/report/") + refernceId we can get the link and add to the notification message

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhongnansu the origin will always be localhost right? it's used to access UI internally, but should not be sent to user as a report link? Also this doesn't seem to include basePath

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt that it's always the localhost, let me check

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see, how is origin used in reporting? I checked it doens't seem to include basepath, not sure if something breaks if we append basepath

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/opensearch-project/dashboards-reports/blob/371f1d18855de49688379634fa867f3f8eaef8e4/dashboards-reports/server/routes/report.ts#L72

It gets origin from the request headers, which should be good. It's not used to access UI internally, it was used for the old implementation of notification where we using polling to get job, and compose the link at kibana side, then send to user.

But I agreed that it's still missing the basePath. will start work on the solution after this PR is merged

return send(delivery, referenceId, "")
}

/**
* Send notifications based on delivery parameter
* @param delivery [ReportDefinition.Delivery] object
* @param referenceId [String] object
* @param userStr [String] object,
* @return [CreateReportDefinitionResponse]
*/
fun send(delivery: ReportDefinition.Delivery, referenceId: String, userStr: String?): SendNotificationResponse? {
if (userStr.isNullOrEmpty()) {
return sendNotificationHelper(delivery, referenceId)
}

var sendNotificationResponse: SendNotificationResponse? = null
client.threadPool().threadContext.stashContext().use {
client.threadPool().threadContext.putTransient(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT,
userStr
)
sendNotificationResponse = sendNotificationHelper(delivery, referenceId)
}
return sendNotificationResponse
}

private fun sendNotificationHelper(
delivery: ReportDefinition.Delivery,
referenceId: String
): SendNotificationResponse? {
log.info("$LOG_PREFIX:NotificationsActions-send")
var sendNotificationResponse: SendNotificationResponse? = null
NotificationsPluginInterface.sendNotification(
client,
EventSource(delivery.title, referenceId, FEATURE_REPORTS, SeverityType.INFO),
ChannelMessage(delivery.textDescription, delivery.htmlDescription, null),
delivery.configIds,
object : ActionListener<SendNotificationResponse> {
override fun onResponse(response: SendNotificationResponse) {
sendNotificationResponse = response
log.info("$LOG_PREFIX:NotificationsActions-send:$sendNotificationResponse")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please record response and also add metrics around success and failure

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it in another PR, merging this to unblock others

}

override fun onFailure(exception: Exception) {
log.error("$LOG_PREFIX:NotificationsActions-send Error:$exception")
}
}
)
return sendNotificationResponse
}

/**
* Executes the given [block] function on this resource and then closes it down correctly whether an exception
* is thrown or not.
*
* In case if the resource is being closed due to an exception occurred in [block], and the closing also fails with an exception,
* the latter is added to the [suppressed][java.lang.Throwable.addSuppressed] exceptions of the former.
*
* @param block a function to process this [AutoCloseable] resource.
* @return the result of [block] function invoked on this resource.
*/
@Suppress("TooGenericExceptionCaught")
private inline fun <T : ThreadContext.StoredContext, R> T.use(block: (T) -> R): R {
var exception: Throwable? = null
try {
return block(this)
} catch (e: Throwable) {
exception = e
throw e
} finally {
closeFinally(exception)
}
}

/**
* Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when
* it's being closed due to some other [cause] exception occurred.
*
* The suppressed exception is added to the list of suppressed exceptions of [cause] exception.
*/
@Suppress("TooGenericExceptionCaught")
private fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) {
null -> close()
else -> try {
close()
} catch (closeException: Throwable) {
cause.addSuppressed(closeException)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@

package org.opensearch.reportsscheduler.scheduler

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.index.ReportInstancesIndex
import org.opensearch.reportsscheduler.model.ReportDefinitionDetails
import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.reportsscheduler.security.UserAccessManager
import org.opensearch.reportsscheduler.util.logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import java.time.Instant

internal object ReportDefinitionJobRunner : ScheduledJobRunner {
Expand Down Expand Up @@ -68,6 +70,11 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner {
log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance")
} else {
log.info("$LOG_PREFIX:runJob-created job:$id")
if (reportDefinitionDetails.reportDefinition.delivery != null) {
val user = UserAccessManager.getUserFromAccess(job.access)
val userStr = user?.let { it.toString() } ?: ""
NotificationsActions.send(reportDefinitionDetails.reportDefinition.delivery, id, userStr)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,19 @@ internal object UserAccessManager {
fun validateUser(user: User?) {
if (isUserPrivateTenant(user) && user?.name == null) {
Metrics.REPORT_PERMISSION_USER_ERROR.counter.increment()
throw OpenSearchStatusException("User name not provided for private tenant access",
RestStatus.FORBIDDEN)
throw OpenSearchStatusException(
"User name not provided for private tenant access",
RestStatus.FORBIDDEN
)
}
if (PluginSettings.isRbacEnabled()) {
// backend roles must be present
if (user?.backendRoles.isNullOrEmpty()) {
Metrics.REPORT_PERMISSION_USER_ERROR.counter.increment()
throw OpenSearchStatusException("User doesn't have backend roles configured. Contact administrator.",
RestStatus.FORBIDDEN)
throw OpenSearchStatusException(
"User doesn't have backend roles configured. Contact administrator.",
RestStatus.FORBIDDEN
)
}
}
}
Expand Down Expand Up @@ -96,6 +100,19 @@ internal object UserAccessManager {
return retList
}

/**
* Get user object from all user access info.
*/
fun getUserFromAccess(access: List<String>): User? {
if (access.isNullOrEmpty()) {
return null
}
val name = access.find { it.startsWith(USER_TAG) }?.substring(USER_TAG.length)
val backendRoles = access.filter { it.startsWith(ROLE_TAG) }.map { it.substring(ROLE_TAG.length) }
val roles = access.filter { it.startsWith(BACKEND_ROLE_TAG) }.map { it.substring(BACKEND_ROLE_TAG.length) }
return User(name, backendRoles, roles, listOf())
}

/**
* Get access info for search filtering
*/
Expand Down
Loading