本文将主要介绍在SpringBoot项目中如何集成RocketMQ以实现普通消息和事务消息的。
首先是分别创建生产者的springboot项目 springboot-rocketmq-producer,创建消费者的springboot项目 springboot-rocketmq-consumer。
1. 引入依赖
本例中使用的RocketMQ的版本是 5.1.3。所以引入的 rocketmq-spring-boot 版本要与之匹配。
可以通过mvnrepository进行查看。https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot/2.2.2
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
<version>2.2.2</version>
</dependency>
2. 配置文件修改
在springboot-rocketmq-producer项目的application.yml文件中添加如下配置:
rocketmq:
name-server: 172.31.184.89:9876
producer:
group: feige-producer-group
consumer:
topic: my-spring-boot-topic
在springboot-rocketmq-consumer项目的application.yml文件中添加如下配置:
server:
port: 8080
rocketmq:
name-server: 172.31.184.89:9876
consumer:
group: feige-consumer-group
topic: my-spring-boot-topic
3. 实现生产者
定义一个生产者类MyProducer,在该类中引入RocketMQTemplate 操作类,然后定义发送消息的方法sendMessage,在此方法中调用 rocketMQTemplate.convertAndSend
方法进行消息发送。
@Component
public class MyProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
*
* @param topic 主题
* @param message 消息
*/
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
}
3.1. 编写生产者单元测试
@Autowired
private MyProducer myProducer;
@Value("${rocketmq.consumer.topic:}")
private String consumerTopic;
@Test
void sendMessage() {
myProducer.sendMessage(consumerTopic,"飞哥SpringBoot集成RocketMQ消息测试");
}
4.实现消费者
定义消费者类MyConsumer。此类实现了RocketMQListener接口并重写了onMessage方法用于接收broker推送过来的消息。
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic:}", consumerGroup = "generalConsumerGroup")
public class MyConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("收到的消息是=" + s);
}
}
5. 实现事务消息
在SpringBoot中实现RocketMQ的事务消息,整体思路与 【RocketMQ系列六】RocketMQ事务消息 文中提到的思路相同。
5.1. 实现事务消息的生产者
在前面创建的MyProducer类中添加实现事务消息的方法 sendTransactionMessage。
/**
* 发送事务消息
*
* @param topic 话题
* @param msg 消息
*/
public void sendTransactionMessage(String topic, String msg) throws InterruptedException {
String[] tags = {"tagA", "tagB", "tagC", "tagD", "tagE"};
for (int i = 0; i < 10; i++) {
// 2. 将topic和tag整合在一起,以:分割,
String destination = topic + ":" + tags[i % tags.length];
// 1.注意该message是org.springframework.messaging.Message
Message<String> message = MessageBuilder.withPayload(msg + "_" + tags[i % tags.length] + "_" + i)
.setHeader("destination", destination).build();
// 第一个参数是发布的目的地,第二个参数是消息,第三个参数是额外的参数
rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
Thread.sleep(10);
}
}
这里需要注意的是传入的Message类是org.springframework.messaging.Message ,不是RocketMQ的Message。
5.2. 实现本地事务消息
接着在定义生产者本地事务实现类 MyTransactionListener,该类实现了RocketMQLocalTransactionListener接口,并重写了executeLocalTransaction方法和checkLocalTransaction方法。这里多了一步就是将 org.springframework.messaging.Message
转成 org.apache.rocketmq.common.message.Message
。
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 将消息转成rocketmq下的message
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "utf-8", (String) arg, msg);
String tags = message.getTags();
if (tags.equals("tagA")) {
return RocketMQLocalTransactionState.COMMIT;
} else if (tags.equals("tagB")) {
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 将消息转成rocketmq下的message
String destination = (String) msg.getHeaders().get("destination");
org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),
"utf-8",destination, msg);
String tags = message.getTags();
if (tags.equals("tagC")) {
return RocketMQLocalTransactionState.COMMIT;
} else if (tags.equals("tagD")) {
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
}