Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WithSpan uni and multi #39223

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;

import java.time.Duration;
import java.util.List;

import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -28,6 +29,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.opentelemetry.sdk.trace.data.SpanData;
Expand All @@ -37,6 +39,8 @@
import io.quarkus.runtime.StartupEvent;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.config.SmallRyeConfig;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.ext.web.Router;

public class WithSpanInterceptorTest {
Expand Down Expand Up @@ -137,6 +141,64 @@ void spanWithException() {
((ExceptionEventData) spanItems.get(0).getEvents().get(0)).getException().getMessage());
}

@Test
void spanUni() {
assertEquals("hello Uni", spanBean.spanUni().await().atMost(Duration.ofSeconds(1)));
List<SpanData> spans = spanExporter.getFinishedSpanItems(1);

final SpanData parent = getSpanByKindAndParentId(spans, INTERNAL, "0000000000000000");
assertEquals("withSpanAndUni", parent.getName());
assertEquals(StatusCode.UNSET, parent.getStatus().getStatusCode());
}

@Test
void spanUniWithException() {
try {
spanBean.spanUniWithException().await().atMost(Duration.ofSeconds(1));
fail("Exception expected");
} catch (Exception e) {
assertThrows(RuntimeException.class, () -> {
throw e;
});
}
List<SpanData> spanItems = spanExporter.getFinishedSpanItems(1);
assertEquals("withSpanAndUni", spanItems.get(0).getName());
assertEquals(INTERNAL, spanItems.get(0).getKind());
assertEquals(ERROR, spanItems.get(0).getStatus().getStatusCode());
assertEquals(1, spanItems.get(0).getEvents().size());
assertEquals("hello Uni",
((ExceptionEventData) spanItems.get(0).getEvents().get(0)).getException().getMessage());
}

@Test
void spanMulti() {
assertEquals("hello Multi 2", spanBean.spanMulti().collect().last().await().atMost(Duration.ofSeconds(1)));
List<SpanData> spans = spanExporter.getFinishedSpanItems(1);

final SpanData parent = getSpanByKindAndParentId(spans, INTERNAL, "0000000000000000");
assertEquals("withSpanAndMulti", parent.getName());
assertEquals(StatusCode.UNSET, parent.getStatus().getStatusCode());
}

@Test
void spanMultiWithException() {
try {
spanBean.spanMultiWithException().collect().last().await().atMost(Duration.ofSeconds(1));
fail("Exception expected");
} catch (Exception e) {
assertThrows(RuntimeException.class, () -> {
throw e;
});
}
List<SpanData> spanItems = spanExporter.getFinishedSpanItems(1);
assertEquals("withSpanAndMulti", spanItems.get(0).getName());
assertEquals(INTERNAL, spanItems.get(0).getKind());
assertEquals(ERROR, spanItems.get(0).getStatus().getStatusCode());
assertEquals(1, spanItems.get(0).getEvents().size());
assertEquals("hello Multi",
((ExceptionEventData) spanItems.get(0).getEvents().get(0)).getException().getMessage());
}

