From 050a1477ab6b9efd2b3ee091af59276d20960ed8 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 12 Aug 2024 13:55:44 +0800 Subject: [PATCH 1/5] Should notify bundle ownership listener onLoad event when ServiceUnitState start --- .../channel/ServiceUnitStateChannelImpl.java | 11 ++++ .../ExtensibleLoadManagerImplTest.java | 50 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index dbe3b88b61f28..b41e2e3b1c539 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -324,6 +324,17 @@ public synchronized void start() throws PulsarServerException { "topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())) .create(); + tableview.forEach((serviceUnit, data) -> { + if (debug) { + log.info("Loaded the service unit state data. serviceUnit: {}, data: {}", serviceUnit, data); + } + ServiceUnitState state = state(data); + if (state.equals(Owned) && isTargetBroker(data.dstBroker())) { + pulsar.getNamespaceService() + .onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit)); + stateChangeListeners.notify(serviceUnit, data, null); + } + }); tableview.listen((key, value) -> handle(key, value)); var strategy = (ServiceUnitStateCompactionStrategy) TopicCompactionStrategy.getInstance(TABLE_VIEW_TAG); if (strategy == null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 69a65caf2943c..51966f420bf25 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -417,6 +417,56 @@ public boolean test(NamespaceBundle namespaceBundle) { } } + @Test(timeOut = 30 * 1000) + public void testNamespaceOwnershipListener() throws Exception { + Pair topicAndBundle = + getBundleIsNotOwnByChangeEventTopic("test-namespace-ownership-listener"); + TopicName topicName = topicAndBundle.getLeft(); + NamespaceBundle bundle = topicAndBundle.getRight(); + + String broker = admin.lookups().lookupTopic(topicName.toString()); + log.info("Assign the bundle {} to {}", bundle, broker); + + checkOwnershipState(broker, bundle); + + AtomicInteger onloadCount = new AtomicInteger(0); + AtomicInteger unloadCount = new AtomicInteger(0); + + NamespaceBundleOwnershipListener listener = new NamespaceBundleOwnershipListener() { + @Override + public void onLoad(NamespaceBundle bundle) { + onloadCount.incrementAndGet(); + } + + @Override + public void unLoad(NamespaceBundle bundle) { + unloadCount.incrementAndGet(); + } + + @Override + public boolean test(NamespaceBundle namespaceBundle) { + return namespaceBundle.equals(bundle); + } + }; + pulsar1.getNamespaceService().addNamespaceBundleOwnershipListener(listener); + pulsar2.getNamespaceService().addNamespaceBundleOwnershipListener(listener); + + // There are a service unit state channel already started, when add listener, it will trigger the onload event. + Awaitility.await().untilAsserted(() -> { + assertEquals(onloadCount.get(), 1); + assertEquals(unloadCount.get(), 0); + }); + + ServiceUnitStateChannelImpl channel = new ServiceUnitStateChannelImpl(pulsar1); + channel.start(); + Awaitility.await().untilAsserted(() -> { + assertEquals(onloadCount.get(), 2); + assertEquals(unloadCount.get(), 0); + }); + + channel.close(); + } + @DataProvider(name = "isPersistentTopicSubscriptionTypeTest") public Object[][] isPersistentTopicSubscriptionTypeTest() { return new Object[][]{ From 653afb4bf58b3f5d6b95c88e6117a71c655eddf2 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 12 Aug 2024 14:22:38 +0800 Subject: [PATCH 2/5] Update --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index b41e2e3b1c539..a238fd59dc194 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -324,6 +324,7 @@ public synchronized void start() throws PulsarServerException { "topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())) .create(); + tableview.listen((key, value) -> handle(key, value)); tableview.forEach((serviceUnit, data) -> { if (debug) { log.info("Loaded the service unit state data. serviceUnit: {}, data: {}", serviceUnit, data); @@ -332,10 +333,8 @@ public synchronized void start() throws PulsarServerException { if (state.equals(Owned) && isTargetBroker(data.dstBroker())) { pulsar.getNamespaceService() .onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit)); - stateChangeListeners.notify(serviceUnit, data, null); } }); - tableview.listen((key, value) -> handle(key, value)); var strategy = (ServiceUnitStateCompactionStrategy) TopicCompactionStrategy.getInstance(TABLE_VIEW_TAG); if (strategy == null) { String err = TABLE_VIEW_TAG + "tag TopicCompactionStrategy is null."; From 7dabfd49d8e0674c4b1fe82fb4b1be118f682679 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 12 Aug 2024 22:47:57 +0800 Subject: [PATCH 3/5] Use `forEachAndListen` --- .../channel/ServiceUnitStateChannelImpl.java | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index a238fd59dc194..fd418242545fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -324,17 +324,7 @@ public synchronized void start() throws PulsarServerException { "topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())) .create(); - tableview.listen((key, value) -> handle(key, value)); - tableview.forEach((serviceUnit, data) -> { - if (debug) { - log.info("Loaded the service unit state data. serviceUnit: {}, data: {}", serviceUnit, data); - } - ServiceUnitState state = state(data); - if (state.equals(Owned) && isTargetBroker(data.dstBroker())) { - pulsar.getNamespaceService() - .onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit)); - } - }); + tableview.forEachAndListen((key, value) -> handle(key, value)); var strategy = (ServiceUnitStateCompactionStrategy) TopicCompactionStrategy.getInstance(TABLE_VIEW_TAG); if (strategy == null) { String err = TABLE_VIEW_TAG + "tag TopicCompactionStrategy is null."; From db3a31efec5e90a3af7f79f780a4337635748afb Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 12 Aug 2024 23:08:43 +0800 Subject: [PATCH 4/5] Revert "Use `forEachAndListen`" This reverts commit 7dabfd49d8e0674c4b1fe82fb4b1be118f682679. --- .../channel/ServiceUnitStateChannelImpl.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index fd418242545fa..a238fd59dc194 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -324,7 +324,17 @@ public synchronized void start() throws PulsarServerException { "topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())) .create(); - tableview.forEachAndListen((key, value) -> handle(key, value)); + tableview.listen((key, value) -> handle(key, value)); + tableview.forEach((serviceUnit, data) -> { + if (debug) { + log.info("Loaded the service unit state data. serviceUnit: {}, data: {}", serviceUnit, data); + } + ServiceUnitState state = state(data); + if (state.equals(Owned) && isTargetBroker(data.dstBroker())) { + pulsar.getNamespaceService() + .onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit)); + } + }); var strategy = (ServiceUnitStateCompactionStrategy) TopicCompactionStrategy.getInstance(TABLE_VIEW_TAG); if (strategy == null) { String err = TABLE_VIEW_TAG + "tag TopicCompactionStrategy is null."; From 8b02802b3e9aa2ef4d2537bf5650d20129b18240 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 16 Aug 2024 19:41:07 +0800 Subject: [PATCH 5/5] Rename the handle event method --- .../channel/ServiceUnitStateChannelImpl.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index a238fd59dc194..1063f8124ece8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -324,17 +324,8 @@ public synchronized void start() throws PulsarServerException { "topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())) .create(); - tableview.listen((key, value) -> handle(key, value)); - tableview.forEach((serviceUnit, data) -> { - if (debug) { - log.info("Loaded the service unit state data. serviceUnit: {}, data: {}", serviceUnit, data); - } - ServiceUnitState state = state(data); - if (state.equals(Owned) && isTargetBroker(data.dstBroker())) { - pulsar.getNamespaceService() - .onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit)); - } - }); + tableview.listen(this::handleEvent); + tableview.forEach(this::handleExisting); var strategy = (ServiceUnitStateCompactionStrategy) TopicCompactionStrategy.getInstance(TABLE_VIEW_TAG); if (strategy == null) { String err = TABLE_VIEW_TAG + "tag TopicCompactionStrategy is null."; @@ -700,7 +691,7 @@ public CompletableFuture publishSplitEventAsync(Split split) { }).thenApply(__ -> null); } - private void handle(String serviceUnit, ServiceUnitStateData data) { + private void handleEvent(String serviceUnit, ServiceUnitStateData data) { long totalHandledRequests = getHandlerTotalCounter(data).incrementAndGet(); if (debug()) { log.info("{} received a handle request for serviceUnit:{}, data:{}. totalHandledRequests:{}", @@ -726,6 +717,17 @@ private void handle(String serviceUnit, ServiceUnitStateData data) { } } + private void handleExisting(String serviceUnit, ServiceUnitStateData data) { + if (debug()) { + log.info("Loaded the service unit state data. serviceUnit: {}, data: {}", serviceUnit, data); + } + ServiceUnitState state = state(data); + if (state.equals(Owned) && isTargetBroker(data.dstBroker())) { + pulsar.getNamespaceService() + .onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit)); + } + } + private static boolean isTransferCommand(ServiceUnitStateData data) { if (data == null) { return false;