Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] fix delete_when_subscriptions_caught_up doesn't work while have active consumers #18283

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,9 @@ public CompletableFuture<Void> deleteForcefully() {
* Flag indicating whether delete should succeed if topic still has unconnected subscriptions. Set to
* false when called from admin API (it will delete the subs too), and set to true when called from GC
* thread
* @param failIfHasBacklogs
* Flag indicating whether delete should succeed if topic has backlogs. Set to false when called from
* admin API (it will delete the subs too), and set to true when called from GC thread
* @param closeIfClientsConnected
* Flag indicate whether explicitly close connected
* producers/consumers/replicators before trying to delete topic.
Expand All @@ -1157,6 +1160,12 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,

lock.writeLock().lock();
try {
// We can proceed with the deletion if either:
// 1. No one is connected and no subscriptions
// 2. The topic have subscriptions but no backlogs for all subscriptions
// if delete_when_no_subscriptions is applied
// 3. We want to kick out everyone and forcefully delete the topic.
// In this case, we shouldn't care if the usageCount is 0 or not, just proceed
if (isClosingOrDeleting) {
log.warn("[{}] Topic is already being closed or deleted", topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced"));
Expand All @@ -1170,6 +1179,9 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
.map(PersistentSubscription::getName).toList();
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs));
} else if (!closeIfClientsConnected && currentUsageCount() != 0 && !failIfHasBacklogs) {
return FutureUtil.failedFuture(new TopicBusyException(
"Topic has " + currentUsageCount() + " connected producers/consumers"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to fail as long as there are connections, so it has nothing to do with !failIfHasBacklogs.

Or if failIfHasBacklogs is triggered, the error message returned should not be like this

Copy link
Contributor

@poorbarcode poorbarcode Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @315157973

When failIfHasBacklogs is true, the expected behavior is that:

  • failure of any producer exists
  • if any consumer exists, close connections

Actually, the code should be like this:

if ( !closeIfClientsConnected && currentUsageCount() ){
   fail...
} else if ( !closeIfClientsConnected !failIfHasBacklogs && anyProducerExists() ){
   fail...
}

This code may not be easy to understand from the context, but the logic is correct

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also a concurrency problem that may lead to the incorrect deletion of existing producer topics. E.g:

checkGC new producer registry
ensure no producer exists
delete topic ( false, true, false)
ensure no backlog
new producer registry
fence topic
disconnect all clients
delete topic

The above flow shows the case: producer exists but the topic is deleted. Would it be better to add a double-check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@poorbarcode poorbarcode Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
Expand All @@ -1179,30 +1191,24 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
FutureUtil.waitForAll(futures).thenRun(() -> {
closeClientFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
});
} else {
closeClientFuture.complete(null);
}
FutureUtil.waitForAll(futures).thenRun(() -> {
closeClientFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
});

closeClientFuture.thenAccept(delete -> {
// We can proceed with the deletion if either:
// 1. No one is connected
// 2. We want to kick out everyone and forcefully delete the topic.
// In this case, we shouldn't care if the usageCount is 0 or not, just proceed
if (currentUsageCount() == 0 || (closeIfClientsConnected && !failIfHasSubscriptions)) {
if (currentUsageCount() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need to keep the original condition (closeIfClientsConnected && !failIfHasSubscriptions))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the current code is correct. Because we have called disconnection operation before. If there are still any connections at the current time, it means that a new connection has come in, we should consider that there is a concurrency problem and give up deleting the topic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right @poorbarcode

The closeIfClientsConnected and failIfHasSubscriptions is validated before we reach here. It looks like duplicated validation.

CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ public void testDeleteWhenNoBacklogs() throws Exception {
producer.send("Pulsar".getBytes());
}

consumer.close();
producer.close();

Thread.sleep(2000);
Expand All @@ -338,6 +337,7 @@ public void testDeleteWhenNoBacklogs() throws Exception {
admin.topics().skipAllMessages(topic, "sub");
Awaitility.await()
.untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic)));
consumer.close();
}

@Test
Expand Down