Skip to content

Commit

Permalink
Preserve thread context in coroutine
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Li <joshuali925@gmail.com>
  • Loading branch information
joshuali925 committed Aug 10, 2021
1 parent aeffad9 commit 3271b58
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.client.Client
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.indices.InvalidIndexNameException
Expand Down Expand Up @@ -73,20 +74,29 @@ abstract class PluginBaseAction<Request : ActionRequest, Response : ActionRespon
request: Request,
listener: ActionListener<Response>
) {
val userStr: String? = client.threadPool().threadContext.getTransient<String>(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
val userStr: String? =
client.threadPool().threadContext.getTransient<String>(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
val user: User? = User.parse(userStr)
val storedThreadContext = client.threadPool().threadContext.newStoredContext(false)
scope.launch {
try {
listener.onResponse(executeRequest(request, user))
client.threadPool().threadContext.stashContext().use {
storedThreadContext.restore()
listener.onResponse(executeRequest(request, user))
}
} catch (exception: OpenSearchStatusException) {
Metrics.REPORT_EXCEPTIONS_ES_STATUS_EXCEPTION.counter.increment()
log.warn("$LOG_PREFIX:OpenSearchStatusException: message:${exception.message}")
listener.onFailure(exception)
} catch (exception: OpenSearchSecurityException) {
Metrics.REPORT_EXCEPTIONS_ES_SECURITY_EXCEPTION.counter.increment()
log.warn("$LOG_PREFIX:OpenSearchSecurityException:", exception)
listener.onFailure(OpenSearchStatusException("Permissions denied: ${exception.message} - Contact administrator",
RestStatus.FORBIDDEN))
listener.onFailure(
OpenSearchStatusException(
"Permissions denied: ${exception.message} - Contact administrator",
RestStatus.FORBIDDEN
)
)
} catch (exception: VersionConflictEngineException) {
Metrics.REPORT_EXCEPTIONS_VERSION_CONFLICT_ENGINE_EXCEPTION.counter.increment()
log.warn("$LOG_PREFIX:VersionConflictEngineException:", exception)
Expand Down Expand Up @@ -125,4 +135,44 @@ abstract class PluginBaseAction<Request : ActionRequest, Response : ActionRespon
* @return the response to return.
*/
abstract fun executeRequest(request: Request, user: User?): Response

/**
* Executes the given [block] function on this resource and then closes it down correctly whether an exception
* is thrown or not.
*
* In case if the resource is being closed due to an exception occurred in [block], and the closing also fails with an exception,
* the latter is added to the [suppressed][java.lang.Throwable.addSuppressed] exceptions of the former.
*
* @param block a function to process this [AutoCloseable] resource.
* @return the result of [block] function invoked on this resource.
*/
@Suppress("TooGenericExceptionCaught")
private inline fun <T : ThreadContext.StoredContext, R> T.use(block: (T) -> R): R {
var exception: Throwable? = null
try {
return block(this)
} catch (e: Throwable) {
exception = e
throw e
} finally {
closeFinally(exception)
}
}

/**
* Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when
* it's being closed due to some other [cause] exception occurred.
*
* The suppressed exception is added to the list of suppressed exceptions of [cause] exception.
*/
@Suppress("TooGenericExceptionCaught")
private fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) {
null -> close()
else -> try {
close()
} catch (closeException: Throwable) {
cause.addSuppressed(closeException)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ internal object NotificationsActions {
object : ActionListener<SendNotificationResponse> {
override fun onResponse(p0: SendNotificationResponse) {
log.info("$LOG_PREFIX:NotificationsActions-send:${p0.notificationId}")
// TODO need to get listener and return listener.onResponse(p0)
}

override fun onFailure(p0: java.lang.Exception) {
Expand Down

0 comments on commit 3271b58

Please sign in to comment.