Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support delete inactive topic when subscriptions caught up #6077

Merged
merged 5 commits into from
Jan 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ brokerDeleteInactiveTopicsEnabled=true
# How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60

# Set the inactive topic delete mode. Default is delete_when_no_subscriptions
# 'delete_when_no_subscriptions' mode only delete the topic which has no subscriptions and no active producers
# 'delete_when_subscriptions_caught_up' mode only delete the topic that all subscriptions has no backlogs(caught up)
# and no active producers/consumers
brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions

# Max duration of topic inactivity in seconds, default is not present
# If not present, 'brokerDeleteInactiveTopicsFrequencySeconds' will be used
# Topics that are inactive for longer than this value will be deleted
brokerDeleteInactiveTopicsMaxInactiveDurationSeconds=

# How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
Expand Down Expand Up @@ -254,6 +255,24 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "How often to check for inactive topics"
)
private int brokerDeleteInactiveTopicsFrequencySeconds = 60;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Set the inactive topic delete mode. Default is delete_when_no_subscriptions\n"
+ "'delete_when_no_subscriptions' mode only delete the topic which has no subscriptions and no active producers\n"
+ "'delete_when_subscriptions_caught_up' mode only delete the topic that all subscriptions has no backlogs(caught up)"
+ "and no active producers/consumers"
)
private InactiveTopicDeleteMode brokerDeleteInactiveTopicsMode = InactiveTopicDeleteMode.delete_when_no_subscriptions;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Max duration of topic inactivity in seconds, default is not present\n"
+ "If not present, 'brokerDeleteInactiveTopicsFrequencySeconds' will be used\n"
+ "Topics that are inactive for longer than this value will be deleted"
)
private Integer brokerDeleteInactiveTopicsMaxInactiveDurationSeconds = null;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "How frequently to proactively check and purge expired messages"
Expand Down Expand Up @@ -1449,6 +1468,14 @@ public boolean isDefaultTopicTypePartitioned() {
return TopicType.PARTITIONED.toString().equals(allowAutoTopicCreationType);
}

public int getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() {
if (brokerDeleteInactiveTopicsMaxInactiveDurationSeconds == null) {
return brokerDeleteInactiveTopicsFrequencySeconds;
} else {
return brokerDeleteInactiveTopicsMaxInactiveDurationSeconds;
}
}

enum TopicType {
PARTITIONED("partitioned"),
NON_PARTITIONED("non-partitioned");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ protected void startStatsUpdater(int statsUpdateInitailDelayInSecs, int statsUpd
protected void startInactivityMonitor() {
if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) {
int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC(interval)), interval, interval,
int maxInactiveDurationInSec = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds();
inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC(maxInactiveDurationInSec)), interval, interval,
TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -1095,8 +1096,9 @@ public Semaphore getLookupRequestSemaphore() {
return lookupRequestSemaphore.get();
}

public void checkGC(int gcIntervalInSeconds) {
forEachTopic(topic -> topic.checkGC(gcIntervalInSeconds));
public void checkGC(int maxInactiveDurationInSec) {
forEachTopic(topic -> topic.checkGC(maxInactiveDurationInSec,
pulsar.getConfiguration().getBrokerDeleteInactiveTopicsMode()));
}

public void checkMessageExpiry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
Expand Down Expand Up @@ -123,7 +124,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);

void checkGC(int gcInterval);
void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode);

void checkInactiveSubscriptions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
Expand Down Expand Up @@ -822,19 +823,19 @@ public boolean isActive() {
}

