Skip to content

Commit

Permalink
Start a CONSUMER span for Kafka poll(); and refactor spring-kafka... (#…
Browse files Browse the repository at this point in the history
…4041)

* Start a separate CONSUMER receive span for each non-empty KafkaConsumer#poll() call

* One batch receive + one batch process span in spring-kafka

* Add CONSUMER receive spans to kafka-streams too

* codenarc

* code review comments
  • Loading branch information
Mateusz Rzeszutek committed Sep 8, 2021
1 parent 28e5cb5 commit e30b082
Show file tree
Hide file tree
Showing 23 changed files with 669 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ muzzle {
}

dependencies {
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent"))

library("org.apache.kafka:kafka-clients:0.11.0.0")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import java.util.Iterator;
import java.util.List;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class ConsumerRecordsInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.kafka.clients.consumer.ConsumerRecords");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, String.class))
.and(returns(Iterable.class)),
ConsumerRecordsInstrumentation.class.getName() + "$IterableAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, named("org.apache.kafka.common.TopicPartition")))
.and(returns(List.class)),
ConsumerRecordsInstrumentation.class.getName() + "$ListAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("iterator"))
.and(takesArguments(0))
.and(returns(Iterator.class)),
ConsumerRecordsInstrumentation.class.getName() + "$IteratorAdvice");
}

@SuppressWarnings("unused")
public static class IterableAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(
@Advice.Return(readOnly = false) Iterable<ConsumerRecord<?, ?>> iterable) {
if (iterable != null) {
ContextStore<ConsumerRecords, SpanContext> consumerRecordsSpan =
InstrumentationContext.get(ConsumerRecords.class, SpanContext.class);
iterable = new TracingIterable(iterable);
}
}
}

@SuppressWarnings("unused")
public static class ListAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(@Advice.Return(readOnly = false) List<ConsumerRecord<?, ?>> iterable) {
if (iterable != null) {
iterable = new TracingList(iterable);
}
}
}

@SuppressWarnings("unused")
public static class IteratorAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(
@Advice.Return(readOnly = false) Iterator<ConsumerRecord<?, ?>> iterator) {
if (iterator != null) {
iterator = new TracingIterator(iterator);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public KafkaClientsInstrumentationModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new KafkaConsumerInstrumentation(), new KafkaProducerInstrumentation());
return asList(
new KafkaProducerInstrumentation(),
new KafkaConsumerInstrumentation(),
new ConsumerRecordsInstrumentation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,86 +5,76 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.spanFromContext;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerReceiveInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Iterator;
import java.util.List;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import java.time.Duration;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class KafkaConsumerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.kafka.clients.consumer.ConsumerRecords");
return named("org.apache.kafka.clients.consumer.KafkaConsumer");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
named("poll")
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, String.class))
.and(returns(Iterable.class)),
KafkaConsumerInstrumentation.class.getName() + "$IterableAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, named("org.apache.kafka.common.TopicPartition")))
.and(returns(List.class)),
KafkaConsumerInstrumentation.class.getName() + "$ListAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("iterator"))
.and(takesArguments(0))
.and(returns(Iterator.class)),
KafkaConsumerInstrumentation.class.getName() + "$IteratorAdvice");
.and(takesArguments(1))
.and(takesArgument(0, long.class).or(takesArgument(0, Duration.class)))
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
this.getClass().getName() + "$PollAdvice");
}

@SuppressWarnings("unused")
public static class IterableAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(
@Advice.Return(readOnly = false) Iterable<ConsumerRecord<?, ?>> iterable) {
if (iterable != null) {
iterable = new TracingIterable(iterable);
}
public static class PollAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Timer onEnter() {
return Timer.start();
}
}

