diff --git a/google-cloud-pubsub/clirr-ignored-differences.xml b/google-cloud-pubsub/clirr-ignored-differences.xml
new file mode 100644
index 000000000..610c1b362
--- /dev/null
+++ b/google-cloud-pubsub/clirr-ignored-differences.xml
@@ -0,0 +1,20 @@
+
+
+
If exactly-once delivery is enabled on the subscription, the future returned by the ack/nack + * methods track the state of acknowledgement operation by the server. If the future completes + * successfully, the message is guaranteed NOT to be re-delivered. Otherwise, the future will + * contain an exception with more details about the failure and the message may be re-delivered. + * + *
If exactly-once delivery is NOT enabled on the subscription, the future returns immediately + * with an AckResponse.SUCCESS. Because re-deliveries are possible, you should ensure that your + * processing code is idempotent, as you may receive any given message more than once. + */ public interface AckReplyConsumerWithResponse { + /** + * Acknowledges that the message has been successfully processed. The service will not send the + * message again. + * + *
A future representing the server response is returned
+ */
Future A future representing the server response is returned
+ */
Future Will update the minDurationPerAckExtension if a user-provided value is not set
- */
- public Builder setExactlyOnceDeliveryEnabled(boolean exactlyOnceDeliveryEnabled) {
- // If exactlyOnceDeliveryIsEnabled we want to update the default minAckDeadlineExtension if
- // applicable
- if (exactlyOnceDeliveryEnabled && this.minDurationPerAckExtensionDefaultUsed) {
- this.minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY;
- }
-
- this.exactlyOnceDeliveryEnabled = exactlyOnceDeliveryEnabled;
- return this;
- }
-
/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java
index 7fcee6b2e..c72d52d3d 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java
@@ -209,7 +209,6 @@ public void receiveMessage(
receiveQueue.offer(MessageAndConsumer.create(message, consumer));
}
})
- .setExactlyOnceDeliveryEnabled(true)
.build();
subscriber.addListener(
new Subscriber.Listener() {
@@ -282,7 +281,6 @@ public void receiveMessage(
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.build())
- .setExactlyOnceDeliveryEnabled(false)
.build();
subscriber.addListener(
new Subscriber.Listener() {
@@ -360,7 +358,6 @@ public void receiveMessage(
SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.build())
- .setExactlyOnceDeliveryEnabled(false)
.build();
subscriber.addListener(
new Subscriber.Listener() {
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java
index 3ff13acfc..0b48e0991 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java
@@ -337,8 +337,7 @@ public void testExtension_GiveUp() throws Exception {
}
@Test
- public void testAckExtensionDefaultsExactlyOnceDeliveryOffThenOn() {
- // EnableExactlyOnceDelivery is turned off by default
+ public void testAckExtensionDefaultsExactlyOnceDeliveryDisabledThenEnabled() {
MessageDispatcher messageDispatcher =
MessageDispatcher.newBuilder(mock(MessageReceiver.class))
.setAckLatencyDistribution(mockAckLatencyDistribution)
@@ -348,13 +347,17 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryOffThenOn() {
.setMaxDurationPerAckExtensionDefaultUsed(true)
.build();
+ // ExactlyOnceDeliveryEnabled is turned off by default
+
// We should be using the Subscriber set hard deadlines
assertMinAndMaxAckDeadlines(
messageDispatcher,
Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()),
Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()));
- messageDispatcher.setEnableExactlyOnceDelivery(true);
+ // This would normally be set from the streaming pull response in the
+ // StreamingSubscriberConnection
+ messageDispatcher.setExactlyOnceDeliveryEnabled(true);
// Should only change min deadline
assertMinAndMaxAckDeadlines(
@@ -365,11 +368,10 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryOffThenOn() {
}
@Test
- public void testAckExtensionDefaultsExactlyOnceDeliveryOnThenOff() {
+ public void testAckExtensionDefaultsExactlyOnceDeliveryEnabledThenDisabled() {
MessageDispatcher messageDispatcher =
MessageDispatcher.newBuilder(mock(MessageReceiver.class))
.setAckLatencyDistribution(mockAckLatencyDistribution)
- .setEnableExactlyOnceDelivery(true)
.setMinDurationPerAckExtension(
Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY)
.setMinDurationPerAckExtensionDefaultUsed(true)
@@ -377,13 +379,19 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryOnThenOff() {
.setMaxDurationPerAckExtensionDefaultUsed(true)
.build();
+ // This would normally be set from the streaming pull response in the
+ // StreamingSubscriberConnection
+ messageDispatcher.setExactlyOnceDeliveryEnabled(true);
+
assertMinAndMaxAckDeadlines(
messageDispatcher,
Math.toIntExact(
Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY.getSeconds()),
Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()));
- messageDispatcher.setEnableExactlyOnceDelivery(false);
+ // This would normally be set from the streaming pull response in the
+ // StreamingSubscriberConnection
+ messageDispatcher.setExactlyOnceDeliveryEnabled(false);
// Should change min deadline
assertMinAndMaxAckDeadlines(
@@ -393,7 +401,7 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryOnThenOff() {
}
@Test
- public void testAckExtensionCustomMinExactlyOnceDeliveryOffThenOn() {
+ public void testAckExtensionCustomMinExactlyOnceDeliveryDisabledThenEnabled() {
int customMinSeconds = 30;
MessageDispatcher messageDispatcher =
MessageDispatcher.newBuilder(mock(MessageReceiver.class))
@@ -404,12 +412,15 @@ public void testAckExtensionCustomMinExactlyOnceDeliveryOffThenOn() {
.setMaxDurationPerAckExtensionDefaultUsed(true)
.build();
+ // ExactlyOnceDeliveryEnabled is turned off by default
assertMinAndMaxAckDeadlines(
messageDispatcher,
customMinSeconds,
Math.toIntExact(Subscriber.MAX_STREAM_ACK_DEADLINE.getSeconds()));
- messageDispatcher.setEnableExactlyOnceDelivery(true);
+ // This would normally be set from the streaming pull response in the
+ // StreamingSubscriberConnection
+ messageDispatcher.setExactlyOnceDeliveryEnabled(true);
// no changes should occur
assertMinAndMaxAckDeadlines(
@@ -419,7 +430,7 @@ public void testAckExtensionCustomMinExactlyOnceDeliveryOffThenOn() {
}
@Test
- public void testAckExtensionCustomMaxExactlyOnceDeliveryOffThenOn() {
+ public void testAckExtensionCustomMaxExactlyOnceDeliveryDisabledThenEnabled() {
int customMaxSeconds = 30;
MessageDispatcher messageDispatcher =
MessageDispatcher.newBuilder(mock(MessageReceiver.class))
@@ -430,12 +441,15 @@ public void testAckExtensionCustomMaxExactlyOnceDeliveryOffThenOn() {
.setMaxDurationPerAckExtensionDefaultUsed(false)
.build();
+ // ExactlyOnceDeliveryEnabled is turned off by default
assertMinAndMaxAckDeadlines(
messageDispatcher,
Math.toIntExact(Subscriber.MIN_STREAM_ACK_DEADLINE.getSeconds()),
customMaxSeconds);
- messageDispatcher.setEnableExactlyOnceDelivery(true);
+ // This would normally be set from the streaming pull response in the
+ // StreamingSubscriberConnection
+ messageDispatcher.setExactlyOnceDeliveryEnabled(true);
// Because the customMaxSeconds is above the
// DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY, we should use the customMaxSeconds
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java
index 6ad951001..d8e1878dd 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java
@@ -452,13 +452,18 @@ public void testSetFailureResponseOutstandingMessages() {
private StreamingSubscriberConnection getStreamingSubscriberConnection(
boolean exactlyOnceDeliveryEnabled) {
- return getStreamingSubscriberReceiverFromBuilder(
- StreamingSubscriberConnection.newBuilder(mock(MessageReceiverWithAckResponse.class)),
- exactlyOnceDeliveryEnabled);
+ StreamingSubscriberConnection streamingSubscriberConnection =
+ getStreamingSubscriberConnectionFromBuilder(
+ StreamingSubscriberConnection.newBuilder(mock(MessageReceiverWithAckResponse.class)));
+
+ // This would normally be set from the streaming pull response
+ streamingSubscriberConnection.setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled);
+
+ return streamingSubscriberConnection;
}
- private StreamingSubscriberConnection getStreamingSubscriberReceiverFromBuilder(
- StreamingSubscriberConnection.Builder builder, boolean exactlyOnceDeliveryEnabled) {
+ private StreamingSubscriberConnection getStreamingSubscriberConnectionFromBuilder(
+ StreamingSubscriberConnection.Builder builder) {
return builder
.setSubscription(MOCK_SUBSCRIPTION_NAME)
.setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT)
@@ -474,7 +479,6 @@ private StreamingSubscriberConnection getStreamingSubscriberReceiverFromBuilder(
.setMinDurationPerAckExtensionDefaultUsed(true)
.setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION)
.setMaxDurationPerAckExtensionDefaultUsed(true)
- .setExactlyOnceDeliveryEnabled(exactlyOnceDeliveryEnabled)
.build();
}
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java
index ab7021bba..612c244fe 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java
@@ -287,16 +287,6 @@ public void testStreamAckDeadlineIsSetCorrectly() throws Exception {
fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());
subscriber.stopAsync().awaitTerminated();
-
- // maxDurationPerAckExtension is unset with exactly once enabled
- subscriber =
- startSubscriber(getTestSubscriberBuilder(testReceiver).setExactlyOnceDeliveryEnabled(true));
- assertEquals(
- expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
- assertEquals(
- Math.toIntExact(Subscriber.STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT.getSeconds()),
- fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());
- subscriber.stopAsync().awaitTerminated();
}
@Test
@@ -358,7 +348,6 @@ private Builder getTestSubscriberBuilder(
.setCredentialsProvider(NoCredentialsProvider.create())
.setClock(fakeExecutor.getClock())
.setParallelPullCount(1)
- .setExactlyOnceDeliveryEnabled(true)
.setFlowControlSettings(
FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build());
}