diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java index 2859047fae1a5..e5465b2c44f7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java @@ -20,7 +20,9 @@ import static org.testng.Assert.assertEquals; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -30,10 +32,12 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -303,4 +307,73 @@ public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException { consumer.close(); } } + + @Test + public void testZeroQueueSizeMessageRedelivery() throws PulsarClientException { + final String topic = "persistent://prop/ns-abc/testZeroQueueSizeMessageRedelivery"; + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .receiverQueueSize(0) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .subscribe(); + + final int messages = 10; + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + + for (int i = 0; i < messages; i++) { + producer.send(i); + } + + Set receivedMessages = new HashSet<>(); + for (int i = 0; i < messages * 2; i++) { + receivedMessages.add(consumer.receive().getValue()); + } + + Assert.assertEquals(receivedMessages.size(), messages); + + consumer.close(); + producer.close(); + } + + @Test + public void testZeroQueueSizeMessageRedeliveryForListener() throws Exception { + final String topic = "persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForListener"; + final int messages = 10; + final CountDownLatch latch = new CountDownLatch(messages * 2); + Set receivedMessages = new HashSet<>(); + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .receiverQueueSize(0) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .messageListener((MessageListener) (c, msg) -> { + try { + receivedMessages.add(msg.getValue()); + } finally { + latch.countDown(); + } + }) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + + for (int i = 0; i < messages; i++) { + producer.send(i); + } + + latch.await(); + Assert.assertEquals(receivedMessages.size(), messages); + + consumer.close(); + producer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index c3c2f7605bd0f..f7f402faf9744 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -60,7 +60,9 @@ public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConf protected Message internalReceive() throws PulsarClientException { zeroQueueLock.lock(); try { - return beforeConsume(fetchSingleMessageFromBroker()); + Message msg = fetchSingleMessageFromBroker(); + trackMessage(msg); + return beforeConsume(msg); } finally { zeroQueueLock.unlock(); } @@ -155,6 +157,7 @@ private void triggerZeroQueueSizeListener(final Message message) { log.debug("[{}][{}] Calling message listener for unqueued message {}", topic, subscription, message.getMessageId()); } + trackMessage(message); listener.received(ZeroQueueConsumerImpl.this, beforeConsume(message)); } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing unqueued message: {}", topic, subscription,