Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix retry backoff for PersistentDispatcherMultipleConsumers #23284

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,15 @@ protected void reScheduleReadInMs(long readAfterMs) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs);
}
topic.getBrokerService().executor().schedule(
() -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
},
readAfterMs, TimeUnit.MILLISECONDS);
Runnable runnable = () -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
};
if (readAfterMs > 0) {
topic.getBrokerService().executor().schedule(runnable, readAfterMs, TimeUnit.MILLISECONDS);
} else {
topic.getBrokerService().executor().execute(runnable);
}
}
}

Expand Down Expand Up @@ -836,6 +839,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

if (entriesToDispatch > 0) {
Expand All @@ -848,9 +852,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
});

lastNumberOfEntriesDispatched = entriesToDispatch;
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
final String topicName = "persistent://public/default/testTopic";
final String subscriptionName = "testSubscription";
private AtomicInteger consumerMockAvailablePermits;
int retryBackoffInitialTimeInMs = 10;
int retryBackoffMaxTimeInMs = 50;

@BeforeMethod
public void setup() throws Exception {
Expand All @@ -120,8 +122,8 @@ public void setup() throws Exception {
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
doReturn(false).when(configMock).isAllowOverrideEntryFilters();
doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
doAnswer(invocation -> retryBackoffInitialTimeInMs).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
doAnswer(invocation -> retryBackoffMaxTimeInMs).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();

Expand Down Expand Up @@ -825,42 +827,53 @@ public void testLastSentPositionAndIndividuallySentPositions(final boolean initi
assertEquals(persistentDispatcher.getLastSentPosition(), initialLastSentPosition.toString());
}

@DataProvider(name = "dispatchMessagesInSubscriptionThread")
private Object[][] dispatchMessagesInSubscriptionThread() {
return new Object[][] { { false }, { true } };
@DataProvider(name = "testBackoffDelayWhenNoMessagesDispatched")
private Object[][] testBackoffDelayWhenNoMessagesDispatchedParams() {
return new Object[][] { { false, true }, { true, true }, { true, false }, { false, false } };
}

@Test(dataProvider = "dispatchMessagesInSubscriptionThread")
public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread)
@Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared)
throws Exception {
persistentDispatcher.close();

List<Long> retryDelays = new CopyOnWriteArrayList<>();
doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};

PersistentDispatcherMultipleConsumers dispatcher;
if (isKeyShared) {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
}

// add a consumer without permits to trigger the retry behavior
consumerMockAvailablePermits.set(0);
persistentDispatcher.addConsumer(consumerMock);
dispatcher.addConsumer(consumerMock);

// call "readEntriesComplete" directly to test the retry behavior
List<Entry> entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 1);
assertEquals(retryDelays.get(0), 10, "Initial retry delay should be 10ms");
}
);
// test the second retry delay
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 2);
double delay = retryDelays.get(1);
Expand All @@ -870,7 +883,7 @@ protected void reScheduleReadInMs(long readAfterMs) {
// verify the max retry delay
for (int i = 0; i < 100; i++) {
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
}
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 102);
Expand All @@ -881,21 +894,104 @@ protected void reScheduleReadInMs(long readAfterMs) {
// unblock to check that the retry delay is reset
consumerMockAvailablePermits.set(1000);
entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
// wait that the possibly async handling has completed
Awaitility.await().untilAsserted(() -> assertFalse(persistentDispatcher.isSendInProgress()));
Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress()));

// now block again to check the next retry delay so verify it was reset
consumerMockAvailablePermits.set(0);
entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 103);
assertEquals(retryDelays.get(0), 10, "Resetted retry delay should be 10ms");
}
);
}

@Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
public void testBackoffDelayWhenRetryDelayDisabled(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared)
throws Exception {
persistentDispatcher.close();

// it should be possible to disable the retry delay
// by setting retryBackoffInitialTimeInMs and retryBackoffMaxTimeInMs to 0
retryBackoffInitialTimeInMs=0;
retryBackoffMaxTimeInMs=0;

List<Long> retryDelays = new CopyOnWriteArrayList<>();
doReturn(dispatchMessagesInSubscriptionThread).when(configMock)
.isDispatcherDispatchMessagesInSubscriptionThread();

PersistentDispatcherMultipleConsumers dispatcher;
if (isKeyShared) {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
}

// add a consumer without permits to trigger the retry behavior
consumerMockAvailablePermits.set(0);
dispatcher.addConsumer(consumerMock);

// call "readEntriesComplete" directly to test the retry behavior
List<Entry> entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 1);
assertEquals(retryDelays.get(0), 0, "Initial retry delay should be 0ms");
}
);
// test the second retry delay
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 2);
double delay = retryDelays.get(1);
assertEquals(delay, 0, 0, "Second retry delay should be 0ms");
}
);
// verify the max retry delay
for (int i = 0; i < 100; i++) {
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
}
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 102);
double delay = retryDelays.get(101);
assertEquals(delay, 0, 0, "Max delay should be 0ms");
}
);
// unblock to check that the retry delay is reset
consumerMockAvailablePermits.set(1000);
entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
// wait that the possibly async handling has completed
Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress()));

// now block again to check the next retry delay so verify it was reset
consumerMockAvailablePermits.set(0);
entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 103);
assertEquals(retryDelays.get(0), 0, "Resetted retry delay should be 0ms");
}
);
}

private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}
Expand Down
Loading