Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fail fast if the extensible load manager failed to start #23297

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker;

import java.io.IOException;
import java.util.concurrent.CompletionException;

public class PulsarServerException extends IOException {
private static final long serialVersionUID = 1;
Expand All @@ -44,4 +45,20 @@ public NotFoundException(Throwable t) {
super(t);
}
}

public static PulsarServerException from(Throwable throwable) {
if (throwable instanceof CompletionException) {
return from(throwable.getCause());
}
if (throwable instanceof PulsarServerException pulsarServerException) {
return pulsarServerException;
} else {
return new PulsarServerException(throwable);
}
}

// Wrap this checked exception into a specific unchecked exception
public static CompletionException toUncheckedException(PulsarServerException e) {
return new CompletionException(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ public void start() throws PulsarServerException {
state = State.Started;
} catch (Exception e) {
LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e);
PulsarServerException startException = new PulsarServerException(e);
PulsarServerException startException = PulsarServerException.from(e);
readyForIncomingRequestsFuture.completeExceptionally(startException);
throw startException;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategyFactory;
Expand All @@ -99,10 +98,7 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.slf4j.Logger;

Expand All @@ -125,10 +121,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS

public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;

public static final int STARTUP_TIMEOUT_SECONDS = 30;

public static final int MAX_RETRY = 5;

private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

public static final Set<String> INTERNAL_TOPICS =
Expand Down Expand Up @@ -212,7 +204,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS

private final ConcurrentHashMap<String, CompletableFuture<Optional<BrokerLookupData>>>
lookupRequests = new ConcurrentHashMap<>();
private final CompletableFuture<Void> initWaiter = new CompletableFuture<>();
private final CompletableFuture<Boolean> initWaiter = new CompletableFuture<>();

/**
* Get all the bundles that are owned by this broker.
Expand Down Expand Up @@ -385,7 +377,7 @@ public void start() throws PulsarServerException {
return;
}
try {
this.brokerRegistry = new BrokerRegistryImpl(pulsar);
this.brokerRegistry = createBrokerRegistry(pulsar);
this.leaderElectionService = new LeaderElectionService(
pulsar.getCoordinationService(), pulsar.getBrokerId(),
pulsar.getSafeWebServiceAddress(), ELECTION_ROOT,
Expand All @@ -400,53 +392,14 @@ public void start() throws PulsarServerException {
});
});
});
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
this.serviceUnitStateChannel = createServiceUnitStateChannel(pulsar);
this.brokerRegistry.start();
this.splitManager = new SplitManager(splitCounter);
this.unloadManager = new UnloadManager(unloadCounter, pulsar.getBrokerId());
this.serviceUnitStateChannel.listen(unloadManager);
this.serviceUnitStateChannel.listen(splitManager);
this.leaderElectionService.start();
pulsar.runWhenReadyForIncomingRequests(() -> {
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.create();
int retry = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
brokerRegistry.register();
this.serviceUnitStateChannel.start();
break;
} catch (Exception e) {
log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(backoff.next());
} catch (InterruptedException ex) {
log.warn("Interrupted while sleeping.");
// preserve thread's interrupt status
Thread.currentThread().interrupt();
try {
pulsar.close();
} catch (PulsarServerException exc) {
log.error("Failed to close pulsar service.", exc);
}
return;
}
failStarting(e);
if (retry >= MAX_RETRY) {
log.error("Failed to start the service unit state channel after retry {} th. "
+ "Closing pulsar service.", retry, e);
try {
pulsar.close();
} catch (PulsarServerException ex) {
log.error("Failed to close pulsar service.", ex);
}
}
}
}
});

this.antiAffinityGroupPolicyHelper =
new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel);
antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
Expand All @@ -455,15 +408,10 @@ public void start() throws PulsarServerException {
SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar);
this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies);
this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper));

try {
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
this.topBundlesLoadDataStore = LoadDataStoreFactory
.create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
} catch (LoadDataStoreException e) {
throw new PulsarServerException(e);
}
this.brokerLoadDataStore = LoadDataStoreFactory
.create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
this.topBundlesLoadDataStore = LoadDataStoreFactory
.create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);

this.context = LoadManagerContextImpl.builder()
.configuration(conf)
Expand All @@ -487,6 +435,7 @@ public void start() throws PulsarServerException {

pulsar.runWhenReadyForIncomingRequests(() -> {
try {
this.serviceUnitStateChannel.start();
var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis();

this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor()
Expand Down Expand Up @@ -521,38 +470,33 @@ public void start() throws PulsarServerException {
MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS);

this.splitScheduler.start();
this.initWaiter.complete(null);
this.initWaiter.complete(true);
this.started = true;
log.info("Started load manager.");
} catch (Exception ex) {
failStarting(ex);
} catch (Throwable e) {
failStarting(e);
}
});
} catch (Exception ex) {
} catch (Throwable ex) {
failStarting(ex);
}
}

private void failStarting(Exception ex) {
log.error("Failed to start the extensible load balance and close broker registry {}.",
this.brokerRegistry, ex);
private void failStarting(Throwable throwable) {
if (this.brokerRegistry != null) {
try {
brokerRegistry.unregister();
} catch (MetadataStoreException e) {
// ignore
}
}
if (this.serviceUnitStateChannel != null) {
try {
serviceUnitStateChannel.close();
} catch (IOException e) {
// ignore
brokerRegistry.close();
} catch (PulsarServerException e) {
// If close failed, this broker might still exist in the metadata store. Then it could be found by other
// brokers as an available broker. Hence, print a warning log for it.
log.warn("Failed to close the broker registry: {}", e.getMessage());
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
}
}
initWaiter.completeExceptionally(ex);
initWaiter.complete(false); // exit the background thread gracefully
throw PulsarServerException.toUncheckedException(PulsarServerException.from(throwable));
}


@Override
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
Expand Down Expand Up @@ -897,7 +841,9 @@ synchronized void playLeader() {
boolean becameFollower = false;
while (!Thread.currentThread().isInterrupted()) {
try {
initWaiter.get();
if (!initWaiter.get()) {
return;
}
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
Expand Down Expand Up @@ -947,7 +893,9 @@ synchronized void playFollower() {
boolean becameLeader = false;
while (!Thread.currentThread().isInterrupted()) {
try {
initWaiter.get();
if (!initWaiter.get()) {
return;
}
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
Expand Down Expand Up @@ -1018,7 +966,9 @@ private List<Metrics> getIgnoredCommandMetrics(String advertisedBrokerAddress) {
@VisibleForTesting
protected void monitor() {
try {
initWaiter.get();
if (!initWaiter.get()) {
return;
}

// Monitor role
// Periodically check the role in case ZK watcher fails.
Expand Down Expand Up @@ -1073,4 +1023,14 @@ private void closeInternalTopics() {
log.warn("Failed to wait for closing internal topics", e);
}
}

@VisibleForTesting
protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) {
return new BrokerRegistryImpl(pulsar);
}

@VisibleForTesting
protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) {
return new ServiceUnitStateChannelImpl(pulsar);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions;

import java.util.Optional;
import lombok.Cleanup;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class LoadManagerFailFastTest {

private static final String cluster = "test";
private final int zkPort = PortManager.nextLockedFreePort();
private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort);
private final ServiceConfiguration config = new ServiceConfiguration();

@BeforeClass
protected void setup() throws Exception {
bk.start();
config.setClusterName(cluster);
config.setAdvertisedAddress("localhost");
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
config.setMetadataStoreUrl("zk:localhost:" + zkPort);
}

@AfterClass
protected void cleanup() throws Exception {
bk.stop();
}

@Test(timeOut = 30000)
public void testBrokerRegistryFailure() throws Exception {
config.setLoadManagerClassName(BrokerRegistryLoadManager.class.getName());
@Cleanup final var pulsar = new PulsarService(config);
try {
pulsar.start();
Assert.fail();
} catch (PulsarServerException e) {
Assert.assertNull(e.getCause());
Assert.assertEquals(e.getMessage(), "Cannot start BrokerRegistry");
}
Assert.assertTrue(pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get()
.isEmpty());
}

@Test(timeOut = 30000)
public void testServiceUnitStateChannelFailure() throws Exception {
config.setLoadManagerClassName(ChannelLoadManager.class.getName());
@Cleanup final var pulsar = new PulsarService(config);
try {
pulsar.start();
Assert.fail();
} catch (PulsarServerException e) {
Assert.assertNull(e.getCause());
Assert.assertEquals(e.getMessage(), "Cannot start ServiceUnitStateChannel");
}
Awaitility.await().untilAsserted(() -> Assert.assertTrue(pulsar.getLocalMetadataStore()
.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get().isEmpty()));
}

private static class BrokerRegistryLoadManager extends ExtensibleLoadManagerImpl {

@Override
protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) {
final var mockBrokerRegistry = Mockito.mock(BrokerRegistryImpl.class);
try {
Mockito.doThrow(new PulsarServerException("Cannot start BrokerRegistry")).when(mockBrokerRegistry)
.start();
} catch (PulsarServerException e) {
throw new RuntimeException(e);
}
return mockBrokerRegistry;
}
}

private static class ChannelLoadManager extends ExtensibleLoadManagerImpl {

@Override
protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) {
final var channel = Mockito.mock(ServiceUnitStateChannelImpl.class);
try {
Mockito.doThrow(new PulsarServerException("Cannot start ServiceUnitStateChannel")).when(channel)
.start();
} catch (PulsarServerException e) {
throw new RuntimeException(e);
}
Mockito.doAnswer(__ -> null).when(channel).listen(Mockito.any());
return channel;
}
}
}
Loading