@Override
public void checkGC(int gcIntervalInSeconds) {
public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
if (isActive()) {
lastActive = System.nanoTime();
} else {
if (System.nanoTime() - lastActive > TimeUnit.SECONDS.toNanos(gcIntervalInSeconds)) {
if (System.nanoTime() - lastActive > TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {

if (TopicName.get(topic).isGlobal()) {
// For global namespace, close repl producers first.
// Once all repl producers are closed, we can delete the topic,
// provided no remote producers connected to the broker.
if (log.isDebugEnabled()) {
log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic,
gcIntervalInSeconds);
maxInactiveDurationInSec);
}

stopReplProducers().thenCompose(v -> delete(true, false, true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
Expand Down Expand Up @@ -764,11 +765,11 @@ void removeSubscription(String subscriptionName) {
*/
@Override
public CompletableFuture<Void> delete() {
return delete(false, false);
return delete(false, false, false);
}

private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean deleteSchema) {
return delete(failIfHasSubscriptions, false, deleteSchema);
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean failIfHasBacklogs, boolean deleteSchema) {
return delete(failIfHasSubscriptions, failIfHasBacklogs, false, deleteSchema);
}

/**
Expand All @@ -780,7 +781,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean d
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(false, true, false);
return delete(false, false, true, false);
}

/**
Expand All @@ -800,6 +801,7 @@ public CompletableFuture<Void> deleteForcefully() {
* IllegalStateException if topic is still active ManagedLedgerException if ledger delete operation fails
*/
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean failIfHasBacklogs,
boolean closeIfClientsConnected,
boolean deleteSchema) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -841,6 +843,12 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions"));
return;
}
} else if (failIfHasBacklogs) {
if (hasBacklogs()) {
isFenced = false;
deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions did not catch up"));
return;
}
} else {
subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
}
Expand Down Expand Up @@ -1586,19 +1594,36 @@ public long getBacklogSize() {
return ledger.getEstimatedBacklogSize();
}

public boolean isActive() {
public boolean isActive(InactiveTopicDeleteMode deleteMode) {
switch (deleteMode) {
case delete_when_no_subscriptions:
if (!subscriptions.isEmpty()) {
return true;
}
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
break;
case delete_when_subscriptions_caught_up:
if (hasBacklogs()) {
return true;
}
break;
}
if (TopicName.get(topic).isGlobal()) {
// No local consumers and no local producers
return !subscriptions.isEmpty() || hasLocalProducers();
// no local producers
return hasLocalProducers();
} else {
return USAGE_COUNT_UPDATER.get(this) != 0;
}
return USAGE_COUNT_UPDATER.get(this) != 0 || !subscriptions.isEmpty();
}

private boolean hasBacklogs() {
return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog() > 0);
}

@Override
public void checkGC(int gcIntervalInSeconds) {
if (isActive()) {
public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
if (isActive(deleteMode)) {
lastActive = System.nanoTime();
} else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(gcIntervalInSeconds)) {
} else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
// Gc interval did not expire yet
return;
} else if (shouldTopicBeRetained()) {
Expand All @@ -1613,7 +1638,7 @@ public void checkGC(int gcIntervalInSeconds) {
// provided no remote producers connected to the broker.
if (log.isDebugEnabled()) {
log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic,
gcIntervalInSeconds);
maxInactiveDurationInSec);
}
closeReplProducersIfNoBacklog().thenRun(() -> {
if (hasRemoteProducers()) {
Expand All @@ -1625,7 +1650,7 @@ public void checkGC(int gcIntervalInSeconds) {
.completeExceptionally(new TopicBusyException("Topic has connected remote producers"));
} else {
log.info("[{}] Global topic inactive for {} seconds, closed repl producers", topic,
gcIntervalInSeconds);
maxInactiveDurationInSec);
replCloseFuture.complete(null);
}
}).exceptionally(e -> {
Expand All @@ -1639,7 +1664,8 @@ public void checkGC(int gcIntervalInSeconds) {
replCloseFuture.complete(null);
}

replCloseFuture.thenCompose(v -> delete(true, true))
replCloseFuture.thenCompose(v -> delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions,
deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, true))
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
.exceptionally(e -> {
if (e.getCause() instanceof TopicBusyException) {
Expand Down Expand Up @@ -1969,7 +1995,7 @@ public synchronized OffloadProcessStatus offloadStatus() {
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema()
.thenCompose((hasSchema) -> {
if (hasSchema || isActive() || ledger.getTotalSize() != 0) {
if (hasSchema || isActive(InactiveTopicDeleteMode.delete_when_no_subscriptions) || ledger.getTotalSize() != 0) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion ->
Expand Down
Loading