Skip to content

Commit

Permalink
make reactor 0.9 behave the same way 1.0 does, add request error test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek committed Apr 28, 2021
1 parent 2b80e1f commit 1f267aa
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,23 @@ public static boolean shouldDecorate(Class<?> callbackClass) {

private abstract static class OnMessageDecorator<M> implements BiConsumer<M, Connection> {
private final BiConsumer<? super M, ? super Connection> delegate;
private final boolean forceParentContext;

public OnMessageDecorator(BiConsumer<? super M, ? super Connection> delegate) {
public OnMessageDecorator(BiConsumer<? super M, ? super Connection> delegate,
boolean forceParentContext) {
this.delegate = delegate;
this.forceParentContext = forceParentContext;
}

@Override
public final void accept(M message, Connection connection) {
Context context = getChannelContext(currentContext(message), connection.channel());
Channel channel = connection.channel();
// don't try to get the client span from the netty channel when forceParentSpan is true
// this way the parent context will always be propagated
if (forceParentContext) {
channel = null;
}
Context context = getChannelContext(currentContext(message), channel);
if (context == null) {
delegate.accept(message, connection);
} else {
Expand All @@ -46,7 +55,7 @@ public final void accept(M message, Connection connection) {

public static final class OnRequestDecorator extends OnMessageDecorator<HttpClientRequest> {
public OnRequestDecorator(BiConsumer<? super HttpClientRequest, ? super Connection> delegate) {
super(delegate);
super(delegate, false);
}

@Override
Expand All @@ -57,8 +66,9 @@ reactor.util.context.Context currentContext(HttpClientRequest message) {

public static final class OnResponseDecorator extends OnMessageDecorator<HttpClientResponse> {
public OnResponseDecorator(
BiConsumer<? super HttpClientResponse, ? super Connection> delegate) {
super(delegate);
BiConsumer<? super HttpClientResponse, ? super Connection> delegate,
boolean forceParentContext) {
super(delegate, forceParentContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,12 @@ public static class OnResponseAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false)
BiConsumer<? super HttpClientResponse, ? super Connection> callback) {
BiConsumer<? super HttpClientResponse, ? super Connection> callback,
@Advice.Origin("#m") String methodName) {
if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
callback = new DecoratorFunctions.OnResponseDecorator(callback);
boolean forceParentContext = methodName.equals("doAfterResponse");
callback = new DecoratorFunctions.OnResponseDecorator(callback,
forceParentContext);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
* SPDX-License-Identifier: Apache-2.0
*/


import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace

import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.sdk.trace.data.SpanData
Expand Down Expand Up @@ -94,11 +98,48 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpCli
assertSameSpan(parentSpan, onRequestSpan)
assertSameSpan(nettyClientSpan, afterRequestSpan)
assertSameSpan(nettyClientSpan, onResponseSpan)
assertSameSpan(nettyClientSpan, afterResponseSpan)
assertSameSpan(parentSpan, afterResponseSpan)
}
}
}

def "should expose context to http request error callback"() {
given:
def onRequestErrorSpan = new AtomicReference<Span>()

def httpClient = createHttpClient()
.doOnRequestError({ rq, err -> onRequestErrorSpan.set(Span.current()) })

when:
runUnderTrace("parent") {
httpClient.get()
.uri("http://localhost:$UNUSABLE_PORT/")
.response()
.block()
}

then:
def ex = thrown(Exception)

assertTraces(1) {
trace(0, 2) {
def parentSpan = span(0)

basicSpan(it, 0, "parent", null, ex)
span(1) {
def actualException = ex.cause
kind SpanKind.CLIENT
childOf parentSpan
status StatusCode.ERROR
errorEvent(actualException.class, actualException.message)
}

assertSameSpan(parentSpan, onRequestErrorSpan)
}
}
}


private static void assertSameSpan(SpanData expected, AtomicReference<Span> actual) {
def expectedSpanContext = expected.spanContext
def actualSpanContext = actual.get().spanContext
Expand Down

0 comments on commit 1f267aa

Please sign in to comment.