Skip to content

Commit

Permalink
Fix for Infinite loop in PersistentReplicator.startProducer()
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai ASher committed Mar 4, 2017
1 parent 072fe7e commit 0169ded
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -617,15 +617,20 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
return disconnectFuture;
}

if (STATE_UPDATER.get(this) == State.Stopping) {
// Do nothing since the all "STATE_UPDATER.set(this, Stopping)" instructions are followed by closeProducerAsync()
// which will at some point change the state to stopped
return CompletableFuture.completedFuture(null);
}

if (producer != null && (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping)
|| STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping))) {
log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", topicName, localCluster,
remoteCluster, cursor.getMarkDeletedPosition(), cursor.getNumberOfEntriesInBacklog());
return closeProducerAsync();
} else {
// If there's already a reconnection happening, signal to close it whenever it's ready
STATE_UPDATER.set(this, State.Stopped);
}

STATE_UPDATER.set(this, State.Stopped);
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URL;
Expand Down Expand Up @@ -67,8 +68,11 @@
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -83,7 +87,9 @@
import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.client.api.ProducerConfiguration;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.impl.PulsarClientImpl;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
Expand Down Expand Up @@ -863,7 +869,7 @@ public void testAtomicReplicationRemoval() throws Exception {
PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
String remoteReplicatorName = topic.replicatorPrefix + "." + remoteCluster;
ConcurrentOpenHashMap<String, PersistentReplicator> replicatorMap = topic.getReplicators();
;

final URL brokerUrl = new URL(
"http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort());
PulsarClient client = PulsarClient.create(brokerUrl.toString());
Expand Down Expand Up @@ -895,4 +901,42 @@ public void testAtomicReplicationRemoval() throws Exception {
DeleteCursorCallback callback = captor.getValue();
callback.deleteCursorComplete(null);
}

@Test
public void testClosingReplicationProducerTwice() throws Exception {
final String globalTopicName = "persistent://prop/global/ns/testClosingReplicationProducerTwice";
String localCluster = "local";
String remoteCluster = "remote";
final ManagedLedger ledgerMock = mock(ManagedLedger.class);
doNothing().when(ledgerMock).asyncDeleteCursor(anyObject(), anyObject(), anyObject());
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();

PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
String remoteReplicatorName = topic.replicatorPrefix + "." + localCluster;

final URL brokerUrl = new URL(
"http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort());
PulsarClient client = spy( PulsarClient.create(brokerUrl.toString()) );
PulsarClientImpl clientImpl = (PulsarClientImpl) client;
Field conf = PersistentReplicator.class.getDeclaredField("producerConfiguration");
conf.setAccessible(true);

ManagedCursor cursor = mock(ManagedCursorImpl.class);
doReturn(remoteCluster).when(cursor).getName();
brokerService.getReplicationClients().put(remoteCluster, client);
PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService);

doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName);

replicator.startProducer();
verify(clientImpl).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName);

replicator.disconnect(false);
replicator.disconnect(false);

replicator.startProducer();

verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName);
}

}

0 comments on commit 0169ded

Please sign in to comment.