Skip to content

Commit

Permalink
[pulsar-broker] recover zk-badversion while updating cursor metadata (a…
Browse files Browse the repository at this point in the history
…pache#5604)

fix test
  • Loading branch information
rdhabalia authored and huangdx0726 committed Aug 24, 2020
1 parent f59d8b3 commit 25c3a4c
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.bookkeeper.mledger;

import com.google.common.annotations.Beta;

import java.util.function.Supplier;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
Expand Down Expand Up @@ -77,10 +80,13 @@ ManagedLedger open(String name, ManagedLedgerConfig config)
* managed ledger configuration
* @param callback
* callback object
* @param mlOwnershipChecker
* checks ml-ownership in case updating ml-metadata fails due to ownership conflict
* @param ctx
* opaque context
*/
void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback, Object ctx);
void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback,
Supplier<Boolean> mlOwnershipChecker, Object ctx);

/**
* Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1958,7 +1958,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
* @param properties
* @param callback
*/
private void persistPositionWhenClosing(PositionImpl position, Map<String, Long> properties,
void persistPositionWhenClosing(PositionImpl position, Map<String, Long> properties,
final AsyncCallbacks.CloseCallback callback, final Object ctx) {

if (shouldPersistUnackRangesToLedger()) {
Expand Down Expand Up @@ -2053,6 +2053,30 @@ public void operationComplete(Void result, Stat stat) {

@Override
public void operationFailed(MetaStoreException e) {
if (e instanceof MetaStoreException.BadVersionException) {
log.warn("[{}] Failed to update cursor metadata for {} due to version conflict {}",
ledger.name, name, e.getMessage());
// it means previous owner of the ml might have updated the version incorrectly. So, check
// the ownership and refresh the version again.
if (ledger.mlOwnershipChecker != null && ledger.mlOwnershipChecker.get()) {
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
new MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
cursorLedgerStat = stat;
}

@Override
public void operationFailed(MetaStoreException e) {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Failed to refresh cursor metadata-version for {} due to {}",
ledger.name, name, e.getMessage());
}
}
});
}
}
callback.operationFailed(e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -280,7 +281,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
r.e = exception;
latch.countDown();
}
}, null);
}, null, null);

latch.await();

Expand All @@ -292,12 +293,12 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {

@Override
public void asyncOpen(String name, OpenLedgerCallback callback, Object ctx) {
asyncOpen(name, new ManagedLedgerConfig(), callback, ctx);
asyncOpen(name, new ManagedLedgerConfig(), callback, null, ctx);
}

@Override
public void asyncOpen(final String name, final ManagedLedgerConfig config, final OpenLedgerCallback callback,
final Object ctx) {
Supplier<Boolean> mlOwnershipChecker, final Object ctx) {

// If the ledger state is bad, remove it from the map.
CompletableFuture<ManagedLedgerImpl> existingFuture = ledgers.get(name);
Expand Down Expand Up @@ -325,7 +326,7 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor,
orderedExecutor, name);
orderedExecutor, name, mlOwnershipChecker);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@Override
public void initializeComplete() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
Expand Down Expand Up @@ -188,6 +189,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

private static final Random random = new Random(System.currentTimeMillis());
private long maximumRolloverTimeMs;
protected final Supplier<Boolean> mlOwnershipChecker;

volatile PositionImpl lastConfirmedEntry;

Expand Down Expand Up @@ -252,6 +254,11 @@ public enum PositionBound {
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, OrderedExecutor orderedExecutor,
final String name) {
this(factory, bookKeeper, store, config, scheduledExecutor, orderedExecutor, name, null);
}
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, OrderedExecutor orderedExecutor,
final String name, final Supplier<Boolean> mlOwnershipChecker) {
this.factory = factory;
this.bookKeeper = bookKeeper;
this.config = config;
Expand All @@ -275,6 +282,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper

// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
this.mlOwnershipChecker = mlOwnershipChecker;
}

synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
Expand All @@ -116,6 +118,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ManagedLedgerTest extends MockedBookKeeperTestCase {
Expand All @@ -124,6 +127,11 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {

private static final Charset Encoding = Charsets.UTF_8;

@DataProvider(name = "checkOwnershipFlag")
public Object[][] checkOwnershipFlagProvider() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

@Test
public void managedLedgerApi() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
Expand Down Expand Up @@ -355,7 +363,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}
}, null);
}, null, null);

counter.await();

Expand Down Expand Up @@ -1980,7 +1988,7 @@ public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Excepti
}

// (3) Validate: cache should remove all entries read by both active cursors
log.info("expected, found : {}, {}", (5 * (totalInsertedEntries)), entryCache.getSize());
log.info("expected, found : {}, {}", (5 * (totalInsertedEntries)), entryCache.getSize());
assertEquals((5 * totalInsertedEntries), entryCache.getSize());

