diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 722f132024435..fdb6a979b9a23 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2885,11 +2885,13 @@ public double getLoadBalancerBandwidthOutResourceWeight() { @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, - doc = "Enable ServiceUnitTableViewSyncer to sync service unit(bundle) states between metadata store and " + doc = "Specify ServiceUnitTableViewSyncer to sync service unit(bundle) states between metadata store and " + "system topic table views during migration from one to the other. One could enable this" - + " syncer before migration and disable it after the migration finishes." + + " syncer before migration and disable it after the migration finishes. " + + "It accepts `MetadataStoreToSystemTopicSyncer` or `SystemTopicToMetadataStoreSyncer` to " + + "enable it. Null value disables it." ) - private boolean loadBalancerServiceUnitTableViewSyncerEnabled = false; + private ServiceUnitTableViewSyncerType loadBalancerServiceUnitTableViewSyncer = null; /**** --- Replication. --- ****/ @FieldContext( @@ -3793,4 +3795,13 @@ public Map lookupProperties() { }); return map; } + + public boolean isLoadBalancerServiceUnitTableViewSyncerEnabled() { + return loadBalancerServiceUnitTableViewSyncer != null; + } + + public enum ServiceUnitTableViewSyncerType { + MetadataStoreToSystemTopicSyncer, + SystemTopicToMetadataStoreSyncer; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 243232b747e1b..64e04fac7ed2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -288,14 +288,14 @@ private static void createSystemTopics(PulsarService pulsar) throws PulsarServer createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); } - private static boolean configureSystemTopics(PulsarService pulsar) { + public static boolean configureSystemTopics(PulsarService pulsar, long target) { try { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar) && pulsar.getConfiguration().isSystemTopicAndTopicLevelPoliciesEnabled()) { Long threshold = pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC); - if (threshold == null || COMPACTION_THRESHOLD != threshold.longValue()) { - pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, COMPACTION_THRESHOLD); - log.info("Set compaction threshold: {} bytes for system topic {}.", COMPACTION_THRESHOLD, TOPIC); + if (threshold == null || target != threshold.longValue()) { + pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, target); + log.info("Set compaction threshold: {} bytes for system topic {}.", target, TOPIC); } } else { log.warn("System topic or topic level policies is disabled. " @@ -954,7 +954,7 @@ protected void monitor() { // System topic config might fail due to the race condition // with topic policy init(Topic policies cache have not init). if (!configuredSystemTopics) { - configuredSystemTopics = configureSystemTopics(pulsar); + configuredSystemTopics = configureSystemTopics(pulsar, COMPACTION_THRESHOLD); } if (role != Leader) { log.warn("Current role:{} does not match with the channel ownership:{}. " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java index 4008555990923..065e1cbbf6ab3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.java @@ -19,17 +19,20 @@ package org.apache.pulsar.broker.loadbalance.extensions.channel; +import static org.apache.pulsar.broker.ServiceConfiguration.ServiceUnitTableViewSyncerType.SystemTopicToMetadataStoreSyncer; +import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.COMPACTION_THRESHOLD; +import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.configureSystemTopics; +import com.fasterxml.jackson.core.JsonProcessingException; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.common.util.FutureUtil; @@ -43,11 +46,13 @@ @Slf4j public class ServiceUnitStateTableViewSyncer implements Closeable { private static final int MAX_CONCURRENT_SYNC_COUNT = 100; - private static final int MAX_SYNC_WAIT_TIME_IN_SECS = 300; + private static final int SYNC_WAIT_TIME_IN_SECS = 300; + private PulsarService pulsar; private volatile ServiceUnitStateTableView systemTopicTableView; private volatile ServiceUnitStateTableView metadataStoreTableView; private volatile boolean isActive = false; + public void start(PulsarService pulsar) throws IOException, TimeoutException, InterruptedException, ExecutionException { if (!pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) { @@ -57,82 +62,17 @@ public void start(PulsarService pulsar) if (isActive) { return; } + this.pulsar = pulsar; try { - long started = System.currentTimeMillis(); - if (metadataStoreTableView != null) { - metadataStoreTableView.close(); - metadataStoreTableView = null; + syncExistingItems(); + // disable compaction + if (!configureSystemTopics(pulsar, 0)) { + throw new IllegalStateException("Failed to disable compaction"); } - metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl(); - metadataStoreTableView.start( - pulsar, - this::syncToSystemTopic, - (k, v) -> {} - ); - log.info("Started MetadataStoreTableView"); + syncTailItems(); - if (systemTopicTableView != null) { - systemTopicTableView.close(); - systemTopicTableView = null; - } - systemTopicTableView = new ServiceUnitStateTableViewImpl(); - systemTopicTableView.start( - pulsar, - this::syncToMetadataStore, - (k, v) -> {} - ); - log.info("Started SystemTopicTableView"); - - Map merged = new HashMap<>(); - metadataStoreTableView.entrySet().forEach(e -> merged.put(e.getKey(), e.getValue())); - systemTopicTableView.entrySet().forEach(e -> merged.put(e.getKey(), e.getValue())); - - List> futures = new ArrayList<>(); - var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(); - - // Directly use store to sync existing items to metadataStoreTableView(otherwise, they are conflicted out) - var store = pulsar.getLocalMetadataStore(); - var writer = ObjectMapperFactory.getMapper().writer(); - for (var e : merged.entrySet()) { - futures.add(store.put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX + "/" + e.getKey(), - writer.writeValueAsBytes(e.getValue()), Optional.empty()).thenApply(__ -> null)); - if (futures.size() == MAX_CONCURRENT_SYNC_COUNT) { - FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); - futures.clear(); - } - } - FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); - futures.clear(); - - for (var e : merged.entrySet()) { - futures.add(syncToSystemTopic(e.getKey(), e.getValue())); - if (futures.size() == MAX_CONCURRENT_SYNC_COUNT) { - FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); - futures.clear(); - } - } - FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); - futures.clear(); - - int size = merged.size(); - while (metadataStoreTableView.entrySet().size() != size || systemTopicTableView.entrySet().size() != size) { - if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started) - > MAX_SYNC_WAIT_TIME_IN_SECS) { - throw new TimeoutException( - "Failed to sync tableviews. MetadataStoreTableView.size: " - + metadataStoreTableView.entrySet().size() - + ", SystemTopicTableView.size: " + systemTopicTableView.entrySet().size() + " in " - + MAX_SYNC_WAIT_TIME_IN_SECS + " secs"); - } - Thread.sleep(100); - } - - log.info("Successfully started ServiceUnitStateTableViewSyncer MetadataStoreTableView.size:{} , " - + "SystemTopicTableView.size: {} in {} secs", - metadataStoreTableView.entrySet().size(), systemTopicTableView.entrySet().size(), - TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started)); isActive = true; } catch (Throwable e) { @@ -149,12 +89,161 @@ private CompletableFuture syncToMetadataStore(String key, ServiceUnitState return metadataStoreTableView.put(key, data); } + private CompletableFuture dummy(String key, ServiceUnitStateData data) { + return CompletableFuture.completedFuture(null); + } + + private void syncExistingItems() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + long started = System.currentTimeMillis(); + @Cleanup + ServiceUnitStateTableView metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl(); + metadataStoreTableView.start( + pulsar, + this::dummy, + this::dummy + ); + + @Cleanup + ServiceUnitStateTableView systemTopicTableView = new ServiceUnitStateTableViewImpl(); + systemTopicTableView.start( + pulsar, + this::dummy, + this::dummy + ); + + + var syncer = pulsar.getConfiguration().getLoadBalancerServiceUnitTableViewSyncer(); + if (syncer == SystemTopicToMetadataStoreSyncer) { + clean(metadataStoreTableView); + syncExistingItemsToMetadataStore(systemTopicTableView); + } else { + clean(systemTopicTableView); + syncExistingItemsToSystemTopic(metadataStoreTableView, systemTopicTableView); + } + + while (metadataStoreTableView.entrySet().size() != systemTopicTableView.entrySet().size()) { + if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started) + > SYNC_WAIT_TIME_IN_SECS) { + throw new TimeoutException( + syncer + " failed to sync existing items in tableviews. MetadataStoreTableView.size: " + + metadataStoreTableView.entrySet().size() + + ", SystemTopicTableView.size: " + systemTopicTableView.entrySet().size() + " in " + + SYNC_WAIT_TIME_IN_SECS + " secs"); + } + Thread.sleep(100); + } + + log.info("Synced existing items MetadataStoreTableView.size:{} , " + + "SystemTopicTableView.size: {} in {} secs", + metadataStoreTableView.entrySet().size(), systemTopicTableView.entrySet().size(), + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started)); + } + + private void syncTailItems() throws InterruptedException, IOException, TimeoutException { + long started = System.currentTimeMillis(); + + if (metadataStoreTableView != null) { + metadataStoreTableView.close(); + metadataStoreTableView = null; + } + + if (systemTopicTableView != null) { + systemTopicTableView.close(); + systemTopicTableView = null; + } + + this.metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl(); + this.metadataStoreTableView.start( + pulsar, + this::syncToSystemTopic, + this::dummy + ); + log.info("Started MetadataStoreTableView"); + + this.systemTopicTableView = new ServiceUnitStateTableViewImpl(); + this.systemTopicTableView.start( + pulsar, + this::syncToMetadataStore, + this::dummy + ); + log.info("Started SystemTopicTableView"); + + while (metadataStoreTableView.entrySet().size() != systemTopicTableView.entrySet().size()) { + if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started) + > SYNC_WAIT_TIME_IN_SECS) { + throw new TimeoutException( + "Failed to sync tableviews. MetadataStoreTableView.size: " + + metadataStoreTableView.entrySet().size() + + ", SystemTopicTableView.size: " + systemTopicTableView.entrySet().size() + " in " + + SYNC_WAIT_TIME_IN_SECS + " secs"); + } + Thread.sleep(100); + } + + log.info("Successfully started ServiceUnitStateTableViewSyncer MetadataStoreTableView.size:{} , " + + "SystemTopicTableView.size: {} in {} secs", + metadataStoreTableView.entrySet().size(), systemTopicTableView.entrySet().size(), + TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - started)); + } + + private void syncExistingItemsToMetadataStore(ServiceUnitStateTableView src) + throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException { + // Directly use store to sync existing items to metadataStoreTableView(otherwise, they are conflicted out) + var store = pulsar.getLocalMetadataStore(); + var writer = ObjectMapperFactory.getMapper().writer(); + var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(); + List> futures = new ArrayList<>(); + var srcIter = src.entrySet().iterator(); + while (srcIter.hasNext()) { + var e = srcIter.next(); + futures.add(store.put(ServiceUnitStateMetadataStoreTableViewImpl.PATH_PREFIX + "/" + e.getKey(), + writer.writeValueAsBytes(e.getValue()), Optional.empty()).thenApply(__ -> null)); + if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || !srcIter.hasNext()) { + FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); + } + } + } + + private void syncExistingItemsToSystemTopic(ServiceUnitStateTableView src, + ServiceUnitStateTableView dst) + throws ExecutionException, InterruptedException, TimeoutException { + var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(); + List> futures = new ArrayList<>(); + var srcIter = src.entrySet().iterator(); + while (srcIter.hasNext()) { + var e = srcIter.next(); + futures.add(dst.put(e.getKey(), e.getValue())); + if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || !srcIter.hasNext()) { + FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); + } + } + } + + private void clean(ServiceUnitStateTableView dst) + throws ExecutionException, InterruptedException, TimeoutException { + var opTimeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(); + var dstIter = dst.entrySet().iterator(); + List> futures = new ArrayList<>(); + while (dstIter.hasNext()) { + var e = dstIter.next(); + futures.add(dst.delete(e.getKey())); + if (futures.size() == MAX_CONCURRENT_SYNC_COUNT || !dstIter.hasNext()) { + FutureUtil.waitForAll(futures).get(opTimeout, TimeUnit.SECONDS); + } + } + } + @Override public void close() throws IOException { if (!isActive) { return; } + if (!configureSystemTopics(pulsar, COMPACTION_THRESHOLD)) { + throw new IllegalStateException("Failed to enable compaction"); + } + try { if (systemTopicTableView != null) { systemTopicTableView.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 1ff605859a9c9..4f6a006918318 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1241,8 +1241,35 @@ public void testDeployAndRollbackLoadManager() throws Exception { @Test(priority = 200) public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { + Pair topicAndBundle = + getBundleIsNotOwnByChangeEventTopic("testLoadBalancerServiceUnitTableViewSyncer"); + TopicName topicName = topicAndBundle.getLeft(); + NamespaceBundle bundle = topicAndBundle.getRight(); + String topic = topicName.toString(); + + String lookupResultBefore1 = pulsar1.getAdminClient().lookups().lookupTopic(topic); + String lookupResultBefore2 = pulsar2.getAdminClient().lookups().lookupTopic(topic); + assertEquals(lookupResultBefore1, lookupResultBefore2); + + LookupOptions options = LookupOptions.builder() + .authoritative(false) + .requestHttps(false) + .readOnly(false) + .loadTopicsInBundle(false).build(); + Optional webServiceUrlBefore1 = + pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrlBefore1.isPresent()); + + Optional webServiceUrlBefore2 = + pulsar2.getNamespaceService().getWebServiceUrl(bundle, options); + assertTrue(webServiceUrlBefore2.isPresent()); + assertEquals(webServiceUrlBefore2.get().toString(), webServiceUrlBefore1.get().toString()); + + + String syncerTyp = serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) ? + "SystemTopicToMetadataStoreSyncer" : "MetadataStoreToSystemTopicSyncer"; pulsar.getAdminClient().brokers() - .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncerEnabled", "true"); + .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", syncerTyp); makeSecondaryAsLeader(); makePrimaryAsLeader(); Awaitility.waitAtMost(10, TimeUnit.SECONDS) @@ -1259,23 +1286,14 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { // start pulsar3 with ServiceUnitStateTableViewImpl @Cleanup var pulsar3 = additionalPulsarTestContext.getPulsarService(); - Pair topicAndBundle = - getBundleIsNotOwnByChangeEventTopic("testDeployAndRollbackLoadManager"); - TopicName topicName = topicAndBundle.getLeft(); - NamespaceBundle bundle = topicAndBundle.getRight(); - String topic = topicName.toString(); String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic); String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic); String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic); assertEquals(lookupResult1, lookupResult2); assertEquals(lookupResult1, lookupResult3); + assertEquals(lookupResult1, lookupResultBefore1); - LookupOptions options = LookupOptions.builder() - .authoritative(false) - .requestHttps(false) - .readOnly(false) - .loadTopicsInBundle(false).build(); Optional webServiceUrl1 = pulsar1.getNamespaceService().getWebServiceUrl(bundle, options); assertTrue(webServiceUrl1.isPresent()); @@ -1290,6 +1308,8 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { assertTrue(webServiceUrl3.isPresent()); assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString()); + assertEquals(webServiceUrl3.get().toString(), webServiceUrlBefore1.get().toString()); + List pulsarServices = List.of(pulsar1, pulsar2, pulsar3); for (PulsarService pulsarService : pulsarServices) { // Test lookup heartbeat namespace's topic @@ -1330,6 +1350,20 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { assertEquals(lookupResult4, lookupResult5); assertEquals(lookupResult4, lookupResult6); assertEquals(lookupResult4, lookupResult7); + assertEquals(lookupResult4, lookupResultBefore1); + + + Pair topicAndBundle2 = + getBundleIsNotOwnByChangeEventTopic("testLoadBalancerServiceUnitTableViewSyncer2"); + String topic2 = topicAndBundle2.getLeft().toString(); + + String lookupResult8 = pulsar1.getAdminClient().lookups().lookupTopic(topic2); + String lookupResult9 = pulsar2.getAdminClient().lookups().lookupTopic(topic2); + String lookupResult10 = pulsar3.getAdminClient().lookups().lookupTopic(topic2); + String lookupResult11 = pulsar4.getAdminClient().lookups().lookupTopic(topic2); + assertEquals(lookupResult9, lookupResult8); + assertEquals(lookupResult10, lookupResult8); + assertEquals(lookupResult11, lookupResult8); Set availableWebUrlCandidates = Sets.newHashSet( pulsar1.getWebServiceAddress(), @@ -1356,6 +1390,7 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { pulsar4.getNamespaceService().getWebServiceUrl(bundle, options); assertTrue(webServiceUrl4.isPresent()); assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString()); + assertEquals(webServiceUrl4.get().toString(), webServiceUrlBefore1.get().toString()); pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4); for (PulsarService pulsarService : pulsarServices) { @@ -1415,7 +1450,7 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { } pulsar.getAdminClient().brokers() - .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncerEnabled", "false"); + .deleteDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer"); makeSecondaryAsLeader(); Awaitility.waitAtMost(5, TimeUnit.SECONDS) .untilAsserted(() -> assertFalse(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()));