Skip to content

Commit

Permalink
UnAcked message tracker based on TimePartition (#3118)
Browse files Browse the repository at this point in the history
### Motivation

currently un-ack message tracker  redelivery is done in batch, not taking into account the actual arrival time of the message.

Split acktimeout into several time-partition, user can define the tick duration.

### Modifications

Improve UnAckedMessageTracker use time partitions to maintain un-acked messages, UnAckedMessageTracker main partitions by LinkedList, remove first partition from the LinkedList and process it and add last partition to the LinkedList when timeout. All un-acked messages add into the last partition.

### Result

UT passed
  • Loading branch information
codelipenghui authored and sijie committed Dec 22, 2018
1 parent a901a35 commit db26946
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testSharedAckedNormalTopic() throws Exception {
assertEquals(received, 5);

// 7. Simulate ackTimeout
((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
Thread.sleep(ackTimeOutMillis);

// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down Expand Up @@ -210,7 +210,7 @@ public void testExclusiveAckedNormalTopic() throws Exception {
assertEquals(received, 5);

// 7. Simulate ackTimeout
((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
Thread.sleep(ackTimeOutMillis);

// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down Expand Up @@ -308,7 +308,7 @@ public void testFailoverAckedNormalTopic() throws Exception {
assertEquals(received, 5);

// 7. Simulate ackTimeout
((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
Thread.sleep(ackTimeOutMillis);

// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down Expand Up @@ -415,8 +415,7 @@ public void testSharedAckedPartitionedTopic() throws Exception {
assertEquals(received, 5);

// 7. Simulate ackTimeout
((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle());
Thread.sleep(ackTimeOutMillis);

// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,7 @@ public void testConsumerUnackedRedelivery() throws Exception {
assertEquals(received, totalMessages);

// 8. Simulate ackTimeout
((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle());
Thread.sleep(ackTimeOutMillis);

// 9. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;

/**
* Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to
* attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers
Expand Down Expand Up @@ -60,6 +60,10 @@ public long getAckTimeoutMillis() {
return conf.getAckTimeoutMillis();
}

public long getTickDurationMillis() {
return conf.getTickDurationMillis();
}

/**
* Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than
* 10 seconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.pulsar.common.util.FutureUtil;

import com.google.common.collect.Lists;

import lombok.NonNull;

public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;

import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;

Expand All @@ -52,11 +51,11 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;


import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -66,7 +65,6 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.PulsarDecoder;
Expand Down Expand Up @@ -188,7 +186,11 @@ enum SubscriptionMode {
}

if (conf.getAckTimeoutMillis() != 0) {
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
if (conf.getTickDurationMillis() > 0) {
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis());
} else {
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
}
} else {
this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
this.allTopicPartitionsNumber = new AtomicInteger(0);

if (conf.getAckTimeoutMillis() != 0) {
this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
if (conf.getTickDurationMillis() > 0) {
this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis());
} else {
this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
}
} else {
this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,36 @@
*/
package org.apache.pulsar.client.impl;

import com.google.common.base.Preconditions;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnAckedMessageTracker implements Closeable {
private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class);
protected ConcurrentOpenHashSet<MessageId> currentSet;
protected ConcurrentOpenHashSet<MessageId> oldOpenSet;
private final ReentrantReadWriteLock readWriteLock;

protected final ConcurrentHashMap<MessageId, ConcurrentOpenHashSet<MessageId>> messageIdPartitionMap;
protected final LinkedList<ConcurrentOpenHashSet<MessageId>> timePartitions;

protected final Lock readLock;
private final Lock writeLock;
private Timeout timeout;
protected final Lock writeLock;

public static final UnAckedMessageTrackerDisabled UNACKED_MESSAGE_TRACKER_DISABLED = new UnAckedMessageTrackerDisabled();
private final long ackTimeoutMillis;
private final long tickDurationInMs;

private static class UnAckedMessageTrackerDisabled extends UnAckedMessageTracker {
@Override
Expand All @@ -67,116 +74,138 @@ public void close() {
}
}

private Timeout timeout;

public UnAckedMessageTracker() {
readWriteLock = null;
readLock = null;
writeLock = null;
timePartitions = null;
messageIdPartitionMap = null;
this.ackTimeoutMillis = 0;
this.tickDurationInMs = 0;
}

public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis) {
currentSet = new ConcurrentOpenHashSet<MessageId>();
oldOpenSet = new ConcurrentOpenHashSet<MessageId>();
readWriteLock = new ReentrantReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
start(client, consumerBase, ackTimeoutMillis);
this(client, consumerBase, ackTimeoutMillis, ackTimeoutMillis);
}

public void start(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis) {
this.stop();
public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis, long tickDurationInMs) {
Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis >= tickDurationInMs);
this.ackTimeoutMillis = ackTimeoutMillis;
this.tickDurationInMs = tickDurationInMs;
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.messageIdPartitionMap = new ConcurrentHashMap<>();
this.timePartitions = new LinkedList<>();

int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / this.tickDurationInMs);
for (int i = 0; i < blankPartitions + 1; i++) {
timePartitions.add(new ConcurrentOpenHashSet<>());
}

timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
if (isAckTimeout()) {
log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size());
Set<MessageId> messageIds = new HashSet<>();
oldOpenSet.forEach(messageIds::add);
oldOpenSet.clear();
Set<MessageId> messageIds = new HashSet<>();
writeLock.lock();
try {
timePartitions.addLast(new ConcurrentOpenHashSet<>());
ConcurrentOpenHashSet<MessageId> headPartition = timePartitions.removeFirst();
if (!headPartition.isEmpty()) {
log.warn("[{}] {} messages have timed-out", consumerBase, timePartitions.size());
headPartition.forEach(messageId -> {
messageIds.add(messageId);
messageIdPartitionMap.remove(messageId);
});
}
} finally {
writeLock.unlock();
}
if (messageIds.size() > 0) {
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
toggle();
timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS);
timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
}
}, ackTimeoutMillis, TimeUnit.MILLISECONDS);
}, this.tickDurationInMs, TimeUnit.MILLISECONDS);
}

void toggle() {
public void clear() {
writeLock.lock();
try {
ConcurrentOpenHashSet<MessageId> temp = currentSet;
currentSet = oldOpenSet;
oldOpenSet = temp;
messageIdPartitionMap.clear();
timePartitions.clear();
int blankPartitions = (int)Math.ceil((double)ackTimeoutMillis / tickDurationInMs);
for (int i = 0; i < blankPartitions + 1; i++) {
timePartitions.add(new ConcurrentOpenHashSet<>());
}
} finally {
writeLock.unlock();
}
}

public void clear() {
readLock.lock();
try {
currentSet.clear();
oldOpenSet.clear();
} finally {
readLock.unlock();
}
}

public boolean add(MessageId m) {
readLock.lock();
public boolean add(MessageId messageId) {
writeLock.lock();
try {
oldOpenSet.remove(m);
return currentSet.add(m);
ConcurrentOpenHashSet<MessageId> partition = timePartitions.peekLast();
messageIdPartitionMap.put(messageId, partition);
return partition.add(messageId);
} finally {
readLock.unlock();
writeLock.unlock();
}

}

boolean isEmpty() {
readLock.lock();
try {
return currentSet.isEmpty() && oldOpenSet.isEmpty();
return messageIdPartitionMap.isEmpty();
} finally {
readLock.unlock();
}
}

public boolean remove(MessageId m) {
readLock.lock();
public boolean remove(MessageId messageId) {
writeLock.lock();
try {
return currentSet.remove(m) || oldOpenSet.remove(m);
boolean removed = false;
ConcurrentOpenHashSet<MessageId> exist = messageIdPartitionMap.remove(messageId);
if (exist != null) {
removed = exist.remove(messageId);
}
return removed;
} finally {
readLock.unlock();
writeLock.unlock();
}
}

long size() {
readLock.lock();
try {
return currentSet.size() + oldOpenSet.size();
} finally {
readLock.unlock();
}
}

private boolean isAckTimeout() {
readLock.lock();
try {
return !oldOpenSet.isEmpty();
return messageIdPartitionMap.size();
} finally {
readLock.unlock();
}
}

public int removeMessagesTill(MessageId msgId) {
readLock.lock();
writeLock.lock();
try {
int currentSetRemovedMsgCount = currentSet.removeIf(m -> (m.compareTo(msgId) <= 0));
int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> (m.compareTo(msgId) <= 0));

return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
int removed = 0;
Iterator<MessageId> iterator = messageIdPartitionMap.keySet().iterator();
while (iterator.hasNext()) {
MessageId messageId = iterator.next();
if (messageId.compareTo(msgId) <= 0) {
ConcurrentOpenHashSet<MessageId> exist = messageIdPartitionMap.get(messageId);
if (exist != null) {
exist.remove(messageId);
}
iterator.remove();
removed ++;
}
}
return removed;
} finally {
readLock.unlock();
writeLock.unlock();
}
}

Expand Down
Loading

0 comments on commit db26946

Please sign in to comment.