Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [ARM] Partially acknowledged batches are not redelivered #424

Open
1 of 2 tasks
bph-sag opened this issue Apr 24, 2024 · 6 comments
Open
1 of 2 tasks

[Bug] [ARM] Partially acknowledged batches are not redelivered #424

bph-sag opened this issue Apr 24, 2024 · 6 comments

Comments

@bph-sag
Copy link

bph-sag commented Apr 24, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Version

Debian GNU/Linux 10 (buster), aarch64 (but the executable is arm32), pulsar-client-cpp 3.3.0

Minimal reproduce step

  1. Send a batch of events to the Pulsar broker (in our reproduction, it is a batch size of 309).
  2. Receive these messages (we're using pulsar::Consumer::receive in a loop), while continuously acknowledging them asynchronously (acknowledgements are done using pulsar::Consumer::acknowledge(const MessageId &messageId), where messageId is pulsar::Message::getMessageId).
  3. Terminate the program during the middle of receiving a batch (we're terminating after having received and acknowledged the 190/309 events from a batch).
  4. Resume the program.

What did you expect to see?

All of the events in the batch re-sent.

What did you see instead?

None of the events in the batch, including the ones never received are re-sent.

Anything else?

  • We log the result of the acknowledgement in the callback method, which shows us that the most we've acknowledged is the 190th message in a batch.
  • Exclusive subscription.
  • Running it on Gravitron2/3 processors.
  • We've used the same broker and test with an x86_64 Pulsar 3.3.0 Pulsar client, and we've had no issues (RHEL8).
  • We can actively see missing messages - we are just sending consecutive numbers and logging them. We're missing exactly 119, the amount remaining in the batch that we've not fully acknowledged.
  • We've tried not immediately terminating - i.e. we stop receiving messages but continue to process any callbacks, to validate that we're just not seeing 119 callbacks missing, and nothing.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@BewareMyPower
Copy link
Contributor

Debian GNU/Linux 10 (buster), aarch64 (but the executable is arm32), pulsar-client-cpp 3.3.0
We've used the same broker and test with an x86_64 Pulsar 3.5.0 Pulsar client, and we've had no issues (RHEL8).

So you tested 3.5.0-x86_64 and 3.3.0-aarch64?


Since a batch is stored as a BK entry, you can check the topic stats for the mark-delete position and compare it with the message id you received. The result you see might because the whole batch was somehow acknowledged.

BTW, if you suspect the issue is related to the DEB packages, you can try building from source and then test it again. You can try vcpkg to save your time. (Currently only 3.4.2 is supported, I will add the 3.5.0 to vcpkg soon)

@bph-sag
Copy link
Author

bph-sag commented Apr 24, 2024

Debian GNU/Linux 10 (buster), aarch64 (but the executable is arm32), pulsar-client-cpp 3.3.0
We've used the same broker and test with an x86_64 Pulsar 3.5.0 Pulsar client, and we've had no issues (RHEL8).

So you tested 3.5.0-x86_64 and 3.3.0-aarch64?

Since a batch is stored as a BK entry, you can check the topic stats for the mark-delete position and compare it with the message id you received. The result you see might because the whole batch was somehow acknowledged.

BTW, if you suspect the issue is related to the DEB packages, you can try building from source and then test it again. You can try vcpkg to save your time. (Currently only 3.4.2 is supported, I will add the 3.5.0 to vcpkg soon)

Oops, typo'd the x86_64 version, I meant to say 3.3.0-x86_64. Corrected the parent comment.

And no, we're not using the Debian packages, we're compiling it ourselves for aarch32 (x86_64 as well). So, 3.3.0-aarch32.


I'll try and take a look at the topic stats, and get back to you.

@bph-sag
Copy link
Author

bph-sag commented Apr 26, 2024

So, looking at the internal stats for the topic, it does appear that the mark-delete position indicates that the full batch is indeed acknowledged for our aarch32 runs.

Unsuccessful run, with missed events

Log lines before termination of first consumer (last acknowledgement and the message associated with that acknowledgement, aarch32)

(Ledger 17, entry 1, 190/309)

2024-04-26 14:44:32.322 INFO  [3944048800] - <connectivity.pulsarTransport.my_dynamic_chain_instance> Got Message with batch index: 190, of batch size: 309, ledger: 17, entry: 1, full message was: [...]
2024-04-26 14:44:32.324 INFO  [3944048800] - <connectivity.pulsarTransport.my_dynamic_chain_instance> SUCCESS ACK - partition: -1, ledgerId: 17, entryId: 1, batchIndex: 190, topicName: persistent://public/default/ConnPlugincor24546ea9837f45e49629df60696e54502e4

Mark delete position from persistent/:tenant/:namespace/:topic/internalStats?authoritative=false&metadata=false

            "markDeletePosition": "17:1",

First message received on the 2nd consumer

(Ledger 17, entry 2, 1/309)

2024-04-26 14:45:03.577 INFO  [3948247200] - <connectivity.pulsarTransport.my_dynamic_chain_instance> Got Message with batch index: 0, of batch size: 309, ledger: 17, entry: 2, full message was: [...]

Successful run, no missing messages (x86_64)

Last messages prior to shutdown

(Ledger 36, entry 2, 198/309)

2024-04-25 14:18:49.657 INFO  [3946145952] - <connectivity.pulsarTransport.my_dynamic_chain_instance> SUCCESS ACK - partition: -1, ledgerId: 36, entryId: 2, batchIndex: 198, topicName: persistent://public/default/ConnPlugincor245ae0980739d4344e98f3f3b29f0f07d8b
2024-04-25 14:18:49.657 INFO  [3946145952] - <connectivity.pulsarTransport.my_dynamic_chain_instance> Got Message with batch index: 198, of batch size: 309, ledger: 36, entry: 2, full message was: [...]

Mark delete position from persistent/:tenant/:namespace/:topic/internalStats?authoritative=false&metadata=false

            "markDeletePosition": "36:1",

First message received on 2nd consumer

(Ledger 36, entry 2, 1/309)

2024-04-25 14:19:11.523 INFO  [3948247200] - <connectivity.pulsarTransport.my_dynamic_chain_instance> Got Message with batch index: 0, of batch size: 309, ledger: 36, entry: 2, full message was: [...]

@bph-sag
Copy link
Author

bph-sag commented Apr 29, 2024

OK, this turns out to have been some strange oddity I can't reproduce in isolation.

We were putting our MessageIds into a pulsar::MessageId[] array, and then acknowledging them all after some miscellaneous work. For some reason, putting it into a pulsar::MessageId[], at least on aarch32 (and with some other triggering factor I can't quite isolate), would break the acknowledgement in some way that would cause Pulsar to acknowledge the entire batch.

Either way, switching to a std::list (I'm sure a std::vector, ala MessageIdList would also work) fixes this issue for us. I'm going to mark this as closed, unless you think there's something else interesting here.

@bph-sag bph-sag closed this as completed Apr 29, 2024
@BewareMyPower
Copy link
Contributor

  • If the MessageId comes from non-batched messages, the implementation is MessageIdImpl that does not have the acker. When it's acknowledged, a CommandAck will be sent.
  • If the MessageId comes from batched messages, the implementation is BatchedMessageIdImpl that has an acker field, which is a shared pointer of BatchMessageAcker that maintains a bit set.

For example, a batched message whose size is 2 have two message IDs.

id1:
  ledger: 100
  entry: 0
  partition: -1
  batch_index: 0
  batch_size: 2
  acker: 0b11

id2:
  ledger: 100
  entry: 0
  partition: -1
  batch_index: 1
  batch_size: 2
  acker: 0b11

0b11 represents the bit set that has 2 bits where each bit is set.

  1. After acknowledging id1, the bit set becomes 0b10. Nothing will happen.
  2. After acknowledging id2, the bit set becomes 0b00. Since it's empty now, a CommandAck will be sent to the broker that indicates the entry of ledger 100 and entry 0 should be acknowledged.

I'm not very sure about how did you store the message ID in a raw array. But if your message ID array could contain default-constructed MessageId objects, whose implementation is MessageIdImpl, see

static const MessageIdImplPtr emptyMessageId = std::make_shared<MessageIdImpl>();

@BewareMyPower
Copy link
Contributor

From the markDeletePosition you provided,

            "markDeletePosition": "17:1",

it means the entry of 17:1 has been acknowledged. However, from the logs:

Message with batch index: 190, of batch size: 309, ledger: 17, entry: 1, full message was: [...]
2024-04-26 14:44:32.324 INFO  [3944048800] - <connectivity.pulsarTransport.my_dynamic_chain_instance> SUCCESS ACK - partition: -1, ledgerId: 17, entryId: 1, batchIndex: 190, topicName: persistent://public/default/ConnPlugincor24546ea9837f45e49629df60696e54502e4

If you didn't acknowledge batch messages of the index range [191, 309), then there must be something wrong with the BitSet::clear operations determined by the method you use (acknowledge or acknowledgeCumulative)

bitSet_.clear(batchIndex);

bitSet_.clear(0, batchIndex + 1);

@BewareMyPower BewareMyPower reopened this May 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants