Skip to content

Commit

Permalink
[fix][broker] Avoid infinite bundle unloading (#20822)
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 committed Jul 25, 2023
1 parent 563f929 commit 3f63768
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.ArrayList;
Expand Down Expand Up @@ -659,11 +660,24 @@ public synchronized void doLoadShedding() {
if (!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
return;
}
NamespaceBundle bundleToUnload = LoadManagerShared.getNamespaceBundle(pulsar, bundle);
Optional<String> destBroker = this.selectBroker(bundleToUnload);
if (!destBroker.isPresent()) {
log.info("[{}] No broker available to unload bundle {} from broker {}",
strategy.getClass().getSimpleName(), bundle, broker);
return;
}
if (destBroker.get().equals(broker)) {
log.warn("[{}] The destination broker {} is the same as the current owner broker for Bundle {}",
strategy.getClass().getSimpleName(), destBroker.get(), bundle);
return;
}

log.info("[{}] Unloading bundle: {} from broker {}",
strategy.getClass().getSimpleName(), bundle, broker);
log.info("[{}] Unloading bundle: {} from broker {} to dest broker {}",
strategy.getClass().getSimpleName(), bundle, broker, destBroker.get());
try {
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange);
pulsar.getAdminClient().namespaces()
.unloadNamespaceBundle(namespaceName, bundleRange, destBroker.get());
loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis());
} catch (PulsarServerException | PulsarAdminException e) {
log.warn("Error when trying to perform load shedding on {} for broker {}", bundle, broker, e);
Expand Down Expand Up @@ -837,99 +851,119 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
// If the given bundle is already in preallocated, return the selected broker.
return Optional.of(preallocatedBundleToBroker.get(bundle));
}
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);

// filter brokers which owns topic higher than threshold
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
conf.getLoadBalancerBrokerMaxTopics());

// distribute namespaces to domain and brokers according to anti-affinity-group
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(),
brokerCandidateCache,
brokerToNamespaceToBundleRange, brokerToFailureDomainMap);

// distribute bundles evenly to candidate-brokers if enable
if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) {
LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(),
brokerCandidateCache,
brokerToNamespaceToBundleRange);
if (log.isDebugEnabled()) {
log.debug("enable distribute bundles evenly to candidate-brokers, broker candidate count={}",
brokerCandidateCache.size());
}
Optional<String> broker = selectBroker(serviceUnit);
if (!broker.isPresent()) {
// If no broker is selected, return empty.
return broker;
}
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);
// Add new bundle to preallocated.
preallocateBundle(bundle, broker.get());
return broker;
}
} finally {
selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
}

// Use the filter pipeline to finalize broker candidates.
try {
for (BrokerFilter filter : filterPipeline) {
filter.filter(brokerCandidateCache, data, loadData, conf);
}
} catch (BrokerFilterException x) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}
private void preallocateBundle(String bundle, String broker) {
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker);

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker,
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}
}

if (brokerCandidateCache.isEmpty()) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}
@VisibleForTesting
Optional<String> selectBroker(final ServiceUnitId serviceUnit) {
synchronized (brokerCandidateCache) {
final String bundle = serviceUnit.toString();
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);

// filter brokers which owns topic higher than threshold
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
conf.getLoadBalancerBrokerMaxTopics());

// Choose a broker among the potentially smaller filtered list, when possible
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
// distribute namespaces to domain and brokers according to anti-affinity-group
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, bundle,
brokerCandidateCache,
brokerToNamespaceToBundleRange, brokerToFailureDomainMap);

// distribute bundles evenly to candidate-brokers if enable
if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) {
LoadManagerShared.removeMostServicingBrokersForNamespace(bundle,
brokerCandidateCache,
brokerToNamespaceToBundleRange);
if (log.isDebugEnabled()) {
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
log.debug("enable distribute bundles evenly to candidate-brokers, broker candidate count={}",
brokerCandidateCache.size());
}
}

if (!broker.isPresent()) {
// No brokers available
return broker;
}
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);

final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
Optional<String> brokerTmp =
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (brokerTmp.isPresent()) {
broker = brokerTmp;
}
// Use the filter pipeline to finalize broker candidates.
try {
for (BrokerFilter filter : filterPipeline) {
filter.filter(brokerCandidateCache, data, loadData, conf);
}
} catch (BrokerFilterException x) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}

// Add new bundle to preallocated.
loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker.get());

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
.computeIfAbsent(broker.get(),
k -> ConcurrentOpenHashMap.<String,
ConcurrentOpenHashSet<String>>newBuilder()
.build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.computeIfAbsent(namespaceName,
k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}
if (brokerCandidateCache.isEmpty()) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
}

// Choose a broker among the potentially smaller filtered list, when possible
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (log.isDebugEnabled()) {
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
}

if (!broker.isPresent()) {
// No brokers available
return broker;
}
} finally {
selectBrokerForAssignment.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(),
brokerTopicLoadingPredicate);
Optional<String> brokerTmp =
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (brokerTmp.isPresent()) {
broker = brokerTmp;
}
}
return broker;
}
}

Expand Down
Loading

0 comments on commit 3f63768

Please sign in to comment.