Skip to content

Commit

Permalink
Add support for autoStartup (#827)
Browse files Browse the repository at this point in the history
* Add support for autoStartup

* Update DefaultListenerContainerRegistry.java

---------

Co-authored-by: Tomaz Fernandes <76525045+tomazfernandes@users.noreply.github.com>
  • Loading branch information
marcinmilewski93 and tomazfernandes committed Jun 26, 2023
1 parent 4e37bb3 commit 0bd7be9
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 1 deletion.
7 changes: 7 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,13 @@ See AWS documentation for more information.
After that period, the framework will try to perform a partial acquire with the available permits, resulting in a poll for less than `maxMessagesPerPoll` messages, unless otherwise configured.
See <<Message Processing Throughput>>.

|`autoStartup`
|true, false
|true
|Determines wherever container should start automatically. When set to false the
container will not launch on startup, requiring manual intervention to start it.
See <<Container Lifecycle>>.

|`listenerShutdownTimeout`
|0 - undefined
|10 seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,

private final int maxMessagesPerPoll;

private final boolean autoStartup;

private final Duration pollTimeout;

private final Duration maxDelayBetweenPolls;
Expand Down Expand Up @@ -71,6 +73,7 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,
protected AbstractContainerOptions(Builder<?, ?> builder) {
this.maxConcurrentMessages = builder.maxConcurrentMessages;
this.maxMessagesPerPoll = builder.maxMessagesPerPoll;
this.autoStartup = builder.autoStartup;
this.pollTimeout = builder.pollTimeout;
this.maxDelayBetweenPolls = builder.maxDelayBetweenPolls;
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
Expand Down Expand Up @@ -99,6 +102,11 @@ public int getMaxMessagesPerPoll() {
return this.maxMessagesPerPoll;
}

@Override
public boolean isAutoStartup() {
return this.autoStartup;
}

@Override
public Duration getPollTimeout() {
return this.pollTimeout;
Expand Down Expand Up @@ -176,6 +184,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private static final int DEFAULT_MAX_MESSAGES_PER_POLL = 10;

private static final boolean DEFAULT_AUTO_STARTUP = true;

private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofSeconds(10);

private static final Duration DEFAULT_SEMAPHORE_TIMEOUT = Duration.ofSeconds(10);
Expand All @@ -196,6 +206,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private int maxMessagesPerPoll = DEFAULT_MAX_MESSAGES_PER_POLL;

private boolean autoStartup = DEFAULT_AUTO_STARTUP;

private Duration pollTimeout = DEFAULT_POLL_TIMEOUT;

private Duration maxDelayBetweenPolls = DEFAULT_SEMAPHORE_TIMEOUT;
Expand Down Expand Up @@ -233,6 +245,7 @@ protected Builder() {
protected Builder(AbstractContainerOptions<?, ?> options) {
this.maxConcurrentMessages = options.maxConcurrentMessages;
this.maxMessagesPerPoll = options.maxMessagesPerPoll;
this.autoStartup = options.autoStartup;
this.pollTimeout = options.pollTimeout;
this.maxDelayBetweenPolls = options.maxDelayBetweenPolls;
this.listenerShutdownTimeout = options.listenerShutdownTimeout;
Expand Down Expand Up @@ -261,6 +274,12 @@ public B maxMessagesPerPoll(int maxMessagesPerPoll) {
return self();
}

@Override
public B autoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
return self();
}

@Override
public B pollTimeout(Duration pollTimeout) {
Assert.notNull(pollTimeout, "pollTimeout cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ public boolean isRunning() {
return this.isRunning;
}

@Override
public boolean isAutoStartup() {
return containerOptions.isAutoStartup();
}

@Override
public void start() {
if (this.isRunning) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ public interface ContainerOptions<O extends ContainerOptions<O, B>, B extends Co
*/
int getMaxMessagesPerPoll();

/**
* Checks whether the container should be started automatically or manually. Default is true.
*
* @return true if the container starts automatically, false if it should be started manually
*/
boolean isAutoStartup();

/**
* Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to
* acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public interface ContainerOptionsBuilder<B extends ContainerOptionsBuilder<B, O>
*/
B maxMessagesPerPoll(int maxMessagesPerPoll);

/**
* Set whether the container should be started automatically or manually. By default, the container is set to start
* automatically.
*
* @param autoStartup true if the container is set to start automatically, false if it should be started manually
* @return this instance.
*/
B autoStartup(boolean autoStartup);

/**
* Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to
* acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import io.awspring.cloud.sqs.LifecycleHandler;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -75,7 +78,9 @@ public MessageListenerContainer<?> getContainerById(String id) {
public void start() {
synchronized (this.lifecycleMonitor) {
logger.debug("Starting {}", getClass().getSimpleName());
LifecycleHandler.get().start(this.listenerContainers.values());
List<MessageListenerContainer<?>> containersToStart = this.listenerContainers.values().stream()
.filter(SmartLifecycle::isAutoStartup).collect(Collectors.toList());
LifecycleHandler.get().start(containersToStart);
this.running = true;
logger.debug("{} started", getClass().getSimpleName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -117,6 +118,8 @@ class SqsIntegrationTests extends BaseSqsIntegrationTest {

static final String MANUALLY_CREATE_CONTAINER_QUEUE_NAME = "manually_create_container_test_queue";

static final String MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME = "manually_create_inactive_container_test_queue";

static final String MANUALLY_CREATE_FACTORY_QUEUE_NAME = "manually_create_factory_test_queue";

static final String MAX_CONCURRENT_MESSAGES_QUEUE_NAME = "max_concurrent_messages_test_queue";
Expand Down Expand Up @@ -144,6 +147,7 @@ static void beforeTests() {
createQueue(client, RESOLVES_PARAMETER_TYPES_QUEUE_NAME,
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "20")),
createQueue(client, MANUALLY_CREATE_CONTAINER_QUEUE_NAME),
createQueue(client, MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME),
createQueue(client, MANUALLY_CREATE_FACTORY_QUEUE_NAME),
createQueue(client, MAX_CONCURRENT_MESSAGES_QUEUE_NAME)).join();
}
Expand All @@ -157,6 +161,10 @@ static void beforeTests() {
@Autowired
ObjectMapper objectMapper;

@Autowired
@Qualifier("inactiveContainer")
MessageListenerContainer<Object> inactiveMessageListenerContainer;

@Test
void receivesMessage() throws Exception {
String messageBody = "receivesMessage-payload";
Expand Down Expand Up @@ -240,6 +248,17 @@ void manuallyCreatesContainer() throws Exception {
assertThat(latchContainer.manuallyCreatedContainerLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
void manuallyCreatesInactiveContainer() throws Exception {
String messageBody = "Testing manually creates inactive container";
assertThat(inactiveMessageListenerContainer.isRunning()).isFalse();
sqsTemplate.send(MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME, messageBody);
inactiveMessageListenerContainer.start();
logger.debug("Sent message to queue {} with messageBody {}", MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME,
messageBody);
assertThat(latchContainer.manuallyInactiveCreatedContainerLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

// @formatter:off
@Test
void manuallyStartsContainerAndChangesComponent() throws Exception {
Expand Down Expand Up @@ -453,6 +472,7 @@ static class LatchContainer {
final CountDownLatch acknowledgementCallbackSuccessLatch = new CountDownLatch(1);
final CountDownLatch acknowledgementCallbackBatchLatch = new CountDownLatch(1);
final CountDownLatch acknowledgementCallbackErrorLatch = new CountDownLatch(1);
final CountDownLatch manuallyInactiveCreatedContainerLatch = new CountDownLatch(1);
final CyclicBarrier maxConcurrentMessagesBarrier = new CyclicBarrier(21);

}
Expand Down Expand Up @@ -576,6 +596,23 @@ public MessageListenerContainer<Object> manuallyCreatedContainer() throws Except
.build();
}

@Bean("inactiveContainer")
public MessageListenerContainer<Object> manuallyCreatedInactiveContainer() throws Exception {
SqsAsyncClient client = BaseSqsIntegrationTest.createAsyncClient();
String queueUrl = client.getQueueUrl(req -> req.queueName(MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME)).get()
.queueUrl();
return SqsMessageListenerContainer
.builder()
.queueNames(queueUrl)
.sqsAsyncClient(client)
.configure(options -> options
.autoStartup(false)
.maxDelayBetweenPolls(Duration.ofSeconds(1))
.pollTimeout(Duration.ofSeconds(3)))
.messageListener(msg -> {latchContainer.manuallyInactiveCreatedContainerLatch.countDown();})
.build();
}

@Bean
public SqsMessageListenerContainer<String> manuallyCreatedFactory() {
SqsMessageListenerContainerFactory<String> factory = new SqsMessageListenerContainerFactory<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -78,8 +79,11 @@ void shouldStartAndStopAllListenerContainers() {
String id2 = "test-container-id-2";
String id3 = "test-container-id-3";
given(container1.getId()).willReturn(id1);
given(container1.isAutoStartup()).willReturn(true);
given(container2.getId()).willReturn(id2);
given(container2.isAutoStartup()).willReturn(true);
given(container3.getId()).willReturn(id3);
given(container3.isAutoStartup()).willReturn(true);
DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry();
registry.registerListenerContainer(container1);
registry.registerListenerContainer(container2);
Expand All @@ -96,6 +100,21 @@ void shouldStartAndStopAllListenerContainers() {
then(container3).should().stop();
}

@Test
void shouldNotStartContainerWithAutoStartupFalse() {
MessageListenerContainer<Object> container1 = mock(MessageListenerContainer.class);
String id1 = "test-container-id-1";
given(container1.getId()).willReturn(id1);
DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry();
registry.registerListenerContainer(container1);
registry.start();
assertThat(registry.isRunning()).isTrue();
registry.stop();
assertThat(registry.isRunning()).isFalse();
then(container1).should(times(0)).start();
then(container1).should().stop();
}

@Test
void shouldThrowIfIdAlreadyPresent() {
MessageListenerContainer<Object> container = mock(MessageListenerContainer.class);
Expand Down

0 comments on commit 0bd7be9

Please sign in to comment.