From ba3153a87b8b1541a833cd4282e7417e695b115c Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Wed, 19 Apr 2023 20:07:58 -0700 Subject: [PATCH] Initial Framework Changes --- .idea/inspectionProfiles/Project_Default.xml | 9 - .idea/runConfigurations/Debug_OpenSearch.xml | 11 -- .idea/vcs.xml | 18 +- .../gradle/test/ClusterConfiguration.groovy | 2 +- .../gradle/testclusters/RunTask.java | 2 +- .../tools/launchers/SystemJvmOptions.java | 5 + gradle.properties | 2 +- gradle/run.gradle | 1 + server/build.gradle | 25 +++ .../util/concurrent/OpenSearchExecutors.java | 4 +- .../OpenSearchThreadPoolExecutor.java | 2 +- .../index/seqno/RetentionLeaseSyncAction.java | 5 +- .../LocalStorePeerRecoverySourceHandler.java | 66 ++++++- .../recovery/PeerRecoverySourceService.java | 21 +- .../recovery/RecoverySourceHandler.java | 86 ++++++++- .../recovery/RetryableTransportClient.java | 6 +- .../SegmentFileTransferHandler.java | 6 +- .../main/java/org/opensearch/node/Node.java | 12 +- .../java/org/opensearch/plugins/Plugin.java | 5 + .../threadpool/ScalingExecutorBuilder.java | 7 +- .../org/opensearch/threadpool/ThreadPool.java | 11 +- .../opensearch/tracing/TaskEventListener.java | 50 +++++ .../OTelContextPreservingActionListener.java | 85 ++++++++ .../OpenSearchConcurrentExecutorService.java | 119 ++++++++++++ .../OpenSearchForwardingExecutorService.java | 51 +++++ .../OpenTelemetryContextWrapper.java | 31 +++ .../opentelemetry/OpenTelemetryService.java | 182 ++++++++++++++++++ .../org/opensearch/bootstrap/security.policy | 6 + ...alStorePeerRecoverySourceHandlerTests.java | 11 +- .../index/shard/IndexShardTestCase.java | 2 +- 30 files changed, 765 insertions(+), 78 deletions(-) delete mode 100644 .idea/inspectionProfiles/Project_Default.xml delete mode 100644 .idea/runConfigurations/Debug_OpenSearch.xml create mode 100644 server/src/main/java/org/opensearch/tracing/TaskEventListener.java create mode 100644 server/src/main/java/org/opensearch/tracing/opentelemetry/OTelContextPreservingActionListener.java create mode 100644 server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchConcurrentExecutorService.java create mode 100644 server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchForwardingExecutorService.java create mode 100644 server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryContextWrapper.java create mode 100644 server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryService.java diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml deleted file mode 100644 index 5cf789707c58c..0000000000000 --- a/.idea/inspectionProfiles/Project_Default.xml +++ /dev/null @@ -1,9 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/runConfigurations/Debug_OpenSearch.xml b/.idea/runConfigurations/Debug_OpenSearch.xml deleted file mode 100644 index 0d8bf59823acf..0000000000000 --- a/.idea/runConfigurations/Debug_OpenSearch.xml +++ /dev/null @@ -1,11 +0,0 @@ - - - - diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 48557884a8893..35eb1ddfbbc02 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,20 +1,6 @@ - - - - + - + \ No newline at end of file diff --git a/buildSrc/src/main/groovy/org/opensearch/gradle/test/ClusterConfiguration.groovy b/buildSrc/src/main/groovy/org/opensearch/gradle/test/ClusterConfiguration.groovy index a5207933c3c72..2877d243bf838 100644 --- a/buildSrc/src/main/groovy/org/opensearch/gradle/test/ClusterConfiguration.groovy +++ b/buildSrc/src/main/groovy/org/opensearch/gradle/test/ClusterConfiguration.groovy @@ -42,7 +42,7 @@ class ClusterConfiguration { String distribution = 'archive' @Input - int numNodes = 1 + int numNodes = 2 @Input int numBwcNodes = 0 diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/RunTask.java b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/RunTask.java index c5035f3b082fe..e62827bda5fcc 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/RunTask.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/RunTask.java @@ -64,7 +64,7 @@ public class RunTask extends DefaultTestClustersTask { private static final int DEFAULT_DEBUG_PORT = 5005; public static final String LOCALHOST_ADDRESS_PREFIX = "127.0.0.1:"; - private Boolean debug = false; + private Boolean debug = true; private Boolean debugServer = false; diff --git a/distribution/tools/launchers/src/main/java/org/opensearch/tools/launchers/SystemJvmOptions.java b/distribution/tools/launchers/src/main/java/org/opensearch/tools/launchers/SystemJvmOptions.java index fc613ccdaae68..4fc0c921fafd0 100644 --- a/distribution/tools/launchers/src/main/java/org/opensearch/tools/launchers/SystemJvmOptions.java +++ b/distribution/tools/launchers/src/main/java/org/opensearch/tools/launchers/SystemJvmOptions.java @@ -47,6 +47,11 @@ static List systemJvmOptions() { * networkaddress.cache.ttl; can be set to -1 to cache forever. */ "-Dopensearch.networkaddress.cache.ttl=60", + "-Djava.security.policy=/home/rishma/ws/pa/performance-analyzer/config/opensearch_security.policy", + "-Djdk.attach.allowAttachSelf=true", + "-Dclk.tck=100", + "--add-opens=jdk.attach/sun.tools.attach=ALL-UNNAMED", + /* * Cache ttl in seconds for negative DNS lookups noting that this overrides the JDK security property * networkaddress.cache.negative ttl; set to -1 to cache forever. diff --git a/gradle.properties b/gradle.properties index 73df0940ce181..02d578e1bc1e5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -27,7 +27,7 @@ systemProp.org.gradle.dependency.duplicate.project.detection=false # Enforce the build to fail on deprecated gradle api usage systemProp.org.gradle.warning.mode=fail - +kotlin.stdlib.default.dependency=false # forcing to use TLS1.2 to avoid failure in vault # see https://github.com/hashicorp/vault/issues/8750#issuecomment-631236121 systemProp.jdk.tls.client.protocols=TLSv1.2 diff --git a/gradle/run.gradle b/gradle/run.gradle index 5a1fed06c0ef7..9073b69f827c4 100644 --- a/gradle/run.gradle +++ b/gradle/run.gradle @@ -34,6 +34,7 @@ apply plugin: 'opensearch.testclusters' testClusters { runTask { testDistribution = 'archive' + numberOfNodes = 2 } } diff --git a/server/build.gradle b/server/build.gradle index 1b5d4fb41cb78..d7f061df04f3b 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -133,6 +133,31 @@ dependencies { api "org.apache.logging.log4j:log4j-jul:${versions.log4j}" api "org.apache.logging.log4j:log4j-core:${versions.log4j}", optional + api 'io.opentelemetry:opentelemetry-api:1.23.1' + api 'io.opentelemetry:opentelemetry-api-logs:1.23.1-alpha' + api 'io.opentelemetry:opentelemetry-api-events:1.23.1-alpha' + api 'io.opentelemetry:opentelemetry-sdk:1.23.1' + api 'io.opentelemetry:opentelemetry-sdk-metrics:1.23.1' + api 'io.opentelemetry:opentelemetry-sdk-common:1.23.1' + api 'io.opentelemetry:opentelemetry-sdk-trace:1.23.1' + api 'io.opentelemetry:opentelemetry-context:1.23.1' + api 'io.opentelemetry:opentelemetry-sdk-logs:1.23.1-alpha' + api 'io.opentelemetry:opentelemetry-semconv:1.23.1-alpha' + + // exporters + api 'io.opentelemetry:opentelemetry-exporter-common:1.23.1' + //implementation 'io.opentelemetry:opentelemetry-exporter-jaeger:1.23.1' + api 'io.opentelemetry:opentelemetry-exporter-otlp:1.23.1' + api 'io.opentelemetry:opentelemetry-exporter-otlp-common:1.23.1' + api("com.squareup.okhttp3:okhttp:4.10.0") + api 'org.jetbrains.kotlin:kotlin-stdlib:1.6.20' + api 'com.squareup.okio:okio-jvm:3.0.0' + + //implementation "com.squareup.okio:okio:2.8.0" + + //implementation 'io.opentelemetry:opentelemetry-exporter-logging:1.23.1' + //implementation 'io.opentelemetry:opentelemetry-exporter-jaeger:1.23.1' + // jna api "net.java.dev.jna:jna:${versions.jna}" diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchExecutors.java b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchExecutors.java index e83f541101b69..5ba761acb3f98 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchExecutors.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchExecutors.java @@ -42,6 +42,7 @@ import org.opensearch.node.Node; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.TaskAwareRunnable; +import org.opensearch.tracing.opentelemetry.OpenTelemetryContextWrapper; import java.util.List; import java.util.Optional; @@ -128,7 +129,6 @@ public static PrioritizedOpenSearchThreadPoolExecutor newSinglePrioritizing( ) { return new PrioritizedOpenSearchThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer); } - public static OpenSearchThreadPoolExecutor newScaling( String name, int min, @@ -354,7 +354,7 @@ public void execute(Runnable command) { * @return an {@link ExecutorService} that executes submitted tasks on the current thread */ public static ExecutorService newDirectExecutorService() { - return DIRECT_EXECUTOR_SERVICE; + return OpenTelemetryContextWrapper.wrapTask(DIRECT_EXECUTOR_SERVICE); } public static String threadName(Settings settings, String namePrefix) { diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java index 7fda911fe7959..d85cfb56bfb1a 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java @@ -84,7 +84,7 @@ final String getName() { } @SuppressForbidden(reason = "properly rethrowing errors, see OpenSearchExecutors.rethrowErrors") - OpenSearchThreadPoolExecutor( + protected OpenSearchThreadPoolExecutor( String name, int corePoolSize, int maximumPoolSize, diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java index afcf5c6766194..1775c633f5caf 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java @@ -32,6 +32,7 @@ package org.opensearch.index.seqno; +import io.opentelemetry.context.Context; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -63,6 +64,7 @@ import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.tracing.opentelemetry.OTelContextPreservingActionListener; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; @@ -129,8 +131,9 @@ final void sync( String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases, - ActionListener listener + ActionListener listener1 ) { + final ActionListener listener = new OTelContextPreservingActionListener<>(listener1, Context.current()); final ThreadContext threadContext = threadPool.getThreadContext(); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { // we have to execute under the system context so that if security is enabled the sync is authorized diff --git a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java index c8d3b2d09ad28..2b0d5535904bf 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java @@ -28,12 +28,15 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.Translog; import org.opensearch.indices.RunUnderPrimaryPermit; +import org.opensearch.tracing.opentelemetry.OpenTelemetryService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transports; import java.io.Closeable; import java.io.IOException; +import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.IntSupplier; /** * This handler is used for node-to-node peer recovery when the recovery target is a replica/ or a relocating primary @@ -106,6 +109,7 @@ && isTargetSameHistory() logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); startingSeqNo = request.startingSeqNo(); if (retentionLeaseRef.get() == null) { + createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY)); } else { sendFileStep.onResponse(SendFileResult.EMPTY); @@ -144,15 +148,23 @@ && isTargetSameHistory() // If the target previously had a copy of this shard then a file-based recovery might move its global // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a // new one later on in the recovery. - shard.removePeerRecoveryRetentionLease( - request.targetNode().getId(), + BiFunction, Void> removePeerRecoveryRetentionLeaseFun = + (args, actionListener) -> { + shard.removePeerRecoveryRetentionLease((String) args[0], + (ActionListener) actionListener); + return null; + }; + OpenTelemetryService.callFunctionAndStartSpan( + "removePeerRecoveryRetentionLease", + removePeerRecoveryRetentionLeaseFun, new ThreadedActionListener<>( logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, deleteRetentionLeaseStep, false - ) + ), + request.targetNode().getId() ); } catch (RetentionLeaseNotFoundException e) { logger.debug("no peer-recovery retention lease for " + request.targetAllocationId()); @@ -162,7 +174,21 @@ && isTargetSameHistory() deleteRetentionLeaseStep.whenComplete(ignored -> { assert Transports.assertNotTransportThread(this + "[phase1]"); - phase1(wrappedSafeCommit.get(), startingSeqNo, () -> estimateNumOps, sendFileStep, false); + BiFunction, Void> phase1Fun = + (args, actionListener) -> { + phase1((IndexCommit)args[0], (long) args[1], (IntSupplier) args[2], + (ActionListener) actionListener, (boolean) args[3]); + return null; + }; + OpenTelemetryService.callFunctionAndStartSpan( + "phase1", + phase1Fun, + sendFileStep, + wrappedSafeCommit.get(), + startingSeqNo, + (IntSupplier)(() -> estimateNumOps), + false + ); }, onFailure); } catch (final Exception e) { @@ -174,7 +200,17 @@ && isTargetSameHistory() sendFileStep.whenComplete(r -> { assert Transports.assertNotTransportThread(this + "[prepareTargetForTranslog]"); // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); + BiFunction, Void> prepareTargetForTranslogFun = + (args, actionListener) -> { + prepareTargetForTranslog((int)args[0], (ActionListener) actionListener); + return null; + }; + OpenTelemetryService.callFunctionAndStartSpan( + "prepareTargetForTranslog", + prepareTargetForTranslogFun, + prepareEngineStep, + countNumberOfHistoryOperations(startingSeqNo) + ); }, onFailure); prepareEngineStep.whenComplete(prepareEngineTime -> { @@ -213,18 +249,30 @@ && isTargetSameHistory() final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); final RetentionLeases retentionLeases = shard.getRetentionLeases(); final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion(); - phase2( + BiFunction, Void> phase2Fun = + (args, actionListener) -> { + try { + phase2((long) args[0], (long) args[1], (Translog.Snapshot) args[2], (long)args[3], (long)args[4], + (RetentionLeases)args[5], (long)args[6], (ActionListener) actionListener); + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + }; + OpenTelemetryService.callFunctionAndStartSpan( + "phase2", + phase2Fun, + sendSnapshotStep, startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, - mappingVersionOnPrimary, - sendSnapshotStep + mappingVersionOnPrimary ); - }, onFailure); + finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java index 8bea14a1a1c86..a40d1044975c1 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java @@ -32,6 +32,7 @@ package org.opensearch.indices.recovery; +import io.opentelemetry.api.trace.Span; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; @@ -57,6 +58,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.tracing.opentelemetry.OpenTelemetryService; import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportRequestHandler; import org.opensearch.transport.TransportService; @@ -68,6 +70,10 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.function.BiFunction; + +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.api.common.AttributeKey.stringKey; /** * The source recovery accepts recovery requests from other peer shards and start the recovery process from this @@ -157,9 +163,14 @@ public void clusterChanged(ClusterChangedEvent event) { } private void recover(StartRecoveryRequest request, ActionListener listener) { + Span span = Span.current(); + span.setAttribute(stringKey("index-name"), request.shardId().getIndexName()); + span.setAttribute(longKey("shard-id"), request.shardId().id()); + span.setAttribute(stringKey("source-node"), request.sourceNode().getId()); + span.setAttribute(stringKey("target-node"), request.targetNode().getId()); + final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); - final ShardRouting routingEntry = shard.routingEntry(); if (routingEntry.primary() == false || routingEntry.active() == false) { @@ -183,6 +194,7 @@ private void recover(StartRecoveryRequest request, ActionListener ongoingRecoveries.remove(shard, handler))); } @@ -202,7 +214,12 @@ private void reestablish(ReestablishRecoveryRequest request, ActionListener { @Override public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception { - recover(request, new ChannelActionListener<>(channel, Actions.START_RECOVERY, request)); + BiFunction, Void> recoverFunction = (args, actionListener) -> { + recover((StartRecoveryRequest) args[0], (ActionListener) actionListener); + return null; + }; + OpenTelemetryService.callFunctionAndStartSpan("recover", recoverFunction, + new ChannelActionListener<>(channel, Actions.START_RECOVERY, request), request); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 3fef056ac7f81..a707e9384a225 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -72,6 +72,7 @@ import org.opensearch.index.translog.Translog; import org.opensearch.indices.RunUnderPrimaryPermit; import org.opensearch.indices.replication.SegmentFileTransferHandler; +import org.opensearch.tracing.opentelemetry.OpenTelemetryService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transports; @@ -86,6 +87,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.IntSupplier; import java.util.stream.StreamSupport; @@ -192,6 +194,7 @@ public void recoverToTarget(ActionListener listener) { protected abstract void innerRecoveryToTarget(ActionListener listener, Consumer onFailure) throws IOException; + protected void finalizeStepAndCompleteFuture( long startingSeqNo, StepListener sendSnapshotStep, @@ -202,7 +205,26 @@ protected void finalizeStepAndCompleteFuture( final StepListener finalizeStep = new StepListener<>(); // Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2 final long trimAboveSeqNo = startingSeqNo - 1; - sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure); + sendSnapshotStep.whenComplete(r -> { + BiFunction, Void> finalizeRecoveryFun = + (args, actionListener) -> { + try { + finalizeRecovery((long) args[0], (long) args[1], (ActionListener) actionListener); + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + }; + OpenTelemetryService.callFunctionAndStartSpan( + "finalizeRecovery", + finalizeRecoveryFun, + finalizeStep, + r.targetLocalCheckpoint, + trimAboveSeqNo + ); + }, + onFailure + ); finalizeStep.whenComplete(r -> { final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time @@ -432,30 +454,77 @@ void phase1( phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes) ); - final StepListener sendFileInfoStep = new StepListener<>(); + StepListener sendFileInfoStep = new StepListener<>(); final StepListener sendFilesStep = new StepListener<>(); final StepListener createRetentionLeaseStep = new StepListener<>(); final StepListener cleanFilesStep = new StepListener<>(); cancellableThreads.checkForCancel(); - recoveryTarget.receiveFileInfo( + BiFunction, Void> receiveFileInfoFunction = (args, actionListener) -> { + recoveryTarget.receiveFileInfo( + (List) args[0], + (List)args[1], + (List) args[2], + (List) args[3], + (int) args[4], + (ActionListener)actionListener + ); + return null; + }; + OpenTelemetryService.callFunctionAndStartSpan( + "sendFileInfo", + receiveFileInfoFunction, + sendFileInfoStep, phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, - translogOps.getAsInt(), - sendFileInfoStep + translogOps.getAsInt() ); sendFileInfoStep.whenComplete( - r -> sendFiles(store, phase1Files.toArray(new StoreFileMetadata[0]), translogOps, sendFilesStep), + r -> { + BiFunction, Void> sendFileFunction = (args, actionListener) -> { + sendFiles( + (Store) args[0], + (StoreFileMetadata[])args[1], + (IntSupplier) args[2], + (ActionListener) actionListener + ); + return null; + }; + OpenTelemetryService.callFunctionAndStartSpan( + "sendFiles", + sendFileFunction, + sendFilesStep, + store, + phase1Files.toArray(new StoreFileMetadata[0]), + translogOps + ); + }, listener::onFailure ); + // When doing peer recovery of remote store enabled replica, retention leases are not required. if (skipCreateRetentionLeaseStep) { sendFilesStep.whenComplete(r -> createRetentionLeaseStep.onResponse(null), listener::onFailure); } else { - sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure); + sendFilesStep.whenComplete(r -> { + BiFunction, Void> createRetentionLeaseFunction = (args, actionListener) -> { + createRetentionLease( + (long) args[0], + (ActionListener) actionListener + ); + return null; + }; + OpenTelemetryService.callFunctionAndStartSpan( + "createRetentionLease", + createRetentionLeaseFunction, + createRetentionLeaseStep, + startingSeqNo + ); + }, + listener::onFailure); } createRetentionLeaseStep.whenComplete(retentionLease -> { @@ -637,7 +706,7 @@ void phase2( final long maxSeqNoOfUpdatesOrDeletes, final RetentionLeases retentionLeases, final long mappingVersion, - final ActionListener listener + ActionListener listener ) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); @@ -898,6 +967,7 @@ private void cleanFiles( // Once the files have been renamed, any other files that are not // related to this recovery (out of date segments, for example) // are deleted + cancellableThreads.checkForCancel(); recoveryTarget.cleanFiles( translogOps.getAsInt(), diff --git a/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java index a7113fb3fee5c..42db686d1a0c1 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java @@ -8,6 +8,7 @@ package org.opensearch.indices.recovery; +import io.opentelemetry.context.Context; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.LegacyESVersion; @@ -22,6 +23,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.tracing.opentelemetry.OTelContextPreservingActionListener; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.SendRequestTransportException; @@ -93,7 +95,9 @@ public void tryAction(ActionListener listener) { action, request, options, - new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC) + new ActionListenerResponseHandler<>( + new OTelContextPreservingActionListener<>(listener, Context.current()), + reader, ThreadPool.Names.GENERIC) ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java index e95c2c6470b4b..8ce2ab6909816 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java @@ -143,15 +143,15 @@ protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException { } @Override - protected void executeChunkRequest(FileChunk request, ActionListener listener1) { - cancellableThreads.checkForCancel(); + protected void executeChunkRequest(FileChunk request, ActionListener listener) { + chunkWriter.writeFileChunk( request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), - ActionListener.runBefore(listener1, request::close) + ActionListener.runBefore(listener, request::close) ); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index f961b8688aefc..c87a78602a2d3 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -39,6 +39,8 @@ import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexingPressureService; +import org.opensearch.tracing.TaskEventListener; +import org.opensearch.tracing.opentelemetry.OpenTelemetryService; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.common.util.FeatureFlags; @@ -53,8 +55,6 @@ import org.opensearch.monitor.fs.FsProbe; import org.opensearch.search.backpressure.SearchBackpressureService; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; -import org.opensearch.tasks.TaskResourceTrackingService; -import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; @@ -354,6 +354,7 @@ public static class DiscoverySettings { private final Collection pluginLifecycleComponents; private final LocalNodeFactory localNodeFactory; private final NodeService nodeService; + final NamedWriteableRegistry namedWriteableRegistry; private final AtomicReference runnableTaskListener; @@ -478,6 +479,13 @@ protected Node( final List> executorBuilders = pluginsService.getExecutorBuilders(settings); runnableTaskListener = new AtomicReference<>(); + List taskEventListeners = pluginsService.filterPlugins(Plugin.class) + .stream() + .map(Plugin::getTaskEventListeners) + .flatMap(List::stream) + .collect(Collectors.toList()); + OpenTelemetryService.TaskEventListeners.getInstance(taskEventListeners); + final ThreadPool threadPool = new ThreadPool(settings, runnableTaskListener, executorBuilders.toArray(new ExecutorBuilder[0])); resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool); diff --git a/server/src/main/java/org/opensearch/plugins/Plugin.java b/server/src/main/java/org/opensearch/plugins/Plugin.java index b51de5693dbf4..f67fcd21a7e02 100644 --- a/server/src/main/java/org/opensearch/plugins/Plugin.java +++ b/server/src/main/java/org/opensearch/plugins/Plugin.java @@ -32,6 +32,7 @@ package org.opensearch.plugins; +import org.opensearch.tracing.TaskEventListener; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.bootstrap.BootstrapCheck; import org.opensearch.client.Client; @@ -206,6 +207,10 @@ public List> getSettingUpgraders() { return Collections.emptyList(); } + public List getTaskEventListeners() { + return Collections.emptyList(); + } + /** * Provides a function to modify index template meta data on startup. *

diff --git a/server/src/main/java/org/opensearch/threadpool/ScalingExecutorBuilder.java b/server/src/main/java/org/opensearch/threadpool/ScalingExecutorBuilder.java index 2690a3cc30238..a2e9cc0b3bbf5 100644 --- a/server/src/main/java/org/opensearch/threadpool/ScalingExecutorBuilder.java +++ b/server/src/main/java/org/opensearch/threadpool/ScalingExecutorBuilder.java @@ -38,6 +38,8 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.node.Node; +import org.opensearch.tracing.opentelemetry.OpenTelemetryContextWrapper; +import org.opensearch.tracing.opentelemetry.OpenTelemetryService; import java.util.Arrays; import java.util.List; @@ -111,7 +113,7 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th final ThreadFactory threadFactory = OpenSearchExecutors.daemonThreadFactory( OpenSearchExecutors.threadName(settings.nodeName, name()) ); - final ExecutorService executor = OpenSearchExecutors.newScaling( + ExecutorService executor = OpenSearchExecutors.newScaling( settings.nodeName + "/" + name(), core, max, @@ -120,6 +122,9 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th threadFactory, threadContext ); + if (OpenTelemetryService.isThreadPoolAllowed(name())) { + executor = OpenTelemetryContextWrapper.wrapTask(executor); + } return new ThreadPool.ExecutorHolder(executor, info); } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 2c7a4db5b8679..931ddcca5ec98 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -45,13 +45,14 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; -import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.concurrent.XRejectedExecutionHandler; import org.opensearch.common.xcontent.ToXContentFragment; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.node.Node; import org.opensearch.node.ReportingService; +import org.opensearch.tracing.opentelemetry.OpenTelemetryContextWrapper; +import org.opensearch.tracing.opentelemetry.OpenTelemetryService; import java.io.IOException; import java.util.ArrayList; @@ -215,7 +216,6 @@ public ThreadPool( final ExecutorBuilder... customBuilders ) { assert Node.NODE_NAME_SETTING.exists(settings); - final Map builders = new HashMap<>(); final int allocatedProcessors = OpenSearchExecutors.allocatedProcessors(settings); final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors); @@ -407,9 +407,14 @@ public ExecutorService executor(String name) { if (holder == null) { throw new IllegalArgumentException("no executor service found for [" + name + "]"); } + if (OpenTelemetryService.isThreadPoolAllowed(Names.GENERIC)) { + return OpenTelemetryContextWrapper.wrapTask(holder.executor()); + } return holder.executor(); } + + /** * Schedules a one-shot command to run after a given delay. The command is run in the context of the calling thread. * @@ -687,7 +692,7 @@ static class ExecutorHolder { public final Info info; ExecutorHolder(ExecutorService executor, Info info) { - assert executor instanceof OpenSearchThreadPoolExecutor || executor == DIRECT_EXECUTOR; +// assert executor instanceof OpenSearchThreadPoolExecutor || executor == DIRECT_EXECUTOR; this.executor = executor; this.info = info; } diff --git a/server/src/main/java/org/opensearch/tracing/TaskEventListener.java b/server/src/main/java/org/opensearch/tracing/TaskEventListener.java new file mode 100644 index 0000000000000..e060a361247e5 --- /dev/null +++ b/server/src/main/java/org/opensearch/tracing/TaskEventListener.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tracing; + +import io.opentelemetry.context.Context; + +import java.util.concurrent.ExecutorService; + +/** + * SPI for event listener when a thread picks a task to be executed associated with Context aware ExecutorService. + */ +public interface TaskEventListener { + + /** Invoked when a thread start working a task associated with Context aware ExecutorService {@link org.opensearch.tracing.opentelemetry.OpenTelemetryContextWrapper#wrapTask(ExecutorService)}. + * Context and Span{@link io.opentelemetry.api.trace.Span} information can be derived by calling Context{@link Context#current()} + * when OpenTelemetry implementation is chosen, as current thread will have the context propagated to it. Span events can be added as necessary. + * This can be used to start metering resource consumption by a thread. + * + * Note: current thread will have the Context set + * @param t Thread to start working on a task + */ + void onStart(String operationName, String eventName, Thread t); + + /** Invoked when a thread completes a task associated with Context aware ExecutorService{@link org.opensearch.tracing.opentelemetry.OpenTelemetryContextWrapper#wrapTask(ExecutorService)} + * for both success and failure scenarios. Context and Span{@link io.opentelemetry.api.trace.Span} information can + * be derived by calling Context{@link Context#current()} when OpenTelemetry implementation is chosen, as + * current thread will have the context propagated to it. Span events can be added as necessary. + * + * This can be used to stop metering resource consumption by a thread. + * + * @param t Thread which completed the task + */ + void onEnd(String operationName, String eventName, Thread t); + + /** + * This is used to check if the TaskEventListener should be called for provided operation and/or event. + * Contract here is, individual service provider should know which all operations are needs to be onboarded to + * this TaskEventListener. It doesn't make sense to call all available TaskEventListeners for all operations using + * current executor service. + * @param operationName name of the operation associated with a trace. + * @return + */ + boolean isApplicable(String operationName, String eventName); +} diff --git a/server/src/main/java/org/opensearch/tracing/opentelemetry/OTelContextPreservingActionListener.java b/server/src/main/java/org/opensearch/tracing/opentelemetry/OTelContextPreservingActionListener.java new file mode 100644 index 0000000000000..a97d1053e0c4f --- /dev/null +++ b/server/src/main/java/org/opensearch/tracing/opentelemetry/OTelContextPreservingActionListener.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tracing.opentelemetry; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import org.opensearch.action.ActionListener; + +import java.util.Objects; + +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.api.common.AttributeKey.stringKey; + +/** + * It does follow - + * 1. Pass the context to thread working on response/failure of delegated ActionListener. + * 2. Close the Span if one passed while its construction. + * 3. Set the scope back to previous context prior to starting the new Span. + * In case, no Span was started and needs to be closed + * {@link OTelContextPreservingActionListener#OTelContextPreservingActionListener(ActionListener, Context)} can be used + * with beforeAttachContext as {@link Context#current()}. + * @param Response object type + */ +public final class OTelContextPreservingActionListener implements ActionListener { + private final ActionListener delegate; + private final Context beforeAttachContext; + private final Context afterAttachContext; + private final String spanID; + + public OTelContextPreservingActionListener(ActionListener delegate, Context beforeAttachContext, String spanID) { + this.delegate = delegate; + this.beforeAttachContext = beforeAttachContext; + this.afterAttachContext = Context.current(); + this.spanID = spanID; + } + + public OTelContextPreservingActionListener(ActionListener delegate, Context beforeAttachContext) { + this(delegate, beforeAttachContext, null); + } + + @Override + public void onResponse(Response r) { + try (Scope ignored = Objects.requireNonNull(afterAttachContext).makeCurrent()) { + Span span = Span.current(); + closeCurrentScope(span); + } + try (Scope ignored = Objects.requireNonNull(beforeAttachContext).makeCurrent()) { + delegate.onResponse(r); + } + } + + @Override + public void onFailure(Exception e) { + try (Scope ignored = Objects.requireNonNull(afterAttachContext).makeCurrent()) { + Span span = Span.current(); + span.setStatus(StatusCode.ERROR); + closeCurrentScope(span); + } + try (Scope ignored = Objects.requireNonNull(beforeAttachContext).makeCurrent()) { + delegate.onFailure(e); + } + } + + private void closeCurrentScope(Span span) { + assert spanID == null || span.getSpanContext().getSpanId().equals(spanID); + span.setAttribute(stringKey("finish-thread-name"), Thread.currentThread().getName()); + span.setAttribute(longKey("finish-thread-id"), Thread.currentThread().getId()); + if (spanID != null) { + Span.current().end(); + } + } + + @Override + public String toString() { + return getClass().getName() + "/" + delegate.toString(); + } +} diff --git a/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchConcurrentExecutorService.java b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchConcurrentExecutorService.java new file mode 100644 index 0000000000000..69381d9661d53 --- /dev/null +++ b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchConcurrentExecutorService.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tracing.opentelemetry; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import org.opensearch.tracing.TaskEventListener; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +final public class OpenSearchConcurrentExecutorService extends OpenSearchForwardingExecutorService { + private final List taskEventListeners; + + OpenSearchConcurrentExecutorService(ExecutorService delegate) { + this(delegate, OpenTelemetryService.TaskEventListeners.getInstance(null)); + } + + OpenSearchConcurrentExecutorService(ExecutorService delegate, List taskEventListeners) { + super(delegate); + this.taskEventListeners = taskEventListeners; + } + + @Override + public Future submit(Callable task) { + return delegate().submit(wrapTask(task, taskEventListeners)); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate().submit(wrapTask(task, taskEventListeners), result); + } + + @Override + public Future submit(Runnable task) { + return delegate().submit(wrapTask(task, taskEventListeners)); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + return delegate().invokeAll(wrap(tasks, taskEventListeners)); + } + + @Override + public List> invokeAll( + Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return delegate().invokeAll(wrap(tasks, taskEventListeners), timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + return delegate().invokeAny(wrap(tasks, taskEventListeners)); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate().invokeAny(wrap(tasks, taskEventListeners), timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate().execute(wrapTask(command, taskEventListeners)); + } + + private static Collection> wrap(Collection> tasks, + List taskEventListeners) { + List> wrapped = new ArrayList<>(); + for (Callable task : tasks) { + wrapped.add(wrapTask(task, taskEventListeners)); + } + return wrapped; + } + + private static Callable wrapTask(Callable callable, List taskEventListeners) { + return () -> { + try (Scope ignored = Context.current().makeCurrent()) { + OpenTelemetryService.callTaskEventListeners(true, "", Span.current().getSpanContext().getSpanId() + "-" + + Thread.currentThread().getName() + "-Start", Thread.currentThread(), taskEventListeners); + return callable.call(); + } finally { + OpenTelemetryService.callTaskEventListeners(false, "", Span.current().getSpanContext().getSpanId() + "-" + + Thread.currentThread().getName() + "-End", Thread.currentThread(), taskEventListeners); + } + }; + } + + static Runnable wrapTask(Runnable runnable, List taskEventListeners) { + return () -> { + try (Scope ignored = Context.current().makeCurrent()) { + OpenTelemetryService.callTaskEventListeners(true, "", Span.current().getSpanContext().getSpanId() + "-" + + Thread.currentThread().getName() + "-Start", Thread.currentThread(), taskEventListeners); + runnable.run(); + } finally { + OpenTelemetryService.callTaskEventListeners(false, "", Span.current().getSpanContext().getSpanId() + "-" + + Thread.currentThread().getName() + "-End", Thread.currentThread(), taskEventListeners); + } + }; + } + + +} diff --git a/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchForwardingExecutorService.java b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchForwardingExecutorService.java new file mode 100644 index 0000000000000..63435677f31c3 --- /dev/null +++ b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchForwardingExecutorService.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tracing.opentelemetry; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +abstract class OpenSearchForwardingExecutorService implements ExecutorService { + + private final ExecutorService delegate; + + protected OpenSearchForwardingExecutorService(ExecutorService delegate) { + this.delegate = delegate; + } + + ExecutorService delegate() { + return delegate; + } + + @Override + public final void shutdown() { + delegate.shutdown(); + } + + @Override + public final List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public final boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public final boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } +} diff --git a/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryContextWrapper.java b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryContextWrapper.java new file mode 100644 index 0000000000000..370fce1f7094a --- /dev/null +++ b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryContextWrapper.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tracing.opentelemetry; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +public class OpenTelemetryContextWrapper { + + /** + * Wraps the ExecutorService for context propagation across threads in the ThreadPool. + * @param executorService executor service to be wrapped + */ + public static ExecutorService wrapTask(ExecutorService executorService) { + return new OpenSearchConcurrentExecutorService(executorService); + } + + /** + * Passes the context to the provided delegate executor + * @param executor executor to be wrapped with. + */ + public static Executor wrapTask(Executor executor) { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryService.java b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryService.java new file mode 100644 index 0000000000000..61f971aea7025 --- /dev/null +++ b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryService.java @@ -0,0 +1,182 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tracing.opentelemetry; + +import com.sun.management.ThreadMXBean; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import org.opensearch.action.ActionListener; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.tracing.TaskEventListener; + +import java.lang.management.ManagementFactory; +import java.util.List; +import java.util.function.BiFunction; + +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.api.common.AttributeKey.stringKey; + + +public class OpenTelemetryService { + public static Resource resource; + public static SdkTracerProvider sdkTracerProvider; + public static SdkMeterProvider sdkMeterProvider; + public static OpenTelemetry openTelemetry; + private static final List DEFAULT_TASK_EVENT_LISTENERS; + private static final List allowedThreadPools = List.of(ThreadPool.Names.GENERIC); + private static final ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean(); + + static { + resource = Resource.getDefault() + .merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "opensearch-tasks"))); + + sdkTracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(OtlpHttpSpanExporter.builder().build())) + .setResource(resource) + .build(); + + sdkMeterProvider = SdkMeterProvider.builder() + .registerMetricReader(PeriodicMetricReader.builder(OtlpGrpcMetricExporter.builder().build()).build()) + .setResource(resource) + .build(); + + openTelemetry = OpenTelemetrySdk.builder() + .setTracerProvider(sdkTracerProvider) + .setMeterProvider(sdkMeterProvider) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .buildAndRegisterGlobal(); + + DEFAULT_TASK_EVENT_LISTENERS = List.of(new TaskEventListener() { + @Override + public void onStart(String operationName, String eventName, Thread t) { + Span span = Span.current(); + if (span != Span.getInvalid()) { + span.addEvent(eventName, + Attributes.of( + AttributeKey.longKey("ThreadID"), t.getId(), + AttributeKey.stringKey("ThreadName"), t.getName(), + AttributeKey.longKey("CPUUsage"), threadMXBean.getThreadCpuTime(t.getId()), + AttributeKey.longKey("MemoryUsage"), threadMXBean.getThreadAllocatedBytes(t.getId()), + AttributeKey.longKey("ContentionTime"), threadMXBean.getThreadInfo(t.getId()).getBlockedTime() + ) + ); + } + } + + @Override + public void onEnd(String operationName, String eventName, Thread t) { + Span span = Span.current(); + if (span != Span.getInvalid()) { + span.addEvent(eventName, + Attributes.of( + AttributeKey.longKey("ThreadID"), t.getId(), + AttributeKey.stringKey("ThreadName"), t.getName(), + AttributeKey.longKey("CPUUsage"), threadMXBean.getThreadCpuTime(t.getId()), + AttributeKey.longKey("MemoryUsage"), threadMXBean.getThreadAllocatedBytes(t.getId()), + AttributeKey.longKey("ContentionTime"), threadMXBean.getThreadInfo(t.getId()).getBlockedTime() + ) + ); + } + } + + @Override + public boolean isApplicable(String operationName, String eventName) { + return true; + } + }); + } + + public static boolean isThreadPoolAllowed(String threadPoolName) { + return allowedThreadPools.contains(threadPoolName); + } + + /** + * starts the span and invokes the function under the scope of new span, closes the scope when function is invoked. + * Wraps the ActionListener with {@link OTelContextPreservingActionListener} for context propagation and ends the span + * on response/failure of action listener. + */ + public static void callFunctionAndStartSpan(String spanName, BiFunction, R> function, + ActionListener actionListener, Object... args) { + Context beforeAttach = Context.current(); + Span span = startSpan(spanName); + try(Scope ignored = span.makeCurrent()) { + actionListener = new OTelContextPreservingActionListener<>(actionListener, beforeAttach, span.getSpanContext().getSpanId()); + callTaskEventListeners(true, "", spanName + "-Start", Thread.currentThread(), + TaskEventListeners.getInstance(null)); + function.apply(args, actionListener); + } finally { + callTaskEventListeners(false, "", spanName + "-End", Thread.currentThread(), + TaskEventListeners.getInstance(null)); + } + } + + /** + * TODO - to be replaced when OpenSearch tracing APIs are available + */ + private static Span startSpan(String spanName) { + Tracer tracer = OpenTelemetryService.sdkTracerProvider.get("recover"); + Span span = tracer.spanBuilder(spanName).setParent(Context.current()).startSpan(); + span.setAttribute(stringKey("start-thread-name"), Thread.currentThread().getName()); + span.setAttribute(longKey("start-thread-id"), Thread.currentThread().getId()); + return span; + } + + /** + * Lazy initialization of all TaskEventListener. + */ + public static class TaskEventListeners { + static volatile List INSTANCE; + public static List getInstance(List otelEventListenerList) { + if (INSTANCE == null) { + synchronized (TaskEventListener.class) { + if (INSTANCE == null) { + INSTANCE = otelEventListenerList; + INSTANCE.addAll(DEFAULT_TASK_EVENT_LISTENERS); + } + } + } + return INSTANCE; + } + } + + protected static void callTaskEventListeners(boolean startEvent, String operationName, String eventName, Thread t, + List taskEventListeners) { + if (Context.current() != Context.root()) { + try (Scope ignored = Span.current().makeCurrent()) { + if (taskEventListeners != null && !taskEventListeners.isEmpty()) { + for (TaskEventListener eventListener : taskEventListeners) { + if (eventListener.isApplicable(operationName, eventName)) { + if (startEvent) { + eventListener.onStart(operationName, eventName, t); + } else { + eventListener.onStart(operationName, eventName, t); + } + } + } + } + } + } + } +} diff --git a/server/src/main/resources/org/opensearch/bootstrap/security.policy b/server/src/main/resources/org/opensearch/bootstrap/security.policy index 3671782b9d12f..0b96ebfe49529 100644 --- a/server/src/main/resources/org/opensearch/bootstrap/security.policy +++ b/server/src/main/resources/org/opensearch/bootstrap/security.policy @@ -182,4 +182,10 @@ grant { permission java.io.FilePermission "/sys/fs/cgroup/memory", "read"; permission java.io.FilePermission "/sys/fs/cgroup/memory/-", "read"; + permission java.net.NetPermission "getProxySelector"; + permission java.util.PropertyPermission "*", "read,write"; + permission java.net.SocketPermission "127.0.0.1:4318", "connect,resolve"; + permission java.net.SocketPermission "127.0.0.1:4317", "connect,resolve"; + permission java.net.SocketPermission "[0:0:0:0:0:0:0:1]:4317", "connect,resolve"; + permission java.net.SocketPermission "[0:0:0:0:0:0:0:1]:4318", "connect,resolve"; }; diff --git a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index 307cb854f000a..48dde6f3cca55 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -228,7 +228,7 @@ public void writeFileChunk( between(1, 5) ); PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); - handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture); + handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture, null); sendFilesFuture.actionGet(); Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(); Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata); @@ -570,7 +570,7 @@ public void writeFileChunk( store, metas.toArray(new StoreFileMetadata[0]), () -> 0, - new LatchedActionListener<>(ActionListener.wrap(r -> sendFilesError.set(null), e -> sendFilesError.set(e)), latch) + new LatchedActionListener<>(ActionListener.wrap(r -> sendFilesError.set(null), e -> sendFilesError.set(e)), latch), null ); latch.await(); assertThat(sendFilesError.get(), instanceOf(IOException.class)); @@ -640,7 +640,7 @@ public void writeFileChunk( between(1, 4) ); PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); - handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture); + handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture, null); Exception ex = expectThrows(Exception.class, sendFilesFuture::actionGet); final IOException unwrappedCorruption = ExceptionsHelper.unwrapCorruption(ex); if (throwCorruptedIndexException) { @@ -808,7 +808,7 @@ public void writeFileChunk( List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); - handler.sendFiles(store, files.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture); + handler.sendFiles(store, files.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture, null); assertBusy(() -> { assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks))); assertThat(unrepliedChunks, hasSize(sentChunks.get())); @@ -886,7 +886,8 @@ public void writeFileChunk( store, files.toArray(new StoreFileMetadata[0]), () -> 0, - new LatchedActionListener<>(ActionListener.wrap(r -> sendFilesError.set(null), e -> sendFilesError.set(e)), sendFilesLatch) + new LatchedActionListener<>(ActionListener.wrap(r -> sendFilesError.set(null), e -> sendFilesError.set(e)), sendFilesLatch), + null ); assertBusy(() -> assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks)))); List failedChunks = randomSubsetOf(between(1, unrepliedChunks.size()), unrepliedChunks); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index d39e190a6f124..da1dba3083707 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -966,7 +966,7 @@ protected final void recoverUnstartedReplica( primary, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic(), primary, replica, replicatePrimaryFunction), request, - recoverySettings + recoverySettings, null ); primary.updateShardState( primary.routingEntry(),