Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarify kafka commit strategy "latest" in documentation #40914

Closed
vsevel opened this issue May 31, 2024 · 5 comments · Fixed by #40978
Closed

Clarify kafka commit strategy "latest" in documentation #40914

vsevel opened this issue May 31, 2024 · 5 comments · Fixed by #40978
Assignees
Labels
area/kafka kind/enhancement New feature or request
Milestone

Comments

@vsevel
Copy link
Contributor

vsevel commented May 31, 2024

Description

https://quarkus.io/guides/kafka#commit-strategies states "This strategy provides at-least-once delivery if the channel processes the message without performing any asynchronous processing.". According to this discussion , this should be read as "if the channel processes the message without performing any asynchronous processing, there could be message loss".
message loss is bad enough for many applications, that I believe it should be stated explicitly. and "asynchronous processing" should be clearly explained. are we talking about reactive style programming? or running subtasks on an executor service and acknowledging the message without waiting for the result? or doing straight processing with concurrency on (e.g. Blocking with ordered=false, or @RunOnVirtualThread)?

I ran some tests to verify the latest. I have a producer that pushes 5 messages per seconds. and a consumer that processes them with @RunOnVirtualThread printing simulating some processing with a sleep between 0 and 1000 ms, and printing ids in a file. from time to time I kill the consumer and restart. Separately I have a checker that reads the file with the ids and check that there may be some duplicates but no holes.
I was able to verify that with commit strategies throttled and latest I did not have any message loss. which is good news. but the question remains: what asynchronous processing are we talking about?

Implementation ideas

No response

@vsevel vsevel added the kind/enhancement New feature or request label May 31, 2024
@quarkus-bot
Copy link

quarkus-bot bot commented May 31, 2024

/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka)

@quarkus-bot
Copy link

quarkus-bot bot commented May 31, 2024

You added a link to a Zulip discussion, please make sure the description of the issue is comprehensive and doesn't require accessing Zulip

This message is automatically generated by a bot.

@ozangunalp
Copy link
Contributor

Indeed that phrase can use some clarification. Async processing in that case refers to processing where order is not preserved like Blocking with ordered=false, or RunOnVirtualThread.

The message loss scenario is not that complicated really:
Imagine receiving 3 records from a topic-partition with offsets 3,4,5 and they are concurrently processed. If the processing of 5 gets completed and it gets committed back to Kafka, you may have an error during processing 4 and you'd never receive it again, because you've already committed a later offset.

Throttled strategy prevents exactly this case by tracking processed offsets and committing offsets later.

Hope this clarifies.

@vsevel
Copy link
Contributor Author

vsevel commented Jun 3, 2024

thanks for the clarification. I can submit a doc PR if you want.
an alternative enhancement would be to provide a strategy "latest-consecutive-message" to avoid message loss. but I am not sure this would add a lot of value compared to throttled.

@pcasaes
Copy link
Contributor

pcasaes commented Jun 14, 2024

Hi, I believe I wrote the original documentation. At that time with latest if the user's code returned a completed future before acking the record then data could be lost (the record's offset could be committed before processing actually takes place). With throttled that is not the case since the offsets are updates only after acking. This presupposes an async method signature:

@Incoming("my-data")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> process(Message<MyData> message) {

 // ack message out of order on some other vertx context, executor thread, etc.

  return CompletableFuture.completedFuture(null);
}


EDIT: latest will commit on ack, so if the user processes out of order then that's when data can be lost. User requests 3 records and receives them with offsets 1,2.3. User only acks 3, Latest will commit to 3+1 even though the previous 2 records haven't been acked yet. Throttled on the other hand will wait until 1 and 2 are acked before committing to 3+1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka kind/enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants