From 3271b580b2d021e5509c28b82d4ec731bcbfad33 Mon Sep 17 00:00:00 2001 From: Joshua Li Date: Tue, 10 Aug 2021 16:32:47 -0700 Subject: [PATCH] Preserve thread context in coroutine Signed-off-by: Joshua Li --- .../action/PluginBaseAction.kt | 58 +++++++++++++++++-- .../notifications/NotificationsActions.kt | 1 + 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/reports-scheduler/src/main/kotlin/org/opensearch/reportsscheduler/action/PluginBaseAction.kt b/reports-scheduler/src/main/kotlin/org/opensearch/reportsscheduler/action/PluginBaseAction.kt index cda4fbb9..b1cde0e9 100644 --- a/reports-scheduler/src/main/kotlin/org/opensearch/reportsscheduler/action/PluginBaseAction.kt +++ b/reports-scheduler/src/main/kotlin/org/opensearch/reportsscheduler/action/PluginBaseAction.kt @@ -44,6 +44,7 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.client.Client import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.util.concurrent.ThreadContext import org.opensearch.index.IndexNotFoundException import org.opensearch.index.engine.VersionConflictEngineException import org.opensearch.indices.InvalidIndexNameException @@ -73,11 +74,16 @@ abstract class PluginBaseAction ) { - val userStr: String? = client.threadPool().threadContext.getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + val userStr: String? = + client.threadPool().threadContext.getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) val user: User? = User.parse(userStr) + val storedThreadContext = client.threadPool().threadContext.newStoredContext(false) scope.launch { try { - listener.onResponse(executeRequest(request, user)) + client.threadPool().threadContext.stashContext().use { + storedThreadContext.restore() + listener.onResponse(executeRequest(request, user)) + } } catch (exception: OpenSearchStatusException) { Metrics.REPORT_EXCEPTIONS_ES_STATUS_EXCEPTION.counter.increment() log.warn("$LOG_PREFIX:OpenSearchStatusException: message:${exception.message}") @@ -85,8 +91,12 @@ abstract class PluginBaseAction T.use(block: (T) -> R): R { + var exception: Throwable? = null + try { + return block(this) + } catch (e: Throwable) { + exception = e + throw e + } finally { + closeFinally(exception) + } + } + + /** + * Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when + * it's being closed due to some other [cause] exception occurred. + * + * The suppressed exception is added to the list of suppressed exceptions of [cause] exception. + */ + @Suppress("TooGenericExceptionCaught") + private fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) { + null -> close() + else -> try { + close() + } catch (closeException: Throwable) { + cause.addSuppressed(closeException) + } + } + } diff --git a/reports-scheduler/src/main/kotlin/org/opensearch/reportsscheduler/notifications/NotificationsActions.kt b/reports-scheduler/src/main/kotlin/org/opensearch/reportsscheduler/notifications/NotificationsActions.kt index ac2d6618..577dd7bd 100644 --- a/reports-scheduler/src/main/kotlin/org/opensearch/reportsscheduler/notifications/NotificationsActions.kt +++ b/reports-scheduler/src/main/kotlin/org/opensearch/reportsscheduler/notifications/NotificationsActions.kt @@ -55,6 +55,7 @@ internal object NotificationsActions { object : ActionListener { override fun onResponse(p0: SendNotificationResponse) { log.info("$LOG_PREFIX:NotificationsActions-send:${p0.notificationId}") + // TODO need to get listener and return listener.onResponse(p0) } override fun onFailure(p0: java.lang.Exception) {