diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java index 49197c51242d5..914b3df67299b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java @@ -19,6 +19,9 @@ package org.apache.bookkeeper.mledger; import com.google.common.annotations.Beta; + +import java.util.function.Supplier; + import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback; @@ -77,10 +80,13 @@ ManagedLedger open(String name, ManagedLedgerConfig config) * managed ledger configuration * @param callback * callback object + * @param mlOwnershipChecker + * checks ml-ownership in case updating ml-metadata fails due to ownership conflict * @param ctx * opaque context */ - void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback, Object ctx); + void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback, + Supplier mlOwnershipChecker, Object ctx); /** * Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 05f6f5c762133..411279dfd6819 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1958,7 +1958,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { * @param properties * @param callback */ - private void persistPositionWhenClosing(PositionImpl position, Map properties, + void persistPositionWhenClosing(PositionImpl position, Map properties, final AsyncCallbacks.CloseCallback callback, final Object ctx) { if (shouldPersistUnackRangesToLedger()) { @@ -2053,6 +2053,30 @@ public void operationComplete(Void result, Stat stat) { @Override public void operationFailed(MetaStoreException e) { + if (e instanceof MetaStoreException.BadVersionException) { + log.warn("[{}] Failed to update cursor metadata for {} due to version conflict {}", + ledger.name, name, e.getMessage()); + // it means previous owner of the ml might have updated the version incorrectly. So, check + // the ownership and refresh the version again. + if (ledger.mlOwnershipChecker != null && ledger.mlOwnershipChecker.get()) { + ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, + new MetaStoreCallback() { + @Override + public void operationComplete(ManagedCursorInfo info, Stat stat) { + cursorLedgerStat = stat; + } + + @Override + public void operationFailed(MetaStoreException e) { + if (log.isDebugEnabled()) { + log.debug( + "[{}] Failed to refresh cursor metadata-version for {} due to {}", + ledger.name, name, e.getMessage()); + } + } + }); + } + } callback.operationFailed(e); } }); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index b88bc7f3444f1..6f037ceb155a3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -40,6 +40,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -280,7 +281,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { r.e = exception; latch.countDown(); } - }, null); + }, null, null); latch.await(); @@ -292,12 +293,12 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { @Override public void asyncOpen(String name, OpenLedgerCallback callback, Object ctx) { - asyncOpen(name, new ManagedLedgerConfig(), callback, ctx); + asyncOpen(name, new ManagedLedgerConfig(), callback, null, ctx); } @Override public void asyncOpen(final String name, final ManagedLedgerConfig config, final OpenLedgerCallback callback, - final Object ctx) { + Supplier mlOwnershipChecker, final Object ctx) { // If the ledger state is bad, remove it from the map. CompletableFuture existingFuture = ledgers.get(name); @@ -325,7 +326,7 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(), config.getBookKeeperEnsemblePlacementPolicyProperties())), store, config, scheduledExecutor, - orderedExecutor, name); + orderedExecutor, name, mlOwnershipChecker); newledger.initialize(new ManagedLedgerInitializeLedgerCallback() { @Override public void initializeComplete() { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a1d41b6c6a000..38ed3ea12995b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; @@ -188,6 +189,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private static final Random random = new Random(System.currentTimeMillis()); private long maximumRolloverTimeMs; + protected final Supplier mlOwnershipChecker; volatile PositionImpl lastConfirmedEntry; @@ -252,6 +254,11 @@ public enum PositionBound { public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, OrderedExecutor orderedExecutor, final String name) { + this(factory, bookKeeper, store, config, scheduledExecutor, orderedExecutor, name, null); + } + public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, + ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, OrderedExecutor orderedExecutor, + final String name, final Supplier mlOwnershipChecker) { this.factory = factory; this.bookKeeper = bookKeeper; this.config = config; @@ -275,6 +282,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper // Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0)); + this.mlOwnershipChecker = mlOwnershipChecker; } synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index a31416f8dbd03..d38196317857c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -96,11 +96,13 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; @@ -116,6 +118,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class ManagedLedgerTest extends MockedBookKeeperTestCase { @@ -124,6 +127,11 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { private static final Charset Encoding = Charsets.UTF_8; + @DataProvider(name = "checkOwnershipFlag") + public Object[][] checkOwnershipFlagProvider() { + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; + } + @Test public void managedLedgerApi() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger"); @@ -355,7 +363,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { fail(exception.getMessage()); } - }, null); + }, null, null); counter.await(); @@ -1980,7 +1988,7 @@ public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Excepti } // (3) Validate: cache should remove all entries read by both active cursors - log.info("expected, found : {}, {}", (5 * (totalInsertedEntries)), entryCache.getSize()); + log.info("expected, found : {}, {}", (5 * (totalInsertedEntries)), entryCache.getSize()); assertEquals((5 * totalInsertedEntries), entryCache.getSize()); final int remainingEntries = totalInsertedEntries - readEntries; @@ -2528,6 +2536,100 @@ public void avoidUseSameOpAddEntryBetweenDifferentLedger() throws Exception { } } + /** + * It verifies that managed-cursor can recover metadata-version if it fails to update due to version conflict. This + * test verifies that version recovery happens if checkOwnership supplier is passed while creating managed-ledger. + * + * @param checkOwnershipFlag + * @throws Exception + */ + @Test(dataProvider = "checkOwnershipFlag") + public void recoverMLWithBadVersion(boolean checkOwnershipFlag) throws Exception { + + ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig(); + ManagedLedgerFactoryImpl factory1 = new ManagedLedgerFactoryImpl(bkc, zkc, conf); + ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(bkc, zkc, conf); + + final MutableObject ledger1 = new MutableObject<>(), ledger2 = new MutableObject<>(); + final MutableObject cursor1 = new MutableObject<>(), cursor2 = new MutableObject<>(); + + createLedger(factory1, ledger1, cursor1, checkOwnershipFlag); + ledger1.getValue().addEntry("test1".getBytes(Encoding)); + ledger1.getValue().addEntry("test2".getBytes(Encoding)); + Entry entry = cursor1.getValue().readEntries(1).get(0); + cursor1.getValue().delete(entry.getPosition()); + + createLedger(factory2, ledger2, cursor2, checkOwnershipFlag); + entry = cursor2.getValue().readEntries(1).get(0); + + // 1. closing cursor will change the zk-version + cursor1.getValue().close(); + + // 2. try to creatCursorLedger which should fail first time because of BadVersionException + // However, if checkOwnershipFlag is eanbled the managed-cursor will reover from that exception. + boolean isFailed = updateCusorMetadataByCreatingMetadataLedger(cursor2); + Assert.assertTrue(isFailed); + + isFailed = updateCusorMetadataByCreatingMetadataLedger(cursor2); + if (checkOwnershipFlag) { + Assert.assertFalse(isFailed); + } else { + Assert.assertTrue(isFailed); + } + + log.info("Test completed"); + } + + private boolean updateCusorMetadataByCreatingMetadataLedger(MutableObject cursor2) + throws InterruptedException { + MutableObject failed = new MutableObject<>(); + failed.setValue(false); + CountDownLatch createLedgerDoneLatch = new CountDownLatch(1); + cursor2.getValue().createNewMetadataLedger(new VoidCallback() { + + @Override + public void operationComplete() { + createLedgerDoneLatch.countDown(); + } + + @Override + public void operationFailed(ManagedLedgerException exception) { + failed.setValue(true); + createLedgerDoneLatch.countDown(); + } + + }); + createLedgerDoneLatch.await(); + return failed.getValue(); + } + + private void createLedger(ManagedLedgerFactoryImpl factory, MutableObject ledger1, + MutableObject cursor1, boolean checkOwnershipFlag) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + factory.asyncOpen("my_test_ledger", new ManagedLedgerConfig(), new OpenLedgerCallback() { + @Override + public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + ledger1.setValue(ledger); + ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + cursor1.setValue((ManagedCursorImpl) cursor); + latch.countDown(); + } + + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + } + }, null); + } + + @Override + public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { + } + }, checkOwnershipFlag ? () -> true : null, null); + latch.await(); + } + private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception { Field field = clazz.getDeclaredField(fieldName); field.setAccessible(true); @@ -2543,4 +2645,4 @@ public static void retryStrategically(Predicate predicate, int retryCount, Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i)); } } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 498f1ea504b5d..a050b47d350b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -916,7 +916,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { topicFuture.completeExceptionally(new PersistenceException(exception)); } } - }, null); + }, () -> isTopicNsOwnedByBroker(topicName), null); }).exceptionally((exception) -> { log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception); @@ -1217,6 +1217,15 @@ public void monitorBacklogQuota() { }); } + public boolean isTopicNsOwnedByBroker(TopicName topicName) throws RuntimeException { + try { + return pulsar.getNamespaceService().isServiceUnitOwned(topicName); + } catch (Exception e) { + log.warn("Failed to check the ownership of the topic: {}, {}", topicName, e.getMessage()); + } + return false; + } + public void checkTopicNsOwnership(final String topic) throws RuntimeException { TopicName topicName = TopicName.get(topic); boolean ownedByThisInstance; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 62c8aff9cfa28..2b4976bc0713b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -42,6 +42,7 @@ import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.function.Supplier; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -208,7 +209,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any()); + any(OpenLedgerCallback.class), any(Supplier.class), any()); // call openLedgerFailed on ML factory asyncOpen doAnswer(new Answer() { @@ -219,7 +220,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any()); + any(OpenLedgerCallback.class), any(Supplier.class), any()); // call addComplete on ledger asyncAddEntry doAnswer(new Answer() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index bc92ee492c230..d717a5a9e7b7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.eq; @@ -59,6 +60,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -223,7 +225,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(mlFactoryMock).asyncOpen(anyString(), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), - any()); + any(Supplier.class), any()); CompletableFuture future = brokerService.getOrCreateTopic(topicName).thenAccept(topic -> { assertTrue(topic.toString().contains(topicName)); @@ -254,7 +256,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(mlFactoryMock).asyncOpen(anyString(), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), - any()); + any(Supplier.class), any()); CompletableFuture future = brokerService.getOrCreateTopic(jinxedTopicName); @@ -1095,7 +1097,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any()); + any(OpenLedgerCallback.class), any(Supplier.class), any()); // call openLedgerFailed on ML factory asyncOpen doAnswer(new Answer() { @@ -1106,7 +1108,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any()); + any(OpenLedgerCallback.class), any(Supplier.class), any()); // call addComplete on ledger asyncAddEntry doAnswer(new Answer() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index f01f771149a1c..ba4cf5bee2815 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.any; import static org.mockito.Mockito.matches; import static org.mockito.Mockito.doAnswer; @@ -50,6 +51,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -767,7 +769,7 @@ public void testCreateProducerTimeout() throws Exception { }); return null; }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any()); + any(OpenLedgerCallback.class), any(Supplier.class), any()); // In a create producer timeout from client side we expect to see this sequence of commands : // 1. create producer @@ -824,7 +826,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any()); + any(OpenLedgerCallback.class), any(Supplier.class), any()); // In a create producer timeout from client side we expect to see this sequence of commands : // 1. create producer @@ -902,7 +904,7 @@ public void testCreateProducerBookieTimeout() throws Exception { }); return null; }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any()); + any(OpenLedgerCallback.class), any(Supplier.class), any()); // In a create producer timeout from client side we expect to see this sequence of commands : // 1. create a failure producer which will timeout creation after 100msec @@ -973,7 +975,7 @@ public void testSubscribeTimeout() throws Exception { return null; }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any()); + any(OpenLedgerCallback.class), any(Supplier.class), any()); // In a subscribe timeout from client side we expect to see this sequence of commands : // 1. Subscribe @@ -1046,7 +1048,7 @@ public void testSubscribeBookieTimeout() throws Exception { }); return null; }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any()); + any(OpenLedgerCallback.class), any(Supplier.class), any()); CompletableFuture openTopicFail = new CompletableFuture<>(); doAnswer(invocationOnMock -> { @@ -1056,7 +1058,7 @@ public void testSubscribeBookieTimeout() throws Exception { }); return null; }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any()); + any(OpenLedgerCallback.class), any(Supplier.class), any()); // In a subscribe timeout from client side we expect to see this sequence of commands : // 1. Subscribe against failtopic which will fail after 100msec @@ -1455,7 +1457,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any()); + any(OpenLedgerCallback.class), any(Supplier.class), any()); // call openLedgerFailed on ML factory asyncOpen doAnswer(new Answer() { @@ -1470,7 +1472,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), - any(OpenLedgerCallback.class), any()); + any(OpenLedgerCallback.class), any(Supplier.class), any()); // call addComplete on ledger asyncAddEntry doAnswer(new Answer() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java index 75781745e6e60..4bb508c925b4c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.bookkeeper.client.PulsarMockBookKeeper; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; @@ -246,7 +247,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(mlFactoryMock) - .asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), any()); + .asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), + any(Supplier.class), any()); // call openLedgerFailed on ML factory asyncOpen doAnswer(new Answer() { @@ -257,7 +259,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { return null; } }).when(mlFactoryMock) - .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), any()); + .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), + any(Supplier.class), any()); // call addComplete on ledger asyncAddEntry doAnswer(new Answer() { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java index 00bdf0320fadb..7cb36330a5a8a 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java @@ -197,7 +197,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { future.completeExceptionally(exception); } - }, null); + }, null, null); } List managedLedgers = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());