From f5c9354b89f1b894200cc439c2f8f8679f35e754 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Sat, 5 Nov 2022 08:45:43 +0800 Subject: [PATCH] [fix][broker] fix delete_when_subscriptions_caught_up doesn't work while have active consumers (#18320) (cherry picked from commit 8de67dc1dd625e4e73673df20732b149eccee26f) --- .../service/persistent/PersistentTopic.java | 173 ++++++++++-------- .../service/InactiveTopicDeleteTest.java | 2 +- 2 files changed, 94 insertions(+), 81 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 44c7c962d1eed..8c10f6ca20c84 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1108,6 +1108,9 @@ public CompletableFuture deleteForcefully() { * Flag indicating whether delete should succeed if topic still has unconnected subscriptions. Set to * false when called from admin API (it will delete the subs too), and set to true when called from GC * thread + * @param failIfHasBacklogs + * Flag indicating whether delete should succeed if topic has backlogs. Set to false when called from + * admin API (it will delete the subs too), and set to true when called from GC thread * @param closeIfClientsConnected * Flag indicate whether explicitly close connected * producers/consumers/replicators before trying to delete topic. @@ -1129,97 +1132,107 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, if (isClosingOrDeleting) { log.warn("[{}] Topic is already being closed or deleted", topic); return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced")); - } else if (failIfHasSubscriptions && !subscriptions.isEmpty()) { - return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions")); - } else if (failIfHasBacklogs && hasBacklogs()) { - return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions did not catch up")); + } + // We can proceed with the deletion if either: + // 1. No one is connected and no subscriptions + // 2. The topic have subscriptions but no backlogs for all subscriptions + // if delete_when_no_subscriptions is applied + // 3. We want to kick out everyone and forcefully delete the topic. + // In this case, we shouldn't care if the usageCount is 0 or not, just proceed + if (!closeIfClientsConnected) { + if (failIfHasSubscriptions && !subscriptions.isEmpty()) { + return FutureUtil.failedFuture( + new TopicBusyException("Topic has subscriptions: " + subscriptions.keys())); + } else if (failIfHasBacklogs) { + if (hasBacklogs()) { + List backlogSubs = + subscriptions.values().stream() + .filter(sub -> sub.getNumberOfEntriesInBacklog(false) > 0) + .map(PersistentSubscription::getName).collect(Collectors.toList()); + return FutureUtil.failedFuture( + new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs)); + } else if (!producers.isEmpty()) { + return FutureUtil.failedFuture(new TopicBusyException( + "Topic has " + producers.size() + " connected producers")); + } + } else if (currentUsageCount() > 0) { + return FutureUtil.failedFuture(new TopicBusyException( + "Topic has " + currentUsageCount() + " connected producers/consumers")); + } } fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting CompletableFuture closeClientFuture = new CompletableFuture<>(); + List> futures = new ArrayList<>(); + subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); if (closeIfClientsConnected) { - List> futures = Lists.newArrayList(); replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); - subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); - FutureUtil.waitForAll(futures).thenRun(() -> { - closeClientFuture.complete(null); - }).exceptionally(ex -> { - log.error("[{}] Error closing clients", topic, ex); - unfenceTopicToResume(); - closeClientFuture.completeExceptionally(ex); - return null; - }); - } else { - closeClientFuture.complete(null); } + FutureUtil.waitForAll(futures).thenRun(() -> { + closeClientFuture.complete(null); + }).exceptionally(ex -> { + log.error("[{}] Error closing clients", topic, ex); + unfenceTopicToResume(); + closeClientFuture.completeExceptionally(ex); + return null; + }); closeClientFuture.thenAccept(delete -> { - // We can proceed with the deletion if either: - // 1. No one is connected - // 2. We want to kick out everyone and forcefully delete the topic. - // In this case, we shouldn't care if the usageCount is 0 or not, just proceed - if (currentUsageCount() == 0 || (closeIfClientsConnected && !failIfHasSubscriptions)) { - CompletableFuture deleteTopicAuthenticationFuture = new CompletableFuture<>(); - brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); - - deleteTopicAuthenticationFuture.thenCompose( - __ -> deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null)) - .thenAccept(__ -> deleteTopicPolicies()) - .thenCompose(__ -> transactionBufferCleanupAndClose()) - .whenComplete((v, ex) -> { - if (ex != null) { - log.error("[{}] Error deleting topic", topic, ex); - unfenceTopicToResume(); - deleteFuture.completeExceptionally(ex); - } else { - List> subsDeleteFutures = new ArrayList<>(); - subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); - - FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { - if (e != null) { - log.error("[{}] Error deleting topic", topic, e); - unfenceTopicToResume(); - deleteFuture.completeExceptionally(e); - } else { - ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { - @Override - public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(PersistentTopic.this); - - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - - subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - - unregisterTopicPolicyListener(); - - log.info("[{}] Topic deleted", topic); - deleteFuture.complete(null); - } - - @Override - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { - if (exception.getCause() - instanceof MetadataStoreException.NotFoundException) { - log.info("[{}] Topic is already deleted {}", - topic, exception.getMessage()); - deleteLedgerComplete(ctx); - } else { - unfenceTopicToResume(); - log.error("[{}] Error deleting topic", topic, exception); - deleteFuture.completeExceptionally(new PersistenceException(exception)); + CompletableFuture deleteTopicAuthenticationFuture = new CompletableFuture<>(); + brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); + deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema()) + .thenCompose(__ -> deleteTopicPolicies()) + .thenCompose(__ -> transactionBufferCleanupAndClose()) + .whenComplete((v, ex) -> { + if (ex != null) { + log.error("[{}] Error deleting topic", topic, ex); + unfenceTopicToResume(); + deleteFuture.completeExceptionally(ex); + } else { + List> subsDeleteFutures = new ArrayList<>(); + subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub))); + FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { + if (e != null) { + log.error("[{}] Error deleting topic", topic, e); + unfenceTopicToResume(); + deleteFuture.completeExceptionally(e); + } else { + ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + brokerService.removeTopicFromCache(PersistentTopic.this); + + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + + subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); + + unregisterTopicPolicyListener(); + + log.info("[{}] Topic deleted", topic); + deleteFuture.complete(null); } - } - }, null); - } - }); - } - }); - } else { - unfenceTopicToResume(); - deleteFuture.completeExceptionally(new TopicBusyException( - "Topic has " + currentUsageCount() + " connected producers/consumers")); - } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, + Object ctx) { + if (exception.getCause() + instanceof MetadataStoreException.NotFoundException) { + log.info("[{}] Topic is already deleted {}", + topic, exception.getMessage()); + deleteLedgerComplete(ctx); + } else { + unfenceTopicToResume(); + log.error("[{}] Error deleting topic", topic, exception); + deleteFuture.completeExceptionally( + new PersistenceException(exception)); + } + } + }, null); + } + }); + } + }); }).exceptionally(ex->{ unfenceTopicToResume(); deleteFuture.completeExceptionally( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java index abbcdc1abe747..c972d0ea5bb7b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -327,7 +327,6 @@ public void testDeleteWhenNoBacklogs() throws Exception { producer.send("Pulsar".getBytes()); } - consumer.close(); producer.close(); Thread.sleep(2000); @@ -337,6 +336,7 @@ public void testDeleteWhenNoBacklogs() throws Exception { admin.topics().skipAllMessages(topic, "sub"); Awaitility.await() .untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic))); + consumer.close(); } @Test