diff --git a/README.md b/README.md
index c74e77759..f765b146b 100644
--- a/README.md
+++ b/README.md
@@ -51,7 +51,7 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies
```Groovy
-implementation platform('com.google.cloud:libraries-bom:24.3.0')
+implementation platform('com.google.cloud:libraries-bom:24.4.0')
implementation 'com.google.cloud:google-cloud-pubsub'
```
diff --git a/google-cloud-pubsub/pom.xml b/google-cloud-pubsub/pom.xml
index 27e436e44..22fe71e2d 100644
--- a/google-cloud-pubsub/pom.xml
+++ b/google-cloud-pubsub/pom.xml
@@ -95,6 +95,11 @@
+
+ org.mockito
+ mockito-core
+ test
+ junitjunit
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponse.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponse.java
new file mode 100644
index 000000000..199186004
--- /dev/null
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumerWithResponse.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsub.v1;
+
+import java.util.concurrent.Future;
+
+public interface AckReplyConsumerWithResponse {
+ Future ack();
+
+ Future nack();
+}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java
new file mode 100644
index 000000000..3b67ce219
--- /dev/null
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckRequestData.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsub.v1;
+
+import com.google.api.core.SettableApiFuture;
+import java.util.Optional;
+
+public class AckRequestData {
+ private final String ackId;
+ private final Optional> messageFuture;
+
+ protected AckRequestData(Builder builder) {
+ this.ackId = builder.ackId;
+ this.messageFuture = builder.messageFuture;
+ }
+
+ public String getAckId() {
+ return ackId;
+ }
+
+ public SettableApiFuture getMessageFutureIfExists() {
+ return this.messageFuture.orElse(null);
+ }
+
+ public AckRequestData setResponse(AckResponse ackResponse, boolean setResponseOnSuccess) {
+ if (this.messageFuture.isPresent() && !this.messageFuture.get().isDone()) {
+ switch (ackResponse) {
+ case SUCCESSFUL:
+ if (setResponseOnSuccess) {
+ this.messageFuture.get().set(ackResponse);
+ }
+ break;
+ case INVALID:
+ case OTHER:
+ case PERMISSION_DENIED:
+ case FAILED_PRECONDITION:
+ // Non-succesful messages will get set for both acks, nacks, and modacks
+ this.messageFuture.get().set(ackResponse);
+ break;
+ }
+ }
+ return this;
+ }
+
+ public boolean hasMessageFuture() {
+ return this.messageFuture.isPresent();
+ }
+
+ public static Builder newBuilder(String ackId) {
+ return new Builder(ackId);
+ }
+
+ /** Builder of {@link AckRequestData AckRequestData}. */
+ protected static final class Builder {
+ private final String ackId;
+ private Optional> messageFuture = Optional.empty();
+
+ protected Builder(String ackId) {
+ this.ackId = ackId;
+ }
+
+ public Builder setMessageFuture(SettableApiFuture messageFuture) {
+ this.messageFuture = Optional.of(messageFuture);
+ return this;
+ }
+
+ public AckRequestData build() {
+ return new AckRequestData(this);
+ }
+ }
+}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckResponse.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckResponse.java
new file mode 100644
index 000000000..162d87bc1
--- /dev/null
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckResponse.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsub.v1;
+
+public enum AckResponse {
+ PERMISSION_DENIED,
+ FAILED_PRECONDITION,
+ SUCCESSFUL,
+ INVALID,
+ OTHER
+}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
index 4177c6e01..a9f73d5c3 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java
@@ -28,19 +28,8 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@@ -57,8 +46,8 @@
*/
class MessageDispatcher {
private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName());
- private static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
+ @InternalApi static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
private final Executor executor;
@@ -68,24 +57,31 @@ class MessageDispatcher {
private final Duration ackExpirationPadding;
private final Duration maxAckExtensionPeriod;
- private final int maxSecondsPerAckExtension;
- private final MessageReceiver receiver;
+ private int minDurationPerAckExtensionSeconds;
+ private final boolean minDurationPerAckExtensionDefaultUsed;
+ private final int maxDurationPerAckExtensionSeconds;
+ private final boolean maxDurationPerAckExtensionDefaultUsed;
+
+ // Only one of receiver or receiverWithAckResponse will be set
+ private MessageReceiver receiver;
+ private MessageReceiverWithAckResponse receiverWithAckResponse;
+
private final AckProcessor ackProcessor;
private final FlowController flowController;
+
+ private AtomicBoolean enableExactlyOnceDelivery;
+
private final Waiter messagesWaiter;
// Maps ID to "total expiration time". If it takes longer than this, stop extending.
private final ConcurrentMap pendingMessages = new ConcurrentHashMap<>();
- private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>();
- private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>();
- private final LinkedBlockingQueue pendingReceipts = new LinkedBlockingQueue<>();
+ private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>();
+ private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>();
+ private final LinkedBlockingQueue pendingReceipts = new LinkedBlockingQueue<>();
- // Start the deadline at the minimum ack deadline so messages which arrive before this is
- // updated will not have a long ack deadline.
- private final AtomicInteger messageDeadlineSeconds =
- new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS);
+ private final AtomicInteger messageDeadlineSeconds = new AtomicInteger();
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
private final Lock jobLock;
private ScheduledFuture> backgroundJob;
@@ -94,28 +90,6 @@ class MessageDispatcher {
// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;
- /** Stores the data needed to asynchronously modify acknowledgement deadlines. */
- static class PendingModifyAckDeadline {
- final List ackIds;
- final int deadlineExtensionSeconds;
-
- PendingModifyAckDeadline(int deadlineExtensionSeconds, String... ackIds) {
- this(deadlineExtensionSeconds, Arrays.asList(ackIds));
- }
-
- private PendingModifyAckDeadline(int deadlineExtensionSeconds, Collection ackIds) {
- this.ackIds = new ArrayList(ackIds);
- this.deadlineExtensionSeconds = deadlineExtensionSeconds;
- }
-
- @Override
- public String toString() {
- return String.format(
- "PendingModifyAckDeadline{extension: %d sec, ackIds: %s}",
- deadlineExtensionSeconds, ackIds);
- }
- }
-
/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
ACK,
@@ -124,21 +98,30 @@ public enum AckReply {
/** Handles callbacks for acking/nacking messages from the {@link MessageReceiver}. */
private class AckHandler implements ApiFutureCallback {
- private final String ackId;
+ private final AckRequestData ackRequestData;
private final int outstandingBytes;
private final long receivedTimeMillis;
private final Instant totalExpiration;
- private AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) {
- this.ackId = ackId;
+ private AckHandler(
+ AckRequestData ackRequestData, int outstandingBytes, Instant totalExpiration) {
+ this.ackRequestData = ackRequestData;
this.outstandingBytes = outstandingBytes;
this.receivedTimeMillis = clock.millisTime();
this.totalExpiration = totalExpiration;
}
+ public AckRequestData getAckRequestData() {
+ return ackRequestData;
+ }
+
+ public SettableApiFuture getMessageFutureIfExists() {
+ return this.ackRequestData.getMessageFutureIfExists();
+ }
+
/** Stop extending deadlines for this message and free flow control. */
private void forget() {
- if (pendingMessages.remove(ackId) == null) {
+ if (pendingMessages.remove(this.ackRequestData.getAckId()) == null) {
/*
* We're forgetting the message for the second time. Probably because we ran out of total
* expiration, forget the message, then the user finishes working on the message, and forget
@@ -154,64 +137,77 @@ private void forget() {
public void onFailure(Throwable t) {
logger.log(
Level.WARNING,
- "MessageReceiver failed to process ack ID: " + ackId + ", the message will be nacked.",
+ "MessageReceiver failed to process ack ID: "
+ + this.ackRequestData.getAckId()
+ + ", the message will be nacked.",
t);
- pendingNacks.add(ackId);
+ this.ackRequestData.setResponse(AckResponse.OTHER, false);
+ pendingNacks.add(this.ackRequestData);
forget();
}
@Override
public void onSuccess(AckReply reply) {
- LinkedBlockingQueue destination;
switch (reply) {
case ACK:
- destination = pendingAcks;
+ pendingAcks.add(this.ackRequestData);
// Record the latency rounded to the next closest integer.
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
break;
case NACK:
- destination = pendingNacks;
+ pendingNacks.add(this.ackRequestData);
break;
default:
throw new IllegalArgumentException(String.format("AckReply: %s not supported", reply));
}
- destination.add(ackId);
forget();
}
}
interface AckProcessor {
- void sendAckOperations(
- List acksToSend, List ackDeadlineExtensions);
+ public void sendAckOperations(List ackRequestDataList);
+
+ public void sendModackOperations(List modackRequestDataList);
}
- MessageDispatcher(
- MessageReceiver receiver,
- AckProcessor ackProcessor,
- Duration ackExpirationPadding,
- Duration maxAckExtensionPeriod,
- Duration maxDurationPerAckExtension,
- Distribution ackLatencyDistribution,
- FlowController flowController,
- Executor executor,
- ScheduledExecutorService systemExecutor,
- ApiClock clock) {
- this.executor = executor;
- this.systemExecutor = systemExecutor;
- this.ackExpirationPadding = ackExpirationPadding;
- this.maxAckExtensionPeriod = maxAckExtensionPeriod;
- this.maxSecondsPerAckExtension = Math.toIntExact(maxDurationPerAckExtension.getSeconds());
- this.receiver = receiver;
- this.ackProcessor = ackProcessor;
- this.flowController = flowController;
- // 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
- this.ackLatencyDistribution = ackLatencyDistribution;
+ private MessageDispatcher(Builder builder) {
+ executor = builder.executor;
+ systemExecutor = builder.systemExecutor;
+ ackExpirationPadding = builder.ackExpirationPadding;
+ maxAckExtensionPeriod = builder.maxAckExtensionPeriod;
+
+ minDurationPerAckExtensionSeconds =
+ Math.toIntExact(builder.minDurationPerAckExtension.getSeconds());
+ minDurationPerAckExtensionDefaultUsed = builder.minDurationPerAckExtensionDefaultUsed;
+ maxDurationPerAckExtensionSeconds =
+ Math.toIntExact(builder.maxDurationPerAckExtension.getSeconds());
+ maxDurationPerAckExtensionDefaultUsed = builder.maxDurationPerAckExtensionDefaultUsed;
+
+ // Start the deadline at the minimum ack deadline so messages which arrive before this is
+ // updated will not have a long ack deadline.
+ if (minDurationPerAckExtensionDefaultUsed) {
+ messageDeadlineSeconds.set(Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()));
+ } else {
+ messageDeadlineSeconds.set(minDurationPerAckExtensionSeconds);
+ }
+
+ receiver = builder.receiver;
+ receiverWithAckResponse = builder.receiverWithAckResponse;
+
+ ackProcessor = builder.ackProcessor;
+ flowController = builder.flowController;
+ enableExactlyOnceDelivery = new AtomicBoolean(builder.enableExactlyOnceDelivery);
+ ackLatencyDistribution = builder.ackLatencyDistribution;
+ clock = builder.clock;
jobLock = new ReentrantLock();
messagesWaiter = new Waiter();
- this.clock = clock;
- this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor);
+ sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor);
+ }
+
+ private boolean shouldSetMessageFuture() {
+ return receiverWithAckResponse != null;
}
void start() {
@@ -256,7 +252,7 @@ public void run() {
newDeadlineSec - ackExpirationPadding.getSeconds(),
TimeUnit.SECONDS);
}
- processOutstandingAckOperations();
+ processOutstandingOperations();
} catch (Throwable t) {
// Catch everything so that one run failing doesn't prevent subsequent runs.
logger.log(Level.WARNING, "failed to run periodic job", t);
@@ -286,7 +282,7 @@ void stop() {
} finally {
jobLock.unlock();
}
- processOutstandingAckOperations();
+ processOutstandingOperations();
}
@InternalApi
@@ -299,6 +295,43 @@ int getMessageDeadlineSeconds() {
return messageDeadlineSeconds.get();
}
+ @InternalApi
+ void setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) {
+ // Sanity check that we are changing the enableExactlyOnceDelivery state
+ if (enableExactlyOnceDelivery == this.enableExactlyOnceDelivery.get()) {
+ return;
+ }
+
+ this.enableExactlyOnceDelivery.set(enableExactlyOnceDelivery);
+
+ // If a custom value for minDurationPerAckExtension, we should respect that
+ if (!minDurationPerAckExtensionDefaultUsed) {
+ return;
+ }
+
+ // We just need to update the minDurationPerAckExtensionSeconds as the
+ // maxDurationPerAckExtensionSeconds does not change
+ int possibleNewMinAckDeadlineExtensionSeconds;
+
+ if (enableExactlyOnceDelivery) {
+ possibleNewMinAckDeadlineExtensionSeconds =
+ Math.toIntExact(
+ Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY.getSeconds());
+ } else {
+ possibleNewMinAckDeadlineExtensionSeconds =
+ Math.toIntExact(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION.getSeconds());
+ }
+
+ // If we are not using the default maxDurationAckExtension, check if the
+ // minAckDeadlineExtensionExactlyOnce needs to be bounded by the set max
+ if (!maxDurationPerAckExtensionDefaultUsed
+ && (possibleNewMinAckDeadlineExtensionSeconds > maxDurationPerAckExtensionSeconds)) {
+ minDurationPerAckExtensionSeconds = maxDurationPerAckExtensionSeconds;
+ } else {
+ minDurationPerAckExtensionSeconds = possibleNewMinAckDeadlineExtensionSeconds;
+ }
+ }
+
private static class OutstandingMessage {
private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;
@@ -313,9 +346,13 @@ void processReceivedMessages(List messages) {
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
List outstandingBatch = new ArrayList<>(messages.size());
for (ReceivedMessage message : messages) {
+ AckRequestData.Builder builder = AckRequestData.newBuilder(message.getAckId());
+ if (shouldSetMessageFuture()) {
+ builder.setMessageFuture(SettableApiFuture.create());
+ }
+ AckRequestData ackRequestData = builder.build();
AckHandler ackHandler =
- new AckHandler(
- message.getAckId(), message.getMessage().getSerializedSize(), totalExpiration);
+ new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) {
// putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the
// previously-mapped element.
@@ -328,7 +365,7 @@ void processReceivedMessages(List messages) {
continue;
}
outstandingBatch.add(new OutstandingMessage(message, ackHandler));
- pendingReceipts.add(message.getAckId());
+ pendingReceipts.add(ackRequestData);
}
processBatch(outstandingBatch);
@@ -363,20 +400,11 @@ private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
}
private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
- final SettableApiFuture response = SettableApiFuture.create();
- final AckReplyConsumer consumer =
- new AckReplyConsumer() {
- @Override
- public void ack() {
- response.set(AckReply.ACK);
- }
+ // This future is for internal bookkeeping to be sent to the StreamingSubscriberConnection
+ // use below in the consumers
+ SettableApiFuture ackReplySettableApiFuture = SettableApiFuture.create();
+ ApiFutures.addCallback(ackReplySettableApiFuture, ackHandler, MoreExecutors.directExecutor());
- @Override
- public void nack() {
- response.set(AckReply.NACK);
- }
- };
- ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
Runnable deliverMessageTask =
new Runnable() {
@Override
@@ -392,10 +420,42 @@ public void run() {
ackHandler.forget();
return;
}
+ if (shouldSetMessageFuture()) {
+ // This is the message future that is propagated to the user
+ SettableApiFuture messageFuture =
+ ackHandler.getMessageFutureIfExists();
+ final AckReplyConsumerWithResponse ackReplyConsumerWithResponse =
+ new AckReplyConsumerWithResponse() {
+ @Override
+ public Future ack() {
+ ackReplySettableApiFuture.set(AckReply.ACK);
+ return messageFuture;
+ }
+
+ @Override
+ public Future nack() {
+ ackReplySettableApiFuture.set(AckReply.NACK);
+ return messageFuture;
+ }
+ };
+ receiverWithAckResponse.receiveMessage(message, ackReplyConsumerWithResponse);
+ } else {
+ final AckReplyConsumer ackReplyConsumer =
+ new AckReplyConsumer() {
+ @Override
+ public void ack() {
+ ackReplySettableApiFuture.set(AckReply.ACK);
+ }
- receiver.receiveMessage(message, consumer);
+ @Override
+ public void nack() {
+ ackReplySettableApiFuture.set(AckReply.NACK);
+ }
+ };
+ receiver.receiveMessage(message, ackReplyConsumer);
+ }
} catch (Exception e) {
- response.setException(e);
+ ackReplySettableApiFuture.setException(e);
}
}
};
@@ -409,26 +469,32 @@ public void run() {
/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */
@InternalApi
int computeDeadlineSeconds() {
- int sec = ackLatencyDistribution.getPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);
-
- if ((maxSecondsPerAckExtension > 0) && (sec > maxSecondsPerAckExtension)) {
- sec = maxSecondsPerAckExtension;
+ int deadlineSeconds = ackLatencyDistribution.getPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);
+
+ // Bound deadlineSeconds by extensions
+ if (!maxDurationPerAckExtensionDefaultUsed
+ && (deadlineSeconds > maxDurationPerAckExtensionSeconds)) {
+ deadlineSeconds = maxDurationPerAckExtensionSeconds;
+ } else if (deadlineSeconds < minDurationPerAckExtensionSeconds) {
+ deadlineSeconds = minDurationPerAckExtensionSeconds;
}
- // Use Ints.constrainToRange when we get guava 21.
- if (sec < Subscriber.MIN_ACK_DEADLINE_SECONDS) {
- sec = Subscriber.MIN_ACK_DEADLINE_SECONDS;
- } else if (sec > Subscriber.MAX_ACK_DEADLINE_SECONDS) {
- sec = Subscriber.MAX_ACK_DEADLINE_SECONDS;
+ // Bound deadlineSeconds by hard limits in subscriber
+ if (deadlineSeconds < Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()) {
+ deadlineSeconds = Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds());
+ } else if (deadlineSeconds > Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()) {
+ deadlineSeconds = Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds());
}
- return sec;
+
+ return deadlineSeconds;
}
@InternalApi
void extendDeadlines() {
int extendSeconds = getMessageDeadlineSeconds();
- List modacks = new ArrayList<>();
- PendingModifyAckDeadline modack = new PendingModifyAckDeadline(extendSeconds);
+ int numAckIdToSend = 0;
+ Map deadlineExtensionModacks =
+ new HashMap();
Instant now = now();
Instant extendTo = now.plusSeconds(extendSeconds);
@@ -436,7 +502,12 @@ void extendDeadlines() {
String ackId = entry.getKey();
Instant totalExpiration = entry.getValue().totalExpiration;
if (totalExpiration.isAfter(extendTo)) {
- modack.ackIds.add(ackId);
+ ModackRequestData modackRequestData =
+ deadlineExtensionModacks.computeIfAbsent(
+ extendSeconds,
+ deadlineExtensionSeconds -> new ModackRequestData(deadlineExtensionSeconds));
+ modackRequestData.addAckRequestData(entry.getValue().getAckRequestData());
+ numAckIdToSend++;
continue;
}
@@ -445,43 +516,161 @@ void extendDeadlines() {
entry.getValue().forget();
if (totalExpiration.isAfter(now)) {
int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS));
- modacks.add(new PendingModifyAckDeadline(sec, ackId));
+ ModackRequestData modackRequestData =
+ deadlineExtensionModacks.computeIfAbsent(
+ sec, extensionSeconds -> new ModackRequestData(extensionSeconds));
+ modackRequestData.addAckRequestData(entry.getValue().getAckRequestData());
+ numAckIdToSend++;
}
}
- logger.log(Level.FINER, "Sending {0} modacks", modack.ackIds.size() + modacks.size());
- modacks.add(modack);
- List acksToSend = Collections.emptyList();
- ackProcessor.sendAckOperations(acksToSend, modacks);
+ if (numAckIdToSend > 0) {
+ logger.log(Level.FINER, "Sending {0} modacks", numAckIdToSend);
+ ackProcessor.sendModackOperations(
+ new ArrayList(deadlineExtensionModacks.values()));
+ }
}
@InternalApi
- void processOutstandingAckOperations() {
- List modifyAckDeadlinesToSend = new ArrayList<>();
+ void processOutstandingOperations() {
+ List modackRequestData = new ArrayList();
- List acksToSend = new ArrayList<>();
- pendingAcks.drainTo(acksToSend);
- logger.log(Level.FINER, "Sending {0} acks", acksToSend.size());
+ // Nacks are modacks with an expiration of 0
+ List nackRequestDataList = new ArrayList();
+ pendingNacks.drainTo(nackRequestDataList);
- PendingModifyAckDeadline nacksToSend = new PendingModifyAckDeadline(0);
- pendingNacks.drainTo(nacksToSend.ackIds);
- logger.log(Level.FINER, "Sending {0} nacks", nacksToSend.ackIds.size());
- if (!nacksToSend.ackIds.isEmpty()) {
- modifyAckDeadlinesToSend.add(nacksToSend);
+ if (!nackRequestDataList.isEmpty()) {
+ modackRequestData.add(new ModackRequestData(0, nackRequestDataList));
}
+ logger.log(Level.FINER, "Sending {0} nacks", nackRequestDataList.size());
- PendingModifyAckDeadline receiptsToSend =
- new PendingModifyAckDeadline(getMessageDeadlineSeconds());
- pendingReceipts.drainTo(receiptsToSend.ackIds);
- logger.log(Level.FINER, "Sending {0} receipts", receiptsToSend.ackIds.size());
- if (!receiptsToSend.ackIds.isEmpty()) {
- modifyAckDeadlinesToSend.add(receiptsToSend);
+ List ackRequestDataReceipts = new ArrayList();
+ pendingReceipts.drainTo(ackRequestDataReceipts);
+ if (!ackRequestDataReceipts.isEmpty()) {
+ modackRequestData.add(
+ new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts));
}
+ logger.log(Level.FINER, "Sending {0} receipts", ackRequestDataReceipts.size());
+
+ ackProcessor.sendModackOperations(modackRequestData);
+
+ List ackRequestDataList = new ArrayList();
+ pendingAcks.drainTo(ackRequestDataList);
+ logger.log(Level.FINER, "Sending {0} acks", ackRequestDataList.size());
- ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
+ ackProcessor.sendAckOperations(ackRequestDataList);
}
private Instant now() {
return Instant.ofEpochMilli(clock.millisTime());
}
+
+ /** Builder of {@link MessageDispatcher MessageDispatchers}. */
+ public static final class Builder {
+ private MessageReceiver receiver;
+ private MessageReceiverWithAckResponse receiverWithAckResponse;
+
+ private AckProcessor ackProcessor;
+ private Duration ackExpirationPadding;
+ private Duration maxAckExtensionPeriod;
+ private Duration minDurationPerAckExtension;
+ private boolean minDurationPerAckExtensionDefaultUsed;
+ private Duration maxDurationPerAckExtension;
+ private boolean maxDurationPerAckExtensionDefaultUsed;
+
+ private Distribution ackLatencyDistribution;
+ private FlowController flowController;
+ private boolean enableExactlyOnceDelivery;
+
+ private Executor executor;
+ private ScheduledExecutorService systemExecutor;
+ private ApiClock clock;
+
+ protected Builder(MessageReceiver receiver) {
+ this.receiver = receiver;
+ }
+
+ protected Builder(MessageReceiverWithAckResponse receiverWithAckResponse) {
+ this.receiverWithAckResponse = receiverWithAckResponse;
+ }
+
+ public Builder setAckProcessor(AckProcessor ackProcessor) {
+ this.ackProcessor = ackProcessor;
+ return this;
+ }
+
+ public Builder setAckExpirationPadding(Duration ackExpirationPadding) {
+ this.ackExpirationPadding = ackExpirationPadding;
+ return this;
+ }
+
+ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
+ this.maxAckExtensionPeriod = maxAckExtensionPeriod;
+ return this;
+ }
+
+ public Builder setMinDurationPerAckExtension(Duration minDurationPerAckExtension) {
+ this.minDurationPerAckExtension = minDurationPerAckExtension;
+ return this;
+ }
+
+ public Builder setMinDurationPerAckExtensionDefaultUsed(
+ boolean minDurationPerAckExtensionDefaultUsed) {
+ this.minDurationPerAckExtensionDefaultUsed = minDurationPerAckExtensionDefaultUsed;
+ return this;
+ }
+
+ public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) {
+ this.maxDurationPerAckExtension = maxDurationPerAckExtension;
+ return this;
+ }
+
+ public Builder setMaxDurationPerAckExtensionDefaultUsed(
+ boolean maxDurationPerAckExtensionDefaultUsed) {
+ this.maxDurationPerAckExtensionDefaultUsed = maxDurationPerAckExtensionDefaultUsed;
+ return this;
+ }
+
+ public Builder setAckLatencyDistribution(Distribution ackLatencyDistribution) {
+ this.ackLatencyDistribution = ackLatencyDistribution;
+ return this;
+ }
+
+ public Builder setFlowController(FlowController flowController) {
+ this.flowController = flowController;
+ return this;
+ }
+
+ public Builder setEnableExactlyOnceDelivery(boolean enableExactlyOnceDelivery) {
+ this.enableExactlyOnceDelivery = enableExactlyOnceDelivery;
+ return this;
+ }
+
+ public Builder setExecutor(Executor executor) {
+ this.executor = executor;
+ return this;
+ }
+
+ public Builder setSystemExecutor(ScheduledExecutorService systemExecutor) {
+ this.systemExecutor = systemExecutor;
+ return this;
+ }
+
+ public Builder setApiClock(ApiClock clock) {
+ this.clock = clock;
+ return this;
+ }
+
+ public MessageDispatcher build() {
+ return new MessageDispatcher(this);
+ }
+ }
+
+ public static Builder newBuilder(MessageReceiver receiver) {
+ return new Builder(receiver);
+ }
+
+ public static Builder newBuilder(MessageReceiverWithAckResponse receiverWithAckResponse) {
+ return new Builder(receiverWithAckResponse);
+ }
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageReceiverWithAckResponse.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageReceiverWithAckResponse.java
new file mode 100644
index 000000000..49792be07
--- /dev/null
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageReceiverWithAckResponse.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsub.v1;
+
+import com.google.pubsub.v1.PubsubMessage;
+
+public interface MessageReceiverWithAckResponse {
+ void receiveMessage(PubsubMessage message, AckReplyConsumerWithResponse consumer);
+}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java
new file mode 100644
index 000000000..b4d2dae0f
--- /dev/null
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModackRequestData.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsub.v1;
+
+import java.util.*;
+
+class ModackRequestData {
+ private final int deadlineExtensionSeconds;
+ private List ackRequestData;
+
+ ModackRequestData(int deadlineExtensionSeconds) {
+ this.deadlineExtensionSeconds = deadlineExtensionSeconds;
+ this.ackRequestData = new ArrayList();
+ }
+
+ ModackRequestData(int deadlineExtensionSeconds, AckRequestData... ackRequestData) {
+ this.deadlineExtensionSeconds = deadlineExtensionSeconds;
+ this.ackRequestData = Arrays.asList(ackRequestData);
+ }
+
+ ModackRequestData(int deadlineExtensionSeconds, List ackRequestData) {
+ this.deadlineExtensionSeconds = deadlineExtensionSeconds;
+ this.ackRequestData = ackRequestData;
+ }
+
+ public int getDeadlineExtensionSeconds() {
+ return deadlineExtensionSeconds;
+ }
+
+ public List getAckRequestData() {
+ return ackRequestData;
+ }
+
+ public ModackRequestData addAckRequestData(AckRequestData ackRequestData) {
+ this.ackRequestData.add(ackRequestData);
+ return this;
+ }
+}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
index 249d896b7..d1af3a3e9 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
@@ -16,41 +16,31 @@
package com.google.cloud.pubsub.v1;
-import static com.google.cloud.pubsub.v1.Subscriber.DEFAULT_MAX_DURATION_PER_ACK_EXTENSION;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import com.google.api.core.AbstractApiService;
-import com.google.api.core.ApiClock;
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutureCallback;
-import com.google.api.core.ApiFutures;
-import com.google.api.core.InternalApi;
-import com.google.api.core.SettableApiFuture;
+import com.google.api.core.*;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcStatusCode;
-import com.google.api.gax.rpc.ApiException;
-import com.google.api.gax.rpc.ApiExceptionFactory;
-import com.google.api.gax.rpc.ClientStream;
-import com.google.api.gax.rpc.ResponseObserver;
-import com.google.api.gax.rpc.StreamController;
+import com.google.api.gax.rpc.*;
import com.google.cloud.pubsub.v1.MessageDispatcher.AckProcessor;
-import com.google.cloud.pubsub.v1.MessageDispatcher.PendingModifyAckDeadline;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.Any;
import com.google.protobuf.Empty;
-import com.google.pubsub.v1.AcknowledgeRequest;
-import com.google.pubsub.v1.ModifyAckDeadlineRequest;
-import com.google.pubsub.v1.StreamingPullRequest;
-import com.google.pubsub.v1.StreamingPullResponse;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.pubsub.v1.*;
+import com.google.rpc.ErrorInfo;
import io.grpc.Status;
-import java.util.List;
-import java.util.UUID;
+import io.grpc.protobuf.StatusProto;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -64,15 +54,21 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private static final Logger logger =
Logger.getLogger(StreamingSubscriberConnection.class.getName());
- @InternalApi static final Duration DEFAULT_STREAM_ACK_DEADLINE = Duration.ofSeconds(60);
- @InternalApi static final Duration MAX_STREAM_ACK_DEADLINE = Duration.ofSeconds(600);
- @InternalApi static final Duration MIN_STREAM_ACK_DEADLINE = Duration.ofSeconds(10);
private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100);
private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
+
+ private static final long INITIAL_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS = 100;
+ private static final long MAX_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS =
+ Duration.ofSeconds(10).toMillis();
private static final int MAX_PER_REQUEST_CHANGES = 1000;
- private final Duration streamAckDeadline;
- private final SubscriberStub stub;
+ private final String PERMANENT_FAILURE_INVALID_ACK_ID_METADATA =
+ "PERMANENT_FAILURE_INVALID_ACK_ID";
+ private final String TRANSIENT_FAILURE_METADATA_PREFIX = "TRANSIENT_";
+
+ private Duration inititalStreamAckDeadline;
+
+ private final SubscriberStub subscriberStub;
private final int channelAffinity;
private final String subscription;
private final ScheduledExecutorService systemExecutor;
@@ -81,6 +77,9 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;
+ // Keeps track of requests without closed futures
+ private final Set pendingRequests = ConcurrentHashMap.newKeySet();
+
private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
private final Waiter ackOperationsWaiter = new Waiter();
@@ -88,6 +87,8 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private final Lock lock = new ReentrantLock();
private ClientStream clientStream;
+ private AtomicBoolean exactlyOnceDeliveryEnabled = new AtomicBoolean(false);
+
/**
* The same clientId is used across all streaming pull connections that are created. This is
* intentional, as it indicates to the server that any guarantees made for a stream that
@@ -95,48 +96,71 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
*/
private final String clientId = UUID.randomUUID().toString();
- public StreamingSubscriberConnection(
- String subscription,
- MessageReceiver receiver,
- Duration ackExpirationPadding,
- Duration maxAckExtensionPeriod,
- Duration maxDurationPerAckExtension,
- Distribution ackLatencyDistribution,
- SubscriberStub stub,
- int channelAffinity,
- FlowControlSettings flowControlSettings,
- boolean useLegacyFlowControl,
- FlowController flowController,
- ScheduledExecutorService executor,
- ScheduledExecutorService systemExecutor,
- ApiClock clock) {
- this.subscription = subscription;
- this.systemExecutor = systemExecutor;
- if (maxDurationPerAckExtension.compareTo(DEFAULT_MAX_DURATION_PER_ACK_EXTENSION) == 0) {
- this.streamAckDeadline = DEFAULT_STREAM_ACK_DEADLINE;
- } else if (maxDurationPerAckExtension.compareTo(MIN_STREAM_ACK_DEADLINE) < 0) {
- this.streamAckDeadline = MIN_STREAM_ACK_DEADLINE;
- } else if (maxDurationPerAckExtension.compareTo(MAX_STREAM_ACK_DEADLINE) > 0) {
- this.streamAckDeadline = MAX_STREAM_ACK_DEADLINE;
+ private StreamingSubscriberConnection(Builder builder) {
+ subscription = builder.subscription;
+ systemExecutor = builder.systemExecutor;
+
+ // We need to set the default stream ack deadline on the initial request, this will be
+ // updated by modack requests in the message dispatcher
+ if (builder.maxDurationPerAckExtensionDefaultUsed) {
+ // If the default is used, check if exactly once is enabled and set appropriately
+ if (builder.exactlyOnceDeliveryEnabled) {
+ inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT;
+ } else {
+ inititalStreamAckDeadline = Subscriber.STREAM_ACK_DEADLINE_DEFAULT;
+ }
+ } else if (builder.maxDurationPerAckExtension.compareTo(Subscriber.MIN_STREAM_ACK_DEADLINE)
+ < 0) {
+ // We will not be able to extend more than the default minimum
+ inititalStreamAckDeadline = Subscriber.MIN_STREAM_ACK_DEADLINE;
+ } else if (builder.maxDurationPerAckExtension.compareTo(Subscriber.MAX_STREAM_ACK_DEADLINE)
+ > 0) {
+ // Will not be able to extend past the max
+ inititalStreamAckDeadline = Subscriber.MAX_STREAM_ACK_DEADLINE;
+ } else {
+ inititalStreamAckDeadline = builder.maxDurationPerAckExtension;
+ }
+
+ subscriberStub = builder.subscriberStub;
+ channelAffinity = builder.channelAffinity;
+ exactlyOnceDeliveryEnabled.set(builder.exactlyOnceDeliveryEnabled);
+
+ MessageDispatcher.Builder messageDispatcherBuilder;
+ if (builder.receiver != null) {
+ messageDispatcherBuilder = MessageDispatcher.newBuilder(builder.receiver);
} else {
- this.streamAckDeadline = maxDurationPerAckExtension;
- }
- this.stub = stub;
- this.channelAffinity = channelAffinity;
- this.messageDispatcher =
- new MessageDispatcher(
- receiver,
- this,
- ackExpirationPadding,
- maxAckExtensionPeriod,
- maxDurationPerAckExtension,
- ackLatencyDistribution,
- flowController,
- executor,
- systemExecutor,
- clock);
- this.flowControlSettings = flowControlSettings;
- this.useLegacyFlowControl = useLegacyFlowControl;
+ messageDispatcherBuilder = MessageDispatcher.newBuilder(builder.receiverWithAckResponse);
+ }
+
+ messageDispatcher =
+ messageDispatcherBuilder
+ .setAckProcessor(this)
+ .setAckExpirationPadding(builder.ackExpirationPadding)
+ .setMaxAckExtensionPeriod(builder.maxAckExtensionPeriod)
+ .setMinDurationPerAckExtension(builder.minDurationPerAckExtension)
+ .setMinDurationPerAckExtensionDefaultUsed(builder.minDurationPerAckExtensionDefaultUsed)
+ .setMaxDurationPerAckExtension(builder.maxDurationPerAckExtension)
+ .setMaxDurationPerAckExtensionDefaultUsed(builder.maxDurationPerAckExtensionDefaultUsed)
+ .setAckLatencyDistribution(builder.ackLatencyDistribution)
+ .setFlowController(builder.flowController)
+ .setEnableExactlyOnceDelivery(builder.exactlyOnceDeliveryEnabled)
+ .setExecutor(builder.executor)
+ .setSystemExecutor(builder.systemExecutor)
+ .setApiClock(builder.clock)
+ .build();
+
+ flowControlSettings = builder.flowControlSettings;
+ useLegacyFlowControl = builder.useLegacyFlowControl;
+ }
+
+ public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled(
+ boolean isExactlyOnceDeliveryEnabled) {
+ exactlyOnceDeliveryEnabled.set(isExactlyOnceDeliveryEnabled);
+ return this;
+ }
+
+ public boolean isExactlyOnceDeliveryEnabled() {
+ return exactlyOnceDeliveryEnabled.get();
}
@Override
@@ -192,7 +216,14 @@ public void onStart(StreamController controller) {
@Override
public void onResponse(StreamingPullResponse response) {
channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
+
+ boolean exactlyOnceDeliveryEnabledResponse =
+ response.getSubscriptionProperties().getExactlyOnceDeliveryEnabled();
+
+ setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabledResponse);
+ messageDispatcher.setEnableExactlyOnceDelivery(exactlyOnceDeliveryEnabledResponse);
messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
+
// Only request more if we're not shutdown.
// If errorFuture is done, the stream has either failed or hung up,
// and we don't need to request.
@@ -222,10 +253,13 @@ public void onComplete() {
private void initialize() {
final SettableApiFuture errorFuture = SettableApiFuture.create();
+
final ResponseObserver responseObserver =
new StreamingPullResponseObserver(errorFuture);
+
ClientStream initClientStream =
- stub.streamingPullCallable()
+ subscriberStub
+ .streamingPullCallable()
.splitCall(
responseObserver,
GrpcCallContext.createDefault().withChannelAffinity(channelAffinity));
@@ -236,7 +270,7 @@ private void initialize() {
initClientStream.send(
StreamingPullRequest.newBuilder()
.setSubscription(subscription)
- .setStreamAckDeadlineSeconds((int) streamAckDeadline.getSeconds())
+ .setStreamAckDeadlineSeconds(Math.toIntExact(inititalStreamAckDeadline.getSeconds()))
.setClientId(clientId)
.setMaxOutstandingMessages(
this.useLegacyFlowControl
@@ -287,6 +321,7 @@ public void onFailure(Throwable cause) {
cause, GrpcStatusCode.of(Status.fromThrowable(cause).getCode()), false);
logger.log(Level.SEVERE, "terminated streaming with exception", gaxException);
runShutdown();
+ setFailureFutureOutstandingMessages(cause);
notifyFailed(gaxException);
return;
}
@@ -319,52 +354,372 @@ private boolean isAlive() {
return state == State.RUNNING || state == State.STARTING;
}
+ public void setResponseOutstandingMessages(AckResponse ackResponse) {
+ // We will close the futures with ackResponse - if there are multiple references to the same
+ // future they will be handled appropriately
+ logger.log(
+ Level.WARNING, "Setting response: {0} on outstanding messages", ackResponse.toString());
+ for (AckRequestData ackRequestData : pendingRequests) {
+ ackRequestData.setResponse(ackResponse, false);
+ }
+
+ // Clear our pending requests
+ pendingRequests.clear();
+ }
+
+ private void setFailureFutureOutstandingMessages(Throwable t) {
+ AckResponse ackResponse;
+
+ if (isExactlyOnceDeliveryEnabled()) {
+ if (!(t instanceof ApiException)) {
+ ackResponse = AckResponse.OTHER;
+ }
+
+ ApiException apiException = (ApiException) t;
+ switch (apiException.getStatusCode().getCode()) {
+ case FAILED_PRECONDITION:
+ ackResponse = AckResponse.FAILED_PRECONDITION;
+ break;
+ case PERMISSION_DENIED:
+ ackResponse = AckResponse.PERMISSION_DENIED;
+ break;
+ default:
+ ackResponse = AckResponse.OTHER;
+ }
+ } else {
+ // We should set success regardless if ExactlyOnceDelivery is not enabled
+ ackResponse = AckResponse.SUCCESSFUL;
+ }
+
+ setResponseOutstandingMessages(ackResponse);
+ }
+
@Override
- public void sendAckOperations(
- List acksToSend, List ackDeadlineExtensions) {
- ApiFutureCallback loggingCallback =
- new ApiFutureCallback() {
- @Override
- public void onSuccess(Empty empty) {
- ackOperationsWaiter.incrementPendingCount(-1);
- }
+ public void sendAckOperations(List ackRequestDataList) {
+ sendAckOperations(ackRequestDataList, INITIAL_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS);
+ }
- @Override
- public void onFailure(Throwable t) {
- ackOperationsWaiter.incrementPendingCount(-1);
- Level level = isAlive() ? Level.WARNING : Level.FINER;
- logger.log(level, "failed to send operations", t);
- }
- };
+ @Override
+ public void sendModackOperations(List modackRequestDataList) {
+ sendModackOperations(modackRequestDataList, INITIAL_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS);
+ }
+ private void sendAckOperations(
+ List ackRequestDataList, long currentBackoffMillis) {
int pendingOperations = 0;
- for (PendingModifyAckDeadline modack : ackDeadlineExtensions) {
- for (List idChunk : Lists.partition(modack.ackIds, MAX_PER_REQUEST_CHANGES)) {
- ApiFuture future =
- stub.modifyAckDeadlineCallable()
+ for (List ackRequestDataInRequestList :
+ Lists.partition(ackRequestDataList, MAX_PER_REQUEST_CHANGES)) {
+ List ackIdsInRequest = new ArrayList<>();
+ for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
+ ackIdsInRequest.add(ackRequestData.getAckId());
+ if (ackRequestData.hasMessageFuture()) {
+ // Add to our pending requests if we care about the response
+ pendingRequests.add(ackRequestData);
+ }
+ }
+ ApiFutureCallback callback =
+ getCallback(ackRequestDataInRequestList, 0, false, currentBackoffMillis);
+ ApiFuture ackFuture =
+ subscriberStub
+ .acknowledgeCallable()
+ .futureCall(
+ AcknowledgeRequest.newBuilder()
+ .setSubscription(subscription)
+ .addAllAckIds(ackIdsInRequest)
+ .build());
+ ApiFutures.addCallback(ackFuture, callback, directExecutor());
+ pendingOperations++;
+ }
+ ackOperationsWaiter.incrementPendingCount(pendingOperations);
+ }
+
+ private void sendModackOperations(
+ List modackRequestDataList, long currentBackoffMillis) {
+ // Send modacks
+ int pendingOperations = 0;
+ for (ModackRequestData modackRequestData : modackRequestDataList) {
+ List ackIdsInRequest = new ArrayList<>();
+ for (List ackRequestDataInRequestList :
+ Lists.partition(modackRequestData.getAckRequestData(), MAX_PER_REQUEST_CHANGES)) {
+ for (AckRequestData ackRequestData : ackRequestDataInRequestList) {
+ ackIdsInRequest.add(ackRequestData.getAckId());
+ if (ackRequestData.hasMessageFuture()) {
+ // Add to our pending requests if we care about the response
+ pendingRequests.add(ackRequestData);
+ }
+ }
+ ApiFutureCallback callback =
+ getCallback(
+ modackRequestData.getAckRequestData(),
+ modackRequestData.getDeadlineExtensionSeconds(),
+ true,
+ currentBackoffMillis);
+ ApiFuture modackFuture =
+ subscriberStub
+ .modifyAckDeadlineCallable()
.futureCall(
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(subscription)
- .addAllAckIds(idChunk)
- .setAckDeadlineSeconds(modack.deadlineExtensionSeconds)
+ .addAllAckIds(ackIdsInRequest)
+ .setAckDeadlineSeconds(modackRequestData.getDeadlineExtensionSeconds())
.build());
- ApiFutures.addCallback(future, loggingCallback, directExecutor());
+ ApiFutures.addCallback(modackFuture, callback, directExecutor());
pendingOperations++;
}
}
+ ackOperationsWaiter.incrementPendingCount(pendingOperations);
+ }
- for (List idChunk : Lists.partition(acksToSend, MAX_PER_REQUEST_CHANGES)) {
- ApiFuture future =
- stub.acknowledgeCallable()
- .futureCall(
- AcknowledgeRequest.newBuilder()
- .setSubscription(subscription)
- .addAllAckIds(idChunk)
- .build());
- ApiFutures.addCallback(future, loggingCallback, directExecutor());
- pendingOperations++;
+ private Map getMetadataMapFromThrowable(Throwable t)
+ throws InvalidProtocolBufferException {
+ // This converts a Throwable (from a "OK" grpc response) to a map of metadata
+ // will be of the format:
+ // {
+ // "ACK-ID-1": "PERMANENT_*",
+ // "ACK-ID-2": "TRANSIENT_*"
+ // }
+ com.google.rpc.Status status = StatusProto.fromThrowable(t);
+ Map metadataMap = new HashMap<>();
+ if (status != null) {
+ for (Any any : status.getDetailsList()) {
+ if (any.is(ErrorInfo.class)) {
+ ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
+ metadataMap = errorInfo.getMetadataMap();
+ }
+ }
}
+ return metadataMap;
+ }
- ackOperationsWaiter.incrementPendingCount(pendingOperations);
+ private ApiFutureCallback getCallback(
+ List ackRequestDataList,
+ int deadlineExtensionSeconds,
+ boolean isModack,
+ long currentBackoffMillis) {
+ // This callback handles retries, and sets message futures
+
+ // Check if ack or nack
+ boolean setResponseOnSuccess = (!isModack || (deadlineExtensionSeconds == 0)) ? true : false;
+
+ return new ApiFutureCallback() {
+ @Override
+ public void onSuccess(Empty empty) {
+ ackOperationsWaiter.incrementPendingCount(-1);
+ for (AckRequestData ackRequestData : ackRequestDataList) {
+ // This will check if a response is needed, and if it has already been set
+ ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
+ // Remove from our pending operations
+ pendingRequests.remove(ackRequestData);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // Remove from our pending operations
+ ackOperationsWaiter.incrementPendingCount(-1);
+
+ if (!isExactlyOnceDeliveryEnabled()) {
+ Level level = isAlive() ? Level.WARNING : Level.FINER;
+ logger.log(level, "failed to send operations", t);
+ return;
+ }
+
+ List ackRequestDataArrayRetryList = new ArrayList<>();
+ try {
+ Map metadataMap = getMetadataMapFromThrowable(t);
+ ackRequestDataList.forEach(
+ ackRequestData -> {
+ String ackId = ackRequestData.getAckId();
+ if (metadataMap.containsKey(ackId)) {
+ // An error occured
+ String errorMessage = metadataMap.get(ackId);
+ if (errorMessage.startsWith(TRANSIENT_FAILURE_METADATA_PREFIX)) {
+ // Retry all "TRANSIENT_*" error messages - do not set message future
+ logger.log(Level.WARNING, "Transient error message, will resend", errorMessage);
+ ackRequestDataArrayRetryList.add(ackRequestData);
+ } else if (errorMessage.equals(PERMANENT_FAILURE_INVALID_ACK_ID_METADATA)) {
+ // Permanent failure, send
+ logger.log(
+ Level.WARNING,
+ "Permanent error invalid ack id message, will not resend",
+ errorMessage);
+ ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess);
+ } else {
+ logger.log(
+ Level.WARNING, "Unknown error message, will not resend", errorMessage);
+ ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess);
+ }
+ } else {
+ ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
+ }
+ // Remove from our pending
+ pendingRequests.remove(ackRequestData);
+ });
+ } catch (InvalidProtocolBufferException e) {
+ // If we fail to parse out the errorInfo, we should retry all
+ logger.log(
+ Level.WARNING, "Exception occurred when parsing throwable {0} for errorInfo", t);
+ ackRequestDataArrayRetryList.addAll(ackRequestDataList);
+ }
+
+ // Handle retries
+ if (!ackRequestDataArrayRetryList.isEmpty()) {
+ long newBackoffMillis =
+ Math.min(currentBackoffMillis * 2, MAX_ACK_OPERATIONS_RECONNECT_BACKOFF_MILLIS);
+ systemExecutor.schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ if (isModack) {
+ // Create a new modackRequest with only the retries
+ ModackRequestData modackRequestData =
+ new ModackRequestData(
+ deadlineExtensionSeconds, ackRequestDataArrayRetryList);
+ sendModackOperations(
+ Collections.singletonList(modackRequestData), newBackoffMillis);
+ } else {
+ sendAckOperations(ackRequestDataArrayRetryList, newBackoffMillis);
+ }
+ }
+ },
+ currentBackoffMillis,
+ TimeUnit.MILLISECONDS);
+ }
+
+ Level level = isAlive() ? Level.WARNING : Level.FINER;
+ logger.log(level, "failed to send operations", t);
+ }
+ };
+ }
+
+ /** Builder of {@link StreamingSubscriberConnection StreamingSubscriberConnections}. */
+ public static final class Builder {
+ private MessageReceiver receiver;
+ private MessageReceiverWithAckResponse receiverWithAckResponse;
+ private String subscription;
+ private Duration ackExpirationPadding;
+ private Duration maxAckExtensionPeriod;
+ private Duration minDurationPerAckExtension;
+ private boolean minDurationPerAckExtensionDefaultUsed;
+ private Duration maxDurationPerAckExtension;
+ private boolean maxDurationPerAckExtensionDefaultUsed;
+
+ private Distribution ackLatencyDistribution;
+ private SubscriberStub subscriberStub;
+ private int channelAffinity;
+ private FlowController flowController;
+ private FlowControlSettings flowControlSettings;
+ private boolean exactlyOnceDeliveryEnabled;
+ private boolean useLegacyFlowControl;
+ private ScheduledExecutorService executor;
+ private ScheduledExecutorService systemExecutor;
+ private ApiClock clock;
+
+ protected Builder(MessageReceiver receiver) {
+ this.receiver = receiver;
+ }
+
+ protected Builder(MessageReceiverWithAckResponse receiverWithAckResponse) {
+ this.receiverWithAckResponse = receiverWithAckResponse;
+ }
+
+ public Builder setSubscription(String subscription) {
+ this.subscription = subscription;
+ return this;
+ }
+
+ public Builder setAckExpirationPadding(Duration ackExpirationPadding) {
+ this.ackExpirationPadding = ackExpirationPadding;
+ return this;
+ }
+
+ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
+ this.maxAckExtensionPeriod = maxAckExtensionPeriod;
+ return this;
+ }
+
+ public Builder setMinDurationPerAckExtension(Duration minDurationPerAckExtension) {
+ this.minDurationPerAckExtension = minDurationPerAckExtension;
+ return this;
+ }
+
+ public Builder setMinDurationPerAckExtensionDefaultUsed(
+ boolean minDurationPerAckExtensionDefaultUsed) {
+ this.minDurationPerAckExtensionDefaultUsed = minDurationPerAckExtensionDefaultUsed;
+ return this;
+ }
+
+ public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) {
+ this.maxDurationPerAckExtension = maxDurationPerAckExtension;
+ return this;
+ }
+
+ public Builder setMaxDurationPerAckExtensionDefaultUsed(
+ boolean maxDurationPerAckExtensionDefaultUsed) {
+ this.maxDurationPerAckExtensionDefaultUsed = maxDurationPerAckExtensionDefaultUsed;
+ return this;
+ }
+
+ public Builder setAckLatencyDistribution(Distribution ackLatencyDistribution) {
+ this.ackLatencyDistribution = ackLatencyDistribution;
+ return this;
+ }
+
+ public Builder setSubscriberStub(SubscriberStub subscriberStub) {
+ this.subscriberStub = subscriberStub;
+ return this;
+ }
+
+ public Builder setChannelAffinity(int channelAffinity) {
+ this.channelAffinity = channelAffinity;
+ return this;
+ }
+
+ public Builder setFlowController(FlowController flowController) {
+ this.flowController = flowController;
+ return this;
+ }
+
+ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
+ this.flowControlSettings = flowControlSettings;
+ return this;
+ }
+
+ public Builder setUseLegacyFlowControl(boolean useLegacyFlowControl) {
+ this.useLegacyFlowControl = useLegacyFlowControl;
+ return this;
+ }
+
+ public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
+ this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled;
+ return this;
+ }
+
+ public Builder setExecutor(ScheduledExecutorService executor) {
+ this.executor = executor;
+ return this;
+ }
+
+ public Builder setSystemExecutor(ScheduledExecutorService systemExecutor) {
+ this.systemExecutor = systemExecutor;
+ return this;
+ }
+
+ public Builder setClock(ApiClock clock) {
+ this.clock = clock;
+ return this;
+ }
+
+ public StreamingSubscriberConnection build() {
+ return new StreamingSubscriberConnection(this);
+ }
+ }
+
+ public static Builder newBuilder(MessageReceiver receiver) {
+ return new Builder(receiver);
+ }
+
+ public static Builder newBuilder(MessageReceiverWithAckResponse receiverWithAckResponse) {
+ return new Builder(receiverWithAckResponse);
}
}
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java
index 656a19fdd..4ee66b031 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java
@@ -89,13 +89,28 @@
* details.
*/
public class Subscriber extends AbstractApiService implements SubscriberInterface {
- @InternalApi static final Duration DEFAULT_MAX_DURATION_PER_ACK_EXTENSION = Duration.ofMillis(0);
private static final int THREADS_PER_CHANNEL = 5;
private static final int MAX_INBOUND_MESSAGE_SIZE =
20 * 1024 * 1024; // 20MB API maximum message size.
- @InternalApi static final int MAX_ACK_DEADLINE_SECONDS = 600;
- @InternalApi static final int MIN_ACK_DEADLINE_SECONDS = 10;
- private static final Duration ACK_EXPIRATION_PADDING = Duration.ofSeconds(5);
+
+ @InternalApi static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);
+
+ @InternalApi
+ static final Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY =
+ Duration.ofMinutes(1);
+
+ @InternalApi static final Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION = Duration.ofMinutes(0);
+ @InternalApi static final Duration DEFAULT_MAX_ACK_DEADLINE_EXTENSION = Duration.ofSeconds(0);
+
+ @InternalApi static final Duration MIN_STREAM_ACK_DEADLINE = Duration.ofSeconds(10);
+ @InternalApi static final Duration MAX_STREAM_ACK_DEADLINE = Duration.ofSeconds(600);
+
+ @InternalApi static final Duration STREAM_ACK_DEADLINE_DEFAULT = Duration.ofSeconds(60);
+
+ @InternalApi
+ static final Duration STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT = Duration.ofSeconds(60);
+
+ @InternalApi static final Duration ACK_EXPIRATION_PADDING_DEFAULT = Duration.ofSeconds(5);
private static final Logger logger = Logger.getLogger(Subscriber.class.getName());
@@ -104,32 +119,43 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private final boolean useLegacyFlowControl;
private final Duration maxAckExtensionPeriod;
private final Duration maxDurationPerAckExtension;
+ private final boolean maxDurationPerAckExtensionDefaultUsed;
+ private final Duration minDurationPerAckExtension;
+ private final boolean minDurationPerAckExtensionDefaultUsed;
+
// The ExecutorProvider used to generate executors for processing messages.
private final ExecutorProvider executorProvider;
// An instantiation of the SystemExecutorProvider used for processing acks
// and other system actions.
@Nullable private final ScheduledExecutorService alarmsExecutor;
private final Distribution ackLatencyDistribution =
- new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
+ new Distribution(Math.toIntExact(MAX_STREAM_ACK_DEADLINE.getSeconds()) + 1);
- private SubscriberStub subStub;
+ private SubscriberStub subscriberStub;
private final SubscriberStubSettings subStubSettings;
private final FlowController flowController;
+ private boolean exactlyOnceDeliveryEnabled = false;
private final int numPullers;
private final MessageReceiver receiver;
+ private final MessageReceiverWithAckResponse receiverWithAckResponse;
private final List streamingSubscriberConnections;
private final ApiClock clock;
private final List backgroundResources = new ArrayList<>();
private Subscriber(Builder builder) {
receiver = builder.receiver;
+ receiverWithAckResponse = builder.receiverWithAckResponse;
flowControlSettings = builder.flowControlSettings;
useLegacyFlowControl = builder.useLegacyFlowControl;
- subscriptionName = builder.subscriptionName;
+ subscriptionName = builder.subscription;
maxAckExtensionPeriod = builder.maxAckExtensionPeriod;
maxDurationPerAckExtension = builder.maxDurationPerAckExtension;
+ maxDurationPerAckExtensionDefaultUsed = builder.maxDurationPerAckExtensionDefaultUsed;
+ minDurationPerAckExtension = builder.minDurationPerAckExtension;
+ minDurationPerAckExtensionDefaultUsed = builder.minDurationPerAckExtensionDefaultUsed;
+
clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock();
flowController =
@@ -140,6 +166,8 @@ private Subscriber(Builder builder) {
.setLimitExceededBehavior(LimitExceededBehavior.Block)
.build());
+ exactlyOnceDeliveryEnabled = builder.exactlyOnceDeliveryEnabled;
+
this.numPullers = builder.parallelPullCount;
executorProvider = builder.executorProvider;
@@ -175,7 +203,7 @@ private Subscriber(Builder builder) {
// We regularly look up the distribution for a good subscription deadline.
// So we seed the distribution with the minimum value to start with.
// Distribution is percentile-based, so this value will eventually lose importance.
- ackLatencyDistribution.record(MIN_ACK_DEADLINE_SECONDS);
+ ackLatencyDistribution.record(Math.toIntExact(MIN_STREAM_ACK_DEADLINE.getSeconds()));
}
/**
@@ -189,6 +217,11 @@ public static Builder newBuilder(ProjectSubscriptionName subscription, MessageRe
return newBuilder(subscription.toString(), receiver);
}
+ public static Builder newBuilder(
+ ProjectSubscriptionName subscription, MessageReceiverWithAckResponse receiver) {
+ return newBuilder(subscription.toString(), receiver);
+ }
+
/**
* Constructs a new {@link Builder}.
*
@@ -200,6 +233,10 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver)
return new Builder(subscription, receiver);
}
+ public static Builder newBuilder(String subscription, MessageReceiverWithAckResponse receiver) {
+ return new Builder(subscription, receiver);
+ }
+
/** Returns the delivery attempt count for a received {@link PubsubMessage} */
public static Integer getDeliveryAttempt(PubsubMessage message) {
if (!message.containsAttributes("googclient_deliveryattempt")) {
@@ -262,7 +299,7 @@ protected void doStart() {
logger.log(Level.FINE, "Starting subscriber group.");
try {
- this.subStub = GrpcSubscriberStub.create(subStubSettings);
+ this.subscriberStub = GrpcSubscriberStub.create(subStubSettings);
} catch (IOException e) {
// doesn't matter what we throw, the Service will just catch it and fail to start.
throw new IllegalStateException(e);
@@ -310,7 +347,7 @@ public void run() {
private void runShutdown() {
stopAllStreamingConnections();
shutdownBackgroundResources();
- subStub.shutdownNow();
+ subscriberStub.shutdownNow();
}
private void startStreamingConnections() {
@@ -321,22 +358,37 @@ private void startStreamingConnections() {
backgroundResources.add(new ExecutorAsBackgroundResource((executor)));
}
- streamingSubscriberConnections.add(
- new StreamingSubscriberConnection(
- subscriptionName,
- receiver,
- ACK_EXPIRATION_PADDING,
- maxAckExtensionPeriod,
- maxDurationPerAckExtension,
- ackLatencyDistribution,
- subStub,
- i,
- flowControlSettings,
- useLegacyFlowControl,
- flowController,
- executor,
- alarmsExecutor,
- clock));
+ StreamingSubscriberConnection.Builder streamingSubscriberConnectionBuilder;
+
+ if (receiverWithAckResponse != null) {
+ streamingSubscriberConnectionBuilder =
+ StreamingSubscriberConnection.newBuilder(receiverWithAckResponse);
+ } else {
+ streamingSubscriberConnectionBuilder = StreamingSubscriberConnection.newBuilder(receiver);
+ }
+
+ StreamingSubscriberConnection streamingSubscriberConnection =
+ streamingSubscriberConnectionBuilder
+ .setSubscription(subscriptionName)
+ .setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT)
+ .setMaxAckExtensionPeriod(maxAckExtensionPeriod)
+ .setMinDurationPerAckExtension(minDurationPerAckExtension)
+ .setMinDurationPerAckExtensionDefaultUsed(minDurationPerAckExtensionDefaultUsed)
+ .setMaxDurationPerAckExtension(maxDurationPerAckExtension)
+ .setMaxDurationPerAckExtensionDefaultUsed(maxDurationPerAckExtensionDefaultUsed)
+ .setAckLatencyDistribution(ackLatencyDistribution)
+ .setSubscriberStub(subscriberStub)
+ .setChannelAffinity(i)
+ .setFlowControlSettings(flowControlSettings)
+ .setFlowController(flowController)
+ .setUseLegacyFlowControl(useLegacyFlowControl)
+ .setExecutor(executor)
+ .setSystemExecutor(alarmsExecutor)
+ .setClock(clock)
+ .setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled)
+ .build();
+
+ streamingSubscriberConnections.add(streamingSubscriberConnection);
}
startConnections(
streamingSubscriberConnections,
@@ -402,7 +454,6 @@ private void stopConnections(List extends ApiService> connections) {
/** Builder of {@link Subscriber Subscribers}. */
public static final class Builder {
- private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);
static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1000L)
@@ -415,15 +466,21 @@ public static final class Builder {
.build();
private static final AtomicInteger SYSTEM_EXECUTOR_COUNTER = new AtomicInteger();
- private String subscriptionName;
+ private String subscription;
private MessageReceiver receiver;
+ private MessageReceiverWithAckResponse receiverWithAckResponse;
private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
- private Duration maxDurationPerAckExtension = DEFAULT_MAX_DURATION_PER_ACK_EXTENSION;
+ private Duration minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION;
+ private boolean minDurationPerAckExtensionDefaultUsed = true;
+ private Duration maxDurationPerAckExtension = DEFAULT_MAX_ACK_DEADLINE_EXTENSION;
+ private boolean maxDurationPerAckExtensionDefaultUsed = true;
private boolean useLegacyFlowControl = false;
private FlowControlSettings flowControlSettings = DEFAULT_FLOW_CONTROL_SETTINGS;
+ private boolean exactlyOnceDeliveryEnabled = false;
+
private ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
private ExecutorProvider systemExecutorProvider = null;
private TransportChannelProvider channelProvider =
@@ -438,11 +495,16 @@ public static final class Builder {
private int parallelPullCount = 1;
private String endpoint = SubscriberStubSettings.getDefaultEndpoint();
- Builder(String subscriptionName, MessageReceiver receiver) {
- this.subscriptionName = subscriptionName;
+ Builder(String subscription, MessageReceiver receiver) {
+ this.subscription = subscription;
this.receiver = receiver;
}
+ Builder(String subscription, MessageReceiverWithAckResponse receiverWithAckResponse) {
+ this.subscription = subscription;
+ this.receiverWithAckResponse = receiverWithAckResponse;
+ }
+
/**
* {@code ChannelProvider} to use to create Channels, which must point at Cloud Pub/Sub
* endpoint.
@@ -511,6 +573,22 @@ public Builder setUseLegacyFlowControl(boolean value) {
return this;
}
+ /**
+ * Enables/Disabled ExactlyOnceDelivery
+ *
+ *
Will update the minDurationPerAckExtension if a user-provided value is not set
+ */
+ public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
+ // If exactlyOnceDeliveryIsEnabled we want to update the default minAckDeadlineExtension if
+ // applicable
+ if (exactlyOnceDeliveryEnabled && this.minDurationPerAckExtensionDefaultUsed) {
+ this.minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY;
+ }
+
+ this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled;
+ return this;
+ }
+
/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
@@ -537,8 +615,37 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
*
MaxDurationPerAckExtension configuration can be disabled by specifying a zero duration.
*/
public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) {
- Preconditions.checkArgument(maxDurationPerAckExtension.toMillis() >= 0);
+ // If a non-default min is set, make sure min is less than max
+ Preconditions.checkArgument(
+ maxDurationPerAckExtension.toMillis() >= 0
+ && (this.minDurationPerAckExtensionDefaultUsed
+ || (this.minDurationPerAckExtension.toMillis()
+ < maxDurationPerAckExtension.toMillis())));
this.maxDurationPerAckExtension = maxDurationPerAckExtension;
+ this.maxDurationPerAckExtensionDefaultUsed = false;
+ return this;
+ }
+
+ /**
+ * Set the lower bound for a single mod ack extention period.
+ *
+ *
The ack deadline will continue to be extended by up to this duration until
+ * MinAckExtensionPeriod is reached. Setting MinDurationPerAckExtension bounds the minimum
+ * amount of time before a mesage re-delivery in the event the Subscriber fails to extend the
+ * deadline.
+ *
+ *