Skip to content

Commit

Permalink
resolved comments
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Sep 18, 2024
1 parent 7faccbb commit ed14f2a
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2885,11 +2885,13 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Enable ServiceUnitTableViewSyncer to sync service unit(bundle) states between metadata store and "
doc = "Specify ServiceUnitTableViewSyncer to sync service unit(bundle) states between metadata store and "
+ "system topic table views during migration from one to the other. One could enable this"
+ " syncer before migration and disable it after the migration finishes."
+ " syncer before migration and disable it after the migration finishes. "
+ "It accepts `MetadataStoreToSystemTopicSyncer` or `SystemTopicToMetadataStoreSyncer` to "
+ "enable it. Null value disables it."
)
private boolean loadBalancerServiceUnitTableViewSyncerEnabled = false;
private ServiceUnitTableViewSyncerType loadBalancerServiceUnitTableViewSyncer = null;

/**** --- Replication. --- ****/
@FieldContext(
Expand Down Expand Up @@ -3793,4 +3795,13 @@ public Map<String, String> lookupProperties() {
});
return map;
}

public boolean isLoadBalancerServiceUnitTableViewSyncerEnabled() {
return loadBalancerServiceUnitTableViewSyncer != null;
}

public enum ServiceUnitTableViewSyncerType {
MetadataStoreToSystemTopicSyncer,
SystemTopicToMetadataStoreSyncer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,14 @@ private static void createSystemTopics(PulsarService pulsar) throws PulsarServer
createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}

