From 2f2a08a74712d5c6d475a0adf8ba01e08fb83258 Mon Sep 17 00:00:00 2001 From: Marcin Milewski Date: Wed, 31 May 2023 13:09:22 +0700 Subject: [PATCH 1/2] Add support for autoStartup --- docs/src/main/asciidoc/sqs.adoc | 7 ++++ .../listener/AbstractContainerOptions.java | 19 ++++++++++ .../AbstractMessageListenerContainer.java | 5 +++ .../cloud/sqs/listener/ContainerOptions.java | 7 ++++ .../sqs/listener/ContainerOptionsBuilder.java | 9 +++++ .../sqs/integration/SqsIntegrationTests.java | 37 +++++++++++++++++++ ...DefaultListenerContainerRegistryTests.java | 19 ++++++++++ 7 files changed, 103 insertions(+) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index e7f5d80a8..dc5d1bf10 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -803,6 +803,13 @@ See AWS documentation for more information. After that period, the framework will try to perform a partial acquire with the available permits, resulting in a poll for less than `maxMessagesPerPoll` messages, unless otherwise configured. See <>. +|`autoStartup` +|true, false +|true +|Determines wherever container should start automatically. When set to false the +container will not launch on startup, requiring manual intervention to start it. +See <>. + |`listenerShutdownTimeout` |0 - undefined |10 seconds diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java index 5a6df4050..3945b3270 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java @@ -37,6 +37,8 @@ public abstract class AbstractContainerOptions, private final int maxMessagesPerPoll; + private final boolean autoStartup; + private final Duration pollTimeout; private final Duration maxDelayBetweenPolls; @@ -71,6 +73,7 @@ public abstract class AbstractContainerOptions, protected AbstractContainerOptions(Builder builder) { this.maxConcurrentMessages = builder.maxConcurrentMessages; this.maxMessagesPerPoll = builder.maxMessagesPerPoll; + this.autoStartup = builder.autoStartup; this.pollTimeout = builder.pollTimeout; this.maxDelayBetweenPolls = builder.maxDelayBetweenPolls; this.listenerShutdownTimeout = builder.listenerShutdownTimeout; @@ -99,6 +102,11 @@ public int getMaxMessagesPerPoll() { return this.maxMessagesPerPoll; } + @Override + public boolean isAutoStartup() { + return this.autoStartup; + } + @Override public Duration getPollTimeout() { return this.pollTimeout; @@ -176,6 +184,8 @@ protected abstract static class Builder, private static final int DEFAULT_MAX_MESSAGES_PER_POLL = 10; + private static final boolean DEFAULT_AUTO_STARTUP = true; + private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofSeconds(10); private static final Duration DEFAULT_SEMAPHORE_TIMEOUT = Duration.ofSeconds(10); @@ -196,6 +206,8 @@ protected abstract static class Builder, private int maxMessagesPerPoll = DEFAULT_MAX_MESSAGES_PER_POLL; + private boolean autoStartup = DEFAULT_AUTO_STARTUP; + private Duration pollTimeout = DEFAULT_POLL_TIMEOUT; private Duration maxDelayBetweenPolls = DEFAULT_SEMAPHORE_TIMEOUT; @@ -233,6 +245,7 @@ protected Builder() { protected Builder(AbstractContainerOptions options) { this.maxConcurrentMessages = options.maxConcurrentMessages; this.maxMessagesPerPoll = options.maxMessagesPerPoll; + this.autoStartup = options.autoStartup; this.pollTimeout = options.pollTimeout; this.maxDelayBetweenPolls = options.maxDelayBetweenPolls; this.listenerShutdownTimeout = options.listenerShutdownTimeout; @@ -261,6 +274,12 @@ public B maxMessagesPerPoll(int maxMessagesPerPoll) { return self(); } + @Override + public B autoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + return self(); + } + @Override public B pollTimeout(Duration pollTimeout) { Assert.notNull(pollTimeout, "pollTimeout cannot be null"); diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java index 5375407d0..5307a9483 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractMessageListenerContainer.java @@ -252,6 +252,11 @@ public boolean isRunning() { return this.isRunning; } + @Override + public boolean isAutoStartup() { + return containerOptions.isAutoStartup(); + } + @Override public void start() { if (this.isRunning) { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java index 968099937..a85509eaf 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java @@ -50,6 +50,13 @@ public interface ContainerOptions, B extends Co */ int getMaxMessagesPerPoll(); + /** + * Checks whether the container should be started automatically or manually. Default is true. + * + * @return true if the container starts automatically, false if it should be started manually + */ + boolean isAutoStartup(); + /** * Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to * acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java index 8eb436473..6d706a744 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java @@ -46,6 +46,15 @@ public interface ContainerOptionsBuilder */ B maxMessagesPerPoll(int maxMessagesPerPoll); + /** + * Set whether the container should be started automatically or manually. By default, the container is set to start + * automatically. + * + * @param autoStartup true if the container is set to start automatically, false if it should be started manually + * @return this instance. + */ + B autoStartup(boolean autoStartup); + /** * Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to * acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available. diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java index 7bdc22745..74d762b37 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java @@ -64,6 +64,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -114,6 +115,8 @@ class SqsIntegrationTests extends BaseSqsIntegrationTest { static final String MANUALLY_CREATE_CONTAINER_QUEUE_NAME = "manually_create_container_test_queue"; + static final String MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME = "manually_create_inactive_container_test_queue"; + static final String MANUALLY_CREATE_FACTORY_QUEUE_NAME = "manually_create_factory_test_queue"; static final String LOW_RESOURCE_FACTORY = "lowResourceFactory"; @@ -139,6 +142,7 @@ static void beforeTests() { createQueue(client, RESOLVES_PARAMETER_TYPES_QUEUE_NAME, singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "20")), createQueue(client, MANUALLY_CREATE_CONTAINER_QUEUE_NAME), + createQueue(client, MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME), createQueue(client, MANUALLY_CREATE_FACTORY_QUEUE_NAME)).join(); } @@ -151,6 +155,10 @@ static void beforeTests() { @Autowired ObjectMapper objectMapper; + @Autowired + @Qualifier("inactiveContainer") + MessageListenerContainer inactiveMessageListenerContainer; + @Test void receivesMessage() throws Exception { String messageBody = "receivesMessage-payload"; @@ -234,6 +242,17 @@ void manuallyCreatesContainer() throws Exception { assertThat(latchContainer.manuallyCreatedContainerLatch.await(10, TimeUnit.SECONDS)).isTrue(); } + @Test + void manuallyCreatesInactiveContainer() throws Exception { + String messageBody = "Testing manually creates inactive container"; + assertThat(inactiveMessageListenerContainer.isRunning()).isFalse(); + sqsTemplate.send(MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME, messageBody); + inactiveMessageListenerContainer.start(); + logger.debug("Sent message to queue {} with messageBody {}", MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME, + messageBody); + assertThat(latchContainer.manuallyInactiveCreatedContainerLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + // @formatter:off @Test void manuallyStartsContainerAndChangesComponent() throws Exception { @@ -421,6 +440,7 @@ static class LatchContainer { final CountDownLatch acknowledgementCallbackSuccessLatch = new CountDownLatch(1); final CountDownLatch acknowledgementCallbackBatchLatch = new CountDownLatch(1); final CountDownLatch acknowledgementCallbackErrorLatch = new CountDownLatch(1); + final CountDownLatch manuallyInactiveCreatedContainerLatch = new CountDownLatch(1); } @@ -543,6 +563,23 @@ public MessageListenerContainer manuallyCreatedContainer() throws Except .build(); } + @Bean("inactiveContainer") + public MessageListenerContainer manuallyCreatedInactiveContainer() throws Exception { + SqsAsyncClient client = BaseSqsIntegrationTest.createAsyncClient(); + String queueUrl = client.getQueueUrl(req -> req.queueName(MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME)).get() + .queueUrl(); + return SqsMessageListenerContainer + .builder() + .queueNames(queueUrl) + .sqsAsyncClient(client) + .configure(options -> options + .autoStartup(false) + .maxDelayBetweenPolls(Duration.ofSeconds(1)) + .pollTimeout(Duration.ofSeconds(3))) + .messageListener(msg -> {latchContainer.manuallyInactiveCreatedContainerLatch.countDown();}) + .build(); + } + @Bean public SqsMessageListenerContainer manuallyCreatedFactory() { SqsMessageListenerContainerFactory factory = new SqsMessageListenerContainerFactory<>(); diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java index e4e7c34e3..f888dbbea 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistryTests.java @@ -20,6 +20,7 @@ import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import org.junit.jupiter.api.Test; @@ -78,8 +79,11 @@ void shouldStartAndStopAllListenerContainers() { String id2 = "test-container-id-2"; String id3 = "test-container-id-3"; given(container1.getId()).willReturn(id1); + given(container1.isAutoStartup()).willReturn(true); given(container2.getId()).willReturn(id2); + given(container2.isAutoStartup()).willReturn(true); given(container3.getId()).willReturn(id3); + given(container3.isAutoStartup()).willReturn(true); DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry(); registry.registerListenerContainer(container1); registry.registerListenerContainer(container2); @@ -96,6 +100,21 @@ void shouldStartAndStopAllListenerContainers() { then(container3).should().stop(); } + @Test + void shouldNotStartContainerWithAutoStartupFalse() { + MessageListenerContainer container1 = mock(MessageListenerContainer.class); + String id1 = "test-container-id-1"; + given(container1.getId()).willReturn(id1); + DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry(); + registry.registerListenerContainer(container1); + registry.start(); + assertThat(registry.isRunning()).isTrue(); + registry.stop(); + assertThat(registry.isRunning()).isFalse(); + then(container1).should(times(0)).start(); + then(container1).should().stop(); + } + @Test void shouldThrowIfIdAlreadyPresent() { MessageListenerContainer container = mock(MessageListenerContainer.class); From 18672144f0e1d1585f63f01b16170ed59bb423c3 Mon Sep 17 00:00:00 2001 From: marcinmilewski93 <55257568+marcinmilewski93@users.noreply.github.com> Date: Tue, 6 Jun 2023 13:08:05 +0700 Subject: [PATCH 2/2] Update DefaultListenerContainerRegistry.java --- .../sqs/listener/DefaultListenerContainerRegistry.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java index 4b3051406..df07f1413 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/DefaultListenerContainerRegistry.java @@ -18,10 +18,13 @@ import io.awspring.cloud.sqs.LifecycleHandler; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.SmartLifecycle; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -75,7 +78,9 @@ public MessageListenerContainer getContainerById(String id) { public void start() { synchronized (this.lifecycleMonitor) { logger.debug("Starting {}", getClass().getSimpleName()); - LifecycleHandler.get().start(this.listenerContainers.values()); + List> containersToStart = this.listenerContainers.values().stream() + .filter(SmartLifecycle::isAutoStartup).collect(Collectors.toList()); + LifecycleHandler.get().start(containersToStart); this.running = true; logger.debug("{} started", getClass().getSimpleName()); }