From db26946520d3fe2c66bc0bc7c87815a8ef3754f4 Mon Sep 17 00:00:00 2001 From: penghui Date: Sat, 22 Dec 2018 22:04:48 +0800 Subject: [PATCH] UnAcked message tracker based on TimePartition (#3118) ### Motivation currently un-ack message tracker redelivery is done in batch, not taking into account the actual arrival time of the message. Split acktimeout into several time-partition, user can define the tick duration. ### Modifications Improve UnAckedMessageTracker use time partitions to maintain un-acked messages, UnAckedMessageTracker main partitions by LinkedList, remove first partition from the LinkedList and process it and add last partition to the LinkedList when timeout. All un-acked messages add into the last partition. ### Result UT passed --- ...erMessageUnAcknowledgedRedeliveryTest.java | 9 +- .../client/impl/TopicsConsumerImplTest.java | 3 +- .../client/api/ConsumerConfiguration.java | 8 +- .../client/impl/ConsumerBuilderImpl.java | 1 - .../pulsar/client/impl/ConsumerImpl.java | 10 +- .../client/impl/MultiTopicsConsumerImpl.java | 6 +- .../client/impl/UnAckedMessageTracker.java | 163 +++++++++++------- .../impl/UnAckedTopicMessageTracker.java | 40 +++-- .../impl/conf/ConsumerConfigurationData.java | 8 +- 9 files changed, 148 insertions(+), 100 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java index 87f8d366dc954..6db887b37f2a8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java @@ -112,7 +112,7 @@ public void testSharedAckedNormalTopic() throws Exception { assertEquals(received, 5); // 7. Simulate ackTimeout - ((ConsumerImpl) consumer).getUnAckedMessageTracker().toggle(); + Thread.sleep(ackTimeOutMillis); // 8. producer publish more messages for (int i = 0; i < totalMessages / 3; i++) { @@ -210,7 +210,7 @@ public void testExclusiveAckedNormalTopic() throws Exception { assertEquals(received, 5); // 7. Simulate ackTimeout - ((ConsumerImpl) consumer).getUnAckedMessageTracker().toggle(); + Thread.sleep(ackTimeOutMillis); // 8. producer publish more messages for (int i = 0; i < totalMessages / 3; i++) { @@ -308,7 +308,7 @@ public void testFailoverAckedNormalTopic() throws Exception { assertEquals(received, 5); // 7. Simulate ackTimeout - ((ConsumerImpl) consumer).getUnAckedMessageTracker().toggle(); + Thread.sleep(ackTimeOutMillis); // 8. producer publish more messages for (int i = 0; i < totalMessages / 3; i++) { @@ -415,8 +415,7 @@ public void testSharedAckedPartitionedTopic() throws Exception { assertEquals(received, 5); // 7. Simulate ackTimeout - ((MultiTopicsConsumerImpl) consumer).getUnAckedMessageTracker().toggle(); - ((MultiTopicsConsumerImpl) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); + Thread.sleep(ackTimeOutMillis); // 8. producer publish more messages for (int i = 0; i < totalMessages / 3; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 55be9ecbd497b..b791033c9dad0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -381,8 +381,7 @@ public void testConsumerUnackedRedelivery() throws Exception { assertEquals(received, totalMessages); // 8. Simulate ackTimeout - ((MultiTopicsConsumerImpl) consumer).getUnAckedMessageTracker().toggle(); - ((MultiTopicsConsumerImpl) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); + Thread.sleep(ackTimeOutMillis); // 9. producer publish more messages for (int i = 0; i < totalMessages / 3; i++) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java index 5ffba38ffceeb..4539d6d6e004f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java @@ -20,13 +20,13 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import java.io.Serializable; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; + /** * Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to * attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers @@ -60,6 +60,10 @@ public long getAckTimeoutMillis() { return conf.getAckTimeoutMillis(); } + public long getTickDurationMillis() { + return conf.getTickDurationMillis(); + } + /** * Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than * 10 seconds. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 1060460e3889c..5792188e37998 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -49,7 +49,6 @@ import org.apache.pulsar.common.util.FutureUtil; import com.google.common.collect.Lists; - import lombok.NonNull; public class ConsumerBuilderImpl implements ConsumerBuilder { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 3e2d4d6a8a0ab..df65361b9a42a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -27,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; - import io.netty.buffer.ByteBuf; import io.netty.util.Timeout; @@ -52,11 +51,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; - import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.ConsumerStats; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -66,7 +65,6 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.Commands; -import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; import org.apache.pulsar.common.api.PulsarDecoder; @@ -188,7 +186,11 @@ enum SubscriptionMode { } if (conf.getAckTimeoutMillis() != 0) { - this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis()); + if (conf.getTickDurationMillis() > 0) { + this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis()); + } else { + this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis()); + } } else { this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 41821ad93a613..e9ef632d1a914 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -99,7 +99,11 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { this.allTopicPartitionsNumber = new AtomicInteger(0); if (conf.getAckTimeoutMillis() != 0) { - this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis()); + if (conf.getTickDurationMillis() > 0) { + this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis()); + } else { + this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis()); + } } else { this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index 266eb3b9db7bd..d31d353ac8050 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -18,29 +18,36 @@ */ package org.apache.pulsar.client.impl; +import com.google.common.base.Preconditions; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Closeable; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class UnAckedMessageTracker implements Closeable { private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class); - protected ConcurrentOpenHashSet currentSet; - protected ConcurrentOpenHashSet oldOpenSet; - private final ReentrantReadWriteLock readWriteLock; + + protected final ConcurrentHashMap> messageIdPartitionMap; + protected final LinkedList> timePartitions; + protected final Lock readLock; - private final Lock writeLock; - private Timeout timeout; + protected final Lock writeLock; public static final UnAckedMessageTrackerDisabled UNACKED_MESSAGE_TRACKER_DISABLED = new UnAckedMessageTrackerDisabled(); + private final long ackTimeoutMillis; + private final long tickDurationInMs; private static class UnAckedMessageTrackerDisabled extends UnAckedMessageTracker { @Override @@ -67,116 +74,138 @@ public void close() { } } + private Timeout timeout; + public UnAckedMessageTracker() { - readWriteLock = null; readLock = null; writeLock = null; + timePartitions = null; + messageIdPartitionMap = null; + this.ackTimeoutMillis = 0; + this.tickDurationInMs = 0; } public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis) { - currentSet = new ConcurrentOpenHashSet(); - oldOpenSet = new ConcurrentOpenHashSet(); - readWriteLock = new ReentrantReadWriteLock(); - readLock = readWriteLock.readLock(); - writeLock = readWriteLock.writeLock(); - start(client, consumerBase, ackTimeoutMillis); + this(client, consumerBase, ackTimeoutMillis, ackTimeoutMillis); } - public void start(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis) { - this.stop(); + public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis, long tickDurationInMs) { + Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis >= tickDurationInMs); + this.ackTimeoutMillis = ackTimeoutMillis; + this.tickDurationInMs = tickDurationInMs; + ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + this.readLock = readWriteLock.readLock(); + this.writeLock = readWriteLock.writeLock(); + this.messageIdPartitionMap = new ConcurrentHashMap<>(); + this.timePartitions = new LinkedList<>(); + + int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / this.tickDurationInMs); + for (int i = 0; i < blankPartitions + 1; i++) { + timePartitions.add(new ConcurrentOpenHashSet<>()); + } + timeout = client.timer().newTimeout(new TimerTask() { @Override public void run(Timeout t) throws Exception { - if (isAckTimeout()) { - log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size()); - Set messageIds = new HashSet<>(); - oldOpenSet.forEach(messageIds::add); - oldOpenSet.clear(); + Set messageIds = new HashSet<>(); + writeLock.lock(); + try { + timePartitions.addLast(new ConcurrentOpenHashSet<>()); + ConcurrentOpenHashSet headPartition = timePartitions.removeFirst(); + if (!headPartition.isEmpty()) { + log.warn("[{}] {} messages have timed-out", consumerBase, timePartitions.size()); + headPartition.forEach(messageId -> { + messageIds.add(messageId); + messageIdPartitionMap.remove(messageId); + }); + } + } finally { + writeLock.unlock(); + } + if (messageIds.size() > 0) { consumerBase.redeliverUnacknowledgedMessages(messageIds); } - toggle(); - timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS); + timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS); } - }, ackTimeoutMillis, TimeUnit.MILLISECONDS); + }, this.tickDurationInMs, TimeUnit.MILLISECONDS); } - void toggle() { + public void clear() { writeLock.lock(); try { - ConcurrentOpenHashSet temp = currentSet; - currentSet = oldOpenSet; - oldOpenSet = temp; + messageIdPartitionMap.clear(); + timePartitions.clear(); + int blankPartitions = (int)Math.ceil((double)ackTimeoutMillis / tickDurationInMs); + for (int i = 0; i < blankPartitions + 1; i++) { + timePartitions.add(new ConcurrentOpenHashSet<>()); + } } finally { writeLock.unlock(); } } - public void clear() { - readLock.lock(); - try { - currentSet.clear(); - oldOpenSet.clear(); - } finally { - readLock.unlock(); - } - } - - public boolean add(MessageId m) { - readLock.lock(); + public boolean add(MessageId messageId) { + writeLock.lock(); try { - oldOpenSet.remove(m); - return currentSet.add(m); + ConcurrentOpenHashSet partition = timePartitions.peekLast(); + messageIdPartitionMap.put(messageId, partition); + return partition.add(messageId); } finally { - readLock.unlock(); + writeLock.unlock(); } - } boolean isEmpty() { readLock.lock(); try { - return currentSet.isEmpty() && oldOpenSet.isEmpty(); + return messageIdPartitionMap.isEmpty(); } finally { readLock.unlock(); } } - public boolean remove(MessageId m) { - readLock.lock(); + public boolean remove(MessageId messageId) { + writeLock.lock(); try { - return currentSet.remove(m) || oldOpenSet.remove(m); + boolean removed = false; + ConcurrentOpenHashSet exist = messageIdPartitionMap.remove(messageId); + if (exist != null) { + removed = exist.remove(messageId); + } + return removed; } finally { - readLock.unlock(); + writeLock.unlock(); } } long size() { readLock.lock(); try { - return currentSet.size() + oldOpenSet.size(); - } finally { - readLock.unlock(); - } - } - - private boolean isAckTimeout() { - readLock.lock(); - try { - return !oldOpenSet.isEmpty(); + return messageIdPartitionMap.size(); } finally { readLock.unlock(); } } public int removeMessagesTill(MessageId msgId) { - readLock.lock(); + writeLock.lock(); try { - int currentSetRemovedMsgCount = currentSet.removeIf(m -> (m.compareTo(msgId) <= 0)); - int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> (m.compareTo(msgId) <= 0)); - - return currentSetRemovedMsgCount + oldSetRemovedMsgCount; + int removed = 0; + Iterator iterator = messageIdPartitionMap.keySet().iterator(); + while (iterator.hasNext()) { + MessageId messageId = iterator.next(); + if (messageId.compareTo(msgId) <= 0) { + ConcurrentOpenHashSet exist = messageIdPartitionMap.get(messageId); + if (exist != null) { + exist.remove(messageId); + } + iterator.remove(); + removed ++; + } + } + return removed; } finally { - readLock.unlock(); + writeLock.unlock(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java index f500fda040fb9..afe9a5e072642 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java @@ -18,7 +18,10 @@ */ package org.apache.pulsar.client.impl; -import static com.google.common.base.Preconditions.checkState; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; + +import java.util.Iterator; public class UnAckedTopicMessageTracker extends UnAckedMessageTracker { @@ -26,23 +29,30 @@ public UnAckedTopicMessageTracker(PulsarClientImpl client, ConsumerBase consu super(client, consumerBase, ackTimeoutMillis); } + public UnAckedTopicMessageTracker(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis, long tickDurationMillis) { + super(client, consumerBase, ackTimeoutMillis, tickDurationMillis); + } + public int removeTopicMessages(String topicName) { - readLock.lock(); + writeLock.lock(); try { - int currentSetRemovedMsgCount = currentSet.removeIf(m -> { - checkState(m instanceof TopicMessageIdImpl, - "message should be of type TopicMessageIdImpl"); - return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName); - }); - int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> { - checkState(m instanceof TopicMessageIdImpl, - "message should be of type TopicMessageIdImpl"); - return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName); - }); - - return currentSetRemovedMsgCount + oldSetRemovedMsgCount; + int removed = 0; + Iterator iterator = messageIdPartitionMap.keySet().iterator(); + while (iterator.hasNext()) { + MessageId messageId = iterator.next(); + if (messageId instanceof TopicMessageIdImpl && + ((TopicMessageIdImpl)messageId).getTopicPartitionName().contains(topicName)) { + ConcurrentOpenHashSet exist = messageIdPartitionMap.get(messageId); + if (exist != null) { + exist.remove(messageId); + } + iterator.remove(); + removed ++; + } + } + return removed; } finally { - readLock.unlock(); + writeLock.unlock(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 7e92d1245d69b..2c56b61dd0f4f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -23,22 +23,22 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.Maps; import com.google.common.collect.Sets; + import java.io.Serializable; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import lombok.Data; - -import java.util.regex.Pattern; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.MessageListener; -import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; @Data @@ -69,6 +69,8 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { private long ackTimeoutMillis = 0; + private long tickDurationMillis = 1000; + private int priorityLevel = 0; @JsonIgnore