Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

777 fix missing headers #781

Merged
merged 2 commits into from
Apr 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,14 @@ public MessageConversionContext createMessageConversionContext() {

@Override
public S fromMessagingMessage(Message<?> message, @Nullable MessageConversionContext context) {
// We must make sure the message id stays consistent throughout this process
MessageHeaders headers = getMessageHeaders(message);
S messageWithHeaders = this.headerMapper.fromHeaders(headers);
Object payload = Objects
.requireNonNull(this.payloadMessageConverter.toMessage(message.getPayload(), message.getHeaders()),
() -> "payloadMessageConverter returned null message for message " + message)
.getPayload();
return doConvertMessage(messageWithHeaders, payload);
Message<?> convertedMessage = Objects.requireNonNull(
this.payloadMessageConverter.toMessage(message.getPayload(), message.getHeaders()),
() -> "payloadMessageConverter returned null message for message " + message);
MessageHeaders completeHeaders = MessageHeaderUtils.addHeadersIfAbsent(headers, convertedMessage.getHeaders());
S messageWithHeaders = this.headerMapper.fromHeaders(completeHeaders);
return doConvertMessage(messageWithHeaders, convertedMessage.getPayload());
}

private MessageHeaders getMessageHeaders(Message<?> message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.annotation.SqsListener;
Expand Down Expand Up @@ -89,9 +88,6 @@ class SqsLoadIntegrationTests extends BaseSqsIntegrationTest {
@Autowired
SqsTemplate sqsTemplate;

@Autowired
ObjectMapper objectMapper;

@Autowired
Settings settings;

Expand Down Expand Up @@ -202,11 +198,11 @@ private void sendMessageBatchAsync(String queueName) {
if (!settings.sendMessages) {
return;
}
Collection<Message<String>> messages = getMessages();
Collection<Message<Object>> messages = getMessages();
doSendMessageBatch(queueName, messages);
}

private void doSendMessageBatch(String queueName, Collection<Message<String>> messages) {
private void doSendMessageBatch(String queueName, Collection<Message<Object>> messages) {
sqsTemplate.sendManyAsync(queueName, messages).thenRun(this::logSend).exceptionally(t -> {
logger.error("Error sending messages - retrying", t);
doSendMessageBatch(queueName, messages);
Expand All @@ -221,22 +217,16 @@ private void logSend() {
}
}

private Collection<Message<String>> getMessages() {
private Collection<Message<Object>> getMessages() {
return IntStream.range(0, Math.min(settings.totalMessages / 2, 10)).mapToObj(index -> {
Message<String> message = MessageBuilder.withPayload(getBody()).build();
Message<Object> message = MessageBuilder.withPayload(getBody()).build();
logger.trace("Sending message with id {}", message.getHeaders().get("id"));
return message;
}).collect(Collectors.toList());
}

private String getBody() {
try {
return this.objectMapper.writeValueAsString(
new MyPojo("MyPojo - " + bodyInteger.incrementAndGet(), "MyPojo - secondValue"));
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
private Object getBody() {
return new MyPojo("MyPojo - " + bodyInteger.incrementAndGet(), "MyPojo - secondValue");
}

static class MessageContainer {
Expand Down Expand Up @@ -266,7 +256,7 @@ static class ReceiveManyFromTwoQueuesListener {

@SqsListener(queueNames = { RECEIVE_FROM_MANY_1_QUEUE_NAME,
RECEIVE_FROM_MANY_2_QUEUE_NAME }, factory = HIGH_THROUGHPUT_FACTORY_NAME, id = "many-from-two-queues")
void listen(Message<String> message) throws Exception {
void listen(Message<MyPojo> message) throws Exception {
logger.trace("Started processing {}", MessageHeaderUtils.getId(message));
if (this.messageContainer.receivedByListener.contains(MessageHeaderUtils.getId(message))) {
logger.warn("Received duplicated message: {}", message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ class SqsMessageConversionIntegrationTests extends BaseSqsIntegrationTest {
@Autowired
SqsTemplate sqsTemplate;

@Autowired
ObjectMapper objectMapper;

@BeforeAll
static void beforeTests() {
SqsAsyncClient client = createAsyncClient();
Expand All @@ -88,31 +85,31 @@ static void beforeTests() {

@Test
void resolvesPojoParameterTypes() throws Exception {
String messageBody = objectMapper.writeValueAsString(new MyPojo("pojoParameterType", "secondValue"));
MyPojo messageBody = new MyPojo("pojoParameterType", "secondValue");
sqsTemplate.send(RESOLVES_POJO_TYPES_QUEUE_NAME, messageBody);
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_TYPES_QUEUE_NAME, messageBody);
assertThat(latchContainer.resolvesPojoLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
void resolvesPojoMessage() throws Exception {
String messageBody = objectMapper.writeValueAsString(new MyPojo("resolvesPojoMessage", "secondValue"));
MyPojo messageBody = new MyPojo("resolvesPojoMessage", "secondValue");
sqsTemplate.send(RESOLVES_POJO_MESSAGE_QUEUE_NAME, messageBody);
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_MESSAGE_QUEUE_NAME, messageBody);
assertThat(latchContainer.resolvesPojoMessageLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
void resolvesPojoList() throws Exception {
String messageBody = objectMapper.writeValueAsString(new MyPojo("resolvesPojoList", "secondValue"));
sqsTemplate.send(RESOLVES_POJO_LIST_QUEUE_NAME, messageBody);
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_LIST_QUEUE_NAME, messageBody);
MyPojo payload = new MyPojo("resolvesPojoList", "secondValue");
sqsTemplate.send(RESOLVES_POJO_LIST_QUEUE_NAME, payload);
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_LIST_QUEUE_NAME, payload);
assertThat(latchContainer.resolvesPojoListLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
void resolvesPojoMessageList() throws Exception {
String messageBody = objectMapper.writeValueAsString(new MyPojo("resolvesPojoMessageList", "secondValue"));
MyPojo messageBody = new MyPojo("resolvesPojoMessageList", "secondValue");
sqsTemplate.send(RESOLVES_POJO_MESSAGE_LIST_QUEUE_NAME, messageBody);
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_MESSAGE_LIST_QUEUE_NAME,
messageBody);
Expand All @@ -121,20 +118,20 @@ void resolvesPojoMessageList() throws Exception {

@Test
void resolvesPojoFromHeader() throws Exception {
String messageBody = objectMapper.writeValueAsString(new MyPojo("pojoParameterType", "secondValue"));
MyPojo payload = new MyPojo("pojoParameterType", "secondValue");
sqsTemplate.send(RESOLVES_POJO_FROM_HEADER_QUEUE_NAME,
MessageBuilder.createMessage(messageBody, new MessagingMessageHeaders(getHeaderMapping(MyPojo.class))));
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_FROM_HEADER_QUEUE_NAME, messageBody);
MessageBuilder.createMessage(payload, new MessagingMessageHeaders(getHeaderMapping(MyPojo.class))));
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_POJO_FROM_HEADER_QUEUE_NAME, payload);
assertThat(latchContainer.resolvesPojoFromMappingLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
void resolvesMyOtherPojoFromHeader() throws Exception {
String messageBody = objectMapper.writeValueAsString(new MyOtherPojo("pojoParameterType", "secondValue"));
sqsTemplate.send(RESOLVES_MY_OTHER_POJO_FROM_HEADER_QUEUE_NAME, MessageBuilder.createMessage(messageBody,
MyOtherPojo payload = new MyOtherPojo("pojoParameterType", "secondValue");
sqsTemplate.send(RESOLVES_MY_OTHER_POJO_FROM_HEADER_QUEUE_NAME, MessageBuilder.createMessage(payload,
new MessagingMessageHeaders(getHeaderMapping(MyOtherPojo.class))));
logger.debug("Sent message to queue {} with messageBody {}", RESOLVES_MY_OTHER_POJO_FROM_HEADER_QUEUE_NAME,
messageBody);
payload);
assertThat(latchContainer.resolvesMyOtherPojoFromMappingLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.junit.jupiter.api.Test;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;

Expand Down Expand Up @@ -114,6 +115,26 @@ void shouldUseProvidedPayloadConverter() throws Exception {
assertThat(resultMessage.getPayload()).isEqualTo(myPojo);
}

@Test
void shouldUseHeadersFromPayloadConverter() {
MessageConverter payloadConverter = mock(MessageConverter.class);
org.springframework.messaging.Message convertedMessageWithContentType = MessageBuilder.withPayload("example")
.setHeader("contentType", "application/json").build();
when(payloadConverter.toMessage(any(MyPojo.class), any())).thenReturn(convertedMessageWithContentType);

SqsMessagingMessageConverter converter = new SqsMessagingMessageConverter();
converter.setPayloadMessageConverter(payloadConverter);
converter.setPayloadTypeMapper(msg -> MyPojo.class);

org.springframework.messaging.Message<MyPojo> message = MessageBuilder.createMessage(new MyPojo(),
new MessageHeaders(null));
Message resultMessage = converter.fromMessagingMessage(message);

assertThat(resultMessage.messageId()).isEqualTo(message.getHeaders().getId().toString());
assertThat(resultMessage.messageAttributes()).containsEntry("contentType",
MessageAttributeValue.builder().stringValue("application/json").dataType("String").build());
}

static class MyPojo {

private String myProperty = "myValue";
Expand Down