private static boolean configureSystemTopics(PulsarService pulsar) {
public static boolean configureSystemTopics(PulsarService pulsar, long target) {
try {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
&& pulsar.getConfiguration().isSystemTopicAndTopicLevelPoliciesEnabled()) {
Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
if (threshold == null || COMPACTION_THRESHOLD != threshold.longValue()) {
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD);
log.info("Set compaction threshold: {} bytes for system topic {}.", COMPACTION_THRESHOLD, TOPIC);
if (threshold == null || target != threshold.longValue()) {
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, target);
log.info("Set compaction threshold: {} bytes for system topic {}.", target, TOPIC);
}
} else {
log.warn("System topic or topic level policies is disabled. "
Expand Down Expand Up @@ -954,7 +954,7 @@ protected void monitor() {
// System topic config might fail due to the race condition
// with topic policy init(Topic policies cache have not init).
if (!configuredSystemTopics) {
configuredSystemTopics = configureSystemTopics(pulsar);
configuredSystemTopics = configureSystemTopics(pulsar, COMPACTION_THRESHOLD);
}
if (role != Leader) {
log.warn("Current role:{} does not match with the channel ownership:{}. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@

package org.apache.pulsar.broker.loadbalance.extensions.channel;

import static org.apache.pulsar.broker.ServiceConfiguration.ServiceUnitTableViewSyncerType.SystemTopicToMetadataStoreSyncer;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.COMPACTION_THRESHOLD;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.configureSystemTopics;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.util.FutureUtil;
Expand All @@ -43,11 +46,13 @@
@Slf4j
public class ServiceUnitStateTableViewSyncer implements Closeable {
private static final int MAX_CONCURRENT_SYNC_COUNT = 100;
private static final int MAX_SYNC_WAIT_TIME_IN_SECS = 300;
private static final int SYNC_WAIT_TIME_IN_SECS = 300;
private PulsarService pulsar;
private volatile ServiceUnitStateTableView systemTopicTableView;
private volatile ServiceUnitStateTableView metadataStoreTableView;
private volatile boolean isActive = false;


public void start(PulsarService pulsar)
throws IOException, TimeoutException, InterruptedException, ExecutionException {
if (!pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) {
Expand All @@ -57,82 +62,17 @@ public void start(PulsarService pulsar)
if (isActive) {
return;
}
this.pulsar = pulsar;

try {
long started = System.currentTimeMillis();

if (metadataStoreTableView != null) {
metadataStoreTableView.close();
metadataStoreTableView = null;
syncExistingItems();
// disable compaction
if (!configureSystemTopics(pulsar, 0)) {
throw new IllegalStateException("Failed to disable compaction");
}
metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl();
metadataStoreTableView.start(
pulsar,
this::syncToSystemTopic,
(k, v) -> {}
);
log.info("Started MetadataStoreTableView");
syncTailItems();

if (systemTopicTableView != null) {
systemTopicTableView.close();
systemTopicTableView = null;
}
systemTopicTableView = new ServiceUnitStateTableViewImpl();
systemTopicTableView.start(
pulsar,
this::syncToMetadataStore,
(k, v) -> {}
);
log.info("Started SystemTopicTableView");

Map<String, ServiceUnitStateData> merged = new HashMap<>();
metadataStoreTableView.entrySet().forEach(e -> merged.put(e.getKey(), e.getValue()));
systemTopicTableView.entrySet().forEach(e -> merged.put(e.getKey(), e.getValue()));

List<CompletableFuture<Void>> futures = new ArrayList<>();
var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();

// Directly use store to sync existing items to metadataStoreTableView(otherwise, they are conflicted out)
var store = pulsar.getLocalMetadataStore();
var writer = ObjectMapperFactory.getMapper().writer();
for (var e : merged.entrySet()) {
futures.add(store.put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX + "/" + e.getKey(),
writer.writeValueAsBytes(e.getValue()), Optional.empty()).thenApply(__ -> null));
if (futures.size() == MAX_CONCURRENT_SYNC_COUNT) {
FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
futures.clear();
}
}
FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
futures.clear();

for (var e : merged.entrySet()) {
futures.add(syncToSystemTopic(e.getKey(), e.getValue()));
if (futures.size() == MAX_CONCURRENT_SYNC_COUNT) {
FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
futures.clear();
}
}
FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
futures.clear();

int size = merged.size();
while (metadataStoreTableView.entrySet().size() != size || systemTopicTableView.entrySet().size() != size) {
if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started)
> MAX_SYNC_WAIT_TIME_IN_SECS) {
throw new TimeoutException(
"Failed to sync tableviews. MetadataStoreTableView.size: "
+ metadataStoreTableView.entrySet().size()
+ ", SystemTopicTableView.size: " + systemTopicTableView.entrySet().size() + " in "
+ MAX_SYNC_WAIT_TIME_IN_SECS + " secs");
}
Thread.sleep(100);
}

log.info("Successfully started ServiceUnitStateTableViewSyncer MetadataStoreTableView.size:{} , "
+ "SystemTopicTableView.size: {} in {} secs",
metadataStoreTableView.entrySet().size(), systemTopicTableView.entrySet().size(),
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started));
isActive = true;

} catch (Throwable e) {
Expand All @@ -149,12 +89,161 @@ private CompletableFuture<Void> syncToMetadataStore(String key, ServiceUnitState
return metadataStoreTableView.put(key, data);
}

private CompletableFuture<Void> dummy(String key, ServiceUnitStateData data) {
return CompletableFuture.completedFuture(null);
}

private void syncExistingItems()
throws IOException, ExecutionException, InterruptedException, TimeoutException {
long started = System.currentTimeMillis();
@Cleanup
ServiceUnitStateTableView metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl();
metadataStoreTableView.start(
pulsar,
this::dummy,
this::dummy
);

@Cleanup
ServiceUnitStateTableView systemTopicTableView = new ServiceUnitStateTableViewImpl();
systemTopicTableView.start(
pulsar,
this::dummy,
this::dummy
);


var syncer = pulsar.getConfiguration().getLoadBalancerServiceUnitTableViewSyncer();
if (syncer == SystemTopicToMetadataStoreSyncer) {
clean(metadataStoreTableView);
syncExistingItemsToMetadataStore(systemTopicTableView);
} else {
clean(systemTopicTableView);
syncExistingItemsToSystemTopic(metadataStoreTableView, systemTopicTableView);
}

while (metadataStoreTableView.entrySet().size() != systemTopicTableView.entrySet().size()) {
if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started)
> SYNC_WAIT_TIME_IN_SECS) {
throw new TimeoutException(
syncer + " failed to sync existing items in tableviews. MetadataStoreTableView.size: "
+ metadataStoreTableView.entrySet().size()
+ ", SystemTopicTableView.size: " + systemTopicTableView.entrySet().size() + " in "
+ SYNC_WAIT_TIME_IN_SECS + " secs");
}
Thread.sleep(100);
}

log.info("Synced existing items MetadataStoreTableView.size:{} , "
+ "SystemTopicTableView.size: {} in {} secs",
metadataStoreTableView.entrySet().size(), systemTopicTableView.entrySet().size(),
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started));
}

private void syncTailItems() throws InterruptedException, IOException, TimeoutException {
long started = System.currentTimeMillis();

if (metadataStoreTableView != null) {
metadataStoreTableView.close();
metadataStoreTableView = null;
}

if (systemTopicTableView != null) {
systemTopicTableView.close();
systemTopicTableView = null;
}

this.metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl();
this.metadataStoreTableView.start(
pulsar,
this::syncToSystemTopic,
this::dummy
);
log.info("Started MetadataStoreTableView");

this.systemTopicTableView = new ServiceUnitStateTableViewImpl();
this.systemTopicTableView.start(
pulsar,
this::syncToMetadataStore,
this::dummy
);
log.info("Started SystemTopicTableView");

while (metadataStoreTableView.entrySet().size() != systemTopicTableView.entrySet().size()) {
if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started)
> SYNC_WAIT_TIME_IN_SECS) {
throw new TimeoutException(
"Failed to sync tableviews. MetadataStoreTableView.size: "
+ metadataStoreTableView.entrySet().size()
+ ", SystemTopicTableView.size: " + systemTopicTableView.entrySet().size() + " in "
+ SYNC_WAIT_TIME_IN_SECS + " secs");
}
Thread.sleep(100);
}

log.info("Successfully started ServiceUnitStateTableViewSyncer MetadataStoreTableView.size:{} , "
+ "SystemTopicTableView.size: {} in {} secs",
metadataStoreTableView.entrySet().size(), systemTopicTableView.entrySet().size(),
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started));
}

private void syncExistingItemsToMetadataStore(ServiceUnitStateTableView src)
throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException {
// Directly use store to sync existing items to metadataStoreTableView(otherwise, they are conflicted out)
var store = pulsar.getLocalMetadataStore();
var writer = ObjectMapperFactory.getMapper().writer();
var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
List<CompletableFuture<Void>> futures = new ArrayList<>();
var srcIter = src.entrySet().iterator();
while (srcIter.hasNext()) {
var e = srcIter.next();
futures.add(store.put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX + "/" + e.getKey(),
writer.writeValueAsBytes(e.getValue()), Optional.empty()).thenApply(__ -> null));
if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || !srcIter.hasNext()) {
FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
}
}
}

