Open
Description
问题描述
在我们的项目中使用rocketmq-spring-boot-starter
时,我们消费者如下
package com.example.demo;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* RocketMQMessageListener
*/
@Service
@RocketMQMessageListener(
topic = "${rocketmq.topic}",
consumerGroup = "ConsumerGroup001",
messageModel = MessageModel.BROADCASTING,
selectorType = SelectorType.TAG,
selectorExpression = "${rocketmq.tag}"
)
public class Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("\n\n\n------- Consumer received: %s \n\n\n\n", message);
}
}
配置文件如下
rocketmq.topic=Topic001
rocketmq.tag=tag001
但是运行时,消费者却无论如何都收不到对应tag
的消息,发送端是没有问题了,消息已经正常生产并且发送了,那么问题出在哪里呢?
解决方案
当我们去掉注解中的selectorType
和selectorExpression
之后,消费者就可以收到消息了,那么一定的tag
的过滤出问题了,于是我们把"${rocketmq.tag}"
替换成了字符串"tag001"
,也可以,那么看来是tag
的占位符处理问题了,简单跟踪了一下,发现rocketmq-spring-boot-starter
并没有把tag
当作占位符求解析,而是直接赋值了,为什么这么做呢?因为除了tag
,它还需要支持 sql
式的筛选,具体代码如下:
switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpression);
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
相比之下,对于topic
之类,是这样的:
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
既然已经找到了问题的根源,那么解决方案也比较明显了,有两种方案:
tag
不使用Spring
的placeholder
占位符- 对
DefaultRocketMQListenerContainer
,进行如下改动:
switch (selectorType) {
case TAG:
consumer.subscribe(topic, applicationContext.getEnvironment().resolvePlaceholders(selectorExpression));
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}