Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for autoStartup #827

Merged
merged 3 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,13 @@ See AWS documentation for more information.
After that period, the framework will try to perform a partial acquire with the available permits, resulting in a poll for less than `maxMessagesPerPoll` messages, unless otherwise configured.
See <<Message Processing Throughput>>.

|`autoStartup`
|true, false
|true
|Determines wherever container should start automatically. When set to false the
container will not launch on startup, requiring manual intervention to start it.
See <<Container Lifecycle>>.

|`listenerShutdownTimeout`
|0 - undefined
|10 seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,

private final int maxMessagesPerPoll;

private final boolean autoStartup;

private final Duration pollTimeout;

private final Duration maxDelayBetweenPolls;
Expand Down Expand Up @@ -71,6 +73,7 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,
protected AbstractContainerOptions(Builder<?, ?> builder) {
this.maxConcurrentMessages = builder.maxConcurrentMessages;
this.maxMessagesPerPoll = builder.maxMessagesPerPoll;
this.autoStartup = builder.autoStartup;
this.pollTimeout = builder.pollTimeout;
this.maxDelayBetweenPolls = builder.maxDelayBetweenPolls;
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
Expand Down Expand Up @@ -99,6 +102,11 @@ public int getMaxMessagesPerPoll() {
return this.maxMessagesPerPoll;
}

@Override
public boolean isAutoStartup() {
return this.autoStartup;
}

@Override
public Duration getPollTimeout() {
return this.pollTimeout;
Expand Down Expand Up @@ -176,6 +184,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private static final int DEFAULT_MAX_MESSAGES_PER_POLL = 10;

private static final boolean DEFAULT_AUTO_STARTUP = true;

private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofSeconds(10);

private static final Duration DEFAULT_SEMAPHORE_TIMEOUT = Duration.ofSeconds(10);
Expand All @@ -196,6 +206,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private int maxMessagesPerPoll = DEFAULT_MAX_MESSAGES_PER_POLL;

private boolean autoStartup = DEFAULT_AUTO_STARTUP;

private Duration pollTimeout = DEFAULT_POLL_TIMEOUT;

private Duration maxDelayBetweenPolls = DEFAULT_SEMAPHORE_TIMEOUT;
Expand Down Expand Up @@ -233,6 +245,7 @@ protected Builder() {
protected Builder(AbstractContainerOptions<?, ?> options) {
this.maxConcurrentMessages = options.maxConcurrentMessages;
this.maxMessagesPerPoll = options.maxMessagesPerPoll;
this.autoStartup = options.autoStartup;
this.pollTimeout = options.pollTimeout;
this.maxDelayBetweenPolls = options.maxDelayBetweenPolls;
this.listenerShutdownTimeout = options.listenerShutdownTimeout;
Expand Down Expand Up @@ -261,6 +274,12 @@ public B maxMessagesPerPoll(int maxMessagesPerPoll) {
return self();
}

@Override
public B autoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
return self();
}

@Override
public B pollTimeout(Duration pollTimeout) {
Assert.notNull(pollTimeout, "pollTimeout cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ public boolean isRunning() {
return this.isRunning;
}

@Override
public boolean isAutoStartup() {
return containerOptions.isAutoStartup();
}

@Override
public void start() {
if (this.isRunning) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ public interface ContainerOptions<O extends ContainerOptions<O, B>, B extends Co
*/
int getMaxMessagesPerPoll();

/**
* Checks whether the container should be started automatically or manually. Default is true.
*
* @return true if the container starts automatically, false if it should be started manually
*/
boolean isAutoStartup();

/**
* Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to
* acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ public interface ContainerOptionsBuilder<B extends ContainerOptionsBuilder<B, O>
*/
B maxMessagesPerPoll(int maxMessagesPerPoll);

/**
* Set whether the container should be started automatically or manually. By default, the container is set to start
* automatically.
*
* @param autoStartup true if the container is set to start automatically, false if it should be started manually
* @return this instance.
*/
B autoStartup(boolean autoStartup);

/**
* Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to
* acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import io.awspring.cloud.sqs.LifecycleHandler;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -75,7 +78,9 @@ public MessageListenerContainer<?> getContainerById(String id) {
public void start() {
synchronized (this.lifecycleMonitor) {
logger.debug("Starting {}", getClass().getSimpleName());
LifecycleHandler.get().start(this.listenerContainers.values());
List<MessageListenerContainer<?>> containersToStart = this.listenerContainers.values().stream()
.filter(SmartLifecycle::isAutoStartup).collect(Collectors.toList());
LifecycleHandler.get().start(containersToStart);
this.running = true;
logger.debug("{} started", getClass().getSimpleName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -117,6 +118,8 @@ class SqsIntegrationTests extends BaseSqsIntegrationTest {

static final String MANUALLY_CREATE_CONTAINER_QUEUE_NAME = "manually_create_container_test_queue";

static final String MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME = "manually_create_inactive_container_test_queue";

static final String MANUALLY_CREATE_FACTORY_QUEUE_NAME = "manually_create_factory_test_queue";

static final String MAX_CONCURRENT_MESSAGES_QUEUE_NAME = "max_concurrent_messages_test_queue";
Expand Down Expand Up @@ -144,6 +147,7 @@ static void beforeTests() {
createQueue(client, RESOLVES_PARAMETER_TYPES_QUEUE_NAME,
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "20")),
createQueue(client, MANUALLY_CREATE_CONTAINER_QUEUE_NAME),
createQueue(client, MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME),
createQueue(client, MANUALLY_CREATE_FACTORY_QUEUE_NAME),
createQueue(client, MAX_CONCURRENT_MESSAGES_QUEUE_NAME)).join();
}
Expand All @@ -157,6 +161,10 @@ static void beforeTests() {
@Autowired
ObjectMapper objectMapper;

@Autowired
@Qualifier("inactiveContainer")
MessageListenerContainer<Object> inactiveMessageListenerContainer;

@Test
void receivesMessage() throws Exception {
String messageBody = "receivesMessage-payload";
Expand Down Expand Up @@ -240,6 +248,17 @@ void manuallyCreatesContainer() throws Exception {
assertThat(latchContainer.manuallyCreatedContainerLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
void manuallyCreatesInactiveContainer() throws Exception {
String messageBody = "Testing manually creates inactive container";
assertThat(inactiveMessageListenerContainer.isRunning()).isFalse();
sqsTemplate.send(MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME, messageBody);
inactiveMessageListenerContainer.start();
logger.debug("Sent message to queue {} with messageBody {}", MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME,
messageBody);
assertThat(latchContainer.manuallyInactiveCreatedContainerLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

// @formatter:off
@Test
void manuallyStartsContainerAndChangesComponent() throws Exception {
Expand Down Expand Up @@ -453,6 +472,7 @@ static class LatchContainer {
final CountDownLatch acknowledgementCallbackSuccessLatch = new CountDownLatch(1);
final CountDownLatch acknowledgementCallbackBatchLatch = new CountDownLatch(1);
final CountDownLatch acknowledgementCallbackErrorLatch = new CountDownLatch(1);
final CountDownLatch manuallyInactiveCreatedContainerLatch = new CountDownLatch(1);
final CyclicBarrier maxConcurrentMessagesBarrier = new CyclicBarrier(21);

}
Expand Down Expand Up @@ -576,6 +596,23 @@ public MessageListenerContainer<Object> manuallyCreatedContainer() throws Except
.build();
}

@Bean("inactiveContainer")
public MessageListenerContainer<Object> manuallyCreatedInactiveContainer() throws Exception {
SqsAsyncClient client = BaseSqsIntegrationTest.createAsyncClient();
String queueUrl = client.getQueueUrl(req -> req.queueName(MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME)).get()
.queueUrl();
return SqsMessageListenerContainer
.builder()
.queueNames(queueUrl)
.sqsAsyncClient(client)
.configure(options -> options
.autoStartup(false)
.maxDelayBetweenPolls(Duration.ofSeconds(1))
.pollTimeout(Duration.ofSeconds(3)))
.messageListener(msg -> {latchContainer.manuallyInactiveCreatedContainerLatch.countDown();})
.build();
}

@Bean
public SqsMessageListenerContainer<String> manuallyCreatedFactory() {
SqsMessageListenerContainerFactory<String> factory = new SqsMessageListenerContainerFactory<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -78,8 +79,11 @@ void shouldStartAndStopAllListenerContainers() {
String id2 = "test-container-id-2";
String id3 = "test-container-id-3";
given(container1.getId()).willReturn(id1);
given(container1.isAutoStartup()).willReturn(true);
given(container2.getId()).willReturn(id2);
given(container2.isAutoStartup()).willReturn(true);
given(container3.getId()).willReturn(id3);
given(container3.isAutoStartup()).willReturn(true);
DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry();
registry.registerListenerContainer(container1);
registry.registerListenerContainer(container2);
Expand All @@ -96,6 +100,21 @@ void shouldStartAndStopAllListenerContainers() {
then(container3).should().stop();
}

@Test
void shouldNotStartContainerWithAutoStartupFalse() {
MessageListenerContainer<Object> container1 = mock(MessageListenerContainer.class);
String id1 = "test-container-id-1";
given(container1.getId()).willReturn(id1);
DefaultListenerContainerRegistry registry = new DefaultListenerContainerRegistry();
registry.registerListenerContainer(container1);
registry.start();
assertThat(registry.isRunning()).isTrue();
registry.stop();
assertThat(registry.isRunning()).isFalse();
then(container1).should(times(0)).start();
then(container1).should().stop();
}

@Test
void shouldThrowIfIdAlreadyPresent() {
MessageListenerContainer<Object> container = mock(MessageListenerContainer.class);
Expand Down