diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java index 895f59516dc30..673a45e132985 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java @@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; @@ -78,9 +79,12 @@ public class CmdConsume { @Parameter(description = "TopicName", required = true) private List mainOptions = new ArrayList(); - @Parameter(names = { "-t", "--subscription-type" }, description = "Subscription type: Exclusive, Shared, Failover.") + @Parameter(names = { "-t", "--subscription-type" }, description = "Subscription type.") private SubscriptionType subscriptionType = SubscriptionType.Exclusive; + @Parameter(names = { "-p", "--subscription-position" }, description = "Subscription position.") + private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest; + @Parameter(names = { "-s", "--subscription-name" }, required = true, description = "Subscription name.") private String subscriptionName; @@ -95,7 +99,7 @@ public class CmdConsume { + "value 0 means to consume messages as fast as possible.") private double consumeRate = 0; - @Parameter(names = { "--regex" }, description = "Indicate thetopic name is a regex pattern") + @Parameter(names = { "--regex" }, description = "Indicate the topic name is a regex pattern") private boolean isRegex = false; private ClientBuilder clientBuilder; @@ -182,7 +186,8 @@ private int consume(String topic) { PulsarClient client = clientBuilder.build(); ConsumerBuilder builder = client.newConsumer() .subscriptionName(this.subscriptionName) - .subscriptionType(subscriptionType); + .subscriptionType(subscriptionType) + .subscriptionInitialPosition(subscriptionInitialPosition); if (isRegex) { builder.topicsPattern(Pattern.compile(topic)); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 5c569fd0bd64e..cf38f271c8779 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.EncryptionKeyInfo; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; import org.slf4j.Logger; @@ -86,9 +87,12 @@ static class Arguments { @Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix") public String subscriberName = "sub"; - @Parameter(names = { "-st", "--subscription-type" }, description = "Subscriber name prefix") + @Parameter(names = { "-st", "--subscription-type" }, description = "Subscription type") public SubscriptionType subscriptionType = SubscriptionType.Exclusive; + @Parameter(names = { "-sp", "--subscription-position" }, description = "Subscription position") + private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest; + @Parameter(names = { "-r", "--rate" }, description = "Simulate a slow message consumer (rate in msg/s)") public double rate = 0; @@ -257,6 +261,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe .receiverQueueSize(arguments.receiverQueueSize) // .acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) // .subscriptionType(arguments.subscriptionType) + .subscriptionInitialPosition(arguments.subscriptionInitialPosition) .replicateSubscriptionState(arguments.replicatedSubscription); if (arguments.encKeyName != null) { diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md index cd737cd3dcc7e..9a87c8c92c5b7 100644 --- a/site2/docs/reference-cli-tools.md +++ b/site2/docs/reference-cli-tools.md @@ -325,8 +325,10 @@ Options |`--hex`|Display binary messages in hexadecimal format.|false| |`-n`, `--num-messages`|Number of messages to consume, 0 means to consume forever.|1| |`-r`, `--rate`|Rate (in messages per second) at which to consume; a value 0 means to consume messages as fast as possible|0.0| +|`--regex`|Indicate the topic name is a regex pattern|false| |`-s`, `--subscription-name`|Subscription name|| |`-t`, `--subscription-type`|The type of the subscription. Possible values: Exclusive, Shared, Failover, Key_Shared.|Exclusive| +|`-p`, `--subscription-position`|The position of the subscription. Possible values: Latest, Earliest.|Latest| @@ -426,7 +428,8 @@ Options |`-u`, `--service-url`|Pulsar service URL|| |`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled|0| |`-s`, `--subscriber-name`|Subscriber name prefix|sub| -|`-st`, `--subscription-type`|Subscriber name prefix. Possible values are Exclusive, Shared, Failover.|Exclusive| +|`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive| +|`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest| |`--trust-cert-file`|Path for the trusted TLS certificate file||