@ApplicationScoped
public static class SpanBean {
@WithSpan
Expand Down Expand Up @@ -179,6 +241,26 @@ public void spanChild() {
public void spanRestClient() {
spanRestClient.spanRestClient();
}

@WithSpan(value = "withSpanAndUni")
public Uni<String> spanUni() {
return Uni.createFrom().item("hello Uni");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would duplicate all these to have truly async responses and not only items emitted immediately.

Something like:

@Inject Vertx vertx;

public Uni<String> spanUniAsync() {
        return Uni.createFrom().emitter(e -> {
            vertx.setTimer(100, x -> e.complete("hello Uni");
        });
}

None of these run on a duplicated context, too, which might make propagation tricky. I recommend also testing with and without duplicated context.

}

@WithSpan(value = "withSpanAndUni")
public Uni<String> spanUniWithException() {
return Uni.createFrom().failure(new RuntimeException("hello Uni"));
}

@WithSpan(value = "withSpanAndMulti")
public Multi<String> spanMulti() {
return Multi.createFrom().items("hello Multi 1", "hello Multi 2");
}

@WithSpan(value = "withSpanAndMulti")
public Multi<String> spanMultiWithException() {
return Multi.createFrom().failure(new RuntimeException("hello Multi"));
}
}

@ApplicationScoped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.eclipse.microprofile.context.spi.ThreadContextProvider;
import org.eclipse.microprofile.context.spi.ThreadContextSnapshot;

import io.opentelemetry.api.trace.Span;
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;

public class OpenTelemetryMpContextPropagationProvider implements ThreadContextProvider {
Expand All @@ -26,7 +27,10 @@ public ThreadContextController begin() {
return new ThreadContextController() {
@Override
public void endContext() throws IllegalStateException {
QuarkusContextStorage.INSTANCE.attach(currentContext);
Span span = Span.fromContext(currentContext);
brunobat marked this conversation as resolved.
Show resolved Hide resolved
if (span != null && span.isRecording()) {
QuarkusContextStorage.INSTANCE.attach(currentContext);
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;

import jakarta.annotation.Priority;
import jakarta.interceptor.AroundInvoke;
Expand All @@ -21,9 +24,13 @@
import io.opentelemetry.instrumentation.api.annotation.support.ParameterAttributeNamesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.util.SpanNames;
import io.quarkus.arc.ArcInvocationContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Functions;

@SuppressWarnings("CdiInterceptorInspection")
@Interceptor
Expand All @@ -43,7 +50,12 @@ public WithSpanInterceptor(final OpenTelemetry openTelemetry) {
MethodRequest::getArgs);

this.instrumenter = builder.addAttributesExtractor(attributesExtractor)
.buildInstrumenter(methodRequest -> spanKindFromMethod(methodRequest.getAnnotationBindings()));
.buildInstrumenter(new SpanKindExtractor<MethodRequest>() {
@Override
public SpanKind extract(MethodRequest methodRequest) {
return spanKindFromMethod(methodRequest.getAnnotationBindings());
}
});
}

@AroundInvoke
Expand All @@ -53,35 +65,108 @@ public Object span(final ArcInvocationContext invocationContext) throws Exceptio
invocationContext.getParameters(),
invocationContext.getInterceptorBindings());

final Class<?> returnType = invocationContext.getMethod().getReturnType();
Context parentContext = Context.current();
brunobat marked this conversation as resolved.
Show resolved Hide resolved
Context spanContext = null;
Scope scope = null;
boolean shouldStart = instrumenter.shouldStart(parentContext, methodRequest);
if (shouldStart) {
spanContext = instrumenter.start(parentContext, methodRequest);
scope = spanContext.makeCurrent();
}

try {
Object result = invocationContext.proceed();

if (shouldStart) {
instrumenter.end(spanContext, methodRequest, null, null);
}
if (!shouldStart) {
return invocationContext.proceed();
}

return result;
} catch (Throwable t) {
if (shouldStart) {
instrumenter.end(spanContext, methodRequest, null, t);
}
throw t;
} finally {
if (scope != null) {
scope.close();
if (isUni(returnType)) {
final Context currentSpanContext = instrumenter.start(parentContext, methodRequest);
final Scope currentScope = currentSpanContext.makeCurrent();
return ((Uni<Object>) invocationContext.proceed()).onTermination()
.invoke(new Functions.TriConsumer<Object, Throwable, Boolean>() {
@Override
public void accept(Object o, Throwable throwable, Boolean isCancelled) {
try {
if (isCancelled) {
instrumenter.end(currentSpanContext, methodRequest, null,
new CancellationException());
} else if (throwable != null) {
instrumenter.end(currentSpanContext, methodRequest, null, throwable);
} else {
instrumenter.end(currentSpanContext, methodRequest, null, null);
}
} finally {
if (currentScope != null) {
currentScope.close();
}
}
}
});
} else if (isMulti(returnType)) {
final Context currentSpanContext = instrumenter.start(parentContext, methodRequest);
final Scope currentScope = currentSpanContext.makeCurrent();

return ((Multi<Object>) invocationContext.proceed()).onTermination().invoke(new BiConsumer<Throwable, Boolean>() {
@Override
public void accept(Throwable throwable, Boolean isCancelled) {
try {
if (isCancelled) {
instrumenter.end(currentSpanContext, methodRequest, null, new CancellationException());
} else if (throwable != null) {
instrumenter.end(currentSpanContext, methodRequest, null, throwable);
} else {
instrumenter.end(currentSpanContext, methodRequest, null, null);
}
} finally {
if (currentScope != null) {
currentScope.close();
}
}
}
});
} else if (isCompletionStage(returnType)) {
final Context currentSpanContext = instrumenter.start(parentContext, methodRequest);
final Scope currentScope = currentSpanContext.makeCurrent();
return ((CompletionStage<?>) invocationContext.proceed()).whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object o, Throwable throwable) {
try {
if (throwable != null) {
instrumenter.end(currentSpanContext, methodRequest, null, throwable);
} else {
instrumenter.end(currentSpanContext, methodRequest, null, null);
}
} finally {
if (currentScope != null) {
currentScope.close();
}
}
}
});
} else {
final Context currentSpanContext = instrumenter.start(parentContext, methodRequest);
final Scope currentScope = currentSpanContext.makeCurrent();
try {
Object result = invocationContext.proceed();
instrumenter.end(currentSpanContext, methodRequest, null, null);
return result;
} catch (Throwable t) {
instrumenter.end(currentSpanContext, methodRequest, null, t);
throw t;
} finally {
if (currentScope != null) {
currentScope.close();
}
}
}
}

private static boolean isUni(Class<?> clazz) {
return Uni.class.isAssignableFrom(clazz);
}

private static boolean isMulti(Class<?> clazz) {
return Multi.class.isAssignableFrom(clazz);
}

private static boolean isCompletionStage(Class<?> clazz) {
return CompletionStage.class.isAssignableFrom(clazz);
}

private static SpanKind spanKindFromMethod(Set<Annotation> annotations) {
SpanKind spanKind = null;
for (Annotation annotation : annotations) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.quarkus.it.opentelemetry.reactive;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
Expand All @@ -12,23 +15,47 @@

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.smallrye.mutiny.Uni;

@Path("/reactive")
public class ReactiveResource {
public static final int MILLISECONDS_DELAY = 100;
@Inject
Tracer tracer;
@Inject
@RestClient
ReactiveRestClient client;

private ScheduledExecutorService executor;

@PostConstruct
public void init() {
executor = Executors.newScheduledThreadPool(2);
}

@GET
public Uni<String> helloGet(@QueryParam("name") String name) {
Span span = tracer.spanBuilder("helloGet").startSpan();
return Uni.createFrom().item("Hello " + name).onItem().delayIt().by(Duration.ofSeconds(2))
return Uni.createFrom().item("Hello " + name).onItem().delayIt().by(Duration.ofMillis(MILLISECONDS_DELAY))
.eventually((Runnable) span::end);
}

@GET
@Path("/hello-get-uni-delay")
@WithSpan("helloGetUniDelay")
public Uni<String> helloGetUniDelay() {
return Uni.createFrom().item("helloGetUniDelay").onItem().delayIt().by(Duration.ofMillis(MILLISECONDS_DELAY));
}

@GET
@Path("/hello-get-uni-executor")
@WithSpan("helloGetUniExecutor")
public Uni<String> helloGetUniExecutor() {
return Uni.createFrom().item("helloGetUniExecutor")
.onItem().delayIt().onExecutor(executor).by(Duration.ofMillis(MILLISECONDS_DELAY));
}

@GET
@Path("/multiple-chain")
public Uni<String> helloMultipleUsingChain() {
Expand All @@ -48,7 +75,7 @@ public Uni<String> helloMultipleUsingCombine() {
@POST
public Uni<String> helloPost(String body) {
Span span = tracer.spanBuilder("helloPost").startSpan();
return Uni.createFrom().item("Hello " + body).onItem().delayIt().by(Duration.ofSeconds(2))
return Uni.createFrom().item("Hello " + body).onItem().delayIt().by(Duration.ofMillis(MILLISECONDS_DELAY))
.eventually((Runnable) span::end);
}

Expand Down
Loading
Loading