Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP 68: Exclusive Producer #8685

Merged
merged 6 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,30 @@
*/
package org.apache.pulsar.broker.service;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.google.common.base.MoreObjects;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
Expand Down Expand Up @@ -101,6 +109,13 @@ public abstract class AbstractTopic implements Topic {
protected CompletableFuture<TransactionBuffer> transactionBuffer;
protected ReentrantLock transactionBufferLock = new ReentrantLock();

protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;

private static final AtomicLongFieldUpdater<AbstractTopic> USAGE_COUNT_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "usageCount");
private volatile long usageCount = 0;

public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
Expand Down Expand Up @@ -316,6 +331,106 @@ public CompletableFuture<Void> checkSchemaCompatibleForConsumer(SchemaData schem
.checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy);
}

@Override
public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
checkArgument(producer.getTopic() == this);

CompletableFuture<Optional<Long>> future = new CompletableFuture<>();

incrementTopicEpochIfNeeded(producer)
.thenAccept(epoch -> {
lock.readLock().lock();
try {
brokerService.checkTopicNsOwnership(getName());
checkTopicFenced();
if (isTerminated()) {
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}
internalAddProducer(producer);
merlimat marked this conversation as resolved.
Show resolved Hide resolved

USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}

future.complete(epoch);
} catch (Throwable e) {
future.completeExceptionally(e);
} finally {
lock.readLock().unlock();
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});

return future;
}

protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) {
lock.writeLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid locking for normal producer usecase?

if (producers.isEmpty() && producer.getAccessMode() == Shared) {
return CompletableFuture.completedFuture(topicEpoch);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to @rdhabalia suggestion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that we can avoid the locking, and I'm not sure that it's something to worry about in the context of adding a producer.
Actually, in the specific case there's still a race condition between updating the producers map, since the update to hasExclusiveProducer and the insertion into producers map should be atomic as well.

try {
switch (producer.getAccessMode()) {
case Shared:
if (hasExclusiveProducer) {
return FutureUtil.failedFuture(new ProducerBusyException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
} else {
// Normal producer getting added, we don't need a new epoch
return CompletableFuture.completedFuture(topicEpoch);
}

case Exclusive:
if (hasExclusiveProducer) {
return FutureUtil.failedFuture(new ProducerFencedException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
} else if (!producers.isEmpty()) {
return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing shared producers"));
} else if (producer.getTopicEpoch().isPresent()
&& producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
// If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
// to be fenced, because a new producer had been present in between.
return FutureUtil.failedFuture(new ProducerFencedException(
String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
topicEpoch.get(), producer.getTopicEpoch().get())));
} else {
// There are currently no existing producers
hasExclusiveProducer = true;

CompletableFuture<Long> future;
if (producer.getTopicEpoch().isPresent()) {
future = setTopicEpoch(producer.getTopicEpoch().get());
} else {
future = incrementTopicEpoch(topicEpoch);
}
return future.thenApply(epoch -> {
topicEpoch = Optional.of(epoch);
return topicEpoch;
}).exceptionally(ex -> {
hasExclusiveProducer = false;
return null;
});
}

// case WaitForExclusive:
// TODO: Implementation

default:
return FutureUtil.failedFuture(
new BrokerServiceException("Invalid producer access mode: " + producer.getAccessMode()));
}

} finally {
lock.writeLock().unlock();
}
}

protected abstract CompletableFuture<Long> setTopicEpoch(long newEpoch);

protected abstract CompletableFuture<Long> incrementTopicEpoch(Optional<Long> currentEpoch);

@Override
public void recordAddLatency(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
Expand Down Expand Up @@ -450,7 +565,44 @@ private boolean isUserProvidedProducerName(Producer producer){
return producer.isUserProvidedProducerName() && !producer.getProducerName().startsWith(replicatorPrefix);
}

protected abstract void handleProducerRemoved(Producer producer);

@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);

if (producers.remove(producer.getProducerName(), producer)) {
handleProducerRemoved(producer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we need lock here else it may create a race condition and producer with WaitForExclusive may wait forever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No lock needed there so far. Other changes are needed for WaitForExclusive, it's not implemented in this PR

}
}

protected void handleProducerRemoved(Producer producer) {
// decrement usage only if this was a valid producer close
long newCount = USAGE_COUNT_UPDATER.decrementAndGet(this);
if (newCount == 0) {
hasExclusiveProducer = false;
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removed producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
lastActive = System.nanoTime();
}

public void handleConsumerAdded(String subscriptionName, String consumerName) {
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Added consumer -- count: {}", topic, subscriptionName,
consumerName, USAGE_COUNT_UPDATER.get(this));
}
}

public void decrementUsageCount() {
USAGE_COUNT_UPDATER.decrementAndGet(this);
}

public long currentUsageCount() {
return usageCount;
}

@Override
public boolean isPublishRateExceeded() {
Expand Down Expand Up @@ -536,6 +688,8 @@ public void setDeleteWhileInactive(boolean deleteWhileInactive) {
this.inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
}

protected abstract boolean isTerminated();

private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);

public InactiveTopicPolicies getInactiveTopicPolicies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ public ProducerBusyException(String msg) {
}
}

