Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UnAcked message tracker based on TimePartition #3118

Merged
merged 14 commits into from
Dec 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testSharedAckedNormalTopic() throws Exception {
assertEquals(received, 5);

// 7. Simulate ackTimeout
((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
Thread.sleep(ackTimeOutMillis);

// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down Expand Up @@ -210,7 +210,7 @@ public void testExclusiveAckedNormalTopic() throws Exception {
assertEquals(received, 5);

// 7. Simulate ackTimeout
((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
Thread.sleep(ackTimeOutMillis);

// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down Expand Up @@ -308,7 +308,7 @@ public void testFailoverAckedNormalTopic() throws Exception {
assertEquals(received, 5);

// 7. Simulate ackTimeout
((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
Thread.sleep(ackTimeOutMillis);

// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down Expand Up @@ -415,8 +415,7 @@ public void testSharedAckedPartitionedTopic() throws Exception {
assertEquals(received, 5);

// 7. Simulate ackTimeout
((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle());
Thread.sleep(ackTimeOutMillis);

// 8. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,7 @@ public void testConsumerUnackedRedelivery() throws Exception {
assertEquals(received, totalMessages);

// 8. Simulate ackTimeout
((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle());
Thread.sleep(ackTimeOutMillis);

// 9. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;

/**
* Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to
* attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers
Expand Down Expand Up @@ -60,6 +60,10 @@ public long getAckTimeoutMillis() {
return conf.getAckTimeoutMillis();
}

public long getTickDurationMillis() {
return conf.getTickDurationMillis();
}

/**
* Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than
* 10 seconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.pulsar.common.util.FutureUtil;

import com.google.common.collect.Lists;

import lombok.NonNull;

public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;

import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;

Expand All @@ -52,11 +51,11 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;


import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -66,7 +65,6 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.PulsarDecoder;
Expand Down Expand Up @@ -188,7 +186,11 @@ enum SubscriptionMode {
}

if (conf.getAckTimeoutMillis() != 0) {
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
if (conf.getTickDurationMillis() > 0) {
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis());
} else {
this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
}
} else {
this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
this.allTopicPartitionsNumber = new AtomicInteger(0);

if (conf.getAckTimeoutMillis() != 0) {
this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
if (conf.getTickDurationMillis() > 0) {
this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis());
} else {
this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
}
} else {
this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,36 @@
*/
package org.apache.pulsar.client.impl;

import com.google.common.base.Preconditions;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnAckedMessageTracker implements Closeable {
private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class);
protected ConcurrentOpenHashSet<MessageId> currentSet;
protected ConcurrentOpenHashSet<MessageId> oldOpenSet;
private final ReentrantReadWriteLock readWriteLock;

protected final ConcurrentHashMap<MessageId, ConcurrentOpenHashSet<MessageId>> messageIdPartitionMap;
protected final LinkedList<ConcurrentOpenHashSet<MessageId>> timePartitions;

protected final Lock readLock;
private final Lock writeLock;
private Timeout timeout;
protected final Lock writeLock;

public static final UnAckedMessageTrackerDisabled UNACKED_MESSAGE_TRACKER_DISABLED = new UnAckedMessageTrackerDisabled();
private final long ackTimeoutMillis;
private final long tickDurationInMs;

private static class UnAckedMessageTrackerDisabled extends UnAckedMessageTracker {
@Override
Expand All @@ -67,116 +74,138 @@ public void close() {
}
}

private Timeout timeout;

public UnAckedMessageTracker() {
readWriteLock = null;
readLock = null;
writeLock = null;
timePartitions = null;
messageIdPartitionMap = null;
this.ackTimeoutMillis = 0;
this.tickDurationInMs = 0;
}

public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis) {
currentSet = new ConcurrentOpenHashSet<MessageId>();
oldOpenSet = new ConcurrentOpenHashSet<MessageId>();
readWriteLock = new ReentrantReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
start(client, consumerBase, ackTimeoutMillis);
this(client, consumerBase, ackTimeoutMillis, ackTimeoutMillis);
}

public void start(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis) {
this.stop();
public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis, long tickDurationInMs) {
Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis >= tickDurationInMs);
this.ackTimeoutMillis = ackTimeoutMillis;
this.tickDurationInMs = tickDurationInMs;
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.messageIdPartitionMap = new ConcurrentHashMap<>();
this.timePartitions = new LinkedList<>();

int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / this.tickDurationInMs);
for (int i = 0; i < blankPartitions + 1; i++) {
timePartitions.add(new ConcurrentOpenHashSet<>());
}

timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
if (isAckTimeout()) {
log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size());
Set<MessageId> messageIds = new HashSet<>();
oldOpenSet.forEach(messageIds::add);
oldOpenSet.clear();
Set<MessageId> messageIds = new HashSet<>();
writeLock.lock();
try {
timePartitions.addLast(new ConcurrentOpenHashSet<>());
ConcurrentOpenHashSet<MessageId> headPartition = timePartitions.removeFirst();
if (!headPartition.isEmpty()) {
log.warn("[{}] {} messages have timed-out", consumerBase, timePartitions.size());
headPartition.forEach(messageId -> {
messageIds.add(messageId);
messageIdPartitionMap.remove(messageId);
});
}
} finally {
writeLock.unlock();
}
if (messageIds.size() > 0) {
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
toggle();
timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS);
timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
}
}, ackTimeoutMillis, TimeUnit.MILLISECONDS);
}, this.tickDurationInMs, TimeUnit.MILLISECONDS);
}

void toggle() {
public void clear() {
writeLock.lock();
try {
ConcurrentOpenHashSet<MessageId> temp = currentSet;
currentSet = oldOpenSet;
oldOpenSet = temp;
messageIdPartitionMap.clear();
timePartitions.clear();
int blankPartitions = (int)Math.ceil((double)ackTimeoutMillis / tickDurationInMs);
for (int i = 0; i < blankPartitions + 1; i++) {
timePartitions.add(new ConcurrentOpenHashSet<>());
}
} finally {
writeLock.unlock();
}
}

public void clear() {
readLock.lock();
try {
currentSet.clear();
oldOpenSet.clear();
} finally {
readLock.unlock();
}
}

public boolean add(MessageId m) {
readLock.lock();
public boolean add(MessageId messageId) {
writeLock.lock();
try {
oldOpenSet.remove(m);
return currentSet.add(m);
ConcurrentOpenHashSet<MessageId> partition = timePartitions.peekLast();
messageIdPartitionMap.put(messageId, partition);
return partition.add(messageId);
} finally {
readLock.unlock();
writeLock.unlock();
}

}

boolean isEmpty() {
readLock.lock();
try {
return currentSet.isEmpty() && oldOpenSet.isEmpty();
return messageIdPartitionMap.isEmpty();
} finally {
readLock.unlock();
}
}

public boolean remove(MessageId m) {
readLock.lock();
public boolean remove(MessageId messageId) {
writeLock.lock();
try {
return currentSet.remove(m) || oldOpenSet.remove(m);
boolean removed = false;
ConcurrentOpenHashSet<MessageId> exist = messageIdPartitionMap.remove(messageId);
if (exist != null) {
removed = exist.remove(messageId);
}
return removed;
} finally {
readLock.unlock();
writeLock.unlock();
}
}

long size() {
readLock.lock();
try {
return currentSet.size() + oldOpenSet.size();
} finally {
readLock.unlock();
}
}

private boolean isAckTimeout() {
readLock.lock();
try {
return !oldOpenSet.isEmpty();
return messageIdPartitionMap.size();
} finally {
readLock.unlock();
}
}

public int removeMessagesTill(MessageId msgId) {
readLock.lock();
writeLock.lock();
try {
int currentSetRemovedMsgCount = currentSet.removeIf(m -> (m.compareTo(msgId) <= 0));
int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> (m.compareTo(msgId) <= 0));

return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
int removed = 0;
Iterator<MessageId> iterator = messageIdPartitionMap.keySet().iterator();
while (iterator.hasNext()) {
MessageId messageId = iterator.next();
if (messageId.compareTo(msgId) <= 0) {
ConcurrentOpenHashSet<MessageId> exist = messageIdPartitionMap.get(messageId);
if (exist != null) {
exist.remove(messageId);
}
iterator.remove();
removed ++;
}
}
return removed;
} finally {
readLock.unlock();
writeLock.unlock();
}
}

Expand Down
Loading