diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 73b4f318f3a36..8882c2cc6c56b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance.impl; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import java.util.ArrayList; @@ -659,11 +660,24 @@ public synchronized void doLoadShedding() { if (!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) { return; } + NamespaceBundle bundleToUnload = LoadManagerShared.getNamespaceBundle(pulsar, bundle); + Optional destBroker = this.selectBroker(bundleToUnload); + if (!destBroker.isPresent()) { + log.info("[{}] No broker available to unload bundle {} from broker {}", + strategy.getClass().getSimpleName(), bundle, broker); + return; + } + if (destBroker.get().equals(broker)) { + log.warn("[{}] The destination broker {} is the same as the current owner broker for Bundle {}", + strategy.getClass().getSimpleName(), destBroker.get(), bundle); + return; + } - log.info("[{}] Unloading bundle: {} from broker {}", - strategy.getClass().getSimpleName(), bundle, broker); + log.info("[{}] Unloading bundle: {} from broker {} to dest broker {}", + strategy.getClass().getSimpleName(), bundle, broker, destBroker.get()); try { - pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange); + pulsar.getAdminClient().namespaces() + .unloadNamespaceBundle(namespaceName, bundleRange, destBroker.get()); loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis()); } catch (PulsarServerException | PulsarAdminException e) { log.warn("Error when trying to perform load shedding on {} for broker {}", bundle, broker, e); @@ -837,99 +851,119 @@ public Optional selectBrokerForAssignment(final ServiceUnitId serviceUni // If the given bundle is already in preallocated, return the selected broker. return Optional.of(preallocatedBundleToBroker.get(bundle)); } - final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, - key -> getBundleDataOrDefault(bundle)); - brokerCandidateCache.clear(); - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, - getAvailableBrokers(), - brokerTopicLoadingPredicate); - // filter brokers which owns topic higher than threshold - LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData, - conf.getLoadBalancerBrokerMaxTopics()); - - // distribute namespaces to domain and brokers according to anti-affinity-group - LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(), - brokerCandidateCache, - brokerToNamespaceToBundleRange, brokerToFailureDomainMap); - - // distribute bundles evenly to candidate-brokers if enable - if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) { - LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), - brokerCandidateCache, - brokerToNamespaceToBundleRange); - if (log.isDebugEnabled()) { - log.debug("enable distribute bundles evenly to candidate-brokers, broker candidate count={}", - brokerCandidateCache.size()); - } + Optional broker = selectBroker(serviceUnit); + if (!broker.isPresent()) { + // If no broker is selected, return empty. + return broker; } - log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle); + // Add new bundle to preallocated. + preallocateBundle(bundle, broker.get()); + return broker; + } + } finally { + selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + } + } - // Use the filter pipeline to finalize broker candidates. - try { - for (BrokerFilter filter : filterPipeline) { - filter.filter(brokerCandidateCache, data, loadData, conf); - } - } catch (BrokerFilterException x) { - // restore the list of brokers to the full set - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, - getAvailableBrokers(), - brokerTopicLoadingPredicate); - } + private void preallocateBundle(String bundle, String broker) { + final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, + key -> getBundleDataOrDefault(bundle)); + loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data); + preallocatedBundleToBroker.put(bundle, broker); + + final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); + final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); + final ConcurrentOpenHashMap> namespaceToBundleRange = + brokerToNamespaceToBundleRange + .computeIfAbsent(broker, + k -> ConcurrentOpenHashMap.>newBuilder() + .build()); + synchronized (namespaceToBundleRange) { + namespaceToBundleRange.computeIfAbsent(namespaceName, + k -> ConcurrentOpenHashSet.newBuilder().build()) + .add(bundleRange); + } + } - if (brokerCandidateCache.isEmpty()) { - // restore the list of brokers to the full set - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, - getAvailableBrokers(), - brokerTopicLoadingPredicate); - } + @VisibleForTesting + Optional selectBroker(final ServiceUnitId serviceUnit) { + synchronized (brokerCandidateCache) { + final String bundle = serviceUnit.toString(); + final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, + key -> getBundleDataOrDefault(bundle)); + brokerCandidateCache.clear(); + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + getAvailableBrokers(), + brokerTopicLoadingPredicate); + + // filter brokers which owns topic higher than threshold + LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData, + conf.getLoadBalancerBrokerMaxTopics()); - // Choose a broker among the potentially smaller filtered list, when possible - Optional broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf); + // distribute namespaces to domain and brokers according to anti-affinity-group + LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, bundle, + brokerCandidateCache, + brokerToNamespaceToBundleRange, brokerToFailureDomainMap); + + // distribute bundles evenly to candidate-brokers if enable + if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) { + LoadManagerShared.removeMostServicingBrokersForNamespace(bundle, + brokerCandidateCache, + brokerToNamespaceToBundleRange); if (log.isDebugEnabled()) { - log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache); + log.debug("enable distribute bundles evenly to candidate-brokers, broker candidate count={}", + brokerCandidateCache.size()); } + } - if (!broker.isPresent()) { - // No brokers available - return broker; - } + log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle); - final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0; - final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage(); - if (maxUsage > overloadThreshold) { - // All brokers that were in the filtered list were overloaded, so check if there is a better broker - LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, - getAvailableBrokers(), - brokerTopicLoadingPredicate); - Optional brokerTmp = - placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf); - if (brokerTmp.isPresent()) { - broker = brokerTmp; - } + // Use the filter pipeline to finalize broker candidates. + try { + for (BrokerFilter filter : filterPipeline) { + filter.filter(brokerCandidateCache, data, loadData, conf); } + } catch (BrokerFilterException x) { + // restore the list of brokers to the full set + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + getAvailableBrokers(), + brokerTopicLoadingPredicate); + } - // Add new bundle to preallocated. - loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data); - preallocatedBundleToBroker.put(bundle, broker.get()); - - final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle); - final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle); - final ConcurrentOpenHashMap> namespaceToBundleRange = - brokerToNamespaceToBundleRange - .computeIfAbsent(broker.get(), - k -> ConcurrentOpenHashMap.>newBuilder() - .build()); - synchronized (namespaceToBundleRange) { - namespaceToBundleRange.computeIfAbsent(namespaceName, - k -> ConcurrentOpenHashSet.newBuilder().build()) - .add(bundleRange); - } + if (brokerCandidateCache.isEmpty()) { + // restore the list of brokers to the full set + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + getAvailableBrokers(), + brokerTopicLoadingPredicate); + } + + // Choose a broker among the potentially smaller filtered list, when possible + Optional broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf); + if (log.isDebugEnabled()) { + log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache); + } + + if (!broker.isPresent()) { + // No brokers available return broker; } - } finally { - selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + + final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0; + final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage(); + if (maxUsage > overloadThreshold) { + // All brokers that were in the filtered list were overloaded, so check if there is a better broker + LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, + getAvailableBrokers(), + brokerTopicLoadingPredicate); + Optional brokerTmp = + placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf); + if (brokerTmp.isPresent()) { + broker = brokerTmp; + } + } + return broker; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java similarity index 92% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index 4b9f679f19d1f..3fb62f486ab8d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -16,11 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance; +package org.apache.pulsar.broker.loadbalance.impl; import static java.lang.Thread.sleep; import static org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -55,11 +57,10 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; +import org.apache.pulsar.broker.loadbalance.LoadBalancerTestingUtils; +import org.apache.pulsar.broker.loadbalance.LoadData; +import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate; -import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; -import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; -import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Producer; @@ -410,39 +411,71 @@ public void testLoadShedding() throws Exception { doAnswer(invocation -> { bundleReference.set(invocation.getArguments()[0].toString() + '/' + invocation.getArguments()[1]); return null; - }).when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + }).when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); + + AtomicReference> selectedBrokerRef = new AtomicReference<>(); + ModularLoadManagerImpl primaryLoadManagerSpy = spy(primaryLoadManager); + doAnswer(invocation -> { + ServiceUnitId serviceUnitId = (ServiceUnitId) invocation.getArguments()[0]; + Optional broker = primaryLoadManager.selectBroker(serviceUnitId); + selectedBrokerRef.set(broker); + return broker; + }).when(primaryLoadManagerSpy).selectBroker(any()); + setField(pulsar1.getAdminClient(), "namespaces", namespacesSpy1); pulsar1.getConfiguration().setLoadBalancerEnabled(true); - final LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData"); + final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy, "loadData"); final Map brokerDataMap = loadData.getBrokerData(); final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryHost)); when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData); brokerDataMap.put(primaryHost, brokerDataSpy1); // Need to update all the bundle data for the shredder to see the spy. - primaryLoadManager.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080")); + primaryLoadManagerSpy.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080")); sleep(100); localBrokerData.setCpu(new ResourceUsage(80, 100)); - primaryLoadManager.doLoadShedding(); + primaryLoadManagerSpy.doLoadShedding(); // 80% is below overload threshold: verify nothing is unloaded. - verify(namespacesSpy1, Mockito.times(0)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + verify(namespacesSpy1, Mockito.times(0)) + .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); localBrokerData.setCpu(new ResourceUsage(90, 100)); - primaryLoadManager.doLoadShedding(); + primaryLoadManagerSpy.doLoadShedding(); // Most expensive bundle will be unloaded. - verify(namespacesSpy1, Mockito.times(1)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + verify(namespacesSpy1, Mockito.times(1)) + .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); assertEquals(bundleReference.get(), mockBundleName(2)); + assertEquals(selectedBrokerRef.get().get(), secondaryHost); - primaryLoadManager.doLoadShedding(); + primaryLoadManagerSpy.doLoadShedding(); // Now less expensive bundle will be unloaded (normally other bundle would move off and nothing would be // unloaded, but this is not the case due to the spy's behavior). - verify(namespacesSpy1, Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + verify(namespacesSpy1, Mockito.times(2)) + .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); assertEquals(bundleReference.get(), mockBundleName(1)); + assertEquals(selectedBrokerRef.get().get(), secondaryHost); - primaryLoadManager.doLoadShedding(); + primaryLoadManagerSpy.doLoadShedding(); // Now both are in grace period: neither should be unloaded. - verify(namespacesSpy1, Mockito.times(2)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString()); + verify(namespacesSpy1, Mockito.times(2)) + .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); + assertEquals(selectedBrokerRef.get().get(), secondaryHost); + + // Test bundle transfer to same broker + + loadData.getRecentlyUnloadedBundles().clear(); + primaryLoadManagerSpy.doLoadShedding(); + verify(namespacesSpy1, Mockito.times(3)) + .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); + + doReturn(Optional.of(primaryHost)).when(primaryLoadManagerSpy).selectBroker(any()); + loadData.getRecentlyUnloadedBundles().clear(); + primaryLoadManagerSpy.doLoadShedding(); + // The bundle shouldn't be unloaded because the broker is the same. + verify(namespacesSpy1, Mockito.times(3)) + .unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()); + } // Test that ModularLoadManagerImpl will determine that writing local data to ZooKeeper is necessary if certain