final int remainingEntries = totalInsertedEntries - readEntries;
Expand Down Expand Up @@ -2528,6 +2536,100 @@ public void avoidUseSameOpAddEntryBetweenDifferentLedger() throws Exception {
}
}

/**
* It verifies that managed-cursor can recover metadata-version if it fails to update due to version conflict. This
* test verifies that version recovery happens if checkOwnership supplier is passed while creating managed-ledger.
*
* @param checkOwnershipFlag
* @throws Exception
*/
@Test(dataProvider = "checkOwnershipFlag")
public void recoverMLWithBadVersion(boolean checkOwnershipFlag) throws Exception {

ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
ManagedLedgerFactoryImpl factory1 = new ManagedLedgerFactoryImpl(bkc, zkc, conf);
ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(bkc, zkc, conf);

final MutableObject<ManagedLedger> ledger1 = new MutableObject<>(), ledger2 = new MutableObject<>();
final MutableObject<ManagedCursorImpl> cursor1 = new MutableObject<>(), cursor2 = new MutableObject<>();

createLedger(factory1, ledger1, cursor1, checkOwnershipFlag);
ledger1.getValue().addEntry("test1".getBytes(Encoding));
ledger1.getValue().addEntry("test2".getBytes(Encoding));
Entry entry = cursor1.getValue().readEntries(1).get(0);
cursor1.getValue().delete(entry.getPosition());

createLedger(factory2, ledger2, cursor2, checkOwnershipFlag);
entry = cursor2.getValue().readEntries(1).get(0);

// 1. closing cursor will change the zk-version
cursor1.getValue().close();

// 2. try to creatCursorLedger which should fail first time because of BadVersionException
// However, if checkOwnershipFlag is eanbled the managed-cursor will reover from that exception.
boolean isFailed = updateCusorMetadataByCreatingMetadataLedger(cursor2);
Assert.assertTrue(isFailed);

isFailed = updateCusorMetadataByCreatingMetadataLedger(cursor2);
if (checkOwnershipFlag) {
Assert.assertFalse(isFailed);
} else {
Assert.assertTrue(isFailed);
}

log.info("Test completed");
}

private boolean updateCusorMetadataByCreatingMetadataLedger(MutableObject<ManagedCursorImpl> cursor2)
throws InterruptedException {
MutableObject<Boolean> failed = new MutableObject<>();
failed.setValue(false);
CountDownLatch createLedgerDoneLatch = new CountDownLatch(1);
cursor2.getValue().createNewMetadataLedger(new VoidCallback() {

@Override
public void operationComplete() {
createLedgerDoneLatch.countDown();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
failed.setValue(true);
createLedgerDoneLatch.countDown();
}

});
createLedgerDoneLatch.await();
return failed.getValue();
}

private void createLedger(ManagedLedgerFactoryImpl factory, MutableObject<ManagedLedger> ledger1,
MutableObject<ManagedCursorImpl> cursor1, boolean checkOwnershipFlag) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
factory.asyncOpen("my_test_ledger", new ManagedLedgerConfig(), new OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
ledger1.setValue(ledger);
ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
cursor1.setValue((ManagedCursorImpl) cursor);
latch.countDown();
}

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
}
}, null);
}

@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
}
}, checkOwnershipFlag ? () -> true : null, null);
latch.await();
}

private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
Expand All @@ -2543,4 +2645,4 @@ public static void retryStrategically(Predicate<Void> predicate, int retryCount,
Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
topicFuture.completeExceptionally(new PersistenceException(exception));
}
}
}, null);
}, () -> isTopicNsOwnedByBroker(topicName), null);

}).exceptionally((exception) -> {
log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception);
Expand Down Expand Up @@ -1217,6 +1217,15 @@ public void monitorBacklogQuota() {
});
}

public boolean isTopicNsOwnedByBroker(TopicName topicName) throws RuntimeException {
try {
return pulsar.getNamespaceService().isServiceUnitOwned(topicName);
} catch (Exception e) {
log.warn("Failed to check the ownership of the topic: {}, {}", topicName, e.getMessage());
}
return false;
}

public void checkTopicNsOwnership(final String topic) throws RuntimeException {
TopicName topicName = TopicName.get(topic);
boolean ownedByThisInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.function.Supplier;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -208,7 +209,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any());
any(OpenLedgerCallback.class), any(Supplier.class), any());

// call openLedgerFailed on ML factory asyncOpen
doAnswer(new Answer<Object>() {
Expand All @@ -219,7 +220,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any());
any(OpenLedgerCallback.class), any(Supplier.class), any());

// call addComplete on ledger asyncAddEntry
doAnswer(new Answer<Object>() {
Expand Down
Loading

0 comments on commit 25c3a4c

Please sign in to comment.