private void syncExistingItemsToSystemTopic(ServiceUnitStateTableView src,
ServiceUnitStateTableView dst)
throws ExecutionException, InterruptedException, TimeoutException {
var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
List<CompletableFuture<Void>> futures = new ArrayList<>();
var srcIter = src.entrySet().iterator();
while (srcIter.hasNext()) {
var e = srcIter.next();
futures.add(dst.put(e.getKey(), e.getValue()));
if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || !srcIter.hasNext()) {
FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
}
}
}

private void clean(ServiceUnitStateTableView dst)
throws ExecutionException, InterruptedException, TimeoutException {
var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
var dstIter = dst.entrySet().iterator();
List<CompletableFuture<Void>> futures = new ArrayList<>();
while (dstIter.hasNext()) {
var e = dstIter.next();
futures.add(dst.delete(e.getKey()));
if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || !dstIter.hasNext()) {
FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS);
}
}
}

@Override
public void close() throws IOException {
if (!isActive) {
return;
}

if (!configureSystemTopics(pulsar, COMPACTION_THRESHOLD)) {
throw new IllegalStateException("Failed to enable compaction");
}

try {
if (systemTopicTableView != null) {
systemTopicTableView.close();
Expand Down
Loading

0 comments on commit ed14f2a

Please sign in to comment.