From 5ecbdb7606c0972b475a76d365e464fc6814f7f6 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Fri, 17 Jan 2020 19:14:05 +0800 Subject: [PATCH] Fix zero queue consumer message redelivery (#6076) Motivation Message redelivery is not work well with zero queue consumer when using receive() or listeners to consume messages. This pull request is try to fix it. Modifications Add missed trackMessage() method call at zero queue size consumer. Verifying this change New unit tests added. (cherry picked from commit 787bee19788faaafacd157625595a70a4b94ccd9) --- .../pulsar/client/impl/ZeroQueueSizeTest.java | 73 +++++++++++++++++++ .../client/impl/ZeroQueueConsumerImpl.java | 5 +- 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java index 2859047fae1a5..e5465b2c44f7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java @@ -20,7 +20,9 @@ import static org.testng.Assert.assertEquals; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -30,10 +32,12 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -303,4 +307,73 @@ public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException { consumer.close(); } } + + @Test + public void testZeroQueueSizeMessageRedelivery() throws PulsarClientException { + final String topic = "persistent://prop/ns-abc/testZeroQueueSizeMessageRedelivery"; + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .receiverQueueSize(0) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .subscribe(); + + final int messages = 10; + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + + for (int i = 0; i < messages; i++) { + producer.send(i); + } + + Set receivedMessages = new HashSet<>(); + for (int i = 0; i < messages * 2; i++) { + receivedMessages.add(consumer.receive().getValue()); + } + + Assert.assertEquals(receivedMessages.size(), messages); + + consumer.close(); + producer.close(); + } + + @Test + public void testZeroQueueSizeMessageRedeliveryForListener() throws Exception { + final String topic = "persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForListener"; + final int messages = 10; + final CountDownLatch latch = new CountDownLatch(messages * 2); + Set receivedMessages = new HashSet<>(); + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .receiverQueueSize(0) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .messageListener((MessageListener) (c, msg) -> { + try { + receivedMessages.add(msg.getValue()); + } finally { + latch.countDown(); + } + }) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + + for (int i = 0; i < messages; i++) { + producer.send(i); + } + + latch.await(); + Assert.assertEquals(receivedMessages.size(), messages); + + consumer.close(); + producer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index c3c2f7605bd0f..f7f402faf9744 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -60,7 +60,9 @@ public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConf protected Message internalReceive() throws PulsarClientException { zeroQueueLock.lock(); try { - return beforeConsume(fetchSingleMessageFromBroker()); + Message msg = fetchSingleMessageFromBroker(); + trackMessage(msg); + return beforeConsume(msg); } finally { zeroQueueLock.unlock(); } @@ -155,6 +157,7 @@ private void triggerZeroQueueSizeListener(final Message message) { log.debug("[{}][{}] Calling message listener for unqueued message {}", topic, subscription, message.getMessageId()); } + trackMessage(message); listener.received(ZeroQueueConsumerImpl.this, beforeConsume(message)); } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing unqueued message: {}", topic, subscription,