Skip to content

Commit

Permalink
[improve][broker] Should notify bundle ownership listener onLoad even…
Browse files Browse the repository at this point in the history
…t when ServiceUnitState start (ExtensibleLoadManagerImpl only) (apache#23152)

(cherry picked from commit 3053b64)
(cherry picked from commit 9a090f7)
  • Loading branch information
Demogorgon314 authored and nikhil-ctds committed Aug 22, 2024
1 parent f8b8aac commit cfa1e0c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ public synchronized void start() throws PulsarServerException {
"topicCompactionStrategyClassName",
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
tableview.listen((key, value) -> handle(key, value));
tableview.listen(this::handleEvent);
tableview.forEach(this::handleExisting);
var strategy = (ServiceUnitStateCompactionStrategy) TopicCompactionStrategy.getInstance(TABLE_VIEW_TAG);
if (strategy == null) {
String err = TABLE_VIEW_TAG + "tag TopicCompactionStrategy is null.";
Expand Down Expand Up @@ -663,7 +664,7 @@ public CompletableFuture<Void> publishSplitEventAsync(Split split) {
}).thenApply(__ -> null);
}

private void handle(String serviceUnit, ServiceUnitStateData data) {
private void handleEvent(String serviceUnit, ServiceUnitStateData data) {
long totalHandledRequests = getHandlerTotalCounter(data).incrementAndGet();
if (debug()) {
log.info("{} received a handle request for serviceUnit:{}, data:{}. totalHandledRequests:{}",
Expand All @@ -689,6 +690,17 @@ private void handle(String serviceUnit, ServiceUnitStateData data) {
}
}

private void handleExisting(String serviceUnit, ServiceUnitStateData data) {
if (debug()) {
log.info("Loaded the service unit state data. serviceUnit: {}, data: {}", serviceUnit, data);
}
ServiceUnitState state = state(data);
if (state.equals(Owned) && isTargetBroker(data.dstBroker())) {
pulsar.getNamespaceService()
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit));
}
}

private static boolean isTransferCommand(ServiceUnitStateData data) {
if (data == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,56 @@ public boolean test(NamespaceBundle namespaceBundle) {
}
}

@Test(timeOut = 30 * 1000)
public void testNamespaceOwnershipListener() throws Exception {
Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("test-namespace-ownership-listener");
TopicName topicName = topicAndBundle.getLeft();
NamespaceBundle bundle = topicAndBundle.getRight();

String broker = admin.lookups().lookupTopic(topicName.toString());
log.info("Assign the bundle {} to {}", bundle, broker);

checkOwnershipState(broker, bundle);

AtomicInteger onloadCount = new AtomicInteger(0);
AtomicInteger unloadCount = new AtomicInteger(0);

NamespaceBundleOwnershipListener listener = new NamespaceBundleOwnershipListener() {
@Override
public void onLoad(NamespaceBundle bundle) {
onloadCount.incrementAndGet();
}

@Override
public void unLoad(NamespaceBundle bundle) {
unloadCount.incrementAndGet();
}

@Override
public boolean test(NamespaceBundle namespaceBundle) {
return namespaceBundle.equals(bundle);
}
};
pulsar1.getNamespaceService().addNamespaceBundleOwnershipListener(listener);
pulsar2.getNamespaceService().addNamespaceBundleOwnershipListener(listener);

// There are a service unit state channel already started, when add listener, it will trigger the onload event.
Awaitility.await().untilAsserted(() -> {
assertEquals(onloadCount.get(), 1);
assertEquals(unloadCount.get(), 0);
});

ServiceUnitStateChannelImpl channel = new ServiceUnitStateChannelImpl(pulsar1);
channel.start();
Awaitility.await().untilAsserted(() -> {
assertEquals(onloadCount.get(), 2);
assertEquals(unloadCount.get(), 0);
});

channel.close();
}

private void checkOwnershipState(String broker, NamespaceBundle bundle)
throws ExecutionException, InterruptedException {
var targetLoadManager = secondaryLoadManager;
Expand Down

0 comments on commit cfa1e0c

Please sign in to comment.