diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index 755996929..d3969559a 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -636,7 +636,9 @@ Note that if there are messages available the call may return earlier than this - `messageVisibilitySeconds` - Set the minimum visibility for the messages retrieved in a poll. Note that for `FIFO` single message listener methods, this visibility is applied to the whole batch before each message is sent to the listener. See <> for more information. - +- `acknowledgementMode` - Set the acknowledgement mode for the container. +If any value is set, it will take precedence over the acknowledgement mode defined for the container factory options. +See <> for more information. ===== Listener Method Arguments @@ -1326,6 +1328,8 @@ NOTE: All options are available for both `single message` and `batch` message li - `ALWAYS` - Acknowledges a message or batch of messages after processing returns success or error. - `MANUAL` - The framework won't acknowledge messages automatically and `Acknowledgement` objects can be received in the listener method. +The `Acknowledgement` strategy can be configured in the `SqsContainerOptions` or in the `@SqsListener` annotation. + ==== Acknowledgement Batching The `acknowledgementInterval` and `acknowledgementThreshold` options enable acknowledgement batching. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/AbstractListenerAnnotationBeanPostProcessor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/AbstractListenerAnnotationBeanPostProcessor.java index f87137bd7..dc9f166c6 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/AbstractListenerAnnotationBeanPostProcessor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/AbstractListenerAnnotationBeanPostProcessor.java @@ -22,22 +22,10 @@ import io.awspring.cloud.sqs.config.HandlerMethodEndpoint; import io.awspring.cloud.sqs.config.SqsEndpoint; import io.awspring.cloud.sqs.config.SqsListenerConfigurer; +import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode; import io.awspring.cloud.sqs.support.resolver.AcknowledgmentHandlerMethodArgumentResolver; import io.awspring.cloud.sqs.support.resolver.BatchAcknowledgmentArgumentResolver; import io.awspring.cloud.sqs.support.resolver.BatchPayloadMethodArgumentResolver; -import java.lang.annotation.Annotation; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; @@ -69,11 +57,26 @@ import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + /** * {@link BeanPostProcessor} implementation that scans beans for a {@link SqsListener @SqsListener} annotation, extracts * information to a {@link SqsEndpoint}, and registers it in the {@link EndpointRegistrar}. * * @author Tomaz Fernandes + * @author Joao Calassio * @since 3.0 */ public abstract class AbstractListenerAnnotationBeanPostProcessor @@ -219,6 +222,17 @@ protected Integer resolveAsInteger(String value, String propertyName) { } } + @Nullable + protected AcknowledgementMode resolveAcknowledgement(String value) { + try { + final String resolvedValue = resolveAsString(value, "acknowledgementMode"); + return StringUtils.hasText(resolvedValue) ? AcknowledgementMode.valueOf(resolvedValue) : null; + } + catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Cannot resolve " + value + " as AcknowledgementMode", e); + } + } + protected String getEndpointId(String id) { if (StringUtils.hasText(id)) { return resolveAsString(id, "id"); diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java index 3ee6af09a..a110580c8 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListener.java @@ -74,6 +74,7 @@ * @author Alain Sahli * @author Matej Nedic * @author Tomaz Fernandes + * @author Joao Calassio * @since 1.1 */ @Target(ElementType.METHOD) @@ -137,4 +138,10 @@ */ String messageVisibilitySeconds() default ""; + /** + * The acknowledgement mode to be used for the provided queues. If not specified, the acknowledgement mode defined + * for the container factory will be used. + */ + String acknowledgementMode() default ""; + } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAcknowledgementMode.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAcknowledgementMode.java new file mode 100644 index 000000000..0044a10a8 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAcknowledgementMode.java @@ -0,0 +1,51 @@ +/* + * Copyright 2013-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.annotation; + +import io.awspring.cloud.sqs.listener.acknowledgement.handler.AlwaysAcknowledgementHandler; +import io.awspring.cloud.sqs.listener.acknowledgement.handler.NeverAcknowledgementHandler; +import io.awspring.cloud.sqs.listener.acknowledgement.handler.OnSuccessAcknowledgementHandler; + +/** + * Acknowledgement strategies supported by the {@link SqsListener} annotation. + * + * @author Joao Calassio + * @since 3.1 + * @see OnSuccessAcknowledgementHandler + * @see AlwaysAcknowledgementHandler + * @see NeverAcknowledgementHandler + * @see io.awspring.cloud.sqs.listener.ContainerOptions + * @see SqsListener + */ +public class SqsListenerAcknowledgementMode { + + /** + * Messages will be acknowledged when message processing is successful. + */ + public static final String ON_SUCCESS = "ON_SUCCESS"; + + /** + * Messages will be acknowledged whether processing was completed successfully or with an error. + */ + public static final String ALWAYS = "ALWAYS"; + + /** + * Messages will not be acknowledged automatically by the container. + * @see io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement + */ + public static final String MANUAL = "MANUAL"; + +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java index 7d695f706..74b2fed89 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/annotation/SqsListenerAnnotationBeanPostProcessor.java @@ -30,6 +30,7 @@ * {@link AbstractListenerAnnotationBeanPostProcessor} implementation for {@link SqsListener @SqsListener}. * * @author Tomaz Fernandes + * @author Joao Calassio * @since 3.0 */ public class SqsListenerAnnotationBeanPostProcessor extends AbstractListenerAnnotationBeanPostProcessor { @@ -51,7 +52,7 @@ protected Endpoint createEndpoint(SqsListener sqsListenerAnnotation) { resolveAsInteger(sqsListenerAnnotation.maxConcurrentMessages(), "maxConcurrentMessages")) .messageVisibility( resolveAsInteger(sqsListenerAnnotation.messageVisibilitySeconds(), "messageVisibility")) - .build(); + .acknowledgementMode(resolveAcknowledgement(sqsListenerAnnotation.acknowledgementMode())).build(); } @Override diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsEndpoint.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsEndpoint.java index 8662211b8..908ad608d 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsEndpoint.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsEndpoint.java @@ -16,6 +16,7 @@ package io.awspring.cloud.sqs.config; import io.awspring.cloud.sqs.annotation.SqsListener; +import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode; import java.time.Duration; import java.util.Collection; import org.springframework.lang.Nullable; @@ -26,6 +27,7 @@ * Contains properties that should be mapped from {@link SqsListener @SqsListener} annotations. * * @author Tomaz Fernandes + * @author Joao Calassio * @since 3.0 */ public class SqsEndpoint extends AbstractEndpoint { @@ -38,12 +40,16 @@ public class SqsEndpoint extends AbstractEndpoint { private final Integer maxMessagesPerPoll; + @Nullable + private final AcknowledgementMode acknowledgementMode; + protected SqsEndpoint(SqsEndpointBuilder builder) { super(builder.queueNames, builder.factoryName, builder.id); this.maxConcurrentMessages = builder.maxConcurrentMessages; this.pollTimeoutSeconds = builder.pollTimeoutSeconds; this.messageVisibility = builder.messageVisibility; this.maxMessagesPerPoll = builder.maxMessagesPerPoll; + this.acknowledgementMode = builder.acknowledgementMode; } /** @@ -91,6 +97,15 @@ public Duration getMessageVisibility() { return this.messageVisibility != null ? Duration.ofSeconds(this.messageVisibility) : null; } + /** + * Returns the acknowledgement mode configured for this endpoint. + * @return the acknowledgement mode. + */ + @Nullable + public AcknowledgementMode getAcknowledgementMode() { + return this.acknowledgementMode; + } + public static class SqsEndpointBuilder { private Collection queueNames; @@ -107,6 +122,9 @@ public static class SqsEndpointBuilder { private Integer maxMessagesPerPoll; + @Nullable + private AcknowledgementMode acknowledgementMode; + public SqsEndpointBuilder queueNames(Collection queueNames) { this.queueNames = queueNames; return this; @@ -142,6 +160,11 @@ public SqsEndpointBuilder id(String id) { return this; } + public SqsEndpointBuilder acknowledgementMode(@Nullable AcknowledgementMode acknowledgementMode) { + this.acknowledgementMode = acknowledgementMode; + return this; + } + public SqsEndpoint build() { return new SqsEndpoint(this); } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java index 85f03c11c..1b335b3c4 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/config/SqsMessageListenerContainerFactory.java @@ -122,6 +122,7 @@ * used. * * @author Tomaz Fernandes + * @author Joao Calassio * @since 3.0 * @see SqsMessageListenerContainer * @see ContainerOptions @@ -161,7 +162,8 @@ private void configureFromSqsEndpoint(SqsEndpoint sqsEndpoint, SqsContainerOptio ConfigUtils.INSTANCE.acceptIfNotNull(sqsEndpoint.getMaxConcurrentMessages(), options::maxConcurrentMessages) .acceptIfNotNull(sqsEndpoint.getMaxMessagesPerPoll(), options::maxMessagesPerPoll) .acceptIfNotNull(sqsEndpoint.getPollTimeout(), options::pollTimeout) - .acceptIfNotNull(sqsEndpoint.getMessageVisibility(), options::messageVisibility); + .acceptIfNotNull(sqsEndpoint.getMessageVisibility(), options::messageVisibility) + .acceptIfNotNull(sqsEndpoint.getAcknowledgementMode(), options::acknowledgementMode); } /** diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/annotation/SqsListenerAcknowledgementModeTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/annotation/SqsListenerAcknowledgementModeTests.java new file mode 100644 index 000000000..a58815a8b --- /dev/null +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/annotation/SqsListenerAcknowledgementModeTests.java @@ -0,0 +1,41 @@ +/* + * Copyright 2013-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.annotation; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode; +import java.lang.reflect.Field; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +/** + * Tests for {@link SqsListenerAcknowledgementMode} enum values + * + * @author Joao Calassio + */ +class SqsListenerAcknowledgementModeTests { + + @ParameterizedTest + @EnumSource(AcknowledgementMode.class) + void shouldHaveAllValuesOfAcknowledgementModeEnum(final AcknowledgementMode acknowledgementMode) + throws NoSuchFieldException, IllegalAccessException { + Class clz = SqsListenerAcknowledgementMode.class; + Field correspondingValue = clz.getDeclaredField(acknowledgementMode.name()); + assertEquals(acknowledgementMode.name(), correspondingValue.get(clz)); + } + +} diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsAnnotationAcknowledgementIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsAnnotationAcknowledgementIntegrationTests.java new file mode 100644 index 000000000..2784732b6 --- /dev/null +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsAnnotationAcknowledgementIntegrationTests.java @@ -0,0 +1,413 @@ +/* + * Copyright 2013-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.integration; + +import static java.util.Collections.singletonMap; +import static java.util.Map.entry; +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.sqs.CompletableFutures; +import io.awspring.cloud.sqs.MessageHeaderUtils; +import io.awspring.cloud.sqs.annotation.SqsListener; +import io.awspring.cloud.sqs.annotation.SqsListenerAcknowledgementMode; +import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration; +import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; +import io.awspring.cloud.sqs.listener.ContainerComponentFactory; +import io.awspring.cloud.sqs.listener.SqsContainerOptions; +import io.awspring.cloud.sqs.listener.SqsHeaders; +import io.awspring.cloud.sqs.listener.StandardSqsComponentFactory; +import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementExecutor; +import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback; +import io.awspring.cloud.sqs.listener.acknowledgement.SqsAcknowledgementExecutor; +import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; +import io.awspring.cloud.sqs.listener.source.AbstractSqsMessageSource; +import io.awspring.cloud.sqs.listener.source.MessageSource; +import io.awspring.cloud.sqs.operations.SqsTemplate; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.messaging.Message; +import org.springframework.test.context.TestPropertySource; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.QueueAttributeName; + +/** + * Integration tests for custom acknowledgement modes on SqsListener annotation + * + * @author Joao Calassio + */ +@SpringBootTest +@TestPropertySource(properties = { "test.property.acknowledgement=ALWAYS" }) +public class SqsAnnotationAcknowledgementIntegrationTests extends BaseSqsIntegrationTest { + + private static final Logger logger = LoggerFactory.getLogger(SqsAnnotationAcknowledgementIntegrationTests.class); + + static final String ACK_AFTER_SECOND_ERROR_FACTORY = "ackAfterSecondErrorFactory"; + + static final String ANNOTATION_ALWAYS_ACK_SUCCESS_QUEUE_NAME = "annotation_always_ack_success_test_queue"; + + static final String ANNOTATION_ALWAYS_ACK_ERROR_QUEUE_NAME = "annotation_always_ack_error_test_queue"; + + static final String ANNOTATION_ON_SUCCESS_ACK_SUCCESS_QUEUE_NAME = "annotation_on_success_ack_success_test_queue"; + + static final String ANNOTATION_ON_SUCCESS_ACK_ERROR_QUEUE_NAME = "annotation_on_success_ack_error_test_queue"; + + static final String ANNOTATION_MANUAL_ACK_QUEUE_NAME = "annotation_ack_manual_test_queue"; + + @Autowired + LatchContainer latchContainer; + + @Autowired + SqsTemplate sqsTemplate; + + @BeforeAll + static void beforeTests() { + SqsAsyncClient client = createAsyncClient(); + CompletableFuture.allOf(createQueue(client, ANNOTATION_ALWAYS_ACK_SUCCESS_QUEUE_NAME), + createQueue(client, ANNOTATION_ALWAYS_ACK_ERROR_QUEUE_NAME), + createQueue(client, ANNOTATION_ON_SUCCESS_ACK_SUCCESS_QUEUE_NAME), + createQueue(client, ANNOTATION_ON_SUCCESS_ACK_ERROR_QUEUE_NAME, + singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "1")), + createQueue(client, ANNOTATION_MANUAL_ACK_QUEUE_NAME)).join(); + } + + @Test + void annotationAlwaysAckOnSuccess() throws Exception { + String messageBody = "annotationAlwaysAckOnSuccess-payload"; + sqsTemplate.send(ANNOTATION_ALWAYS_ACK_SUCCESS_QUEUE_NAME, messageBody); + logger.debug("Sent message to queue {} with messageBody {}", ANNOTATION_ALWAYS_ACK_SUCCESS_QUEUE_NAME, + messageBody); + assertThat(latchContainer.annotationAlwaysAckSuccessLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @Test + void annotationAlwaysAckOnError() throws Exception { + String messageBody = "annotationAlwaysAckOnError-payload"; + sqsTemplate.send(ANNOTATION_ALWAYS_ACK_ERROR_QUEUE_NAME, messageBody); + logger.debug("Sent message to queue {} with messageBody {}", ANNOTATION_ALWAYS_ACK_ERROR_QUEUE_NAME, + messageBody); + assertThat(latchContainer.annotationAlwaysAckErrorLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @Test + void annotationOnSuccessAckOnSuccess() throws Exception { + String messageBody = "annotationOnSuccessAckOnSuccess-payload"; + sqsTemplate.send(ANNOTATION_ON_SUCCESS_ACK_SUCCESS_QUEUE_NAME, messageBody); + logger.debug("Sent message to queue {} with messageBody {}", ANNOTATION_ON_SUCCESS_ACK_SUCCESS_QUEUE_NAME, + messageBody); + assertThat(latchContainer.annotationOnSuccessAckSuccessLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @Test + void annotationOnSuccessAckOnError() throws Exception { + String messageBody = "annotationOnSuccessAckOnError-payload"; + sqsTemplate.send(ANNOTATION_ON_SUCCESS_ACK_ERROR_QUEUE_NAME, messageBody); + logger.debug("Sent message to queue {} with messageBody {}", ANNOTATION_ON_SUCCESS_ACK_ERROR_QUEUE_NAME, + messageBody); + assertThat(latchContainer.annotationOnSuccessAckErrorLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(latchContainer.annotationOnSuccessAckErrorCallbackLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @Test + void annotationManualAck() throws Exception { + String messageBody = "annotationManualAck-payload"; + sqsTemplate.send(ANNOTATION_MANUAL_ACK_QUEUE_NAME, messageBody); + logger.debug("Sent message to queue {} with messageBody {}", ANNOTATION_MANUAL_ACK_QUEUE_NAME, messageBody); + assertThat(latchContainer.annotationManualAckLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(latchContainer.annotationManualAckLatchCallback.await(10, TimeUnit.SECONDS)).isFalse(); + } + + static class AnnotationAlwaysAcknowledgeOnSuccessMessagesListener { + + @Autowired + LatchContainer latchContainer; + + @SqsListener(queueNames = ANNOTATION_ALWAYS_ACK_SUCCESS_QUEUE_NAME, id = "annotation-always-ack-success", acknowledgementMode = SqsListenerAcknowledgementMode.ALWAYS) + void listen(String message) { + logger.debug("Received message in Listener Method: " + message); + latchContainer.annotationAlwaysAckSuccessLatch.countDown(); + } + + } + + static class AnnotationAlwaysAcknowledgeOnErrorMessagesListener { + + @Autowired + LatchContainer latchContainer; + + @SqsListener(queueNames = ANNOTATION_ALWAYS_ACK_ERROR_QUEUE_NAME, id = "annotation-always-ack-error", acknowledgementMode = "${test.property.acknowledgement}") + void listen(String message) { + logger.debug("Received message in Listener Method: " + message); + latchContainer.annotationAlwaysAckErrorLatch.countDown(); + throw new RuntimeException("Expected exception from annotation-ack-always-error"); + } + + } + + static class AnnotationOnSuccessAcknowledgeOnSuccessMessagesListener { + + @Autowired + LatchContainer latchContainer; + + @SqsListener(queueNames = ANNOTATION_ON_SUCCESS_ACK_SUCCESS_QUEUE_NAME, factory = ACK_AFTER_SECOND_ERROR_FACTORY, id = "annotation-onsuccess-ack-success", acknowledgementMode = SqsListenerAcknowledgementMode.ON_SUCCESS) + void listen(String message) { + logger.debug("Received message in Listener Method: " + message); + latchContainer.annotationOnSuccessAckSuccessLatch.countDown(); + } + + } + + static class AnnotationOnSuccessAcknowledgeOnErrorMessagesListener { + + @Autowired + LatchContainer latchContainer; + + @SqsListener(queueNames = ANNOTATION_ON_SUCCESS_ACK_ERROR_QUEUE_NAME, factory = ACK_AFTER_SECOND_ERROR_FACTORY, id = "annotation-onsuccess-ack-error", acknowledgementMode = SqsListenerAcknowledgementMode.ON_SUCCESS) + void listen(String message) { + logger.debug("Received message in Listener Method: " + message); + latchContainer.annotationOnSuccessAckErrorLatch.countDown(); + throw new RuntimeException("Expected exception from annotation-onsuccess-ack-error"); + } + + } + + static class AnnotationManualAcknowledgeMessagesListener { + + @Autowired + LatchContainer latchContainer; + + @SqsListener(queueNames = ANNOTATION_MANUAL_ACK_QUEUE_NAME, id = "annotation-manual-ack", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL) + void listen(String message) { + logger.debug("Received message in Listener Method: " + message); + latchContainer.annotationManualAckLatch.countDown(); + } + + } + + static class LatchContainer { + + final CountDownLatch annotationAlwaysAckSuccessLatch = new CountDownLatch(2); + + final CountDownLatch annotationAlwaysAckErrorLatch = new CountDownLatch(2); + + final CountDownLatch annotationOnSuccessAckSuccessLatch = new CountDownLatch(2); + + final CountDownLatch annotationOnSuccessAckErrorLatch = new CountDownLatch(1); + + final CountDownLatch annotationOnSuccessAckErrorCallbackLatch = new CountDownLatch(1); + + final CountDownLatch annotationManualAckLatch = new CountDownLatch(1); + + final CountDownLatch annotationManualAckLatchCallback = new CountDownLatch(1); + + } + + @Import(SqsBootstrapConfiguration.class) + @Configuration + static class SQSConfiguration { + + LatchContainer latchContainer = new LatchContainer(); + + @Bean + LatchContainer latchContainer() { + return this.latchContainer; + } + + @Bean + SqsTemplate sqsTemplate() { + return SqsTemplate.builder().sqsAsyncClient(BaseSqsIntegrationTest.createAsyncClient()).build(); + } + + @Bean + AnnotationAlwaysAcknowledgeOnSuccessMessagesListener annotationAlwaysAcknowledgeOnSuccessMessagesListener() { + return new AnnotationAlwaysAcknowledgeOnSuccessMessagesListener(); + } + + @Bean + AnnotationAlwaysAcknowledgeOnErrorMessagesListener annotationAlwaysAcknowledgeOnErrorMessagesListener() { + return new AnnotationAlwaysAcknowledgeOnErrorMessagesListener(); + } + + @Bean + AnnotationOnSuccessAcknowledgeOnSuccessMessagesListener annotationOnSuccessAcknowledgeOnSuccessMessagesListener() { + return new AnnotationOnSuccessAcknowledgeOnSuccessMessagesListener(); + } + + @Bean + AnnotationOnSuccessAcknowledgeOnErrorMessagesListener annotationOnSuccessAcknowledgeOnErrorMessagesListener() { + return new AnnotationOnSuccessAcknowledgeOnErrorMessagesListener(); + } + + @Bean + AnnotationManualAcknowledgeMessagesListener annotationManualAcknowledgeMessagesListener() { + return new AnnotationManualAcknowledgeMessagesListener(); + } + + @Bean + public SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory() { + return SqsMessageListenerContainerFactory.builder() + .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient) + .acknowledgementResultCallback(getAcknowledgementResultCallback()) + .configure(options -> options.maxDelayBetweenPolls(Duration.ofSeconds(5)) + .queueAttributeNames(Collections.singletonList(QueueAttributeName.QUEUE_ARN)) + .pollTimeout(Duration.ofSeconds(5))) + .build(); + } + + @Bean(name = ACK_AFTER_SECOND_ERROR_FACTORY) + public SqsMessageListenerContainerFactory ackAfterSecondErrorFactory() { + return SqsMessageListenerContainerFactory.builder() + .configure(options -> options.maxConcurrentMessages(10).pollTimeout(Duration.ofSeconds(10)) + .maxMessagesPerPoll(10).maxDelayBetweenPolls(Duration.ofSeconds(1))) + .containerComponentFactories(getExceptionThrowingAckExecutor()) + .acknowledgementResultCallback(getAcknowledgementResultCallback()).errorHandler(testErrorHandler()) + .sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient).build(); + } + + private List> getExceptionThrowingAckExecutor() { + return Collections.singletonList(new StandardSqsComponentFactory() { + @Override + public MessageSource createMessageSource(SqsContainerOptions options) { + return new AbstractSqsMessageSource() { + @Override + protected AcknowledgementExecutor createAcknowledgementExecutorInstance() { + return new SqsAcknowledgementExecutor() { + + final AtomicBoolean hasThrown = new AtomicBoolean(); + + @Override + public CompletableFuture execute(Collection> messagesToAck) { + final String queueName = MessageHeaderUtils.getHeaderAsString( + messagesToAck.iterator().next(), SqsHeaders.SQS_QUEUE_NAME_HEADER); + if (queueName.equals(ANNOTATION_ON_SUCCESS_ACK_ERROR_QUEUE_NAME) + && hasThrown.compareAndSet(false, true)) { + return CompletableFutures.failedFuture(new RuntimeException( + "Expected acknowledgement exception for " + queueName)); + } + return super.execute(messagesToAck); + } + }; + } + }; + } + }); + } + + private AcknowledgementResultCallback getAcknowledgementResultCallback() { + return new AcknowledgementResultCallback<>() { + @Override + public void onSuccess(Collection> messages) { + logger.debug("Invoking on success acknowledgement result callback for {}", + MessageHeaderUtils.getId(messages)); + final String queueName = MessageHeaderUtils.getHeaderAsString(messages.iterator().next(), + SqsHeaders.SQS_QUEUE_NAME_HEADER); + + final Map latches = Map.ofEntries( + entry(ANNOTATION_ALWAYS_ACK_SUCCESS_QUEUE_NAME, + latchContainer.annotationAlwaysAckSuccessLatch), + entry(ANNOTATION_ALWAYS_ACK_ERROR_QUEUE_NAME, latchContainer.annotationAlwaysAckErrorLatch), + entry(ANNOTATION_ON_SUCCESS_ACK_SUCCESS_QUEUE_NAME, + latchContainer.annotationOnSuccessAckSuccessLatch), + entry(ANNOTATION_MANUAL_ACK_QUEUE_NAME, latchContainer.annotationManualAckLatchCallback)); + + if (latches.containsKey(queueName)) { + latches.get(queueName).countDown(); + } + } + + @Override + public void onFailure(Collection> messages, Throwable t) { + logger.debug("Invoking on failure acknowledgement result callback for {}", + MessageHeaderUtils.getId(messages)); + final String queueName = MessageHeaderUtils.getHeaderAsString(messages.iterator().next(), + SqsHeaders.SQS_QUEUE_NAME_HEADER); + + final Map latches = Map + .ofEntries(entry(ANNOTATION_ON_SUCCESS_ACK_ERROR_QUEUE_NAME, + latchContainer.annotationOnSuccessAckErrorCallbackLatch)); + + if (latches.containsKey(queueName)) { + latches.get(queueName).countDown(); + } + } + }; + } + + private AsyncErrorHandler testErrorHandler() { + return new AsyncErrorHandler() { + + final List previousMessages = Collections.synchronizedList(new ArrayList<>()); + + @Override + public CompletableFuture handle(Message message, Throwable t) { + // Eventually ack to not interfere with other tests. + if (previousMessages.contains(message.getPayload())) { + return CompletableFuture.completedFuture(null); + } + previousMessages.add(message.getPayload()); + return CompletableFutures.failedFuture(t); + } + + @Override + public CompletableFuture handle(Collection> messages, Throwable t) { + // Eventually ack to not interfere with other tests. + if (previousMessages.containsAll(toPayloadList(messages))) { + return CompletableFuture.completedFuture(null); + } + previousMessages.addAll(toPayloadList(messages)); + return CompletableFutures.failedFuture(t); + } + + private List toPayloadList(Collection> messages) { + return messages.stream().map(Message::getPayload).collect(Collectors.toList()); + } + + private Collection getBatchEntries( + Collection> messages) { + return messages.stream().map(this::getBatchEntry).collect(Collectors.toList()); + } + + private DeleteMessageBatchRequestEntry getBatchEntry(Message message) { + return DeleteMessageBatchRequestEntry.builder().id(UUID.randomUUID().toString()) + .receiptHandle( + MessageHeaderUtils.getHeaderAsString(message, SqsHeaders.SQS_RECEIPT_HANDLE_HEADER)) + .build(); + } + }; + } + + } + +}