From 43161e581054c8d92a815475da3ad4689194c96b Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Tue, 18 Feb 2020 00:36:38 +0800 Subject: [PATCH 1/6] Add zookeeper session expired policy. --- conf/broker.conf | 5 ++ .../pulsar/broker/ServiceConfiguration.java | 7 ++ .../ZookeeperSessionExpiredHandlers.java | 90 +++++++++++++++++++ .../apache/pulsar/broker/PulsarService.java | 12 ++- .../broker/namespace/NamespaceService.java | 23 +++++ .../broker/namespace/OwnershipCache.java | 4 + .../apache/pulsar/client/impl/MQTTTests.java | 18 ++++ .../service/web/ZookeeperCacheLoader.java | 32 +++++-- .../LocalZooKeeperConnectionService.java | 4 +- .../pulsar/zookeeper/ZooKeeperCache.java | 47 ++++++++++ .../zookeeper/ZooKeeperSessionWatcher.java | 22 ++--- .../ZookeeperSessionExpiredHandler.java | 32 +++++++ .../ZooKeeperSessionWatcherTest.java | 47 +++++++++- 13 files changed, 316 insertions(+), 27 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/MQTTTests.java create mode 100644 pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java diff --git a/conf/broker.conf b/conf/broker.conf index 7594e1e4a8e37..c5a2edb70c344 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -357,6 +357,11 @@ retentionCheckIntervalInSeconds=120 # Use 0 or negative number to disable the check maxNumPartitionsPerPartitionedTopic=0 +# There are two policies when zookeeper session expired happens, "shutdown" and "reconnect". +# If uses "shutdown" policy, shutdown the broker when zookeeper session expired happens. +# If uses "reconnect" policy, try to reconnect to zookeeper server and re-register metadata to zookeeper. +zookeeperSessionExpiredPolicy=shutdown + ### --- Authentication --- ### # Role names that are treated as "proxy roles". If the broker sees a request with #role as proxyRoles - it will demand to see a valid original principal. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 34163b489380a..89ad7faee46b6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -660,6 +660,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int maxNumPartitionsPerPartitionedTopic = 0; + @FieldContext( + doc = "There are two policies when zookeeper session expired happens, \"shutdown\" and \"reconnect\". \n\n" + + " If uses \"shutdown\" policy, shutdown the broker when zookeeper session expired happens.\n\n" + + " If uses \"reconnect\" policy, try to reconnect to zookeeper server and re-register metadata to zookeeper." + ) + private String zookeeperSessionExpiredPolicy = "shutdown"; + /**** --- Messaging Protocols --- ****/ @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java b/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java new file mode 100644 index 0000000000000..e6541939d7dec --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar; + +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher; +import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService; +import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler; + +/** + * Handlers for broker service to handle Zookeeper session expired + */ +public class ZookeeperSessionExpiredHandlers { + + public static final String SHUTDOWN_POLICY = "shutdown"; + public static final String RECONNECT_POLICY = "reconnect"; + + public static ZookeeperSessionExpiredHandler shutdownWhenZookeeperSessionExpired(ShutdownService shutdownService) { + return new ShutDownWhenSessionExpired(shutdownService); + } + + public static ZookeeperSessionExpiredHandler reconnectWhenZookeeperSessionExpired(PulsarService pulsarService, ShutdownService shutdownService) { + return new ReconnectWhenSessionExpired(pulsarService, shutdownService); + } + + // Shutdown the messaging service when Zookeeper session expired. + public static class ShutDownWhenSessionExpired implements ZookeeperSessionExpiredHandler { + + private final ShutdownService shutdownService; + private ZooKeeperSessionWatcher watcher; + + public ShutDownWhenSessionExpired(ShutdownService shutdownService) { + this.shutdownService = shutdownService; + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + this.watcher = watcher; + } + + @Override + public void onSessionExpired() { + this.watcher.close(); + this.shutdownService.shutdown(-1); + } + } + + // Reconnect to the zookeeper server and re-register ownership cache to avoid ownership change. + public static class ReconnectWhenSessionExpired implements ZookeeperSessionExpiredHandler { + + private final PulsarService pulsarService; + private ZooKeeperSessionWatcher watcher; + private final ShutdownService shutdownService; + + public ReconnectWhenSessionExpired(PulsarService pulsarService, ShutdownService shutdownService) { + this.pulsarService = pulsarService; + this.shutdownService = shutdownService; + } + + @Override + public void onSessionExpired() { + if (this.pulsarService.getNamespaceService() == null) { + this.watcher.close(); + this.shutdownService.shutdown(-1); + } + this.pulsarService.getNamespaceService().registerOwnedBundles(); + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + this.watcher = watcher; + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 21a73a0b6bdc2..e7cc98b7e79b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -126,6 +126,8 @@ import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl; import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService; +import org.apache.pulsar.ZookeeperSessionExpiredHandlers; +import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -393,7 +395,15 @@ public void start() throws PulsarServerException { // Now we are ready to start services localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), config.getZookeeperServers(), config.getZooKeeperSessionTimeoutMillis()); - localZooKeeperConnectionProvider.start(shutdownService); + ZookeeperSessionExpiredHandler sessionExpiredHandler = null; + if (ZookeeperSessionExpiredHandlers.RECONNECT_POLICY.equals(config.getZookeeperSessionExpiredPolicy())) { + sessionExpiredHandler = ZookeeperSessionExpiredHandlers.reconnectWhenZookeeperSessionExpired(this, shutdownService); + } else if (ZookeeperSessionExpiredHandlers.SHUTDOWN_POLICY.equals(config.getZookeeperSessionExpiredPolicy())) { + sessionExpiredHandler = ZookeeperSessionExpiredHandlers.shutdownWhenZookeeperSessionExpired(shutdownService); + } else { + throw new IllegalArgumentException("Invalid zookeeper session expired policy " + config.getZookeeperSessionExpiredPolicy()); + } + localZooKeeperConnectionProvider.start(sessionExpiredHandler); // Initialize and start service to access configuration repository. this.startZkCacheService(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 24d933d0f2b3f..ee6e424c2e38b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1214,4 +1214,27 @@ public boolean registerSLANamespace() throws PulsarServerException { } return isNameSpaceRegistered; } + + public void registerOwnedBundles() { + List ownedBundles = new ArrayList<>(ownershipCache.getOwnedBundles().values()); + ownershipCache.invalidateLocalOwnerCache(); + ownedBundles.forEach(ownedBundle -> { + String path = ServiceUnitZkUtils.path(ownedBundle.getNamespaceBundle()); + try { + if (!pulsar.getLocalZkCache().checkRegNodeAndWaitExpired(path)) { + ownershipCache.tryAcquiringOwnership(ownedBundle.getNamespaceBundle()); + } + } catch (Exception e) { + try { + ownedBundle.handleUnloadRequest(pulsar, 5, TimeUnit.MINUTES); + } catch (IllegalStateException ex) { + // The owned bundle is not in active state. + } catch (Exception ex) { + LOG.error("Unexpected exception occur when register owned bundle {}. Shutdown broker now !!!", + ownedBundle.getNamespaceBundle(), ex); + pulsar.getShutdownService().shutdown(-1); + } + } + }); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index 50e96fadf51fb..46b0b2faa55f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -372,6 +372,10 @@ public void updateBundleState(NamespaceBundle bundle, boolean isActive) throws E } } + public void invalidateLocalOwnerCache() { + this.ownedBundlesCache.synchronous().invalidateAll(); + } + public NamespaceEphemeralData getSelfOwnerInfo() { return selfOwnerInfo; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MQTTTests.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MQTTTests.java new file mode 100644 index 0000000000000..41e23f191b245 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MQTTTests.java @@ -0,0 +1,18 @@ +package org.apache.pulsar.client.impl; + +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; + +public class MQTTTests { + + public static void main(String[] args) throws PulsarClientException, InterruptedException { + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:15002").build(); + + Producer producer = client.newProducer(Schema.STRING).topic("persistent://public/default/mop").enableBatching(false).create(); + for (int i = 0 ; i < 100000; i++) { + producer.send("Mop -> " + i); + } + } +} diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java index 66341fd6aab56..56332e7316fd4 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java @@ -34,6 +34,8 @@ import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; +import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher; +import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,17 +68,33 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String zooke int zookeeperSessionTimeoutMs) throws Exception { localZkConnectionSvc = new LocalZooKeeperConnectionService(zkClientFactory, zookeeperServers, zookeeperSessionTimeoutMs); - localZkConnectionSvc.start(exitCode -> { - log.error("Shutting down ZK sessions: {}", exitCode); + localZkConnectionSvc.start(new ZookeeperSessionExpiredHandler() { + @Override + public void onSessionExpired() { + log.error("Shutting down ZK sessions: {}", -1); + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + + } }); this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), (int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor); - localZkConnectionSvc.start(exitCode -> { - try { - localZkCache.getZooKeeper().close(); - } catch (InterruptedException e) { - log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e); + localZkConnectionSvc.start(new ZookeeperSessionExpiredHandler() { + @Override + public void onSessionExpired() { + try { + localZkCache.getZooKeeper().close(); + } catch (InterruptedException e) { + log.warn("Failed to shutdown ZooKeeper gracefully {}", e.getMessage(), e); + } + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + } }); diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java index 6334c29d53fa7..e4be27e53a9d6 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java @@ -59,7 +59,7 @@ public LocalZooKeeperConnectionService(ZooKeeperClientFactory zkClientFactory, S this.zkSessionTimeoutMillis = zkSessionTimeoutMillis; } - public void start(ShutdownService shutdownService) throws IOException { + public void start(ZookeeperSessionExpiredHandler sessionExpiredHandler) throws IOException { // Connect to local ZK CompletableFuture zkFuture = zkClientFactory.create(zkConnect, SessionType.ReadWrite, (int) zkSessionTimeoutMillis); @@ -67,7 +67,7 @@ public void start(ShutdownService shutdownService) throws IOException { try { localZooKeeper = zkFuture.get(zkSessionTimeoutMillis, TimeUnit.MILLISECONDS); localZooKeeperSessionWatcher = new ZooKeeperSessionWatcher(localZooKeeper, zkSessionTimeoutMillis, - shutdownService); + sessionExpiredHandler); localZooKeeperSessionWatcher.start(); localZooKeeper.register(localZooKeeperSessionWatcher); } catch (Exception e) { diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java index 2688c10c40fd3..5ad16843d0f9d 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java @@ -34,12 +34,14 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.util.SafeRunnable; import org.apache.pulsar.common.util.FutureUtil; @@ -49,6 +51,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; @@ -64,6 +67,7 @@ * * @param */ +@Slf4j public abstract class ZooKeeperCache implements Watcher { public static interface Deserializer { T deserialize(String key, byte[] content) throws Exception; @@ -503,4 +507,47 @@ public void stop() { this.backgroundExecutor.shutdown(); } + + public boolean checkRegNodeAndWaitExpired(String regPath) throws IOException { + final CountDownLatch prevNodeLatch = new CountDownLatch(1); + Watcher zkPrevRegNodewatcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + // Check for prev znode deletion. Connection expiration is + // not handling, since bookie has logic to shutdown. + if (EventType.NodeDeleted == event.getType()) { + prevNodeLatch.countDown(); + } + } + }; + try { + Stat stat = getZooKeeper().exists(regPath, zkPrevRegNodewatcher); + if (null != stat) { + // if the ephemeral owner isn't current zookeeper client + // wait for it to be expired. + if (stat.getEphemeralOwner() != getZooKeeper().getSessionId()) { + log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:" + + " {} ms for znode deletion", regPath, getZooKeeper().getSessionTimeout()); + // waiting for the previous bookie reg znode deletion + if (!prevNodeLatch.await(getZooKeeper().getSessionTimeout(), TimeUnit.MILLISECONDS)) { + throw new NodeExistsException(regPath); + } else { + return false; + } + } + return true; + } else { + return false; + } + } catch (KeeperException ke) { + log.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke); + throw new IOException("ZK exception checking and wait ephemeral znode " + + regPath + " expired", ke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie); + throw new IOException("Interrupted checking and wait ephemeral znode " + + regPath + " expired", ie); + } + } } diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java index 5bd93264eea13..fa70c9ea47529 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcher.java @@ -54,7 +54,7 @@ default void run() { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSessionWatcher.class); - private final ShutdownService shutdownService; + private final ZookeeperSessionExpiredHandler sessionExpiredHandler; private final ZooKeeper zk; // Maximum time to wait for ZK session to be re-connected to quorum (set to 5/6 of SessionTimeout) private final long monitorTimeoutMillis; @@ -68,11 +68,12 @@ default void run() { private volatile boolean zkOperationCompleted = false; private ScheduledFuture task; - public ZooKeeperSessionWatcher(ZooKeeper zk, long zkSessionTimeoutMillis, ShutdownService shutdownService) { + public ZooKeeperSessionWatcher(ZooKeeper zk, long zkSessionTimeoutMillis, ZookeeperSessionExpiredHandler sessionExpiredHandler) { this.zk = zk; this.monitorTimeoutMillis = zkSessionTimeoutMillis * 5 / 6; this.tickTimeMillis = zkSessionTimeoutMillis / 15; - this.shutdownService = shutdownService; + this.sessionExpiredHandler = sessionExpiredHandler; + this.sessionExpiredHandler.setWatcher(this); } public void start() { @@ -100,9 +101,7 @@ public void process(WatchedEvent event) { case None: if (eventState == Watcher.Event.KeeperState.Expired) { LOG.error("ZooKeeper session already expired, invoking shutdown"); - close(); - shuttingDown = true; - shutdownService.shutdown(-1); + sessionExpiredHandler.onSessionExpired(); } break; default: @@ -151,10 +150,8 @@ public synchronized void run() { keeperState = Watcher.Event.KeeperState.Disconnected; } if (keeperState == Watcher.Event.KeeperState.Expired) { - LOG.error("zoo keeper session expired, invoking shutdown service"); - close(); - shuttingDown = true; - shutdownService.shutdown(-1); + LOG.error("zookeeper session expired, invoking shutdown service"); + sessionExpiredHandler.onSessionExpired(); } else if (keeperState == Watcher.Event.KeeperState.Disconnected) { if (disconnectedAt == 0) { // this is the first disconnect, we should monitor the time out from now, so we record the time of @@ -166,9 +163,7 @@ public synchronized void run() { - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - disconnectedAt); if (timeRemainingMillis <= 0) { LOG.error("timeout expired for reconnecting, invoking shutdown service"); - close(); - shuttingDown = true; - shutdownService.shutdown(-1); + sessionExpiredHandler.onSessionExpired(); } else { LOG.warn("zoo keeper disconnected, waiting to reconnect, time remaining = {} seconds", TimeUnit.MILLISECONDS.toSeconds(timeRemainingMillis)); @@ -189,5 +184,6 @@ public void close() { if (scheduler != null) { scheduler.shutdownNow(); } + shuttingDown = true; } } diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java new file mode 100644 index 0000000000000..fc2f180660946 --- /dev/null +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZookeeperSessionExpiredHandler.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.zookeeper; + +/** + * Handler interface on Zookeeper session expired + */ +public interface ZookeeperSessionExpiredHandler { + + /** + * Signal when zookeeper session is expired. + */ + void onSessionExpired(); + + void setWatcher(ZooKeeperSessionWatcher watcher); +} diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java index 30c4c2d074751..8a0bec9ab0e7d 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java @@ -23,7 +23,6 @@ import static org.testng.Assert.assertTrue; import org.apache.zookeeper.KeeperException.Code; -import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.EventType; @@ -56,7 +55,20 @@ public int getExitCode() { void setup() { zkClient = MockZooKeeper.newInstance(); shutdownService = new MockShutdownService(); - sessionWatcher = new ZooKeeperSessionWatcher(zkClient, 1000, shutdownService); + sessionWatcher = new ZooKeeperSessionWatcher(zkClient, 1000, new ZookeeperSessionExpiredHandler() { + + private ZooKeeperSessionWatcher watcher; + @Override + public void onSessionExpired() { + watcher.close(); + shutdownService.shutdown(-1); + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + this.watcher = watcher; + } + }); } @AfterMethod @@ -115,7 +127,20 @@ void testProcessResultNoNode() { @Test void testRun1() throws Exception { - ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 1000, shutdownService); + ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 1000, + new ZookeeperSessionExpiredHandler() { + private ZooKeeperSessionWatcher watcher; + @Override + public void onSessionExpired() { + watcher.close(); + shutdownService.shutdown(-1); + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + this.watcher = watcher; + } + }); sessionWatcherZkNull.run(); assertFalse(sessionWatcherZkNull.isShutdownStarted()); assertEquals(sessionWatcherZkNull.getKeeperState(), KeeperState.Disconnected); @@ -125,7 +150,21 @@ void testRun1() throws Exception { @Test void testRun2() throws Exception { - ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 0, shutdownService); + ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 0, + new ZookeeperSessionExpiredHandler() { + + private ZooKeeperSessionWatcher watcher; + @Override + public void onSessionExpired() { + watcher.close(); + shutdownService.shutdown(-1); + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + this.watcher = watcher; + } + }); sessionWatcherZkNull.run(); assertTrue(sessionWatcherZkNull.isShutdownStarted()); assertEquals(sessionWatcherZkNull.getKeeperState(), KeeperState.Disconnected); From 8f39e20a8b7b8aab805a63e8bc2087cdec46d474 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Tue, 18 Feb 2020 00:46:49 +0800 Subject: [PATCH 2/6] Fix log --- .../main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java index 5ad16843d0f9d..ff132f27b5a0a 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java @@ -67,7 +67,6 @@ * * @param */ -@Slf4j public abstract class ZooKeeperCache implements Watcher { public static interface Deserializer { T deserialize(String key, byte[] content) throws Exception; @@ -550,4 +549,6 @@ public void process(WatchedEvent event) { + regPath + " expired", ie); } } + + private static Logger log = LoggerFactory.getLogger(ZooKeeperCache.class); } From e205849f8000c428614d5066f1aabd8ffce9ee82 Mon Sep 17 00:00:00 2001 From: penghui Date: Fri, 22 May 2020 18:25:34 +0800 Subject: [PATCH 3/6] delete unused code --- .../apache/pulsar/client/impl/MQTTTests.java | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/MQTTTests.java diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MQTTTests.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MQTTTests.java deleted file mode 100644 index 41e23f191b245..0000000000000 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MQTTTests.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.apache.pulsar.client.impl; - -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; - -public class MQTTTests { - - public static void main(String[] args) throws PulsarClientException, InterruptedException { - PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:15002").build(); - - Producer producer = client.newProducer(Schema.STRING).topic("persistent://public/default/mop").enableBatching(false).create(); - for (int i = 0 ; i < 100000; i++) { - producer.send("Mop -> " + i); - } - } -} From 30347ff6c446778b555ef8374833f255cced5e46 Mon Sep 17 00:00:00 2001 From: penghui Date: Fri, 22 May 2020 19:32:58 +0800 Subject: [PATCH 4/6] Fix missed imports --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index ee6e424c2e38b..ae6231a8bdf1f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -73,6 +73,7 @@ import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; From 127ea4ecfa78715c097887ba8bbaa57468846180 Mon Sep 17 00:00:00 2001 From: penghui Date: Wed, 27 May 2020 14:03:29 +0800 Subject: [PATCH 5/6] Fix tests issue --- .../LocalZooKeeperConnectionServiceTest.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java index aaf8b2486d64f..14e17840ddf57 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java @@ -36,7 +36,16 @@ void testSimpleZooKeeperConnection() throws Exception { MockedZooKeeperClientFactoryImpl mockZkClientFactory = new MockedZooKeeperClientFactoryImpl(); LocalZooKeeperConnectionService localZkConnectionService = new LocalZooKeeperConnectionService( mockZkClientFactory, "dummy", 1000); - localZkConnectionService.start(null); + localZkConnectionService.start(new ZookeeperSessionExpiredHandler() { + @Override + public void onSessionExpired() { + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + + } + }); // Get ZooKeeper client MockZooKeeper zk = (MockZooKeeper) localZkConnectionService.getLocalZooKeeper(); @@ -92,7 +101,16 @@ void testSimpleZooKeeperConnectionFail() throws Exception { LocalZooKeeperConnectionService localZkConnectionService = new LocalZooKeeperConnectionService( new ZookeeperClientFactoryImpl(), "dummy", 1000); try { - localZkConnectionService.start(null); + localZkConnectionService.start(new ZookeeperSessionExpiredHandler() { + @Override + public void onSessionExpired() { + } + + @Override + public void setWatcher(ZooKeeperSessionWatcher watcher) { + + } + }); fail("should fail"); } catch (Exception e) { assertTrue(e.getMessage().contains("Failed to establish session with local ZK")); From 013199adc932a33e821455568eda8dd3c3944a6b Mon Sep 17 00:00:00 2001 From: penghui Date: Fri, 5 Jun 2020 11:13:59 +0800 Subject: [PATCH 6/6] Fix tests --- .../pulsar/zookeeper/ZooKeeperSessionWatcherTest.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java index 68fb4af4f4eeb..5b49b2b781261 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java @@ -140,11 +140,6 @@ public void setWatcher(ZooKeeperSessionWatcher watcher) { this.watcher = watcher; } }); - } - - @Test - public void testRun1() throws Exception { - ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 1000, shutdownService); sessionWatcherZkNull.run(); assertFalse(sessionWatcherZkNull.isShutdownStarted()); assertEquals(sessionWatcherZkNull.getKeeperState(), KeeperState.Disconnected); @@ -169,11 +164,6 @@ public void setWatcher(ZooKeeperSessionWatcher watcher) { this.watcher = watcher; } }); - } - - @Test - public void testRun2() throws Exception { - ZooKeeperSessionWatcher sessionWatcherZkNull = new ZooKeeperSessionWatcher(null, 0, shutdownService); sessionWatcherZkNull.run(); assertTrue(sessionWatcherZkNull.isShutdownStarted()); assertEquals(sessionWatcherZkNull.getKeeperState(), KeeperState.Disconnected);