diff --git a/conf/broker.conf b/conf/broker.conf index 79b00fbab7246..6fcd02cdf5051 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -100,6 +100,17 @@ brokerDeleteInactiveTopicsEnabled=true # How often to check for inactive topics brokerDeleteInactiveTopicsFrequencySeconds=60 +# Set the inactive topic delete mode. Default is delete_when_no_subscriptions +# 'delete_when_no_subscriptions' mode only delete the topic which has no subscriptions and no active producers +# 'delete_when_subscriptions_caught_up' mode only delete the topic that all subscriptions has no backlogs(caught up) +# and no active producers/consumers +brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions + +# Max duration of topic inactivity in seconds, default is not present +# If not present, 'brokerDeleteInactiveTopicsFrequencySeconds' will be used +# Topics that are inactive for longer than this value will be deleted +brokerDeleteInactiveTopicsMaxInactiveDurationSeconds= + # How frequently to proactively check and purge expired messages messageExpiryCheckIntervalInMinutes=5 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index d437362ea4bee..54cbe621b9675 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -33,6 +33,7 @@ import lombok.Setter; import org.apache.bookkeeper.client.api.DigestType; import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.configuration.Category; import org.apache.pulsar.common.configuration.FieldContext; @@ -254,6 +255,24 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "How often to check for inactive topics" ) private int brokerDeleteInactiveTopicsFrequencySeconds = 60; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Set the inactive topic delete mode. Default is delete_when_no_subscriptions\n" + + "'delete_when_no_subscriptions' mode only delete the topic which has no subscriptions and no active producers\n" + + "'delete_when_subscriptions_caught_up' mode only delete the topic that all subscriptions has no backlogs(caught up)" + + "and no active producers/consumers" + ) + private InactiveTopicDeleteMode brokerDeleteInactiveTopicsMode = InactiveTopicDeleteMode.delete_when_no_subscriptions; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "Max duration of topic inactivity in seconds, default is not present\n" + + "If not present, 'brokerDeleteInactiveTopicsFrequencySeconds' will be used\n" + + "Topics that are inactive for longer than this value will be deleted" + ) + private Integer brokerDeleteInactiveTopicsMaxInactiveDurationSeconds = null; + @FieldContext( category = CATEGORY_POLICIES, doc = "How frequently to proactively check and purge expired messages" @@ -1458,6 +1477,14 @@ public boolean isDefaultTopicTypePartitioned() { return TopicType.PARTITIONED.toString().equals(allowAutoTopicCreationType); } + public int getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() { + if (brokerDeleteInactiveTopicsMaxInactiveDurationSeconds == null) { + return brokerDeleteInactiveTopicsFrequencySeconds; + } else { + return brokerDeleteInactiveTopicsMaxInactiveDurationSeconds; + } + } + enum TopicType { PARTITIONED("partitioned"), NON_PARTITIONED("non-partitioned"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index eb8a05fd60ad6..4ea2d472cff76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -403,7 +403,8 @@ protected void startStatsUpdater(int statsUpdateInitailDelayInSecs, int statsUpd protected void startInactivityMonitor() { if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) { int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds(); - inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC(interval)), interval, interval, + int maxInactiveDurationInSec = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(); + inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC(maxInactiveDurationInSec)), interval, interval, TimeUnit.SECONDS); } @@ -1095,8 +1096,9 @@ public Semaphore getLookupRequestSemaphore() { return lookupRequestSemaphore.get(); } - public void checkGC(int gcIntervalInSeconds) { - forEachTopic(topic -> topic.checkGC(gcIntervalInSeconds)); + public void checkGC(int maxInactiveDurationInSec) { + forEachTopic(topic -> topic.checkGC(maxInactiveDurationInSec, + pulsar.getConfiguration().getBrokerDeleteInactiveTopicsMode())); } public void checkMessageExpiry() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 529b746867ed6..26af1c1c5c8bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -32,6 +32,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicStats; @@ -123,7 +124,7 @@ CompletableFuture createSubscription(String subscriptionName, Init CompletableFuture close(boolean closeWithoutWaitingClientDisconnect); - void checkGC(int gcInterval); + void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode); void checkInactiveSubscriptions(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 6f054cae814c4..8e3b14fa7f43a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -68,6 +68,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats; import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats; import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats; @@ -822,11 +823,11 @@ public boolean isActive() { } @Override - public void checkGC(int gcIntervalInSeconds) { + public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) { if (isActive()) { lastActive = System.nanoTime(); } else { - if (System.nanoTime() - lastActive > TimeUnit.SECONDS.toNanos(gcIntervalInSeconds)) { + if (System.nanoTime() - lastActive > TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) { if (TopicName.get(topic).isGlobal()) { // For global namespace, close repl producers first. @@ -834,7 +835,7 @@ public void checkGC(int gcIntervalInSeconds) { // provided no remote producers connected to the broker. if (log.isDebugEnabled()) { log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic, - gcIntervalInSeconds); + maxInactiveDurationInSec); } stopReplProducers().thenCompose(v -> delete(true, false, true)) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c87f642ec9892..b177b116a735b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -101,6 +101,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo; @@ -769,11 +770,11 @@ void removeSubscription(String subscriptionName) { */ @Override public CompletableFuture delete() { - return delete(false, false); + return delete(false, false, false); } - private CompletableFuture delete(boolean failIfHasSubscriptions, boolean deleteSchema) { - return delete(failIfHasSubscriptions, false, deleteSchema); + private CompletableFuture delete(boolean failIfHasSubscriptions, boolean failIfHasBacklogs, boolean deleteSchema) { + return delete(failIfHasSubscriptions, failIfHasBacklogs, false, deleteSchema); } /** @@ -785,7 +786,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean d */ @Override public CompletableFuture deleteForcefully() { - return delete(false, true, false); + return delete(false, false, true, false); } /** @@ -805,6 +806,7 @@ public CompletableFuture deleteForcefully() { * IllegalStateException if topic is still active ManagedLedgerException if ledger delete operation fails */ private CompletableFuture delete(boolean failIfHasSubscriptions, + boolean failIfHasBacklogs, boolean closeIfClientsConnected, boolean deleteSchema) { CompletableFuture deleteFuture = new CompletableFuture<>(); @@ -846,6 +848,12 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions")); return; } + } else if (failIfHasBacklogs) { + if (hasBacklogs()) { + isFenced = false; + deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions did not catch up")); + return; + } } else { subscriptions.forEach((s, sub) -> futures.add(sub.delete())); } @@ -1591,19 +1599,36 @@ public long getBacklogSize() { return ledger.getEstimatedBacklogSize(); } - public boolean isActive() { + public boolean isActive(InactiveTopicDeleteMode deleteMode) { + switch (deleteMode) { + case delete_when_no_subscriptions: + if (!subscriptions.isEmpty()) { + return true; + } + break; + case delete_when_subscriptions_caught_up: + if (hasBacklogs()) { + return true; + } + break; + } if (TopicName.get(topic).isGlobal()) { - // No local consumers and no local producers - return !subscriptions.isEmpty() || hasLocalProducers(); + // no local producers + return hasLocalProducers(); + } else { + return USAGE_COUNT_UPDATER.get(this) != 0; } - return USAGE_COUNT_UPDATER.get(this) != 0 || !subscriptions.isEmpty(); + } + + private boolean hasBacklogs() { + return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog() > 0); } @Override - public void checkGC(int gcIntervalInSeconds) { - if (isActive()) { + public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) { + if (isActive(deleteMode)) { lastActive = System.nanoTime(); - } else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(gcIntervalInSeconds)) { + } else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) { // Gc interval did not expire yet return; } else if (shouldTopicBeRetained()) { @@ -1618,7 +1643,7 @@ public void checkGC(int gcIntervalInSeconds) { // provided no remote producers connected to the broker. if (log.isDebugEnabled()) { log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic, - gcIntervalInSeconds); + maxInactiveDurationInSec); } closeReplProducersIfNoBacklog().thenRun(() -> { if (hasRemoteProducers()) { @@ -1630,7 +1655,7 @@ public void checkGC(int gcIntervalInSeconds) { .completeExceptionally(new TopicBusyException("Topic has connected remote producers")); } else { log.info("[{}] Global topic inactive for {} seconds, closed repl producers", topic, - gcIntervalInSeconds); + maxInactiveDurationInSec); replCloseFuture.complete(null); } }).exceptionally(e -> { @@ -1644,7 +1669,8 @@ public void checkGC(int gcIntervalInSeconds) { replCloseFuture.complete(null); } - replCloseFuture.thenCompose(v -> delete(true, true)) + replCloseFuture.thenCompose(v -> delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions, + deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, true)) .thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic)) .exceptionally(e -> { if (e.getCause() instanceof TopicBusyException) { @@ -1979,7 +2005,7 @@ public synchronized OffloadProcessStatus offloadStatus() { public CompletableFuture addSchemaIfIdleOrCheckCompatible(SchemaData schema) { return hasSchema() .thenCompose((hasSchema) -> { - if (hasSchema || isActive() || ledger.getTotalSize() != 0) { + if (hasSchema || isActive(InactiveTopicDeleteMode.delete_when_no_subscriptions) || ledger.getTotalSize() != 0) { return checkSchemaCompatibleForConsumer(schema); } else { return addSchema(schema).thenCompose(schemaVersion -> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java new file mode 100644 index 0000000000000..c3d353b05247e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + + +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Consumer; + +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class InactiveTopicDeleteTest extends BrokerTestBase { + + protected void setup() throws Exception { + // No-op + } + + protected void cleanup() throws Exception { + // No-op + } + + @Test + public void testDeleteWhenNoSubscriptions() throws Exception { + conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); + conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1); + super.baseSetup(); + + final String topic = "persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"; + + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribe(); + + consumer.close(); + producer.close(); + + Thread.sleep(2000); + Assert.assertTrue(admin.topics().getList("prop/ns-abc") + .contains(topic)); + + admin.topics().deleteSubscription(topic, "sub"); + Thread.sleep(2000); + Assert.assertFalse(admin.topics().getList("prop/ns-abc") + .contains(topic)); + + super.internalCleanup(); + } + + @Test + public void testDeleteWhenNoBacklogs() throws Exception { + conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); + conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1); + super.baseSetup(); + + final String topic = "persistent://prop/ns-abc/testDeleteWhenNoBacklogs"; + + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribe(); + + for (int i = 0; i < 10; i++) { + producer.send("Pulsar".getBytes()); + } + + consumer.close(); + producer.close(); + + Thread.sleep(2000); + Assert.assertTrue(admin.topics().getList("prop/ns-abc") + .contains(topic)); + + admin.topics().skipAllMessages(topic, "sub"); + Thread.sleep(2000); + Assert.assertFalse(admin.topics().getList("prop/ns-abc") + .contains(topic)); + + super.internalCleanup(); + } + + @Test + public void testMaxInactiveDuration() throws Exception { + conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); + conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1); + conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(5); + super.baseSetup(); + + final String topic = "persistent://prop/ns-abc/testMaxInactiveDuration"; + + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + + producer.close(); + Thread.sleep(2000); + Assert.assertTrue(admin.topics().getList("prop/ns-abc") + .contains(topic)); + + Thread.sleep(4000); + Assert.assertFalse(admin.topics().getList("prop/ns-abc") + .contains(topic)); + + super.internalCleanup(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index d192c2c30c67b..da8a203fa097e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; @@ -197,7 +198,7 @@ public void run() { // Thread.sleep(5,0); log.info("{} forcing topic GC ", Thread.currentThread()); for (int i = 0; i < 2000; i++) { - topic.checkGC(0); + topic.checkGC(0, InactiveTopicDeleteMode.delete_when_no_subscriptions); } log.info("GC done.."); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index fd51c067022fa..c154d29a0ef6b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -34,6 +34,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.testng.annotations.Test; /** @@ -59,6 +60,7 @@ public void testInit() throws Exception { assertTrue(config.getBrokerServicePort().isPresent() && config.getBrokerServicePort().get().equals(brokerServicePort)); assertEquals(config.getBootstrapNamespaces().get(1), "ns2"); + assertEquals(config.getBrokerDeleteInactiveTopicsMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); } @Test diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index 75b6d03311092..5d37a11ec60f4 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -85,3 +85,4 @@ replicationMetricsEnabled=true replicationConnectionsPerBroker=16 replicationProducerQueueSize=1000 replicatorPrefix=pulsar.repl +brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicDeleteMode.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicDeleteMode.java new file mode 100644 index 0000000000000..eb7d1ba3ee75f --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/InactiveTopicDeleteMode.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +/** + * Inactive topic deletion mode. + */ +public enum InactiveTopicDeleteMode { + + /** + * The topic can be deleted when no subscriptions and no active producers. + */ + delete_when_no_subscriptions, + + /** + * The topic can be deleted when all subscriptions catchup and no active producers/consumers. + */ + delete_when_subscriptions_caught_up +}