diff --git a/conf/broker.conf b/conf/broker.conf index 2709a83d972c6..444180ed2109d 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -410,6 +410,10 @@ systemTopicEnabled=false # Please enable the system topic first. topicLevelPoliciesEnabled=false +# If a topic remains fenced for this number of seconds, it will be closed forcefully. +# If it is set to 0 or a negative number, the fenced topic will not be closed. +topicFencingTimeoutSeconds=0 + ### --- Authentication --- ### # Role names that are treated as "proxy roles". If the broker sees a request with #role as proxyRoles - it will demand to see a valid original principal. diff --git a/conf/standalone.conf b/conf/standalone.conf index 0ce88dc975ffd..7ca742a5bf903 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -350,6 +350,10 @@ systemTopicEnabled=false # Please enable the system topic first. topicLevelPoliciesEnabled=false +# If a topic remains fenced for this number of seconds, it will be closed forcefully. +# If it is set to 0 or a negative number, the fenced topic will not be closed. +topicFencingTimeoutSeconds=0 + ### --- Authentication --- ### # Role names that are treated as "proxy roles". If the broker sees a request with #role as proxyRoles - it will demand to see a valid original principal. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 4da87d86c31a7..7d52f783630dc 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -755,6 +755,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String zookeeperSessionExpiredPolicy = "shutdown"; + @FieldContext( + category = CATEGORY_SERVER, + doc = "If a topic remains fenced for this number of seconds, it will be closed forcefully.\n" + + " If it is set to 0 or a negative number, the fenced topic will not be closed." + ) + private int topicFencingTimeoutSeconds = 0; /**** --- Messaging Protocols --- ****/ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f46773210e9ea..198195d082d5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -37,6 +37,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -184,6 +185,8 @@ protected TopicStatsHelper initialValue() { public volatile int maxUnackedMessagesOnSubscription = -1; private volatile boolean isClosingOrDeleting = false; + private ScheduledFuture fencedTopicMonitoringTask = null; + private static class TopicStatsHelper { public double averageMsgSize; public double aggMsgRateIn; @@ -353,7 +356,7 @@ private void decrementPendingWriteOpsAndCheck() { // signal to managed ledger that we are ready to resume by creating a new ledger ledger.readyToCreateNewLedger(); - isFenced = false; + unfence(); } } @@ -380,7 +383,7 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx) } else { // fence topic when failed to write a message to BK - isFenced = true; + fence(); // close all producers List> futures = Lists.newArrayList(); producers.values().forEach(producer -> futures.add(producer.disconnect())); @@ -2206,6 +2209,40 @@ public boolean isSystemTopic() { return false; } + private synchronized void fence() { + isFenced = true; + ScheduledFuture monitoringTask = this.fencedTopicMonitoringTask; + if (monitoringTask == null || monitoringTask.isDone()) { + final int timeout = brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds(); + if (timeout > 0) { + this.fencedTopicMonitoringTask = brokerService.executor().schedule(this::closeFencedTopicForcefully, + timeout, TimeUnit.SECONDS); + } + } + } + + private synchronized void unfence() { + isFenced = false; + ScheduledFuture monitoringTask = this.fencedTopicMonitoringTask; + if (monitoringTask != null && !monitoringTask.isDone()) { + monitoringTask.cancel(false); + } + } + + private void closeFencedTopicForcefully() { + if (isFenced) { + final int timeout = brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds(); + if (isClosingOrDeleting) { + log.warn("[{}] Topic remained fenced for {} seconds and is already closed (pendingWriteOps: {})", topic, + timeout, pendingWriteOps.get()); + } else { + log.error("[{}] Topic remained fenced for {} seconds, so close it (pendingWriteOps: {})", topic, + timeout, pendingWriteOps.get()); + close(); + } + } + } + private void fenceTopicToCloseOrDelete() { isClosingOrDeleting = true; isFenced = true; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 970277dc34e38..bd56f35b91040 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -59,6 +59,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1747,6 +1748,43 @@ public void testCheckInactiveSubscriptions() throws Exception { verify(nonDeletableSubscription2, times(0)).delete(); } + @Test + public void testTopicFencingTimeout() throws Exception { + ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + doReturn(svcConfig).when(pulsar).getConfiguration(); + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + + Method fence = PersistentTopic.class.getDeclaredMethod("fence"); + fence.setAccessible(true); + Method unfence = PersistentTopic.class.getDeclaredMethod("unfence"); + unfence.setAccessible(true); + + Field fencedTopicMonitoringTaskField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask"); + fencedTopicMonitoringTaskField.setAccessible(true); + Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced"); + isFencedField.setAccessible(true); + Field isClosingOrDeletingField = PersistentTopic.class.getDeclaredField("isClosingOrDeleting"); + isClosingOrDeletingField.setAccessible(true); + + doReturn(10).when(svcConfig).getTopicFencingTimeoutSeconds(); + fence.invoke(topic); + unfence.invoke(topic); + ScheduledFuture fencedTopicMonitoringTask = (ScheduledFuture) fencedTopicMonitoringTaskField.get(topic); + assertTrue(fencedTopicMonitoringTask.isDone()); + assertTrue(fencedTopicMonitoringTask.isCancelled()); + assertFalse((boolean) isFencedField.get(topic)); + assertFalse((boolean) isClosingOrDeletingField.get(topic)); + + doReturn(1).when(svcConfig).getTopicFencingTimeoutSeconds(); + fence.invoke(topic); + Thread.sleep(2000); + fencedTopicMonitoringTask = (ScheduledFuture) fencedTopicMonitoringTaskField.get(topic); + assertTrue(fencedTopicMonitoringTask.isDone()); + assertFalse(fencedTopicMonitoringTask.isCancelled()); + assertTrue((boolean) isFencedField.get(topic)); + assertTrue((boolean) isClosingOrDeletingField.get(topic)); + } + private ByteBuf getMessageWithMetadata(byte[] data) throws IOException { MessageMetadata messageData = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis()) .setProducerName("prod-name").setSequenceId(0).build();