Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TracingConsumer related exception due to missing currentLag method with Kafka greater than or equal 3.0.0 #6432

Closed
ppatierno opened this issue Aug 7, 2022 · 7 comments · Fixed by #6457
Labels
bug Something isn't working

Comments

@ppatierno
Copy link
Contributor

I am using the kafka-clients-2.6 instrumentation library in a Kafka Streams API based application which is using the org.apache.kafka:kafka-streams:3.2.0 dependency.
In order to wrap the consumer and producer that are used internally by such an application, the way is to develop a class implementing the KafkaClientSupplier interface.

For example, the class implementing the KafkaClientSupplier interface is wrapping a Kafka consumer in the following way:

@Override
public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
    KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
    return telemetry.wrap(new KafkaConsumer<>(config));
}

Anyway, when running such an application, it throws the following exception:

java.lang.AbstractMethodError: Receiver class io.opentelemetry.instrumentation.kafkaclients.TracingConsumer does not define or inherit an implementation of the resolved method 'abstract java.util.OptionalLong currentLag(org.apache.kafka.common.TopicPartition)' of interface org.apache.kafka.clients.consumer.Consumer.
	at org.apache.kafka.streams.processor.internals.PartitionGroup.readyToProcess(PartitionGroup.java:143) ~[kafka-streams-3.2.0.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamTask.isProcessable(StreamTask.java:674) ~[kafka-streams-3.2.0.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:694) ~[kafka-streams-3.2.0.jar:?]
	at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) ~[kafka-streams-3.2.0.jar:?]
	at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) ~[kafka-streams-3.2.0.jar:?]
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) ~[kafka-streams-3.2.0.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) ~[kafka-streams-3.2.0.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589) ~[kafka-streams-3.2.0.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551) ~[kafka-streams-3.2.0.jar:?]

The problem is related to the Kafka 2.6 version which is used in the Java instrumentation library.
Starting from Kafka 3.0.0, the Consumer interface has a new OptionalLong currentLag(TopicPartition topicPartition); method which is, of course, not implemented by the current TracingConsumer (because based on Kafka 2.6 which doesn't have such a method).
For this reason, instrumenting a Kafka Streams API based application using since Kafka 3.0.0 doesn't work anymore.

The instrumentation library should be upgraded to use Kafka 3.0.0 (even if I would upgrade to the latest 3.2.0) and implementing the missing method.

@ppatierno ppatierno added the bug Something isn't working label Aug 7, 2022
@trask
Copy link
Member

trask commented Aug 7, 2022

hi @ppatierno! check out https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/VERSIONING.md#dropping-support-for-older-library-versions

tl;dr, updating the library instrumentation "min version" to 3.0.0 seems reasonable to me. in general we wouldn't update all the way to 3.2.0 unless that is necessary (the library instrumentation doesn't pull in the library transitively, so it's not like the library instrumentation is pulling in an old version of the library)

@ppatierno
Copy link
Contributor Author

Thanks @trask ... so AFAIU, we can "easily" bump to 3.0.0 and fix the TracingConsumer class to implement the new (missing) method.
At this point when having a new release, a user using Kafka >= 3.0.0 can use the newer release while other users using Kafka < 3.0.0 will stay on previous library release.
Other question, is about the actual directory and library name which clearly states "2.6" version kafka-clients-2.6. Should we remove the version from the name from now on?

@laurit
Copy link
Contributor

laurit commented Aug 8, 2022

We could replace TracingConsumer with a jdk proxy. This way we could instrument only the methods we care about and just delegate the rest to the underlying instance without having to bother with the methods that were added in a later version.

@ppatierno
Copy link
Contributor Author

I think this is a pretty good idea. In the end, the TracingConsumer is already acting as a proxy by delegating all the calls to the underlying Consumer instance. The poll method is the only one to be instrumented to build and finish the span https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingConsumer.java#L83

We could use the same approach for the TracingProducer which is delegating all calls to the underlying Producer instance. The send method is the only one to be instrumented to build and inject the span https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingProducer.java#L70

@ppatierno
Copy link
Contributor Author

@laurit @trask thanks for fixing this. When do you think a new release will be available with this in?

@laurit
Copy link
Contributor

laurit commented Aug 12, 2022

@ppatierno should be available in latest snapshot https://oss.sonatype.org/content/repositories/snapshots/io/opentelemetry/javaagent/opentelemetry-javaagent/1.17.0-SNAPSHOT/ Current plan is to release next week.

@ppatierno
Copy link
Contributor Author

I used the snapshot and the bug was fixed ;-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants