Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce ack trackers to prevent batch message loss #1277

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

KennyChenFight
Copy link

Motivation

In the current implementation of messageID.Serialize(), the ackTracker information is not recorded. This causes an issue when the client uses DeserializeMessageID to restore the message ID. The Pulsar consumer will mistakenly acknowledge the message within the batch, causing other unacknowledged messages in the batch to be acknowledged as well. This results in message loss for the client.

Modifications

In the current consumer implementation, maintain the ackTrackers structure. When a trackingMessageID lacks an ackTracker, attempt to retrieve it from ackTrackers. Only remove the corresponding ackTracker from ackTrackers after all messages in the batch have been acknowledged.

@KennyChenFight KennyChenFight marked this pull request as ready for review August 30, 2024 09:32
@KennyChenFight KennyChenFight changed the title feat: introduce ack trackers feat: introduce ack trackers to prevent batch message loss Aug 31, 2024
@nodece
Copy link
Member

nodece commented Sep 5, 2024

Good work!

I think we should fix this issue in the ack_grouping_tracker.go.

When acknowledging a batch message, ensure that the ledger ID, entry ID, and tracker are properly managed within a map. The logic for handling the tracker is as follows:

  • New batch Message without a Tracker:
    If the incoming batch message does not have a tracker but a matching ledger ID and entry ID exist in the map, use the tracker already stored in the map.

  • New batch Message with a Tracker:
    If the incoming batch message has a tracker but it differs from the one already in the map for the same ledger ID and entry ID, merge the two trackers to ensure consistency.

What do you think?

@KennyChenFight
Copy link
Author

Good work!

I think we should fix this issue in the ack_grouping_tracker.go.

When acknowledging a batch message, ensure that the ledger ID, entry ID, and tracker are properly managed within a map. The logic for handling the tracker is as follows:

  • New batch Message without a Tracker:
    If the incoming batch message does not have a tracker but a matching ledger ID and entry ID exist in the map, use the tracker already stored in the map.
  • New batch Message with a Tracker:
    If the incoming batch message has a tracker but it differs from the one already in the map for the same ledger ID and entry ID, merge the two trackers to ensure consistency.

What do you think?

Hello!
I have previously considered implementing it in ack_grouping_tracker.go, but there is an issue. In the current implementation, the messages added to ack_grouping_tracker are considered ready to be acknowledged.

However, if we record the batch index here, it means that adding a message does not necessarily mean it can be acknowledged, which might be a bit strange.

Or perhaps we need to move this part:

if trackingID != nil && trackingID.ack() {
	// All messages in the same batch have been acknowledged, we only need to acknowledge the
	// MessageID that represents the entry that stores the whole batch
	trackingID = &trackingMessageID{
		messageID: &messageID{
			ledgerID: trackingID.ledgerID,
			entryID:  trackingID.entryID,
		},
	}

	pc.metrics.AcksCounter.Inc()
	pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
}

to ack_grouping_tracker as well?

@nodece
Copy link
Member

nodece commented Sep 5, 2024

the messages added to ack_grouping_tracker are considered ready to be acknowledged.

You can create a map to record the batch message, such as batchMessagePendingAcks map[[2]uint64]*batchMessageTracker.

However, if we record the batch index here, it means that adding a message does not necessarily mean it can be acknowledged, which might be a bit strange.

Only record the ledger ID and entry ID.

Or perhaps we need to move this part: ... to ack_grouping_tracker as well?

Sounds good!

@KennyChenFight
Copy link
Author

the messages added to ack_grouping_tracker are considered ready to be acknowledged.

You can create a map to record the batch message, such as batchMessagePendingAcks map[[2]uint64]*batchMessageTracker.

However, if we record the batch index here, it means that adding a message does not necessarily mean it can be acknowledged, which might be a bit strange.

Only record the ledger ID and entry ID.

Or perhaps we need to move this part: ... to ack_grouping_tracker as well?

Sounds good!

Sounds good!
Let me try it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants