在 RocketMQ 主要的组件如下。

NameServer
NameServer 集群,Topic 的路由注册中心,为客户端根据 Topic 提供路由服务,
从而引导客户端向 Broker 发送消息。NameServer 之间的节点不通信。
路由信息在 NameServer 集群中数据一致性采取的最终一致性。

Broker
消息存储服务器,分为两种角色:
Master 与 Slave,
在 RocketMQ 中,主服务承担读写操作,从服务器作为一个备份,当主服务器存在压力时,
从服务器可以承担读服务(消息消费)。所有 Broker,包含 Slave 服务器每隔 30s 会向 NameServer 发送心跳包,
心跳包中会包含存在在 Broker 上所有的 Topic 的路由信息。

Client
消息客户端,包括 Producer(消息发送者)和 Consumer(消费消费者)。
客户端在同一时间只会连接一台 NameServer,只有在连接出现异常时才会向尝试连接另外一台。
客户端每隔 30s 向 NameServer 发起 Topic 的路由信息查询。

温馨提示:NameServer 是在内存中存储 Topic 的路由信息,持久化 Topic 路由信息的地方是在 Broker 中,
即 ${ROCKETMQ_HOME}/store/config/topics.json。

在 RocketMQ 4.5.0 版本后引入了多副本机制,即一个复制组(m-s)可以演变为基于 Raft 协议的复制组,
复制组内部使用 Raft 协议保证 Broker 节点数据的强一致性,该部署架构在金融行业用的比较多。

消息订阅模型
在 RocketMQ 的消息消费模式采用的是发布与订阅模式。

Topic:一类消息的集合,消息发送者将一类消息发送到一个主题中,
例如订单模块将订单发送到 order_topic 中,而用户登录时,将登录事件发送到 user_login_topic 中。
ConsumerGroup:消息消费组,一个消费单位的“群体”,
消费组首先在启动时需要订阅需要消费的 Topic。一个 Topic 可以被多个消费组订阅,
同样一个消费组也可以订阅多个主题。一个消费组拥有多个消费者。

消费模式
在 RocketMQ 中支持广播模式与集群模式。
广播模式:一个消费组内的所有消费者每一个都会处理 Topic 中的每一条消息,通常用于刷新内存缓存。
集群模式:一个消费组内的所有消费者共同消费一个 Topic 中的消息,即分工协作,一个消费者消费一部分数据,启动负载均衡。
集群模式是非常普遍的模式,符合分布式架构的基本理念,即横向扩容,当前消费者如果无法快速及时处理消息时,
可以通过增加消费者的个数横向扩容,快速提高消费能力,及时处理挤压的消息。

在 MQ 领域有一个不成文的约定:同一个消费者同一时间可以分配多个队列,但一个队列同一时间只会分配给一个消费者。

RocketMQ 提供了众多的队列负载算法,其中最常用的两种平均分配算法。
AllocateMessageQueueAveragely:平均分配
AllocateMessageQueueAveragelyByCircle:轮流平均分配

温馨提示:如果 Topic 的队列个数小于消费者的个数,那有些消费者无法分配到消息。
在 RocketMQ 中一个 Topic 的队列数直接决定了最大消费者的个数,但 Topic 队列个数的增加对 RocketMQ 的性能不会产生影响。


Linux 常用的端口命令:
修改主机名:
vim /etc/hostname
修改IP地址:
cd /etc/sysconfig/network-scripts/
vim ifcfg-ens33 修改IP

# 关闭防火墙
systemctl stop firewalld.service
# 查看防火墙的状态
firewall-cmd --state
# 禁止firewall开机启动
systemctl disable firewalld.service
# 开放name server默认端口
firewall-cmd --remove-port=9876/tcp --permanent
# 开放master默认端口
firewall-cmd --remove-port=10911/tcp --permanent
# 开放slave默认端口 (当前集群模式可不开启)
firewall-cmd --remove-port=11011/tcp --permanent
# 重启防火墙
firewall-cmd --reload

在官网下载rocketmq 的二进制包,并上传至虚拟机
https://rocketmq.apache.org/release_notes/release-notes-4.9.3

RocketMQ服务搭建  
下载RocketMQ源码 : Apache Download Mirrors
解压 :rocketmq-all-4.9.2-bin-release.zip
unzip rocketmq-all-4.9.2-bin-release.zip
如果没有 unzip 命令 安装 yum install unzip
进入机器a的目录
cd /usr/local/bin/rocketmq-all-4.9.3-bin-release/conf/2m-2s-sync
在机器a上配置broker-a的master节点
vim broker-a.properties
内容:
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/bin/rocketmq/store/broker-a
#commitLog 存储路径
storePathCommitLog=/usr/local/bin/rocketmq/store/broker-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/bin/rocketmq/store/broker-a/consumerqueue
#消息索引存储路径
storePathIndex=/usr/local/bin/rocketmq/store/broker-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/bin/rocketmq/store/broker-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/bin/rocketmq/store/broker-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
brokerIP1=192.168.178.128

在机器a上配置broker-b的slave节点
vim broker-b-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/bin/rocketmq/store/broker-b
#commitLog 存储路径
storePathCommitLog=/usr/local/bin/rocketmq/store/broker-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/bin/rocketmq/store/broker-b/consumerqueue
#消息索引存储路径
storePathIndex=/usr/local/bin/rocketmq/store/broker-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/bin/rocketmq/store/broker-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/bin/rocketmq/store/broker-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
brokerIP1=192.168.178.128

在机器b上配置broker-b 的master 节点
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/bin/rocketmq/store/broker-b
#commitLog 存储路径
storePathCommitLog=/usr/local/bin/rocketmq/store/broker-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/bin/rocketmq/store/broker-b/consumerqueue
#消息索引存储路径
storePathIndex=/usr/local/bin/rocketmq/store/broker-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/bin/rocketmq/store/broker-b/checkpoint
#abort 文件存储路径
abortFile=/usr/local/bin/rocketmq/store/broker-b/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
brokerIP1=192.168.178.129

在机器b上配置broker-a 的slave节点
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/bin/rocketmq/store/broker-a
#commitLog 存储路径
storePathCommitLog=/usr/local/bin/rocketmq/store/broker-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/bin/rocketmq/store/broker-a/consumerqueue
#消息索引存储路径
storePathIndex=/usr/local/bin/rocketmq/store/broker-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/bin/rocketmq/store/broker-a/checkpoint
#abort 文件存储路径
abortFile=/usr/local/bin/rocketmq/store/broker-a/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
brokerIP1=192.168.178.129

分别在a和b两台机器上修改jvm启动参数
修改 runbroker.sh 中的jvm初始堆内存,最大堆内存,年轻代的大小
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

修改 runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

开始启动nameServer
分别在a ,b两台机器运行
cd /usr/local/bin/rocketmq-all-4.5.0-bin-release/bin
nohup sh mqnamesrv &

在机器a上启动broker-a的master节点
cd /usr/local/bin/rocketmq-all-4.5.0-bin-release/bin

nohup sh mqbroker -c /usr/local/bin/rocketmq-all-4.5.0-bin-release/conf/2m-2s-sync/broker-a.properties &
在机器a上启动broker-b的slave节点 


nohup sh mqbroker -c /usr/local/bin/rocketmq-all-4.5.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
13.在机器b上启动broker-b的master节点


nohup sh mqbroker -c /usr/local/bin/rocketmq-all-4.5.0-bin-release/conf/2m-2s-sync/broker-b.properties &
在机器b上启动broker-a的slave节点

nohup sh mqbroker -c /usr/local/bin/rocketmq-all-4.5.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &

查看是否启动成功了:
jps

到此 2主两从的节点就启动完成了

rocketmq-console-ng仪表台
下载安装
https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0
修改配置文件:
server.address=0.0.0.0
# =============修改端口为19876=============
server.port=19876

### SSL setting
#server.ssl.key-store=classpath:rmqcngkeystore.jks
#server.ssl.key-store-password=rocketmq
#server.ssl.keyStoreType=PKCS12
#server.ssl.keyAlias=rmqcngkey

#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.level.root=INFO
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
# =============修改namesrv地址,如果是多个请以分号;分隔(此处修改成你自己安装的rocket namesrv地址ip即可)=============
rocketmq.config.namesrvAddr=10.211.55.11:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
#set the message track trace topic if you don't want use the default one
rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket

#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
# =============开启控制台账户密码登录(为true表示开启,为false表示关闭)=============
rocketmq.config.loginRequired=true

#set the accessKey and secretKey if you used acl
#rocketmq.config.accessKey=
#rocketmq.config.secretKey=

修改 users.properties 内容如下(每次修改完记得重新编译打包):
# This file supports hot change, any change will be auto-reloaded without Console restarting.
# Format: a user per line, username=password[,N] #N is optional, 0 (Normal User); 1 (Admin)

# Define Admin
# =============用户名和密码规则「用户名=密码,权限」,这里的权限为1表示管理员,为0表示普通用户=============
# 例如:admin=admin123,1
这是用户名=这是密码,1


# Define Users
# =============屏蔽下边两个账户=============
#user1=user1
#user2=user2


放开console控制台的监控参数配置
默认的rocketmq-console将此功能注释掉了,修改文件:
~/rocketmq-console/src/resources/static/view/pages/consumer.html,将如下图所示的代码放开即可。
开启定时任务监控,扫描实时数据,做阈值判断,告警提示
默认情况下,rocketmq-console只定义了定时任务入口,具体的策略没有任何处理,我们需要根据自己的需求加入自身的告警方式,比如:邮箱,钉钉,短信,微信等等。
其预留的定时任务实现类为:
org.apache.rocketmq.console.task.MonitorTask
定时任务的扫描频率可根据自身系统要求考量设置。

简单的介绍面板
https://wenku.baidu.com/view/bada131113661ed9ad51f01dc281e53a5902514a.html


参考文档:
ttps://learn.lianglianglee.com/专栏/RocketMQ%20实战与进阶(完)/03%20消息发送%20API%20详解与版本变迁说明.md
ttps://blog.csdn.net/luanlouis/article/details/88078657
https://rocketmq.apache.org/docs/quick-start/
https://www.sunjs.com/article/detail/a44ed76899194d1fa1cc883c70b99c1f.html

07-03 10:16