Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[bot-cherry-pick][broker] Close topics that remain fenced forcefully (a…
Browse files Browse the repository at this point in the history
…pache#8636)

* [bot-cherry-pick][broker] Close topics that remain fenced forcefully

* Fix conflicts.

Co-authored-by: Masahiro Sakamoto <massakam@yahoo-corp.jp>
(cherry picked from commit 291ec28)
  • Loading branch information
codelipenghui committed Nov 20, 2020
1 parent c528dcf commit 490f3bf
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 2 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,10 @@ systemTopicEnabled=false
# Please enable the system topic first.
topicLevelPoliciesEnabled=false

# If a topic remains fenced for this number of seconds, it will be closed forcefully.
# If it is set to 0 or a negative number, the fenced topic will not be closed.
topicFencingTimeoutSeconds=0

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ systemTopicEnabled=false
# Please enable the system topic first.
topicLevelPoliciesEnabled=false

# If a topic remains fenced for this number of seconds, it will be closed forcefully.
# If it is set to 0 or a negative number, the fenced topic will not be closed.
topicFencingTimeoutSeconds=0

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String zookeeperSessionExpiredPolicy = "shutdown";

@FieldContext(
category = CATEGORY_SERVER,
doc = "If a topic remains fenced for this number of seconds, it will be closed forcefully.\n"
+ " If it is set to 0 or a negative number, the fenced topic will not be closed."
)
private int topicFencingTimeoutSeconds = 0;

/**** --- Messaging Protocols --- ****/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -184,6 +185,8 @@ protected TopicStatsHelper initialValue() {
public volatile int maxUnackedMessagesOnSubscription = -1;
private volatile boolean isClosingOrDeleting = false;

private ScheduledFuture<?> fencedTopicMonitoringTask = null;

private static class TopicStatsHelper {
public double averageMsgSize;
public double aggMsgRateIn;
Expand Down Expand Up @@ -353,7 +356,7 @@ private void decrementPendingWriteOpsAndCheck() {
// signal to managed ledger that we are ready to resume by creating a new ledger
ledger.readyToCreateNewLedger();

isFenced = false;
unfence();
}

}
Expand All @@ -380,7 +383,7 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx)
} else {

// fence topic when failed to write a message to BK
isFenced = true;
fence();
// close all producers
List<CompletableFuture<Void>> futures = Lists.newArrayList();
producers.values().forEach(producer -> futures.add(producer.disconnect()));
Expand Down Expand Up @@ -2206,6 +2209,40 @@ public boolean isSystemTopic() {
return false;
}

private synchronized void fence() {
isFenced = true;
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
if (monitoringTask == null || monitoringTask.isDone()) {
final int timeout = brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds();
if (timeout > 0) {
this.fencedTopicMonitoringTask = brokerService.executor().schedule(this::closeFencedTopicForcefully,
timeout, TimeUnit.SECONDS);
}
}
}

private synchronized void unfence() {
isFenced = false;
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
if (monitoringTask != null && !monitoringTask.isDone()) {
monitoringTask.cancel(false);
}
}

private void closeFencedTopicForcefully() {
if (isFenced) {
final int timeout = brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds();
if (isClosingOrDeleting) {
log.warn("[{}] Topic remained fenced for {} seconds and is already closed (pendingWriteOps: {})", topic,
timeout, pendingWriteOps.get());
} else {
log.error("[{}] Topic remained fenced for {} seconds, so close it (pendingWriteOps: {})", topic,
timeout, pendingWriteOps.get());
close();
}
}
}

private void fenceTopicToCloseOrDelete() {
isClosingOrDeleting = true;
isFenced = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1747,6 +1748,43 @@ public void testCheckInactiveSubscriptions() throws Exception {
verify(nonDeletableSubscription2, times(0)).delete();
}

@Test
public void testTopicFencingTimeout() throws Exception {
ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
doReturn(svcConfig).when(pulsar).getConfiguration();
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);

Method fence = PersistentTopic.class.getDeclaredMethod("fence");
fence.setAccessible(true);
Method unfence = PersistentTopic.class.getDeclaredMethod("unfence");
unfence.setAccessible(true);

Field fencedTopicMonitoringTaskField = PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
fencedTopicMonitoringTaskField.setAccessible(true);
Field isFencedField = AbstractTopic.class.getDeclaredField("isFenced");
isFencedField.setAccessible(true);
Field isClosingOrDeletingField = PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
isClosingOrDeletingField.setAccessible(true);

doReturn(10).when(svcConfig).getTopicFencingTimeoutSeconds();
fence.invoke(topic);
unfence.invoke(topic);
ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>) fencedTopicMonitoringTaskField.get(topic);
assertTrue(fencedTopicMonitoringTask.isDone());
assertTrue(fencedTopicMonitoringTask.isCancelled());
assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));

doReturn(1).when(svcConfig).getTopicFencingTimeoutSeconds();
fence.invoke(topic);
Thread.sleep(2000);
fencedTopicMonitoringTask = (ScheduledFuture<?>) fencedTopicMonitoringTaskField.get(topic);
assertTrue(fencedTopicMonitoringTask.isDone());
assertFalse(fencedTopicMonitoringTask.isCancelled());
assertTrue((boolean) isFencedField.get(topic));
assertTrue((boolean) isClosingOrDeletingField.get(topic));
}

private ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
MessageMetadata messageData = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis())
.setProducerName("prod-name").setSequenceId(0).build();
Expand Down

0 comments on commit 490f3bf

Please sign in to comment.