Skip to content

Commit

Permalink
Added ServiceUnitStateTableViewSyncer
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Sep 18, 2024
1 parent d0d13a2 commit d42170e
Show file tree
Hide file tree
Showing 4 changed files with 397 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2882,6 +2882,15 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
private String loadManagerServiceUnitStateTableViewClassName =
"org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl";

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Enable ServiceUnitTableViewSyncer to sync service unit(bundle) states between metadata store and "
+ "system topic table views during migration from one to the other. One could enable this"
+ " syncer before migration and disable it after the migration finishes."
)
private boolean loadBalancerServiceUnitTableViewSyncerEnabled = false;

/**** --- Replication. --- ****/
@FieldContext(
category = CATEGORY_REPLICATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewSyncer;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
Expand Down Expand Up @@ -167,6 +168,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS

private TopBundleLoadDataReporter topBundleLoadDataReporter;

@Getter
protected ServiceUnitStateTableViewSyncer serviceUnitStateTableViewSyncer;

private volatile ScheduledFuture brokerLoadDataReportTask;
private volatile ScheduledFuture topBundlesLoadDataReportTask;

Expand Down Expand Up @@ -399,6 +403,7 @@ public void start() throws PulsarServerException {
serviceUnitStateChannel, unloadCounter, unloadMetrics);
this.splitScheduler = new SplitScheduler(
pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
this.serviceUnitStateTableViewSyncer = new ServiceUnitStateTableViewSyncer();

pulsar.runWhenReadyForIncomingRequests(() -> {
try {
Expand Down Expand Up @@ -770,6 +775,7 @@ public void close() throws PulsarServerException {
this.topBundlesLoadDataStore.shutdown();
this.unloadScheduler.close();
this.splitScheduler.close();
this.serviceUnitStateTableViewSyncer.close();
} catch (IOException ex) {
throw new PulsarServerException(ex);
} finally {
Expand Down Expand Up @@ -824,6 +830,9 @@ synchronized void playLeader() {
topBundlesLoadDataStore.init();
unloadScheduler.start();
serviceUnitStateChannel.scheduleOwnershipMonitor();
if (pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) {
serviceUnitStateTableViewSyncer.start(pulsar);
}
break;
} catch (Throwable e) {
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
Expand Down Expand Up @@ -873,6 +882,7 @@ synchronized void playFollower() {
brokerLoadDataStore.init();
topBundlesLoadDataStore.close();
topBundlesLoadDataStore.startProducer();
serviceUnitStateTableViewSyncer.close();
break;
} catch (Throwable e) {
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
Expand Down Expand Up @@ -951,12 +961,20 @@ protected void monitor() {
+ "Playing the leader role.", role, isChannelOwner);
playLeader();
}

if (pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) {
serviceUnitStateTableViewSyncer.start(pulsar);
} else {
serviceUnitStateTableViewSyncer.close();
}

} else {
if (role != Follower) {
log.warn("Current role:{} does not match with the channel ownership:{}. "
+ "Playing the follower role.", role, isChannelOwner);
playFollower();
}
serviceUnitStateTableViewSyncer.close();
}
} catch (Throwable e) {
log.error("Failed to get the channel ownership.", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.broker.loadbalance.extensions.channel;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;

/**
* Defines ServiceUnitTableViewSyncer.
* It syncs service unit(bundle) states between metadata store and system topic table views.
* One could enable this syncer before migration from one to the other and disable it after the migration finishes.
*/
@Slf4j
public class ServiceUnitStateTableViewSyncer implements Cloneable {
private static final int MAX_CONCURRENT_SYNC_COUNT = 100;
private volatile ServiceUnitStateTableView systemTopicTableView;
private volatile ServiceUnitStateTableView metadataStoreTableView;
private volatile boolean isActive = false;

public void start(PulsarService pulsar)
throws IOException, TimeoutException, InterruptedException, ExecutionException {
if (!pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) {
return;
}

if (isActive) {
return;
}

try {
long started = System.currentTimeMillis();

if (metadataStoreTableView != null) {
metadataStoreTableView.close();
metadataStoreTableView = null;
}
metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl();
metadataStoreTableView.start(
pulsar,
this::syncToSystemTopic,
(k, v) -> {}
);
log.info("Started MetadataStoreTableView");

if (systemTopicTableView != null) {
systemTopicTableView.close();
systemTopicTableView = null;
}
systemTopicTableView = new ServiceUnitStateTableViewImpl();
systemTopicTableView.start(
pulsar,
this::syncToMetadataStore,
(k, v) -> {}
);
log.info("Started SystemTopicTableView");

Map<String, ServiceUnitStateData> merged = new HashMap<>();
metadataStoreTableView.entrySet().forEach(e -> merged.put(e.getKey(), e.getValue()));
systemTopicTableView.entrySet().forEach(e -> merged.put(e.getKey(), e.getValue()));

List<CompletableFuture<Void>> futures = new ArrayList<>();
var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();

// Directly use store to sync existing items to metadataStoreTableView(otherwise, they are conflicted out)
var store = pulsar.getLocalMetadataStore();
var writer = ObjectMapperFactory.getMapper().writer();
for (var e : merged.entrySet()) {
futures.add(store.put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX + "/" + e.getKey(),
writer.writeValueAsBytes(e.getValue()), Optional.empty()).thenApply(__ -> null));
if (futures.size() == MAX_CONCURRENT_SYNC_COUNT) {
FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
futures.clear();
}
}
FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
futures.clear();

for (var e : merged.entrySet()) {
futures.add(syncToSystemTopic(e.getKey(), e.getValue()));
if (futures.size() == MAX_CONCURRENT_SYNC_COUNT) {
FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
futures.clear();
}
}
FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
futures.clear();

int size = merged.size();
int syncTimeout = opTimeout * size;
while (metadataStoreTableView.entrySet().size() != size || systemTopicTableView.entrySet().size() != size) {
if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started) > syncTimeout) {
throw new TimeoutException(
"Failed to sync tableviews. MetadataStoreTableView.size: "
+ metadataStoreTableView.entrySet().size()
+ ", SystemTopicTableView.size: " + systemTopicTableView.entrySet().size() + " in "
+ syncTimeout + " secs");
}
Thread.sleep(100);
}

log.info("Successfully started ServiceUnitStateTableViewSyncer MetadataStoreTableView.size:{} , "
+ "SystemTopicTableView.size: {} in {} secs",
metadataStoreTableView.entrySet().size(), systemTopicTableView.entrySet().size(),
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started));
isActive = true;

} catch (Throwable e) {
log.error("Failed to start ServiceUnitStateTableViewSyncer", e);
throw e;
}
}

private CompletableFuture<Void> syncToSystemTopic(String key, ServiceUnitStateData data) {
return systemTopicTableView.put(key, data);
}

private CompletableFuture<Void> syncToMetadataStore(String key, ServiceUnitStateData data) {
return metadataStoreTableView.put(key, data);
}

public void close() throws IOException {
if (!isActive) {
return;
}

try {
if (systemTopicTableView != null) {
systemTopicTableView.close();
systemTopicTableView = null;
log.info("Closed SystemTopicTableView");
}
} catch (Exception e) {
log.error("Failed to close SystemTopicTableView", e);
throw e;
}

try {
if (metadataStoreTableView != null) {
metadataStoreTableView.close();
metadataStoreTableView = null;
log.info("Closed MetadataStoreTableView");
}
} catch (Exception e) {
log.error("Failed to close MetadataStoreTableView", e);
throw e;
}

log.info("Successfully closed ServiceUnitStateTableViewSyncer.");
isActive = false;
}

public boolean isActive() {
return isActive;
}
}
Loading

0 comments on commit d42170e

Please sign in to comment.