springBoot整合RocketMq
项目地址:https://github.com/apache/rocketmq-spring
在项目wiki界面你可获取相关示例,或者直接下载示例项目
部分概念在我的上一篇文章中有说明,这里仅仅只演示示例代码,不做概念性说明
引入依赖
写文章时rocketmq-spring-boot-starter最新版本是2.2.2 对应的rocketmq-client版本是4.9.3
这里我服务端安装的是5.0.0版本,不过5.0.0版本服务端按照官方的说法是兼容4.x版本的客户端的
如果一定要使用5.0.0版本sdk,可以排除相关依赖,再自行引入相应版本(似乎不太靠谱,我这边换成2.6.13版本启动报错了,不管了,先不换版本测试,使用其默认的2.5.9版本spring-boot-starter)
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
<!-- 如果和当前项目使用的spring版本还有springBoot版本不一致,建议排除-->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
配置文件
仅做参考,如果未做解释的配置请查看
org.apache.rocketmq.spring.autoconfigure.RocketMQProperties
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: k8s-master:9876 # RocketMQ Namesrv,多个用;间隔
# Producer 配置项
producer:
group: koala-dev-event-centre-group # 生产者分组
send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
secret-key: # Secret Key
enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
# Consumer 配置项
consumer:
listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
erbadagang-consumer-group:
topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
但通常一个服务内会有多个consumer,建议在代码中实现。而producer如果只有一个,可以配置。
封装类RocketMQTemplate:
rocketmq-spring-boot-starter提供的用于收发消息的对象,后续演示的代码默认是已经引入了这个对象
@Resource
private RocketMQTemplate rocketMQTemplate;
封装另外的RocketMQTemplate 对象
事务监听器是和rocketMQTemplate一对一绑定的,如果我们需要再次开启一个单独的事务,就需要另外的rocketMQTemplate对象
@ExtRocketMQConsumerConfiguration注解获取的配置项和RocketMQTemplate的配置项相同,可以参照以下示例进行单独指定,似乎可以用来单独连接另外一个mq
@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group = "string_consumer", tlsEnable = "${demo.ext.consumer.tlsEnable}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
rocketMQTemplate.getProducer().setCallbackExecutor();
rocketMQTemplate.getProducer().setAsyncSenderExecutor();
设置发送消息和消息回查的线程池
可以在配置类中执行,紧记录
rocketMQTemplate.getProducer().setCallbackExecutor(-);
rocketMQTemplate.getProducer().setAsyncSenderExecutor(-);
消息发送
普通消息发送
org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer
1、同步消息
String springTopic="springboot_topic";
// Send string
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "sync message");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
// 支持发送对象
sendResult = rocketMQTemplate.syncSend(springTopic, new User().setUserAge((byte) 18).setUserName("Kitty"));
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
MessageBuilder 构建消息发送对象
// MessageBuilder 构建消息发送对象
sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload(
new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
2、异步消息
rocketMQTemplate.asyncSend(orderPaidTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
System.out.printf("async onSucess SendResult=%s %n", var1);
}
@Override
public void onException(Throwable var1) {
System.out.printf("async onException Throwable=%s %n", var1);
}
});
3、单向消息
rocketMQTemplate.sendOneWay(msgExtTopic, "I'm from tag0");
4、给消息添加tag
convertAndSend类似于单向发送消息,这里用于延时怎么给消息添加tag,sendOneWay,asyncSend,syncSend等方法都是通过如下方式给消息添加tag,他们的第一个入参都是destination,是topic和tag的结合
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag0", "I'm from tag0"); // tag0 will not be consumer-selected
System.out.printf("syncSend topic %s tag %s %n", msgExtTopic, "tag0");
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
System.out.printf("syncSend topic %s tag %s %n", msgExtTopic, "tag1");
批量消息
List<Message> msgs = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
}
SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
System.out.printf("--- Batch messages send result :" + sr);
顺序消息
应该是会以hashKey进行取模,让需要保证顺序的消息坐落在同一个messagequeue上,
#核心代码,只演示同步发送消息的实例,异步发送消息代码请自行寻找,
SendResult sr = rocketMQTemplate.syncSendOrderly(springTopic, msgs, "hashKey", 60000);
事务消息
事务消息发送关键代码
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
springTransTopic + ":" + tags[i % tags.length], msg, null);
事务监听器和rocketMQTemplate绑定(一对一关系,所以这里如果你需要发送多个事务消息那就需要额外的rocketMQTemplate,rocketMQTemplateBeanName属性用来完成关系的绑定)
事务监听器作用不再赘述,虽然和roketmq自带的监听器类不是一个,但是模式是一样的
/**
* 关于@RocketMQTransactionListener 这个注解,有点奇怪。2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。
* 但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。
* 而且这个版本中,虽然重现了我们在原生API中的事务消息逻辑,但是测试过程中还是发现一些奇怪的特性,用的时候要注意点。
**/
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
private ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID);
String destination = arg.toString();
localTrans.put(transId,msg);
//这个msg的实现类是GenericMessage,里面实现了toString方法
//在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
//而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
System.out.println("executeLocalTransaction msg = "+msg);
//转成RocketMQ的Message对象
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
String tags = message.getTags();
if(StringUtils.contains(tags,"TagA")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagB")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
//延迟检查的时间间隔要有点奇怪。
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID).toString();
Message originalMessage = localTrans.get(transId);
//这里能够获取到自定义的transaction_id属性
System.out.println("checkLocalTransaction msg = "+originalMessage);
//获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
// String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();
String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString();
if(StringUtils.contains(tags,"TagC")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags,"TagD")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
需要注意的是,我们拿到的不再是rocketmq的messageExt对象,不过我们一样可以获取到消息id和事务id
事务id是RMQ_SYS_TRANS_HALF_TOPIC这个主题下的消息id,在ui界面,你可以直接根据主题(发消息的主题)和消息id搜索到对应的事务消息
有回执的消息
实在没看出有啥用,我只想说既然这样,直接接口调用不就行了吗
生产者发送消息后,等待消费者进行消费,并给回执,回执可以是多种类型,并且支持嵌套的复杂类型,具体可以到示例项目中找
生产者
String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class);
System.out.printf("send %s and receive %s %n", "request string", replyString);
消费者是一个给回执的监听器
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", consumerGroup = "${demo.rocketmq.stringRequestConsumer}",
selectorExpression = "${demo.rocketmq.tag}", replyTimeout = 10000)
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
@Override
public String onMessage(String message) {
System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
return "reply string";
}
}
异步接收回执信息
rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RocketMQLocalRequestCallback<String>() {
@Override public void onSuccess(String message) {
System.out.printf("send %s and receive %s %n", "request string", message);
}
@Override public void onException(Throwable e) {
e.printStackTrace();
}
});
延时消息
同步异步单向等消息发送的方法中支持传入delayLevel参数,
5.0.0版本已经支持延时到具体是时刻,而不是指定延时级别,这里可能需要等rocketmq-spring-boot-starter的下一个版本了
ACL权限控制
Producer 端要想使用 ACL 功能,需要多配置两个配置项:
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
rocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK
Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置
@Service
@RocketMQMessageListener(
topic = "test-topic-1",
consumerGroup = "my-consumer_test-topic-1",
accessKey = "AK",
secretKey = "SK"
)
public class MyConsumer implements RocketMQListener<String> {
...
}
消费者消费消息
普通消费者
@Service
@RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer_newns")
public class StringConsumerNewNS implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumerNewNS received: %s \n", message);
}
}
拿到原始messageExt消息对象消费者
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.msgExtTopic}", selectorExpression = "tag0||tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(MessageExt message) {
System.out.printf("------- MessageExtConsumer received message, msgId: %s, body:%s \n", message.getMsgId(), new String(message.getBody()));
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// set consumer consume message from now
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
}
}
消费者过滤消息
通过注解的属性设置多虑消息的条件
@RocketMQMessageListener(topic = "${demo.rocketmq.msgExtTopic}", selectorExpression = "tag0||tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
# 主要是这两个属性
SelectorType selectorType() default SelectorType.TAG; // 还可以指定SQL92
/**
* Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
*/
String selectorExpression() default "*";
顺序消息消费
@RocketMQMessageListener(consumeMode = ConsumeMode.ORDERLY
同样是指定这个注解的属性
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; # 需要指定为 ORDERLY
消息返回消费失败
原生的写法是可以在对应的监听器中返回消息是否消费成功,在springBoot对应的监听器中,抛出异常就算是消费失败,会触发消息重试机制
不使用rocketmq-spring-boot-starter的方式
rocketmq-spring-boot-starter使用的话,依赖冲途是个问题,而且还不支持5.0.0版本
这里直接使用配置类的方法使用原函数呢个
消费者端
//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
@Configuration
@Slf4j
public class ConsumerClient {
@Value("${rocketmq.name-server}")
private String namesrvAddr;
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Bean(destroyMethod = "shutdown",value = "rocketMqConsumer")
public DefaultMQPushConsumer buildConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr);
consumer.subscribe("TopicTest", "*");
/*定义消费Client从那个位置消费消息,分别为:
CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
log.debug("{} Receive New Messages: {}", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
return consumer;
}
}
生产者
//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
@Configuration
@Slf4j
public class ProducerClient {
@Value("${rocketmq.name-server}")
private String namesrvAddr;
@Value("${rocketmq.producer.group}")
private String producerGroup;
public static final String TOPIC = "TopicTest";
public static final String TAG = "TagA";
@Bean(destroyMethod = "shutdown",value = "rocketMqProducer")
public DefaultMQProducer buildProducer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
producer.setSendMsgTimeout(6000);
producer.start();
return producer;
}
}
其他
springcloudalibaba的github页面可以找到相关使用示例
关联信息
- 关联的主题:
- 上一篇:
- 下一篇:
- image: 20221021/1
- 转载自: