Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture request body for HttpUrlConnection #3724

Merged
merged 6 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,30 @@
import co.elastic.apm.agent.impl.context.SpanContextImpl;
import co.elastic.apm.agent.impl.context.UrlImpl;
import co.elastic.apm.agent.impl.stacktrace.StacktraceConfigurationImpl;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.util.ResultUtil;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.tracer.Outcome;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.SpanEndListener;
import co.elastic.apm.agent.tracer.pooling.Recyclable;
import co.elastic.apm.agent.tracer.util.ResultUtil;
import co.elastic.apm.agent.util.CharSequenceUtils;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class SpanImpl extends AbstractSpanImpl<SpanImpl> implements Recyclable, Span<SpanImpl> {

/**
* Protection against excessive memory usage and span ending run times:
* We limit the maximum allowed number of end listeners.
*/
static final int MAX_END_LISTENERS = 100;
private static final Logger logger = LoggerFactory.getLogger(SpanImpl.class);
public static final long MAX_LOG_INTERVAL_MICRO_SECS = TimeUnit.MINUTES.toMicros(5);
private static long lastSpanMaxWarningTimestamp;
Expand Down Expand Up @@ -75,6 +84,9 @@ public class SpanImpl extends AbstractSpanImpl<SpanImpl> implements Recyclable,
@Nullable
private List<StackFrame> stackFrames;

private final Set<SpanEndListener<? super SpanImpl>> endListeners =
Collections.newSetFromMap(new ConcurrentHashMap<SpanEndListener<? super SpanImpl>, Boolean>());

/**
* If a span is non-discardable, all the spans leading up to it are non-discardable as well
*/
Expand Down Expand Up @@ -174,6 +186,25 @@ public SpanImpl withAction(@Nullable String action) {
return this;
}

@Override
public void addEndListener(SpanEndListener<? super SpanImpl> listener) {
if (endListeners.size() < MAX_END_LISTENERS) {
endListeners.add(listener);
} else {
if (logger.isDebugEnabled()) {
logger.warn("Not adding span end listener because limit is reached: {}," +
" throwable stacktrace will be added for debugging", listener, new Throwable());
} else {
logger.warn("Not adding span end listener because limit is reached: {}", listener);
}
}
}

@Override
public void removeEndListener(SpanEndListener<? super SpanImpl> listener) {
endListeners.remove(listener);
}


/**
* Sets span.type, span.subtype and span.action. If no subtype and action are provided, assumes the legacy usage of hierarchical
Expand Down Expand Up @@ -221,6 +252,9 @@ public String getAction() {

@Override
public void beforeEnd(long epochMicros) {
for (SpanEndListener<? super SpanImpl> endListener : endListeners) {
endListener.onEnd(this);
}
// set outcome when not explicitly set by user nor instrumentation
if (outcomeNotSet()) {
Outcome outcome;
Expand Down Expand Up @@ -476,6 +510,7 @@ public void resetState() {
super.resetState();
context.resetState();
composite.resetState();
endListeners.clear();
stacktrace = null;
subtype = null;
action = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,25 @@
import co.elastic.apm.agent.impl.sampling.ConstantSampler;
import co.elastic.apm.agent.objectpool.TestObjectPoolFactory;
import co.elastic.apm.agent.tracer.Outcome;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.SpanEndListener;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

public class SpanTest {

Expand Down Expand Up @@ -87,6 +93,64 @@ void testOutcomeExplicitlyToUnknown() {
assertThat(span.getOutcome()).isEqualTo(Outcome.UNKNOWN);
}

@Test
void checkEndListenersConcurrencySafe() {
TransactionImpl transaction = new TransactionImpl(tracer);
transaction.startRoot(0, ConstantSampler.of(true), BaggageImpl.EMPTY);
try {
SpanImpl span = new SpanImpl(tracer);
span.start(TraceContextImpl.fromParent(), transaction, BaggageImpl.EMPTY, -1L);

AtomicInteger invocationCounter = new AtomicInteger();
SpanEndListener<Span<?>> callback = new SpanEndListener<Span<?>>() {
@Override
public void onEnd(Span<?> span) {
span.removeEndListener(this);
invocationCounter.incrementAndGet();
}
};
span.addEndListener(callback);
span.end();
assertThat(invocationCounter.get()).isEqualTo(1);
} finally {
transaction.end();
}

}

@Test
@SuppressWarnings("unchecked")
void checkEndListenersLimit() {
TransactionImpl transaction = new TransactionImpl(tracer);
transaction.startRoot(0, ConstantSampler.of(true), BaggageImpl.EMPTY);
try {
SpanImpl span = new SpanImpl(tracer);
span.start(TraceContextImpl.fromParent(), transaction, BaggageImpl.EMPTY, -1L);

for (int i = 0; i < SpanImpl.MAX_END_LISTENERS - 1; i++) {
span.addEndListener(new SpanEndListener<SpanImpl>() {
@Override
public void onEnd(SpanImpl span) {

}
});
}

SpanEndListener<SpanImpl> invokeMe = (SpanEndListener<SpanImpl>) Mockito.mock(SpanEndListener.class);
SpanEndListener<SpanImpl> dontInvokeMe = (SpanEndListener<SpanImpl>) Mockito.mock(SpanEndListener.class);
span.addEndListener(invokeMe);
span.addEndListener(dontInvokeMe);

span.end();

verify(invokeMe).onEnd(span);
verifyNoInteractions(dontInvokeMe);
} finally {
transaction.end();
}

}

@Test
void normalizeEmptyFields() {
SpanImpl span = new SpanImpl(tracer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ public static <T> CoderResult decodeUtf8BytesFromSource(ByteSourceReader<T> read
}
}

@Nullable
public static byte[] copyToByteArray(@Nullable ByteBuffer buf) {
if (buf == null) {
return null;
}
byte[] data = new byte[buf.position()];
buf.position(0);
buf.get(data);
return data;
}

public interface ByteSourceReader<S> {
int availableBytes(S source);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public boolean isNestedCallAndDecrement() {
return decrement() != 0;
}

private int get() {
public int get() {
Integer callDepthForCurrentThread = callDepthPerThread.get();
if (callDepthForCurrentThread == null) {
callDepthForCurrentThread = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ protected boolean isBodyCapturingSupported() {
return true;
}

@Override
public void testPostBodyCaptureForExistingSpan() throws Exception {
//TODO: async http client instrumentation does not support capturing bodies for existing spans yet
}

@Override
protected void performPost(String path, byte[] data, String contentTypeHeader) throws Exception {
final CompletableFuture<HttpResponse> responseFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package co.elastic.apm.agent.httpclient;

import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.SpanEndListener;
import co.elastic.apm.agent.tracer.metadata.BodyCapture;

class RequestBodyRecordingHelper implements SpanEndListener<Span<?>> {

/**
* We do not need to participate in span reference counting here.
* Instead, we only hold a reference to the span for the time it is not ended.
* This is ensured via the {@link #onEnd(Span)} callback.
*/
// Visible for testing
Span<?> clientSpan;

public RequestBodyRecordingHelper(Span<?> clientSpan) {
if (!clientSpan.isFinished()) {
this.clientSpan = clientSpan;
clientSpan.addEndListener(this);
}
}

void appendToBody(byte b) {
if (clientSpan != null) {
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
requestBody.append(b);
if (requestBody.isFull()) {
releaseSpan();
}
}
}

void appendToBody(byte[] b, int off, int len) {
if (clientSpan != null) {
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
requestBody.append(b, off, len);
if (requestBody.isFull()) {
releaseSpan();
}
}
}

void releaseSpan() {
if (clientSpan != null) {
clientSpan.removeEndListener(this);
}
clientSpan = null;
}

@Override
public void onEnd(Span<?> span) {
releaseSpan();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,29 @@
package co.elastic.apm.agent.httpclient;

import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.metadata.BodyCapture;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;

public class RequestBodyRecordingInputStream extends InputStream {

private final InputStream delegate;

@Nullable
private Span<?> clientSpan;
private final RequestBodyRecordingHelper recordingHelper;

public RequestBodyRecordingInputStream(InputStream delegate, Span<?> clientSpan) {
this.delegate = delegate;
clientSpan.incrementReferences();
this.clientSpan = clientSpan;
this.recordingHelper = new RequestBodyRecordingHelper(clientSpan);
}


protected void appendToBody(byte b) {
if (clientSpan != null) {
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
requestBody.append(b);
if (requestBody.isFull()) {
releaseSpan();
}
}
}

protected void appendToBody(byte[] b, int off, int len) {
if (clientSpan != null) {
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
requestBody.append(b, off, len);
if (requestBody.isFull()) {
releaseSpan();
}
}
}

public void releaseSpan() {
if (clientSpan != null) {
clientSpan.decrementReferences();
clientSpan = null;
}
}

@Override
public int read() throws IOException {
int character = delegate.read();
if (character == -1) {
releaseSpan();
recordingHelper.releaseSpan();
} else {
appendToBody((byte) character);
recordingHelper.appendToBody((byte) character);
}
return character;
}
Expand All @@ -81,9 +50,9 @@ public int read() throws IOException {
public int read(byte[] b, int off, int len) throws IOException {
int readBytes = delegate.read(b, off, len);
if (readBytes == -1) {
releaseSpan();
recordingHelper.releaseSpan();
} else {
appendToBody(b, off, readBytes);
recordingHelper.appendToBody(b, off, readBytes);
}
return readBytes;
}
Expand All @@ -96,7 +65,7 @@ public int available() throws IOException {
@Override
public void close() throws IOException {
try {
releaseSpan();
recordingHelper.releaseSpan();
} finally {
delegate.close();
}
Expand Down
Loading
Loading