Skip to content

Commit

Permalink
client: make SubscriptionMode a member of ConsumerConfigurationData (a…
Browse files Browse the repository at this point in the history
…pache#6337)

Currently, SubscriptionMode is a parameter to create ConsumerImpl, but it is not exported out, and user could not set this value for consumer.  This change tries to make SubscriptionMode a member of ConsumerConfigurationData, so user could set this parameter when create consumer.  

(cherry picked from commit 208af7c)
  • Loading branch information
jiazhai authored and tuteng committed Mar 21, 2020
1 parent aaf87f6 commit 7112e21
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
TopicName.getPartitionIndex(conf.getSingleTopic()),
false,
consumerFuture,
SubscriptionMode.Durable,
MessageId.earliest,
0 /* startMessageRollbackDurationInSec */,
Schema.BYTES, null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType);

/**
* Select the subscription mode to be used when subscribing to the topic.
*
* <p>Options are:
* <ul>
* <li>{@link SubscriptionMode#Durable} (Default)</li>
* <li>{@link SubscriptionMode#NonDurable}</li>
* </ul>
*
* @param subscriptionMode
* the subscription mode value
* @return the consumer builder instance
*/
ConsumerBuilder<T> subscriptionMode(SubscriptionMode subscriptionMode);

/**
* Sets a {@link MessageListener} for the consumer
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

/**
* Types of subscription mode supported by Pulsar.
*/
public enum SubscriptionMode {
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
// position
Durable,

// Lightweight subscription mode that doesn't have a durable cursor associated
NonDurable
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
Expand Down Expand Up @@ -191,6 +192,13 @@ public ConsumerBuilder<T> subscriptionType(@NonNull SubscriptionType subscriptio
return this;
}

@Override
public ConsumerBuilder<T> subscriptionMode(@NonNull SubscriptionMode subscriptionMode) {
conf.setSubscriptionMode(subscriptionMode);
return this;
}


@Override
public ConsumerBuilder<T> messageListener(@NonNull MessageListener<T> messageListener) {
conf.setMessageListener(messageListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
Expand Down Expand Up @@ -150,39 +151,38 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final boolean createTopicIfDoesNotExist;

enum SubscriptionMode {
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
// position
Durable,

// Lightweight subscription mode that doesn't have a durable cursor associated
NonDurable
}

static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor,
int partitionIndex,
boolean hasParentConsumer,
CompletableFuture<Consumer<T>> subscribeFuture,
MessageId startMessageId,
Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
if (conf.getReceiverQueueSize() == 0) {
return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer,
subscribeFuture,
subscriptionMode, startMessageId, schema, interceptors,
startMessageId, schema, interceptors,
createTopicIfDoesNotExist);
} else {
return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer,
subscribeFuture, subscriptionMode, startMessageId, 0 /* rollback time in sec to start msgId */,
subscribeFuture, startMessageId, 0 /* rollback time in sec to start msgId */,
schema, interceptors, createTopicIfDoesNotExist);
}
}

protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer,
CompletableFuture<Consumer<T>> subscribeFuture, SubscriptionMode subscriptionMode, MessageId startMessageId,
CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema, ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
this.subscriptionMode = conf.getSubscriptionMode();
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
this.lastDequeuedMessage = startMessageId == null ? MessageId.earliest : startMessageId;
this.initialStartMessageId = this.startMessageId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
Expand Down Expand Up @@ -833,10 +832,10 @@ private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult,
String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture,
SubscriptionMode.Durable, null, schema, interceptors,
createIfDoesNotExist);
configurationData, client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture,
null, schema, interceptors,
createIfDoesNotExist);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
return subFuture;
})
Expand All @@ -847,7 +846,7 @@ private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult,

CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider().getExecutor(), -1, true, subFuture, SubscriptionMode.Durable, null,
client.externalExecutorProvider().getExecutor(), -1, true, subFuture, null,
schema, interceptors,
createIfDoesNotExist);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
Expand Down Expand Up @@ -1118,7 +1117,7 @@ private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicNa
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
partitionIndex, true, subFuture, null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.transaction.TransactionBuilder;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
Expand Down Expand Up @@ -354,7 +353,7 @@ private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerC
} else {
int partitionIndex = TopicName.getPartitionIndex(topic);
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false,
consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
consumerSubscribedFuture,null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -47,6 +46,7 @@ public ReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConf
consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
consumerConfiguration.setSubscriptionName(subscription);
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());

Expand Down Expand Up @@ -83,7 +83,7 @@ public void reachedEndOfTopic(Consumer<T> consumer) {

final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration,
listenerExecutor, partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable,
listenerExecutor, partitionIdx, false, consumerFuture,
readerConfiguration.getStartMessageId(), readerConfiguration.getStartMessageFromRollbackDurationInSec(),
schema, null, true /* createTopicIfDoesNotExist */);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {

public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
MessageId startMessageId, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
subscriptionMode, startMessageId, 0 /* startMessageRollbackDurationInSec */, schema, interceptors,
startMessageId, 0 /* startMessageRollbackDurationInSec */, schema, interceptors,
createTopicIfDoesNotExist);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;

@Data
Expand All @@ -59,6 +60,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {

private SubscriptionType subscriptionType = SubscriptionType.Exclusive;

private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;

@JsonIgnore
private MessageListener<T> messageListener;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,32 @@
*/
package org.apache.pulsar.client.impl;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.netty.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.*;

public class ConsumerImplTest {


Expand All @@ -62,7 +69,7 @@ public void setUp() {

consumerConf.setSubscriptionName("test-sub");
consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null,
executorService, -1, false, subscribeFuture, null, null, null,
true);
}

Expand Down

0 comments on commit 7112e21

Please sign in to comment.