Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable receiving messages from different message groups in the same Batch for FIFO queues #1192

Merged
merged 2 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,12 @@ See <<Retrieving Attributes from SQS>>.
|Configures the `MessageSystemAttribute` that will be retrieved from SQS for each message.
See <<Retrieving Attributes from SQS>>.

|`fifoBatchGroupingStrategy`
|`PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES`, `PROCESS_MULTIPLE_GROUPS_IN_SAME_BATCH`
|`PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES`
|Specifies how messages from FIFO queues should be grouped when retrieved by the container when listener
mode is `batch`. See <<FIFO Support>>.

|`messageConverter`
|`MessagingMessageConverter`
|`SqsMessagingMessageConverter`
Expand Down Expand Up @@ -1029,10 +1035,9 @@ NOTE: Spring-managed `MessageListenerContainer` beans' lifecycle actions are alw

* Messages are polled with a `receiveRequestAttemptId`, and the received batch of messages is split according to the message`s `MessageGroupId`.
* Each message from a given group will then be processed in order, while each group is processed in parallel.
* If processing fails for a message, the following messages from the same message group are discarded so they will be served again after their `message visibility`
expires.
* To receive messages from multiple groups in a `batch`, set `fifoBatchGroupingStrategy` to `PROCESS_MULTIPLE_GROUPS_IN_SAME_BATCH` in `SqsContainerOptions`.
* If processing fails for a message, the following messages from the same message group are discarded so they will be served again after their `message visibility` expires.
* Messages which were already successfully processed and acknowledged will not be served again.
* If a `batch` listener is used, each message group from a poll will be served as a batch to the listener method.
* `FIFO` queues also have different defaults for acknowledging messages, see <<Acknowledgement Defaults>> for more information.
* If a `message visibility` is set through `@SqsListener` or `SqsContainerOptions`, visibility will be extended for all messages in the message group before each message is processed.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.listener;

/**
* Grouping strategy for Fifo SQS with batch listener mode.
*
* @author Alexis SEGURA
* @since 3.1.2
*/
public enum FifoBatchGroupingStrategy {

/**
* Default strategy. Group messages in batches by message group. Each batch contains messages from a single message group.
* The order of messages within the group is preserved. As message groups are processed in parallel, this strategy
* provides the maximal throughput.
*/
PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES,

/**
* Each batch contains messages originating from multiple message groups. The order of messages within each group is
* preserved. Note that FIFO queues do not serve new messages from a group until all the messages for that group in
* the previous batch have been acknowledged or their visibility expired.
*/
PROCESS_MULTIPLE_GROUPS_IN_SAME_BATCH
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,16 @@ public MessageSource<T> createMessageSource(SqsContainerOptions options) {
@Override
public MessageSink<T> createMessageSink(SqsContainerOptions options) {
MessageSink<T> deliverySink = createDeliverySink(options.getListenerMode());
return new MessageGroupingSinkAdapter<>(
maybeWrapWithVisibilityAdapter(deliverySink, options.getMessageVisibility()),
getMessageGroupingFunction());
MessageSink<T> wrappedDeliverySink = maybeWrapWithVisibilityAdapter(deliverySink,
options.getMessageVisibility());
return maybeWrapWithMessageGroupingAdapter(options, wrappedDeliverySink);
}

private MessageSink<T> maybeWrapWithMessageGroupingAdapter(SqsContainerOptions options, MessageSink<T> wrappedDeliverySink) {
return FifoBatchGroupingStrategy.PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES
.equals(options.getFifoBatchGroupingStrategy())
? new MessageGroupingSinkAdapter<>(wrappedDeliverySink, getMessageGroupingFunction())
: wrappedDeliverySink;
}

// @formatter:off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class SqsContainerOptions extends AbstractContainerOptions<SqsContainerOp
@Nullable
private final Duration messageVisibility;

private final FifoBatchGroupingStrategy fifoBatchGroupingStrategy;

private final Collection<QueueAttributeName> queueAttributeNames;

private final Collection<String> messageAttributeNames;
Expand All @@ -57,6 +59,7 @@ protected SqsContainerOptions(BuilderImpl builder) {
this.messageSystemAttributeNames = builder.messageSystemAttributeNames;
this.messageVisibility = builder.messageVisibility;
this.queueNotFoundStrategy = builder.queueNotFoundStrategy;
this.fifoBatchGroupingStrategy = builder.fifoBatchGroupingStrategy;
}

/**
Expand Down Expand Up @@ -100,6 +103,15 @@ public Duration getMessageVisibility() {
return this.messageVisibility;
}

/**
* Get messages grouping strategy in FIFO queues when retrieved by the container in listener mode
* {@link ListenerMode#BATCH}.
* @return the fifo batch message grouping strategy.
*/
public FifoBatchGroupingStrategy getFifoBatchGroupingStrategy() {
return this.fifoBatchGroupingStrategy;
}

/**
* Get the {@link QueueNotFoundStrategy} for the container.
* @return the strategy.
Expand Down Expand Up @@ -135,6 +147,8 @@ private static class BuilderImpl

private QueueNotFoundStrategy queueNotFoundStrategy = DEFAULT_QUEUE_NOT_FOUND_STRATEGY;

private FifoBatchGroupingStrategy fifoBatchGroupingStrategy = FifoBatchGroupingStrategy.PROCESS_MESSAGE_GROUPS_IN_PARALLEL_BATCHES;

@Nullable
private Duration messageVisibility;

Expand All @@ -148,6 +162,7 @@ protected BuilderImpl(SqsContainerOptions options) {
this.messageAttributeNames = options.messageAttributeNames;
this.messageSystemAttributeNames = options.messageSystemAttributeNames;
this.messageVisibility = options.messageVisibility;
this.fifoBatchGroupingStrategy = options.fifoBatchGroupingStrategy;
this.queueNotFoundStrategy = options.queueNotFoundStrategy;
}

Expand Down Expand Up @@ -181,6 +196,14 @@ public SqsContainerOptionsBuilder messageVisibility(Duration messageVisibility)
return this;
}

@Override
public SqsContainerOptionsBuilder fifoBatchGroupingStrategy(
FifoBatchGroupingStrategy fifoBatchGroupingStrategy) {
Assert.notNull(fifoBatchGroupingStrategy, "fifoBatchGroupingStrategy cannot be null");
this.fifoBatchGroupingStrategy = fifoBatchGroupingStrategy;
return this;
}

@Override
public SqsContainerOptionsBuilder queueNotFoundStrategy(QueueNotFoundStrategy queueNotFoundStrategy) {
Assert.notNull(queueNotFoundStrategy, "queueNotFoundStrategy cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ SqsContainerOptionsBuilder messageSystemAttributeNames(
*/
SqsContainerOptionsBuilder messageVisibility(Duration messageVisibility);

/**
* Set how the messages from FIFO queues should be grouped when container listener mode is
* {@link ListenerMode#BATCH}. By default, messages are grouped in batches by message group,
* which are processed in parallel, maintaining order within each message group.
* @param fifoBatchGroupingStrategy the strategy to batch FIFO messages.
* @return this instance.
*/
SqsContainerOptionsBuilder fifoBatchGroupingStrategy(FifoBatchGroupingStrategy fifoBatchGroupingStrategy);

/**
* Set the {@link QueueNotFoundStrategy} for the container.
* @param queueNotFoundStrategy the strategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.awspring.cloud.sqs.annotation.SqsListener;
import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration;
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.FifoBatchGroupingStrategy;
import io.awspring.cloud.sqs.listener.FifoSqsComponentFactory;
import io.awspring.cloud.sqs.listener.ListenerMode;
import io.awspring.cloud.sqs.listener.MessageListener;
Expand Down Expand Up @@ -92,6 +93,8 @@ class SqsFifoIntegrationTests extends BaseSqsIntegrationTest {

static final String FIFO_RECEIVES_BATCHES_MANY_GROUPS_QUEUE_NAME = "fifo_receives_batches_many_groups.fifo";

static final String FIFO_RECEIVES_BATCH_GROUPING_STRATEGY_MULTIPLE_GROUPS_IN_SAME_BATCH_QUEUE_NAME = "fifo_receives_batch_grouping_strategy_multiple_groups_in_same_batch.fifo";

static final String FIFO_MANUALLY_CREATE_CONTAINER_QUEUE_NAME = "fifo_manually_create_container_test_queue.fifo";

static final String FIFO_MANUALLY_CREATE_FACTORY_QUEUE_NAME = "fifo_manually_create_factory_test_queue.fifo";
Expand Down Expand Up @@ -141,6 +144,7 @@ static void beforeTests() {
createFifoQueue(client, FIFO_STOPS_PROCESSING_ON_ERROR_QUEUE_NAME, getVisibilityAttribute("2")),
createFifoQueue(client, FIFO_STOPS_PROCESSING_ON_ACK_ERROR_ERROR_QUEUE_NAME),
createFifoQueue(client, FIFO_RECEIVES_BATCHES_MANY_GROUPS_QUEUE_NAME),
createFifoQueue(client, FIFO_RECEIVES_BATCH_GROUPING_STRATEGY_MULTIPLE_GROUPS_IN_SAME_BATCH_QUEUE_NAME),
createFifoQueue(client, FIFO_MANUALLY_CREATE_CONTAINER_QUEUE_NAME),
createFifoQueue(client, FIFO_MANUALLY_CREATE_FACTORY_QUEUE_NAME),
createFifoQueue(client, FIFO_MANUALLY_CREATE_BATCH_CONTAINER_QUEUE_NAME),
Expand Down Expand Up @@ -315,6 +319,55 @@ void receivesBatchesManyGroups() throws Exception {
.containsExactlyElementsOf(values);
}

@Test
void receivesFifoBatchGroupingStrategyMultipleGroupsInSameBatch() throws Exception {
List<String> values = IntStream.range(0, 2).mapToObj(String::valueOf)
.collect(toList());
String messageGroupId1 = UUID.randomUUID().toString();
String messageGroupId2 = UUID.randomUUID().toString();
List<Message<String>> messages = new ArrayList<>();
messages.addAll(createMessagesFromValues(messageGroupId1, values));
messages.addAll(createMessagesFromValues(messageGroupId2, values));
sqsTemplate.sendMany(FIFO_RECEIVES_BATCH_GROUPING_STRATEGY_MULTIPLE_GROUPS_IN_SAME_BATCH_QUEUE_NAME, messages);

SqsMessageListenerContainer<String> container = SqsMessageListenerContainer
.<String>builder()
.queueNames(FIFO_RECEIVES_BATCH_GROUPING_STRATEGY_MULTIPLE_GROUPS_IN_SAME_BATCH_QUEUE_NAME)
.messageListener(new MessageListener<>() {
@Override
public void onMessage(Message<String> message) {
throw new UnsupportedOperationException("Batch listener");
}

@Override
public void onMessage(Collection<Message<String>> messages) {
assertThat(MessageHeaderUtils
.getHeader(messages, SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, String.class)
.stream().distinct().count()).isEqualTo(2);
latchContainer.receivesFifoBatchGroupingStrategyMultipleGroupsInSameBatchLatch.countDown();
}
})
.configure(options -> options
.maxConcurrentMessages(10)
.pollTimeout(Duration.ofSeconds(10))
.maxMessagesPerPoll(10)
.maxDelayBetweenPolls(Duration.ofSeconds(1))
.fifoBatchGroupingStrategy(FifoBatchGroupingStrategy.PROCESS_MULTIPLE_GROUPS_IN_SAME_BATCH)
.listenerMode(ListenerMode.BATCH))
.sqsAsyncClient(BaseSqsIntegrationTest.createAsyncClient())
.build();

try {
container.start();
assertThat(latchContainer.receivesFifoBatchGroupingStrategyMultipleGroupsInSameBatchLatch
.await(settings.latchTimeoutSeconds, TimeUnit.SECONDS)).isTrue();
}
finally {
container.stop();
}

}

@Test
void manuallyCreatesContainer() throws Exception {
List<String> values = IntStream.range(0, this.settings.messagesPerTest).mapToObj(String::valueOf)
Expand Down Expand Up @@ -511,6 +564,7 @@ static class LatchContainer {
CountDownLatch stopsProcessingOnAckErrorLatch2;
CountDownLatch stopsProcessingOnAckErrorHasThrown;
CountDownLatch receivesBatchManyGroupsLatch;
CountDownLatch receivesFifoBatchGroupingStrategyMultipleGroupsInSameBatchLatch;

LatchContainer(Settings settings) {
this.settings = settings;
Expand All @@ -528,6 +582,7 @@ static class LatchContainer {
this.stopsProcessingOnAckErrorLatch1 = new CountDownLatch(1);
this.stopsProcessingOnAckErrorLatch2 = new CountDownLatch(1);
this.receivesBatchManyGroupsLatch = new CountDownLatch(1);
this.receivesFifoBatchGroupingStrategyMultipleGroupsInSameBatchLatch = new CountDownLatch(1);
this.stopsProcessingOnAckErrorHasThrown = new CountDownLatch(1);
}

Expand Down
Loading