Skip to content

Commit

Permalink
Refactor zookeeper session timeout handling into an interface (apache…
Browse files Browse the repository at this point in the history
…#6347)

Refactor zookeeper session timeout handling into an interface
  • Loading branch information
codelipenghui authored and cdbartholomew committed Jul 24, 2020
1 parent 0d08d17 commit df7a8ce
Show file tree
Hide file tree
Showing 13 changed files with 323 additions and 31 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,12 @@ retentionCheckIntervalInSeconds=120
# Use 0 or negative number to disable the check
maxNumPartitionsPerPartitionedTopic=0

# There are two policies when zookeeper session expired happens, "shutdown" and "reconnect".
# If uses "shutdown" policy, shutdown the broker when zookeeper session expired happens.
# If uses "reconnect" policy, try to reconnect to zookeeper server and re-register metadata to zookeeper.
# Node: the "reconnect" policy is an experiment feature
zookeeperSessionExpiredPolicy=shutdown

# Enable or disable system topic
systemTopicEnabled=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int maxNumPartitionsPerPartitionedTopic = 0;

@FieldContext(
doc = "There are two policies when zookeeper session expired happens, \"shutdown\" and \"reconnect\". \n\n"
+ " If uses \"shutdown\" policy, shutdown the broker when zookeeper session expired happens.\n\n"
+ " If uses \"reconnect\" policy, try to reconnect to zookeeper server and re-register metadata to zookeeper."
)
private String zookeeperSessionExpiredPolicy = "shutdown";

/**** --- Messaging Protocols --- ****/

@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;

/**
* Handlers for broker service to handle Zookeeper session expired
*/
public class ZookeeperSessionExpiredHandlers {

public static final String SHUTDOWN_POLICY = "shutdown";
public static final String RECONNECT_POLICY = "reconnect";

public static ZookeeperSessionExpiredHandler shutdownWhenZookeeperSessionExpired(ShutdownService shutdownService) {
return new ShutDownWhenSessionExpired(shutdownService);
}

public static ZookeeperSessionExpiredHandler reconnectWhenZookeeperSessionExpired(PulsarService pulsarService, ShutdownService shutdownService) {
return new ReconnectWhenSessionExpired(pulsarService, shutdownService);
}

// Shutdown the messaging service when Zookeeper session expired.
public static class ShutDownWhenSessionExpired implements ZookeeperSessionExpiredHandler {

private final ShutdownService shutdownService;
private ZooKeeperSessionWatcher watcher;

public ShutDownWhenSessionExpired(ShutdownService shutdownService) {
this.shutdownService = shutdownService;
}

@Override
public void setWatcher(ZooKeeperSessionWatcher watcher) {
this.watcher = watcher;
}

@Override
public void onSessionExpired() {
this.watcher.close();
this.shutdownService.shutdown(-1);
}
}

// Reconnect to the zookeeper server and re-register ownership cache to avoid ownership change.
public static class ReconnectWhenSessionExpired implements ZookeeperSessionExpiredHandler {

private final PulsarService pulsarService;
private ZooKeeperSessionWatcher watcher;
private final ShutdownService shutdownService;

public ReconnectWhenSessionExpired(PulsarService pulsarService, ShutdownService shutdownService) {
this.pulsarService = pulsarService;
this.shutdownService = shutdownService;
}

@Override
public void onSessionExpired() {
if (this.pulsarService.getNamespaceService() == null) {
this.watcher.close();
this.shutdownService.shutdown(-1);
}
this.pulsarService.getNamespaceService().registerOwnedBundles();
}

@Override
public void setWatcher(ZooKeeperSessionWatcher watcher) {
this.watcher = watcher;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
import org.apache.pulsar.ZookeeperSessionExpiredHandlers;
import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
Expand Down Expand Up @@ -417,7 +419,15 @@ public void start() throws PulsarServerException {
// Now we are ready to start services
localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
config.getZookeeperServers(), config.getZooKeeperSessionTimeoutMillis());
localZooKeeperConnectionProvider.start(shutdownService);
ZookeeperSessionExpiredHandler sessionExpiredHandler = null;
if (ZookeeperSessionExpiredHandlers.RECONNECT_POLICY.equals(config.getZookeeperSessionExpiredPolicy())) {
sessionExpiredHandler = ZookeeperSessionExpiredHandlers.reconnectWhenZookeeperSessionExpired(this, shutdownService);
} else if (ZookeeperSessionExpiredHandlers.SHUTDOWN_POLICY.equals(config.getZookeeperSessionExpiredPolicy())) {
sessionExpiredHandler = ZookeeperSessionExpiredHandlers.shutdownWhenZookeeperSessionExpired(shutdownService);
} else {
throw new IllegalArgumentException("Invalid zookeeper session expired policy " + config.getZookeeperSessionExpiredPolicy());
}
localZooKeeperConnectionProvider.start(sessionExpiredHandler);

// Initialize and start service to access configuration repository.
this.startZkCacheService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@

import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -1267,4 +1268,27 @@ public boolean registerSLANamespace() throws PulsarServerException {
}
return isNameSpaceRegistered;
}

public void registerOwnedBundles() {
List<OwnedBundle> ownedBundles = new ArrayList<>(ownershipCache.getOwnedBundles().values());
ownershipCache.invalidateLocalOwnerCache();
ownedBundles.forEach(ownedBundle -> {
String path = ServiceUnitZkUtils.path(ownedBundle.getNamespaceBundle());
try {
if (!pulsar.getLocalZkCache().checkRegNodeAndWaitExpired(path)) {
ownershipCache.tryAcquiringOwnership(ownedBundle.getNamespaceBundle());
}
} catch (Exception e) {
try {
ownedBundle.handleUnloadRequest(pulsar, 5, TimeUnit.MINUTES);
} catch (IllegalStateException ex) {
// The owned bundle is not in active state.
} catch (Exception ex) {
LOG.error("Unexpected exception occur when register owned bundle {}. Shutdown broker now !!!",
ownedBundle.getNamespaceBundle(), ex);
pulsar.getShutdownService().shutdown(-1);
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ public void updateBundleState(NamespaceBundle bundle, boolean isActive) throws E
}
}

public void invalidateLocalOwnerCache() {
this.ownedBundlesCache.synchronous().invalidateAll();
}

public NamespaceEphemeralData getSelfOwnerInfo() {
return selfOwnerInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher;
import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,17 +68,33 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String zooke
int zookeeperSessionTimeoutMs) throws Exception {
localZkConnectionSvc = new LocalZooKeeperConnectionService(zkClientFactory, zookeeperServers,
zookeeperSessionTimeoutMs);
localZkConnectionSvc.start(exitCode -> {
log.error("Shutting down ZK sessions: {}", exitCode);
localZkConnectionSvc.start(new ZookeeperSessionExpiredHandler() {
@Override
public void onSessionExpired() {
log.error("Shutting down ZK sessions: {}", -1);
}

@Override
public void setWatcher(ZooKeeperSessionWatcher watcher) {

}
});

this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
(int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor);
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
} catch (InterruptedException e) {
log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e);
localZkConnectionSvc.start(new ZookeeperSessionExpiredHandler() {
@Override
public void onSessionExpired() {
try {
localZkCache.getZooKeeper().close();
} catch (InterruptedException e) {
log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e);
}
}

@Override
public void setWatcher(ZooKeeperSessionWatcher watcher) {

}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ public LocalZooKeeperConnectionService(ZooKeeperClientFactory zkClientFactory, S
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
}

public void start(ShutdownService shutdownService) throws IOException {
public void start(ZookeeperSessionExpiredHandler sessionExpiredHandler) throws IOException {
// Connect to local ZK
CompletableFuture<ZooKeeper> zkFuture = zkClientFactory.create(zkConnect, SessionType.ReadWrite,
(int) zkSessionTimeoutMillis);

try {
localZooKeeper = zkFuture.get(zkSessionTimeoutMillis, TimeUnit.MILLISECONDS);
localZooKeeperSessionWatcher = new ZooKeeperSessionWatcher(localZooKeeper, zkSessionTimeoutMillis,
shutdownService);
sessionExpiredHandler);
localZooKeeperSessionWatcher.start();
localZooKeeper.register(localZooKeeperSessionWatcher);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.pulsar.common.util.FutureUtil;
Expand All @@ -49,6 +51,7 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
Expand Down Expand Up @@ -503,4 +506,49 @@ public void stop() {

this.backgroundExecutor.shutdown();
}

public boolean checkRegNodeAndWaitExpired(String regPath) throws IOException {
final CountDownLatch prevNodeLatch = new CountDownLatch(1);
Watcher zkPrevRegNodewatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
// Check for prev znode deletion. Connection expiration is
// not handling, since bookie has logic to shutdown.
if (EventType.NodeDeleted == event.getType()) {
prevNodeLatch.countDown();
}
}
};
try {
Stat stat = getZooKeeper().exists(regPath, zkPrevRegNodewatcher);
if (null != stat) {
// if the ephemeral owner isn't current zookeeper client
// wait for it to be expired.
if (stat.getEphemeralOwner() != getZooKeeper().getSessionId()) {
log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:"
+ " {} ms for znode deletion", regPath, getZooKeeper().getSessionTimeout());
// waiting for the previous bookie reg znode deletion
if (!prevNodeLatch.await(getZooKeeper().getSessionTimeout(), TimeUnit.MILLISECONDS)) {
throw new NodeExistsException(regPath);
} else {
return false;
}
}
return true;
} else {
return false;
}
} catch (KeeperException ke) {
log.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke);
throw new IOException("ZK exception checking and wait ephemeral znode "
+ regPath + " expired", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie);
throw new IOException("Interrupted checking and wait ephemeral znode "
+ regPath + " expired", ie);
}
}

private static Logger log = LoggerFactory.getLogger(ZooKeeperCache.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ default void run() {

private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSessionWatcher.class);

private final ShutdownService shutdownService;
private final ZookeeperSessionExpiredHandler sessionExpiredHandler;
private final ZooKeeper zk;
// Maximum time to wait for ZK session to be re-connected to quorum (set to 5/6 of SessionTimeout)
private final long monitorTimeoutMillis;
Expand All @@ -68,11 +68,12 @@ default void run() {
private volatile boolean zkOperationCompleted = false;
private ScheduledFuture<?> task;

public ZooKeeperSessionWatcher(ZooKeeper zk, long zkSessionTimeoutMillis, ShutdownService shutdownService) {
public ZooKeeperSessionWatcher(ZooKeeper zk, long zkSessionTimeoutMillis, ZookeeperSessionExpiredHandler sessionExpiredHandler) {
this.zk = zk;
this.monitorTimeoutMillis = zkSessionTimeoutMillis * 5 / 6;
this.tickTimeMillis = zkSessionTimeoutMillis / 15;
this.shutdownService = shutdownService;
this.sessionExpiredHandler = sessionExpiredHandler;
this.sessionExpiredHandler.setWatcher(this);
}

public void start() {
Expand Down Expand Up @@ -100,9 +101,7 @@ public void process(WatchedEvent event) {
case None:
if (eventState == Watcher.Event.KeeperState.Expired) {
LOG.error("ZooKeeper session already expired, invoking shutdown");
close();
shuttingDown = true;
shutdownService.shutdown(-1);
sessionExpiredHandler.onSessionExpired();
}
break;
default:
Expand Down Expand Up @@ -151,10 +150,8 @@ public synchronized void run() {
keeperState = Watcher.Event.KeeperState.Disconnected;
}
if (keeperState == Watcher.Event.KeeperState.Expired) {
LOG.error("zoo keeper session expired, invoking shutdown service");
close();
shuttingDown = true;
shutdownService.shutdown(-1);
LOG.error("zookeeper session expired, invoking shutdown service");
sessionExpiredHandler.onSessionExpired();
} else if (keeperState == Watcher.Event.KeeperState.Disconnected) {
if (disconnectedAt == 0) {
// this is the first disconnect, we should monitor the time out from now, so we record the time of
Expand All @@ -166,9 +163,7 @@ public synchronized void run() {
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - disconnectedAt);
if (timeRemainingMillis <= 0) {
LOG.error("timeout expired for reconnecting, invoking shutdown service");
close();
shuttingDown = true;
shutdownService.shutdown(-1);
sessionExpiredHandler.onSessionExpired();
} else {
LOG.warn("zoo keeper disconnected, waiting to reconnect, time remaining = {} seconds",
TimeUnit.MILLISECONDS.toSeconds(timeRemainingMillis));
Expand All @@ -189,5 +184,6 @@ public void close() {
if (scheduler != null) {
scheduler.shutdownNow();
}
shuttingDown = true;
}
}
Loading

0 comments on commit df7a8ce

Please sign in to comment.