RocketMQ学习笔记
参考资料:
安装启动(非集群模式)
官网下载二进制安装包(当然也可下载源码包后自己编译):下载地址
解压
unzip rocketmq-all-4.3.0-bin-release.zip
修改配置
在
conf/broker.conf
中新增brokerIP1 = 192.168.195.88 autoCreateTopicEnable = true # 线上环境应该设为false
在
bin/runbroker.sh
中修改JVM内存大小,默认是8G,一般自己电脑的上虚拟机可能没这么大
启动
# 后台启动NameServer nohup sh bin/mqnamesrv -n 192.168.195.88:9876 & # 查看日志,看是否启动成功 tail -f ~/logs/rocketmqlogs/namesrv.log # 后台启动Broker nohup sh bin/mqbroker -n 192.168.195.88:9876 -c conf/broker.conf & # 查看日志,看是否启动成功 tail -f ~/logs/rocketmqlogs/broker.log
停止
sh bin/mqshutdown broker sh bin/mqshutdown namesrv
注意:
- 记得在防火墙中开启9876(nameServer用)、10909(生产者用)、10911(消费者用)端口
- 上面的192.168.195.88为我自己的机器的IP,不要使用localhost
常用命令
在RocketMQ的bin目录下有一个mqadmin脚本,它充当着控制台的角色,可以用来完成我们常用的操作。如不喜欢命令可安装第三方的可视化操控界面工具
创建topic
sh mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t testTopic1 # 参数 -n为nameServe服务地址 -b为broker服务地址 -t为topic的名字
查询所有topic
sh mqadmin topicList -n localhost:9876 # 参数 -n为nameServe服务地址
查看Topic统计信息
sh mqadmin topicStatus -n localhost:9876 -t testTopic1 # 参数 -n为nameServe服务地址 -t为topic的名字
查看消费组信息
sh mqadmin consumerProgress -n localhost:9876 -g simple_push_consumer_group_01 # 参数 -n为nameServe服务地址 -g为消费组的名字,无则表示查看所有的
查看所有命令:
sh mqadmin
原理概述
系统架构
注意点:
- NameServer提供服务发现和路由。 每个 NameServer 记录完整的路由信息,提供等效的读写服务,Broker启动后将自己注册至NameServer;随后每隔30s定期向NameServer上报Topic路由信息。每个 Broker 与NameServer 集群中的所有节点都会建立长连接。
- Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master。
- Consumer 可同时和提供Topic服务的master和Slave建立长连接,即在master节点宕机时,消费者可以从slave节点读取消息。
- Broker的主从切换问题,暂未研究
数据存储
RocketMQ的数据存储主要有三个内容:ConsumeQueue、CommitLog和IndexFile
ConsumeQueue是消息的逻辑队列,是由20字节定长的二进制数据单元组成,其中commitLogOffset(8 byte)、msgSize(4 byte)、tagsHashCode(8 byte);每个Topic和QueuId对应一个ConsumeQueue;单个文件大小约5.72M,每个文件由30W条数据组成,每个文件默认大小为600万个字节,当一个ConsumeQueue类型的文件写满了,则写入下一个文件。
CommitLog是消息存放的实际物理位置,每个Broker下所有的Topic下的消息队列共用同一个CommitLog的日志数据文件来存储,所有RocketMQ的写入是顺序的。单个CommitLog文件的默认大小为1G。
IndexFile即消息索引,如果一个消息包含key值的话,会使用IndexFile存储消息索引,其每个单元的数据构成为keyHash(4 byte)、commitLogOffset(8 byte)、timestamp(4 byte)、nextIndexOffset(4byte)。IndexFile主要是用来根据key来查询消息。
Producer端发送消息最终写入的是CommitLog,写入CommitLog有同步刷盘和异步刷盘两种方式:
同步刷盘:只有在消息真正持久化至磁盘后,Broker端才会真正地返回给Producer端一个成功的ACK响应。
异步刷盘:只要消息写入PageCache即可将成功的ACK返回给Producer端。
Consumer端先从ConsumeQueue读取持久化消息的offset,随后再从CommitLog中进行读取消息的真正实体内容。所以实际上读取操作是随机而不是顺序的,所以这也是消费速度是比Kafka低的原因。
生产者
RocketMQ发送消息有三种方式
同步
消息发送后,等待服务端的ack响应,这种方式最可靠,但效率最低
异步
消息发送注册回调函数,不需等待服务端的响应
单向
消息发送后,不关心服务端是否成功接受
如果Producer发送消息失败,会自动重试,重试的策略:
- 重试次数 < retryTimesWhenSendFailed(可配置)
- 总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
- 同时满足上面两个条件后,Producer会选择另外一个队列发送消息
消费者
RocketMQ消费消息主要有两种方式:
pull模式
由消费者客户端主动向服务端拉取消息。
一般情况下,如果我们没有控制好pull的频率,频率过低时,则可能消费速度太低导致消息的积压,频率过高时,则可能发送过多无效或低效pull请求,增加了服务端负载。
为了解决这个问题,RocketMQ在没有足够的消息时(如服务端没有可消费的消息),并不会立即返回响应,而是保持并挂起当前请求,待有足够的消息时在返回。并且我们需要指定offset的起点和终点,并且需要我们自己保存好本次消费的offset点,下次消费的时候好从上次的offset点开始拉取消息。
pull模式我们并不经常使用。
push模式
由服务端主动地将消息推送给消费者。
push模式下,慢消费的情况可能导致消费者端的缓冲区溢出。
但是在RocketMQ中并不是真正的push,而是基于长轮训的pull模式的来实现的伪push。具体的实现是:Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。
消费重试
即消费失败后,隔一段时间重新消费该消息。
重试队列
RocketMQ会为每个消费组都设置一个Topic名称为
%RETRY%+consumerGroup
的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。Consumer端出现异常失败时,失败的消息会重新发送给服务端的重试队列。死信队列
重试队列中超过配置的“最大重试消费次数”后就会移入到这个死信队列中。在RocketMQ中,SubscriptionGroupConfig配置常量默认地设置了两个参数,一个是retryQueueNums为1(重试队列数量为1个),另外一个是retryMaxTimes为16(最大重试消费的次数为16次)。Broker端通过校验判断,如果超过了最大重试消费次数则会将消息移至这里所说的死信队列。这里,RocketMQ会为每个消费组都设置一个Topic命名为
%DLQ%+consumerGroup
的死信队列。一般在实际应用中,移入至死信队列的消息,需要人工干预处理。
注意点:
RocketMQ的的默认延迟级别分为16个,所以一条消息最大的重试次数为16;
// 源码位置:org.apache.rocketmq.store.config.MessageStoreConfig.class // 如需修改,则需要修改broker的配置,官方并不建议修改 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; //如果我们在发送消息时设置消息的延迟级别为3,则表示消息10s后才能被消费者发现 msg.setDelayTimeLevel(3);
RocketMQ的Message中的
reconsumeTimes
属性,表示该消息当前已重试的的次数,我们可以通过如下方法来控制重试的次数int reconsumeTimes = msg.getReconsumeTimes(); // 只重试三次 if(reconsumeTimes >= 3){ // 表示消息已成功消费,不在放入重试队列中 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } else { // 表示消息消费失败,放入重试队列中,指定延迟时间后重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; }
RocketMQ自身的重试机制,默认消息的初始延迟级别就为3,好像并没法修改。源码如下:
// 源码位置:org.apache.rocketmq.client.impl.consumer.ProcessQueue.class public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { ... pushConsumer.sendMessageBack(msg, 3); ... }
简单的代码示例
Producer
public class Producer {
public static void main(String[] args) {
// 初始化一个生产者,生产组为simple_producer_group
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("simple_producer_group_01");
// 设置NameServer地址
defaultMQProducer.setNamesrvAddr("192.168.195.88:9876");
try {
// 启动
defaultMQProducer.start();
Producer producer = new Producer();
producer.syncSend(defaultMQProducer);
producer.asyncSend(defaultMQProducer);
producer.onewaySend(defaultMQProducer);
} catch (Exception e) {
log.error("异常:", e);
} finally {
defaultMQProducer.shutdown();
}
}
/**
* 同步方式
*/
private void syncSend(DefaultMQProducer producer) throws InterruptedException, RemotingException,MQClientException, MQBrokerException, UnsupportedEncodingException {
for(int i=0; i<100; i++){
String messageBody = "syncSend message" + i ;
Message msg = new Message("testTopic1", "*",
messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
/*
设置消息延迟级别,默认有16个级别:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
注意,如果设置为3,意思是10s再被消费
*/
//msg.setDelayTimeLevel(2);
SendResult sendResult = producer.send(msg, 10000);
log.info("onewaySend发送结果:{}", sendResult);
}
}
/**
* 异步方式
*/
private void asyncSend(DefaultMQProducer producer) throws UnsupportedEncodingException, RemotingException, MQClientException, InterruptedException {
for(int i=0; i<100; i++){
String messageBody = "asyncSend message" + i ;
Message msg = new Message("testTopic1", "*",
messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("asyncSend发送成功:{}", sendResult);
}
@Override
public void onException(Throwable e) {
log.info("asyncSend发送异常:{}", e);
}
});
}
}
/**
* 单向方式
*/
private void onewaySend(DefaultMQProducer producer) throws RemotingException, MQClientException, InterruptedException, UnsupportedEncodingException {
for(int i=0; i<100; i++){
String messageBody = "onewaySend message" + i ;
Message msg = new Message("testTopic1", "*",
messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
log.info("onewaySend发送完成");
}
}
}
Consumer
public class Consumer {
public static void main(String[] args) throws Exception {
new Consumer().pushMode();
//new Consumer().pullMode();
}
/**
* push 消费模式
*/
private void pushMode() throws MQClientException {
// 设置消费组
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("simple_push_consumer_group_01");
// 设置nameServer地址
consumer.setNamesrvAddr("192.168.195.88:9876");
// 设置从头开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 指定topic
consumer.subscribe("testTopic1", "*");
// 设置批量消费消息的数量,默认1
consumer.setConsumeMessageBatchMaxSize(1);
// 注册消息监听
consumer.registerMessageListener((List<MessageExt> msgs,
ConsumeConcurrentlyContext context) -> {
MessageExt msg = msgs.get(0);
try {
log.info("收到消息:{}",
new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
log.info("异常:", e);
}
// 消息的重试次数
int reconsumeTimes = msg.getReconsumeTimes();
log.info("reconsumeTimes={}, delayTimeLevel={}", reconsumeTimes,
msg.getDelayTimeLevel());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// 只重试三次
/* if(reconsumeTimes >= 3){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}*/
});
// 运行
consumer.start();
log.info("消费者启动成功");
}
/** 用于pull模式中记录offset使用*/
private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<>();
/**
* pull 消费模式, 不建议使用
*/
private void pullMode() throws MQClientException {
DefaultMQPullConsumer consumer
= new DefaultMQPullConsumer("simple_pull_consumer_group_01");
consumer.setNamesrvAddr("192.168.195.88:9876");
consumer.start();
// 获取topic中的所有队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("testTopic1");
for (MessageQueue mq : mqs) {
log.info("消费队列:{}", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
log.info("pullResult:{}", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
pullResult.getMsgFoundList().forEach(msgExt -> {
try {
log.info("收到消息:{}", new String(msgExt.getBody(),
RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
log.info("异常:", e);
}
});
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case NO_MATCHED_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
log.info("异常:", e);
}
}
}
}
/**
* 获取偏移量
*/
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSET_TABLE.get(mq);
if (offset != null) {
return offset;
}
return 0;
}
/**
* 保存偏移量
*/
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSET_TABLE.put(mq, offset);
}
}
分布式事务解决方案
逻辑是这样的:
- 事务发起方发送事务消息到MQ中,但此时该消息对方不可见
- 消息发送成功后,执行本地事务
- 根据本地事务执行结果,向MQ发送确认消息commit 或rollback;如果为rollbalke,MQ则删除事务消息不在下发,如为commit,则事务消息变为对方可见
- 对方消费事务消息,如果消费失败则进行重试,重试仍未成功则需要人工处理
- 如果确认消息发送失败,或者执行本地事务时事务发起方宕掉,这时MQ 将会不停的询问其同组的其他producer来获取状态。
实例代码(Springboot项目):
@Component
@Slf4j
public class TransactionProducer implements InitializingBean {
@Autowired
private TransactionListener transactionListener;
private TransactionMQProducer producer;
@Override
public void afterPropertiesSet() throws Exception {
producer = new TransactionMQProducer("transaction_producer_group_01");
producer.setNamesrvAddr("192.168.195.88:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
try {
producer.start();
} catch (MQClientException e) {
log.error("TransactionProducer 启动异常:", e);
}
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> producer.shutdown()));
}
/**
* 发送事务消息
*/
public void produce() {
try {
String body= "transaction_test_1";
Message msg = new Message("transactionTopicTest1", "user",
UUID.randomUUID().toString(),
body.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
log.info("发送事务消息结果:{}", sendResult);
} catch (MQClientException | UnsupportedEncodingException e) {
log.info("发送事务消息异常:{}", e);
}
}
}
@Component
@Slf4j
public class TransactionListenerImpl implements TransactionListener {
@Resource(name = "jdbcTemplate1")
private JdbcTemplate jdbcTemplate;
/** 生产中可以用redis或数据库代替 */
private ConcurrentHashMap<String, LocalTransactionState> localTrans
= new ConcurrentHashMap<>();
/**
* 执行本地事务
* @param msg 事务消息
* @param arg 自定义参数
* @return 执行结果
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
jdbcTemplate.execute("insert into user(name) values('transaction_test_1')");
localTrans.put(msg.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);
log.info("本地事务执行成功");
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e){
localTrans.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);
log.error("本地事务执行失败");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
LocalTransactionState state = localTrans.get(msg.getTransactionId());
log.info("执行事务回查,transactionId:{}, transactionState:{}",
msg.getTransactionId(), state);
if (null != state) {
return state;
}
return LocalTransactionState.UNKNOW;
}
}
@Component
@Slf4j
public class TransactionConsumer implements InitializingBean {
@Resource(name = "jdbcTemplate2")
private JdbcTemplate jdbcTemplate;
@Override
public void afterPropertiesSet() throws Exception {
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("transaction_consumer_group_01");
consumer.setNamesrvAddr("192.168.195.88:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("transactionTopicTest1", "user");
consumer.registerMessageListener((List<MessageExt> msgs,
ConsumeConcurrentlyContext context) -> {
try {
MessageExt msg = msgs.get(0);
log.info("收到消息:{}", msg);
// 正式项目中,我们应该保存该msgId,防止消息的重复消费
// String msgId = msg.getMsgId();
String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
jdbcTemplate.execute("insert into user(name) values('"+body+"')");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.info("异常:", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
log.info("TransactionConsumer启动成功");
}
}
/**
* 数据源配置,用的Mysql和Druid
*/
@Configuration
public class DbConfig {
@Bean(name = "dataSource1")
DataSource dataSource1() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mysql://192.168.195.88:3306/jta1?useUnicode=true&characterEncoding=UTF8&useSSL=false");
dataSource.setUsername("root");
dataSource.setPassword("root");
return dataSource;
}
@Bean(name = "dataSource2")
DataSource dataSource2() {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mysql://192.168.195.88:3306/jta2?useUnicode=true&characterEncoding=UTF8&useSSL=false");
dataSource.setUsername("root");
dataSource.setPassword("root");
return dataSource;
}
@Bean("jdbcTemplate1")
JdbcTemplate first(@Qualifier("dataSource1") DataSource dataSource) {
JdbcTemplate jdbcTemplate = new JdbcTemplate();
jdbcTemplate.setDataSource(dataSource);
return jdbcTemplate;
}
@Bean("jdbcTemplate2")
JdbcTemplate second(@Qualifier("dataSource2") DataSource dataSource) {
JdbcTemplate jdbcTemplate = new JdbcTemplate();
jdbcTemplate.setDataSource(dataSource);
return jdbcTemplate;
}
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {
@Autowired
TransactionProducer transactionProducer;
/**
* 测试分布式事务
*/
@Test
public void testTransactionProducer() throws InterruptedException {
transactionProducer.produce();
Thread.sleep(10000000);
}
}
其他的一些东西
以下内容没有主要研究,主要摘自官网,有需要时再详细研究。
有序消息
原理和Kafka要做到有序差不多,即把消息放入一个队列中,然后使用一个消费者去进行消费。
广播消息
即每个消费者消费都所有的消息
//设置广播模式属性即可
consumer.setMessageModel(MessageModel.BROADCASTING);
批量发送消息
批量发送消息可提高生产者的性能。注意的是:同一批次的消息应该具有:相同的topic,相同的waitStoreMsgOK和没有延时计划,并且一批消息的总大小不应超过1M。
消息过滤器
RocketMQ在消息过滤上是比较强大的,虽然我们可能不会经常用到它。Message除了可以Tag来加以区分外,我们还可以为它添加额外的属性。如下:
在发送消息时:
// 消息添加额外属性
msg.putUserProperty("a", "3");
在消费消息时:
// 只有消费具有属性a,a>= 0且a <= 3的消息
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
还有非常多的过滤方式如:in
and
or
not
is null
=
<
>
等等
消息重复
RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。