Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Make MessageRedeliveryController work more efficiently #17804

Merged

Conversation

codelipenghui
Copy link
Contributor

@codelipenghui codelipenghui commented Sep 22, 2022

Fixes: #15445

Motivation

The implementation of MessageRedeliveryController is inefficient

image

"BookKeeperClientWorker-OrderedExecutor-1-0" #40 prio=5 os_prio=0 cpu=27401742.31ms elapsed=788038.50s tid=0x00007f4cf95bc800 nid=0x56 runnable  [0x00007f4cd49a0000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TreeMap.put(java.base@11.0.15/TreeMap.java:566)
	at java.util.TreeSet.add(java.base@11.0.15/TreeSet.java:255)
	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.lambda$items$4(ConcurrentSortedLongPairSet.java:152)
	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet$$Lambda$1150/0x00000008408d9040.accept(Unknown Source)
	at org.apache.pulsar.common.util.collections.ConcurrentLongPairSet$Section.forEach(ConcurrentLongPairSet.java:563)
	at org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.forEach(ConcurrentLongPairSet.java:242)
	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.items(ConcurrentSortedLongPairSet.java:151)
	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.items(ConcurrentSortedLongPairSet.java:143)
	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.items(ConcurrentSortedLongPairSet.java:128)
	at org.apache.pulsar.broker.service.persistent.MessageRedeliveryController.getMessagesToReplayNow(MessageRedeliveryController.java:112)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.getMessagesToReplayNow(PersistentDispatcherMultipleConsumers.java:850)
	- eliminated <0x0000000757da7548> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.getMessagesToReplayNow(PersistentStickyKeyDispatcherMultipleConsumers.java:432)
	- locked <0x0000000757da7548> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:175)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:483)
	- locked <0x0000000757da7548> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$11.readEntryComplete(ManagedCursorImpl.java:1326)
	- locked <0x00000007129d8210> (a org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$11)
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$0(EntryCacheImpl.java:222)
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl$$Lambda$1158/0x00000008408db440.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(java.base@11.0.15/CompletableFuture.java:714)
	at java.util.concurrent.CompletableFuture$Completion.run(java.base@11.0.15/CompletableFuture.java:478)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.15/ThreadPoolExecutor.java:1128)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.15/ThreadPoolExecutor.java:628)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@11.0.15/Thread.java:829)

The issue usually happens if you have a large number of backlogs with the Key_Shared subscription. The publish latency will go high due to the IO thread and BookKeeperClientWorker thread with high high CPU usage

image

A related fix #15354, but the fix can't fix the case that the MessageRedeliveryController have many redelivery messages
This PR can make the MessageRedeliveryController work more efficiently by introducing a new Ordered Map and Bitmap based LongPair Ordered Set


And the issue can be reproduced by standalone and pulsar-perf

  1. Start a standalone
  2. Use pulsar-perf to produce messages bin/pulsar-perf produce test -r 1000 -bm 1 -mk random
  3. Open 3 terminals to run bin/pulsar-perf consume -st Key_Shared -r 100 -n 50 test
  4. After a while, stop 2 perf consume process
  5. You will see the publish latency increased

image

With this PR, the situation will not happen

Modifications

  • Added RoaringBitmap dependency which is a widely used compressed bitset. The dependency is introduced to broker, not the common module because the common module will also expose the Java Client
  • Added BitmapSortedLongPairSet for MessageRedeliveryController
  • Update the MessageRedeliveryController to use BitmapSortedLongPairSet for messagesToRedeliver

Verifying this change

New test added BitmapSortedLongPairSetTest

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • Anything that affects deployment

Documentation

  • doc-required
    (Your PR needs to update docs, and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

Matching PR in forked repository

PR in forked repository: codelipenghui#14

@codelipenghui codelipenghui self-assigned this Sep 22, 2022
@codelipenghui codelipenghui added this to the 2.12.0 milestone Sep 22, 2022
@codelipenghui codelipenghui added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/broker doc-not-needed Your PR changes do not impact docs release/2.9.4 release/2.11.1 release/2.10.3 labels Sep 22, 2022
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Nice!

}
}

public synchronized void removeUpTo(long item1, long item2) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: why do we need synchronized here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😁, I copied the method signature from another class

import org.apache.pulsar.common.util.collections.LongPairSet;
import org.roaringbitmap.RoaringBitmap;

public class BitmapSortedLongPairSet {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can indicate this class is thread-safe by adding a comment or name like Concurrent***?

Comment on lines 76 to 79
Map.Entry<Long, RoaringBitmap> firstEntry = map.firstEntry();
while (firstEntry != null && firstEntry.getKey() <= item1) {
if (firstEntry.getKey() < item1) {
map.remove(firstEntry.getKey(), firstEntry.getValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to avoid loop traversal by using subMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sub map will have the low and upper boundaries. If we change to sub map here e.g. from 3 to 10, you will not able to add 2 or 11 again.

@codelipenghui codelipenghui merged commit c60f895 into apache:master Sep 23, 2022
codelipenghui added a commit that referenced this pull request Sep 23, 2022
codelipenghui added a commit that referenced this pull request Sep 23, 2022
codelipenghui added a commit that referenced this pull request Sep 23, 2022
@codelipenghui codelipenghui modified the milestones: 2.12.0, 2.11.0 Sep 23, 2022
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Sep 28, 2022
…tly (apache#17804)

(cherry picked from commit c60f895)
(cherry picked from commit 19eb842)
dragonls pushed a commit to dragonls/pulsar that referenced this pull request Oct 21, 2022
dragonls pushed a commit to dragonls/pulsar that referenced this pull request Oct 21, 2022
…request !71)

[improve][broker] Make MessageRedeliveryController work more efficiently (apache#17804)
@codelipenghui codelipenghui deleted the penghui/improve-relay-queue branch October 25, 2022 15:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-2.9 Archived: 2.9 is end of life cherry-picked/branch-2.10 cherry-picked/branch-2.11 doc-not-needed Your PR changes do not impact docs release/2.9.4 release/2.10.2 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CPU usage 100% when use Key_Shared
6 participants