diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 631a728ccce4d..264bac7cb6aab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -451,12 +451,15 @@ protected void reScheduleReadInMs(long readAfterMs) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs); } - topic.getBrokerService().executor().schedule( - () -> { - isRescheduleReadInProgress.set(false); - readMoreEntries(); - }, - readAfterMs, TimeUnit.MILLISECONDS); + Runnable runnable = () -> { + isRescheduleReadInProgress.set(false); + readMoreEntries(); + }; + if (readAfterMs > 0) { + topic.getBrokerService().executor().schedule(runnable, readAfterMs, TimeUnit.MILLISECONDS); + } else { + topic.getBrokerService().executor().execute(runnable); + } } } @@ -836,6 +839,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis totalBytesSent += sendMessageInfo.getTotalBytes(); } + lastNumberOfEntriesDispatched = (int) totalEntries; acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); if (entriesToDispatch > 0) { @@ -848,9 +852,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); }); - - lastNumberOfEntriesDispatched = entriesToDispatch; } + return true; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index af99741d09bb6..a7ff9eb9c11f2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -110,6 +110,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest { final String topicName = "persistent://public/default/testTopic"; final String subscriptionName = "testSubscription"; private AtomicInteger consumerMockAvailablePermits; + int retryBackoffInitialTimeInMs = 10; + int retryBackoffMaxTimeInMs = 50; @BeforeMethod public void setup() throws Exception { @@ -120,8 +122,8 @@ public void setup() throws Exception { doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); doReturn(false).when(configMock).isAllowOverrideEntryFilters(); - doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs(); - doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs(); + doAnswer(invocation -> retryBackoffInitialTimeInMs).when(configMock).getDispatcherRetryBackoffInitialTimeInMs(); + doAnswer(invocation -> retryBackoffMaxTimeInMs).when(configMock).getDispatcherRetryBackoffMaxTimeInMs(); pulsarMock = mock(PulsarService.class); doReturn(configMock).when(pulsarMock).getConfiguration(); @@ -825,34 +827,45 @@ public void testLastSentPositionAndIndividuallySentPositions(final boolean initi assertEquals(persistentDispatcher.getLastSentPosition(), initialLastSentPosition.toString()); } - @DataProvider(name = "dispatchMessagesInSubscriptionThread") - private Object[][] dispatchMessagesInSubscriptionThread() { - return new Object[][] { { false }, { true } }; + @DataProvider(name = "testBackoffDelayWhenNoMessagesDispatched") + private Object[][] testBackoffDelayWhenNoMessagesDispatchedParams() { + return new Object[][] { { false, true }, { true, true }, { true, false }, { false, false } }; } - @Test(dataProvider = "dispatchMessagesInSubscriptionThread") - public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread) + @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched") + public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared) throws Exception { persistentDispatcher.close(); List retryDelays = new CopyOnWriteArrayList<>(); doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); - persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( - topicMock, cursorMock, subscriptionMock, configMock, - new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { - @Override - protected void reScheduleReadInMs(long readAfterMs) { - retryDelays.add(readAfterMs); - } - }; + + PersistentDispatcherMultipleConsumers dispatcher; + if (isKeyShared) { + dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( + topicMock, cursorMock, subscriptionMock, configMock, + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { + @Override + protected void reScheduleReadInMs(long readAfterMs) { + retryDelays.add(readAfterMs); + } + }; + } else { + dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) { + @Override + protected void reScheduleReadInMs(long readAfterMs) { + retryDelays.add(readAfterMs); + } + }; + } // add a consumer without permits to trigger the retry behavior consumerMockAvailablePermits.set(0); - persistentDispatcher.addConsumer(consumerMock); + dispatcher.addConsumer(consumerMock); // call "readEntriesComplete" directly to test the retry behavior List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 1); assertEquals(retryDelays.get(0), 10, "Initial retry delay should be 10ms"); @@ -860,7 +873,7 @@ protected void reScheduleReadInMs(long readAfterMs) { ); // test the second retry delay entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 2); double delay = retryDelays.get(1); @@ -870,7 +883,7 @@ protected void reScheduleReadInMs(long readAfterMs) { // verify the max retry delay for (int i = 0; i < 100; i++) { entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); } Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 102); @@ -881,14 +894,14 @@ protected void reScheduleReadInMs(long readAfterMs) { // unblock to check that the retry delay is reset consumerMockAvailablePermits.set(1000); entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2"))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); // wait that the possibly async handling has completed - Awaitility.await().untilAsserted(() -> assertFalse(persistentDispatcher.isSendInProgress())); + Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress())); // now block again to check the next retry delay so verify it was reset consumerMockAvailablePermits.set(0); entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3"))); - persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); Awaitility.await().untilAsserted(() -> { assertEquals(retryDelays.size(), 103); assertEquals(retryDelays.get(0), 10, "Resetted retry delay should be 10ms"); @@ -896,6 +909,89 @@ protected void reScheduleReadInMs(long readAfterMs) { ); } + @Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched") + public void testBackoffDelayWhenRetryDelayDisabled(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared) + throws Exception { + persistentDispatcher.close(); + + // it should be possible to disable the retry delay + // by setting retryBackoffInitialTimeInMs and retryBackoffMaxTimeInMs to 0 + retryBackoffInitialTimeInMs=0; + retryBackoffMaxTimeInMs=0; + + List retryDelays = new CopyOnWriteArrayList<>(); + doReturn(dispatchMessagesInSubscriptionThread).when(configMock) + .isDispatcherDispatchMessagesInSubscriptionThread(); + + PersistentDispatcherMultipleConsumers dispatcher; + if (isKeyShared) { + dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( + topicMock, cursorMock, subscriptionMock, configMock, + new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) { + @Override + protected void reScheduleReadInMs(long readAfterMs) { + retryDelays.add(readAfterMs); + } + }; + } else { + dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) { + @Override + protected void reScheduleReadInMs(long readAfterMs) { + retryDelays.add(readAfterMs); + } + }; + } + + // add a consumer without permits to trigger the retry behavior + consumerMockAvailablePermits.set(0); + dispatcher.addConsumer(consumerMock); + + // call "readEntriesComplete" directly to test the retry behavior + List entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); + dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + Awaitility.await().untilAsserted(() -> { + assertEquals(retryDelays.size(), 1); + assertEquals(retryDelays.get(0), 0, "Initial retry delay should be 0ms"); + } + ); + // test the second retry delay + entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); + dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + Awaitility.await().untilAsserted(() -> { + assertEquals(retryDelays.size(), 2); + double delay = retryDelays.get(1); + assertEquals(delay, 0, 0, "Second retry delay should be 0ms"); + } + ); + // verify the max retry delay + for (int i = 0; i < 100; i++) { + entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1))); + dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + } + Awaitility.await().untilAsserted(() -> { + assertEquals(retryDelays.size(), 102); + double delay = retryDelays.get(101); + assertEquals(delay, 0, 0, "Max delay should be 0ms"); + } + ); + // unblock to check that the retry delay is reset + consumerMockAvailablePermits.set(1000); + entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2"))); + dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + // wait that the possibly async handling has completed + Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress())); + + // now block again to check the next retry delay so verify it was reset + consumerMockAvailablePermits.set(0); + entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3"))); + dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal); + Awaitility.await().untilAsserted(() -> { + assertEquals(retryDelays.size(), 103); + assertEquals(retryDelays.get(0), 0, "Resetted retry delay should be 0ms"); + } + ); + } + private ByteBuf createMessage(String message, int sequenceId) { return createMessage(message, sequenceId, "testKey"); }