public static class ProducerFencedException extends BrokerServiceException {
public ProducerFencedException(String msg) {
super(msg);
}
}

public static class ServiceUnitNotReadyException extends BrokerServiceException {
public ServiceUnitNotReadyException(String msg) {
super(msg);
Expand Down Expand Up @@ -217,6 +223,8 @@ private static PulsarApi.ServerError getClientErrorCode(Throwable t, boolean che
return ServerError.InvalidTxnStatus;
} else if (t instanceof NotAllowedException) {
return ServerError.NotAllowedError;
} else if (t instanceof ProducerFencedException) {
return ServerError.ProducerFenced;
} else if (t instanceof TransactionConflictException) {
return ServerError.TransactionConflict;
} else if (t instanceof CoordinatorException.TransactionNotFoundException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand All @@ -42,6 +43,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
Expand Down Expand Up @@ -83,13 +85,18 @@ public class Producer {
private final boolean isNonPersistentTopic;
private final boolean isEncrypted;

private final ProducerAccessMode accessMode;
private Optional<Long> topicEpoch;

private final Map<String, String> metadata;

private final SchemaVersion schemaVersion;

public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId,
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
boolean userProvidedProducerName) {
boolean userProvidedProducerName,
ProducerAccessMode accessMode,
Optional<Long> topicEpoch) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
Expand Down Expand Up @@ -117,13 +124,16 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
stats.setProducerName(producerName);
stats.producerId = producerId;
stats.metadata = this.metadata;
stats.accessMode = Commands.convertProducerAccessMode(accessMode);

this.isRemote = producerName
.startsWith(cnx.getBrokerService().pulsar().getConfiguration().getReplicatorPrefix());
this.remoteCluster = isRemote ? producerName.split("\\.")[2].split(REPL_PRODUCER_NAME_DELIMITER)[0] : null;

this.isEncrypted = isEncrypted;
this.schemaVersion = schemaVersion;
this.accessMode = accessMode;
this.topicEpoch = topicEpoch;
}

@Override
Expand Down Expand Up @@ -635,6 +645,14 @@ public SchemaVersion getSchemaVersion() {
return schemaVersion;
}

public ProducerAccessMode getAccessMode() {
return accessMode;
}

public Optional<Long> getTopicEpoch() {
return topicEpoch;
}

private static final Logger log = LoggerFactory.getLogger(Producer.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.pulsar.broker.service;

import io.netty.util.concurrent.Future;

import java.util.List;
import java.util.Optional;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
Expand All @@ -38,7 +41,7 @@ public interface PulsarCommandSender {
void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion);

void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion);
SchemaVersion schemaVersion, Optional<Long> topicEpoch);

void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, long ledgerId,
long entryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import java.util.List;
import java.util.Optional;

import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
Expand Down Expand Up @@ -97,9 +101,9 @@ public void sendProducerSuccessResponse(long requestId, String producerName, Sch

@Override
public void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion) {
SchemaVersion schemaVersion, Optional<Long> topicEpoch) {
PulsarApi.BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, lastSequenceId,
schemaVersion);
schemaVersion, topicEpoch);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
command.getProducerSuccess().recycle();
Expand Down
Loading