@SuppressWarnings("unused")
public static class ListAdvice {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void onExit(
@Advice.Enter Timer timer,
@Advice.Return ConsumerRecords<?, ?> records,
@Advice.Thrown Throwable error) {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(@Advice.Return(readOnly = false) List<ConsumerRecord<?, ?>> iterable) {
if (iterable != null) {
iterable = new TracingList(iterable);
// don't create spans when no records were received
if (records == null || records.isEmpty()) {
return;
}
}
}

@SuppressWarnings("unused")
public static class IteratorAdvice {
Context parentContext = currentContext();
ReceivedRecords receivedRecords = ReceivedRecords.create(records, timer);
if (consumerReceiveInstrumenter().shouldStart(parentContext, receivedRecords)) {
Context context = consumerReceiveInstrumenter().start(parentContext, receivedRecords);
consumerReceiveInstrumenter().end(context, receivedRecords, null, error);

@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(
@Advice.Return(readOnly = false) Iterator<ConsumerRecord<?, ?>> iterator) {
if (iterator != null) {
iterator = new TracingIterator(iterator, consumerInstrumenter());
// we're storing the context of the receive span so that process spans can use it as parent
// context even though the span has ended
// this is the suggested behavior according to the spec batch receive scenario:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
ContextStore<ConsumerRecords, SpanContext> consumerRecordsSpan =
InstrumentationContext.get(ConsumerRecords.class, SpanContext.class);
consumerRecordsSpan.put(records, spanFromContext(context).getSpanContext());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.kafka;
package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
Expand All @@ -13,70 +13,71 @@
import org.apache.kafka.common.TopicPartition;
import org.checkerframework.checker.nullness.qual.Nullable;

public final class BatchConsumerAttributesExtractor
extends MessagingAttributesExtractor<BatchRecords<?, ?>, Void> {
public final class KafkaReceiveAttributesExtractor
extends MessagingAttributesExtractor<ReceivedRecords, Void> {

@Override
protected String system(BatchRecords<?, ?> batchRecords) {
protected String system(ReceivedRecords receivedRecords) {
return "kafka";
}

@Override
protected String destinationKind(BatchRecords<?, ?> batchRecords) {
protected String destinationKind(ReceivedRecords receivedRecords) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}

@Override
protected @Nullable String destination(BatchRecords<?, ?> batchRecords) {
protected @Nullable String destination(ReceivedRecords receivedRecords) {
Set<String> topics =
batchRecords.records().partitions().stream()
receivedRecords.records().partitions().stream()
.map(TopicPartition::topic)
.collect(Collectors.toSet());
// only return topic when there's exactly one in the batch
return topics.size() == 1 ? topics.iterator().next() : null;
}

@Override
protected boolean temporaryDestination(BatchRecords<?, ?> batchRecords) {
protected boolean temporaryDestination(ReceivedRecords receivedRecords) {
return false;
}

@Override
protected @Nullable String protocol(BatchRecords<?, ?> batchRecords) {
protected @Nullable String protocol(ReceivedRecords receivedRecords) {
return null;
}

@Override
protected @Nullable String protocolVersion(BatchRecords<?, ?> batchRecords) {
protected @Nullable String protocolVersion(ReceivedRecords receivedRecords) {
return null;
}

@Override
protected @Nullable String url(BatchRecords<?, ?> batchRecords) {
protected @Nullable String url(ReceivedRecords receivedRecords) {
return null;
}

@Override
protected @Nullable String conversationId(BatchRecords<?, ?> batchRecords) {
protected @Nullable String conversationId(ReceivedRecords receivedRecords) {
return null;
}

@Override
protected @Nullable Long messagePayloadSize(BatchRecords<?, ?> batchRecords) {
protected @Nullable Long messagePayloadSize(ReceivedRecords receivedRecords) {
return null;
}

@Override
protected @Nullable Long messagePayloadCompressedSize(BatchRecords<?, ?> batchRecords) {
protected @Nullable Long messagePayloadCompressedSize(ReceivedRecords receivedRecords) {
return null;
}

@Override
protected MessageOperation operation(BatchRecords<?, ?> batchRecords) {
return MessageOperation.PROCESS;
protected MessageOperation operation(ReceivedRecords receivedRecords) {
return MessageOperation.RECEIVE;
}

@Override
protected @Nullable String messageId(BatchRecords<?, ?> batchRecords, @Nullable Void unused) {
protected @Nullable String messageId(ReceivedRecords receivedRecords, @Nullable Void unused) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ public final class KafkaSingletons {

private static final Instrumenter<ProducerRecord<?, ?>, Void> PRODUCER_INSTRUMENTER =
buildProducerInstrumenter();
private static final Instrumenter<ConsumerRecord<?, ?>, Void> CONSUMER_INSTRUMENTER =
buildConsumerInstrumenter();
private static final Instrumenter<ReceivedRecords, Void> CONSUMER_RECEIVE_INSTRUMENTER =
buildConsumerReceiveInstrumenter();
private static final Instrumenter<ConsumerRecord<?, ?>, Void> CONSUMER_PROCESS_INSTRUMENTER =
buildConsumerProcessInstrumenter();

private static Instrumenter<ProducerRecord<?, ?>, Void> buildProducerInstrumenter() {
KafkaProducerAttributesExtractor attributesExtractor = new KafkaProducerAttributesExtractor();
Expand All @@ -39,7 +41,19 @@ public final class KafkaSingletons {
.newInstrumenter(SpanKindExtractor.alwaysProducer());
}

private static Instrumenter<ConsumerRecord<?, ?>, Void> buildConsumerInstrumenter() {
private static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumenter() {
KafkaReceiveAttributesExtractor attributesExtractor = new KafkaReceiveAttributesExtractor();
SpanNameExtractor<ReceivedRecords> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);

return Instrumenter.<ReceivedRecords, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.setTimeExtractors(ReceivedRecords::startTime, (request, response) -> request.now())
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}

private static Instrumenter<ConsumerRecord<?, ?>, Void> buildConsumerProcessInstrumenter() {
KafkaConsumerAttributesExtractor attributesExtractor =
new KafkaConsumerAttributesExtractor(MessageOperation.PROCESS);
SpanNameExtractor<ConsumerRecord<?, ?>> spanNameExtractor =
Expand All @@ -62,8 +76,12 @@ public final class KafkaSingletons {
return PRODUCER_INSTRUMENTER;
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> consumerInstrumenter() {
return CONSUMER_INSTRUMENTER;
public static Instrumenter<ReceivedRecords, Void> consumerReceiveInstrumenter() {
return CONSUMER_RECEIVE_INSTRUMENTER;
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter() {
return CONSUMER_PROCESS_INSTRUMENTER;
}

private KafkaSingletons() {}
Expand Down
Loading

0 comments on commit e30b082

Please sign in to comment.