Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscribe initial position for consumer cli #6442

Merged
merged 1 commit into from
Mar 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
Expand Down Expand Up @@ -78,9 +79,12 @@ public class CmdConsume {
@Parameter(description = "TopicName", required = true)
private List<String> mainOptions = new ArrayList<String>();

@Parameter(names = { "-t", "--subscription-type" }, description = "Subscription type: Exclusive, Shared, Failover.")
@Parameter(names = { "-t", "--subscription-type" }, description = "Subscription type.")
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;

@Parameter(names = { "-p", "--subscription-position" }, description = "Subscription position.")
private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;

@Parameter(names = { "-s", "--subscription-name" }, required = true, description = "Subscription name.")
private String subscriptionName;

Expand All @@ -95,7 +99,7 @@ public class CmdConsume {
+ "value 0 means to consume messages as fast as possible.")
private double consumeRate = 0;

@Parameter(names = { "--regex" }, description = "Indicate thetopic name is a regex pattern")
@Parameter(names = { "--regex" }, description = "Indicate the topic name is a regex pattern")
private boolean isRegex = false;

private ClientBuilder clientBuilder;
Expand Down Expand Up @@ -182,7 +186,8 @@ private int consume(String topic) {
PulsarClient client = clientBuilder.build();
ConsumerBuilder<byte[]> builder = client.newConsumer()
.subscriptionName(this.subscriptionName)
.subscriptionType(subscriptionType);
.subscriptionType(subscriptionType)
.subscriptionInitialPosition(subscriptionInitialPosition);

if (isRegex) {
builder.topicsPattern(Pattern.compile(topic));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
Expand Down Expand Up @@ -86,9 +87,12 @@ static class Arguments {
@Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix")
public String subscriberName = "sub";

@Parameter(names = { "-st", "--subscription-type" }, description = "Subscriber name prefix")
@Parameter(names = { "-st", "--subscription-type" }, description = "Subscription type")
public SubscriptionType subscriptionType = SubscriptionType.Exclusive;

@Parameter(names = { "-sp", "--subscription-position" }, description = "Subscription position")
private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;

@Parameter(names = { "-r", "--rate" }, description = "Simulate a slow message consumer (rate in msg/s)")
public double rate = 0;

Expand Down Expand Up @@ -257,6 +261,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
.receiverQueueSize(arguments.receiverQueueSize) //
.acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) //
.subscriptionType(arguments.subscriptionType)
.subscriptionInitialPosition(arguments.subscriptionInitialPosition)
.replicateSubscriptionState(arguments.replicatedSubscription);

if (arguments.encKeyName != null) {
Expand Down
5 changes: 4 additions & 1 deletion site2/docs/reference-cli-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,10 @@ Options
|`--hex`|Display binary messages in hexadecimal format.|false|
|`-n`, `--num-messages`|Number of messages to consume, 0 means to consume forever.|1|
|`-r`, `--rate`|Rate (in messages per second) at which to consume; a value 0 means to consume messages as fast as possible|0.0|
|`--regex`|Indicate the topic name is a regex pattern|false|
|`-s`, `--subscription-name`|Subscription name||
|`-t`, `--subscription-type`|The type of the subscription. Possible values: Exclusive, Shared, Failover, Key_Shared.|Exclusive|
|`-p`, `--subscription-position`|The position of the subscription. Possible values: Latest, Earliest.|Latest|



Expand Down Expand Up @@ -426,7 +428,8 @@ Options
|`-u`, `--service-url`|Pulsar service URL||
|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0|
|`-s`, `--subscriber-name`|Subscriber name prefix|sub|
|`-st`, `--subscription-type`|Subscriber name prefix. Possible values are Exclusive, Shared, Failover.|Exclusive|
|`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive|
|`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest|
|`--trust-cert-file`|Path for the trusted TLS certificate file||


Expand Down