Skip to content

Commit

Permalink
Fix zero queue consumer message redelivery (#6076)
Browse files Browse the repository at this point in the history
Motivation
Message redelivery is not work well with zero queue consumer when using receive() or listeners to consume messages. This pull request is try to fix it.

Modifications
Add missed trackMessage() method call at zero queue size consumer.

Verifying this change
New unit tests added.

(cherry picked from commit 787bee1)
  • Loading branch information
codelipenghui authored and tuteng committed Apr 13, 2020
1 parent 32164db commit 5ecbdb7
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import static org.testng.Assert.assertEquals;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -30,10 +32,12 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -303,4 +307,73 @@ public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException {
consumer.close();
}
}

@Test
public void testZeroQueueSizeMessageRedelivery() throws PulsarClientException {
final String topic = "persistent://prop/ns-abc/testZeroQueueSizeMessageRedelivery";
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.receiverQueueSize(0)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscribe();

final int messages = 10;
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();

for (int i = 0; i < messages; i++) {
producer.send(i);
}

Set<Integer> receivedMessages = new HashSet<>();
for (int i = 0; i < messages * 2; i++) {
receivedMessages.add(consumer.receive().getValue());
}

Assert.assertEquals(receivedMessages.size(), messages);

consumer.close();
producer.close();
}

@Test
public void testZeroQueueSizeMessageRedeliveryForListener() throws Exception {
final String topic = "persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForListener";
final int messages = 10;
final CountDownLatch latch = new CountDownLatch(messages * 2);
Set<Integer> receivedMessages = new HashSet<>();
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.receiverQueueSize(0)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.messageListener((MessageListener<Integer>) (c, msg) -> {
try {
receivedMessages.add(msg.getValue());
} finally {
latch.countDown();
}
})
.subscribe();

Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();

for (int i = 0; i < messages; i++) {
producer.send(i);
}

latch.await();
Assert.assertEquals(receivedMessages.size(), messages);

consumer.close();
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConf
protected Message<T> internalReceive() throws PulsarClientException {
zeroQueueLock.lock();
try {
return beforeConsume(fetchSingleMessageFromBroker());
Message<T> msg = fetchSingleMessageFromBroker();
trackMessage(msg);
return beforeConsume(msg);
} finally {
zeroQueueLock.unlock();
}
Expand Down Expand Up @@ -155,6 +157,7 @@ private void triggerZeroQueueSizeListener(final Message<T> message) {
log.debug("[{}][{}] Calling message listener for unqueued message {}", topic, subscription,
message.getMessageId());
}
trackMessage(message);
listener.received(ZeroQueueConsumerImpl.this, beforeConsume(message));
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing unqueued message: {}", topic, subscription,
Expand Down

0 comments on commit 5ecbdb7

Please sign in to comment.