Skip to content

Commit

Permalink
Change Async Servlet span end logic to fix race condition on Undertow (
Browse files Browse the repository at this point in the history
…#2992)

* Attach servlet async listener with asyncStart instrumentation

* Exclude Spring packages containing servlet request classes from global ignores

* Exclude Tapestry HSR proxy with global ignore

* Improve comments.

* Fix for Liberty - request response when adding async listener

* Removed unused methods

* Explicit response to async listeners on all servlet engines

* Attach response to request on Jetty

* Fix broken build due to rebase, improved a comment

* Address PR comments

* Added a comment.

* Addressed PR comments
  • Loading branch information
agoallikmaa committed May 26, 2021
1 parent 5f373b3 commit fd132d4
Show file tree
Hide file tree
Showing 33 changed files with 403 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public class Jetty11HandlerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This Object source,
@Advice.Argument(value = 2, readOnly = false) HttpServletRequest request,
@Advice.Argument(2) HttpServletRequest request,
@Advice.Argument(3) HttpServletResponse response,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

Expand All @@ -31,6 +32,9 @@ public static void onEnter(

context = tracer().startServerSpan(request);
scope = context.makeCurrent();

// Must be set here since Jetty handlers can use startAsync outside of servlet scope.
tracer().setAsyncListenerResponse(request, response);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public class Jetty8HandlerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This Object source,
@Advice.Argument(value = 2, readOnly = false) HttpServletRequest request,
@Advice.Argument(2) HttpServletRequest request,
@Advice.Argument(3) HttpServletResponse response,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

Expand All @@ -31,6 +32,9 @@ public static void onEnter(

context = tracer().startServerSpan(request);
scope = context.makeCurrent();

// Must be set here since Jetty handlers can use startAsync outside of servlet scope.
tracer().setAsyncListenerResponse(request, response);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ apply from: "$rootDir/gradle/instrumentation.gradle"

dependencies {
api(project(':instrumentation:servlet:servlet-common:library'))
implementation(project(':instrumentation:servlet:servlet-common:javaagent'))
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.servlet.ServletAccessor;
import io.opentelemetry.instrumentation.servlet.ServletHttpServerTracer;
import io.opentelemetry.instrumentation.servlet.TagSettingAsyncListener;
import java.util.concurrent.atomic.AtomicBoolean;
import io.opentelemetry.javaagent.instrumentation.servlet.common.service.ServletAndFilterAdviceHelper;

public class JettyHandlerAdviceHelper {
/** Shared method exit implementation for Jetty handler advices. */
Expand Down Expand Up @@ -45,23 +43,7 @@ public static <REQUEST, RESPONSE> void stopSpan(
return;
}

AtomicBoolean responseHandled = new AtomicBoolean(false);
ServletAccessor<REQUEST, RESPONSE> servletAccessor = tracer.getServletAccessor();

// In case of async servlets wait for the actual response to be ready
if (servletAccessor.isRequestAsyncStarted(request)) {
try {
servletAccessor.addRequestAsyncListener(
request, new TagSettingAsyncListener<>(tracer, responseHandled, context));
} catch (IllegalStateException e) {
// org.eclipse.jetty.server.Request may throw an exception here if request became
// finished after check above. We just ignore that exception and move on.
}
}

// Check again in case the request finished before adding the listener.
if (!servletAccessor.isRequestAsyncStarted(request)
&& responseHandled.compareAndSet(false, true)) {
if (ServletAndFilterAdviceHelper.mustEndOnHandlerMethodExit(tracer, request)) {
tracer.end(context, response);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ apply from: "$rootDir/gradle/instrumentation.gradle"
dependencies {
compileOnly "javax.servlet:javax.servlet-api:3.0.1"

implementation project(':instrumentation:servlet:servlet-common:javaagent')
implementation project(':instrumentation:servlet:servlet-3.0:javaagent')
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.servlet.v3_0.TagSettingAsyncListener;
import java.util.concurrent.atomic.AtomicBoolean;
import io.opentelemetry.javaagent.instrumentation.servlet.common.service.ServletAndFilterAdviceHelper;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
Expand All @@ -33,7 +32,7 @@ public static void onEnter(
// it is a bit too early to start span at this point because calling
// some methods on HttpServletRequest will give a NPE
// just remember the request and use it a bit later to start the span
ThreadLocalContext.startRequest(httpServletRequest);
ThreadLocalContext.startRequest(httpServletRequest, (HttpServletResponse) response);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down Expand Up @@ -68,17 +67,7 @@ public static void stopSpan(
return;
}

AtomicBoolean responseHandled = new AtomicBoolean(false);

// In case of async servlets wait for the actual response to be ready
if (request.isAsyncStarted()) {
request
.getAsyncContext()
.addListener(new TagSettingAsyncListener(responseHandled, context), request, response);
}

// Check again in case the request finished before adding the listener.
if (!request.isAsyncStarted() && responseHandled.compareAndSet(false, true)) {
if (ServletAndFilterAdviceHelper.mustEndOnHandlerMethodExit(tracer(), request)) {
tracer().end(context, response);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@ public static void onEnter() {

ctx.setContext(context);
ctx.setScope(scope);

// Must be set here since Liberty RequestProcessors can use startAsync outside of servlet scope.
tracer().setAsyncListenerResponse(ctx.getRequest(), ctx.getResponse());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class ThreadLocalContext {

private static final ThreadLocal<ThreadLocalContext> local = new ThreadLocal<>();

private final HttpServletRequest req;
private final HttpServletRequest request;
private final HttpServletResponse response;
private Context context;
private Scope scope;
private boolean started;

private ThreadLocalContext(HttpServletRequest req) {
this.req = req;
private ThreadLocalContext(HttpServletRequest request, HttpServletResponse response) {
this.request = request;
this.response = response;
}

public Context getContext() {
Expand All @@ -39,7 +42,11 @@ public void setScope(Scope scope) {
}

public HttpServletRequest getRequest() {
return req;
return request;
}

public HttpServletResponse getResponse() {
return response;
}

/**
Expand All @@ -53,8 +60,8 @@ public boolean startSpan() {
return !b;
}

public static void startRequest(HttpServletRequest req) {
ThreadLocalContext ctx = new ThreadLocalContext(req);
public static void startRequest(HttpServletRequest request, HttpServletResponse response) {
ThreadLocalContext ctx = new ThreadLocalContext(request, response);
local.set(ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ public Integer getRequestRemotePort(HttpServletRequest httpServletRequest) {
return null;
}

@Override
public boolean isRequestAsyncStarted(HttpServletRequest request) {
return false;
}

@Override
public void addRequestAsyncListener(
HttpServletRequest request, ServletAsyncListener<ResponseWithStatus> listener) {
HttpServletRequest request,
ServletAsyncListener<ResponseWithStatus> listener,
Object response) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public static void onEnter(

context = tracer().startSpan(httpServletRequest, mappingResolver, servlet);
scope = context.makeCurrent();

tracer().setAsyncListenerResponse(httpServletRequest, (HttpServletResponse) response);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.servlet.v3_0;

import static io.opentelemetry.instrumentation.servlet.v3_0.Servlet3HttpServerTracer.tracer;

import io.opentelemetry.javaagent.instrumentation.api.CallDepthThreadLocalMap;
import javax.servlet.AsyncContext;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServletRequest;
import net.bytebuddy.asm.Advice;

public class Servlet3AsyncStartAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void startAsyncEnter() {
// This allows to detect the outermost invocation of startAsync in method exit
CallDepthThreadLocalMap.incrementCallDepth(AsyncContext.class);
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void startAsyncExit(@Advice.This ServletRequest servletRequest) {
int callDepth = CallDepthThreadLocalMap.decrementCallDepth(AsyncContext.class);

if (callDepth != 0) {
// This is not the outermost invocation, ignore.
return;
}

if (servletRequest instanceof HttpServletRequest) {
HttpServletRequest request = (HttpServletRequest) servletRequest;

if (!tracer().isAsyncListenerAttached(request)) {
tracer().attachAsyncListener(request);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.async.AsyncContextInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.async.AsyncStartInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.service.ServletAndFilterInstrumentation;
import java.util.List;

Expand All @@ -30,7 +31,8 @@ public List<TypeInstrumentation> typeInstrumentations() {
BASE_PACKAGE,
adviceClassName(".Servlet3Advice"),
adviceClassName(".Servlet3InitAdvice"),
adviceClassName(".Servlet3FilterInitAdvice")));
adviceClassName(".Servlet3FilterInitAdvice")),
new AsyncStartInstrumentation(BASE_PACKAGE, adviceClassName(".Servlet3AsyncStartAdvice")));
}

private static String adviceClassName(String suffix) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ public Integer getRequestRemotePort(HttpServletRequest request) {
return request.getRemotePort();
}

@Override
public boolean isRequestAsyncStarted(HttpServletRequest request) {
return request.isAsyncStarted();
}

@Override
public void addRequestAsyncListener(
HttpServletRequest request, ServletAsyncListener<HttpServletResponse> listener) {
request.getAsyncContext().addListener(new Listener(listener));
HttpServletRequest request,
ServletAsyncListener<HttpServletResponse> listener,
Object response) {
if (response instanceof HttpServletResponse) {
request
.getAsyncContext()
.addListener(new Listener(listener), request, (HttpServletResponse) response);
} else {
request.getAsyncContext().addListener(new Listener(listener));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.async.AsyncContextInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.async.AsyncStartInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.response.HttpServletResponseInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.service.ServletAndFilterInstrumentation;
import java.util.Arrays;
Expand All @@ -27,6 +28,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
return Arrays.asList(
new AsyncContextInstrumentation(
BASE_PACKAGE, adviceClassName(".async.AsyncDispatchAdvice")),
new AsyncStartInstrumentation(BASE_PACKAGE, adviceClassName(".async.AsyncStartAdvice")),
new ServletAndFilterInstrumentation(
BASE_PACKAGE,
adviceClassName(".service.JakartaServletServiceAdvice"),
Expand Down
Loading

0 comments on commit fd132d4

Please sign in to comment.