容错方案
容错通常也是主链路规划里很重要的一环,消息队列RocketMQ作为集团中间件成员里的主要分子,每年也会参与到双十一中,经受各种考验。
容错方案:
-
通常是使用自动的降级熔断策略,当性能达到阈值时就会自动开启。此外还会设置一个手动的降级开关,来人工开启降级流程
-
还可以采用分段限流,根据系统承压能力、集群规模等设置不同的限流方案,同时还能根据应用水位来动态调整限流策略,例如接口响应时间是100ms时,可以设置每秒访问10万次,接口响应时间是200ms时,可以设置每秒访问5万次
容错策略类继承关系图
LatencyFaultTolerance
:延迟故障容错接口LatencyFaultToleranceImpl
:延迟故障容错实现类,具体容错功能的实现MQFaultStrategy
:RocketMQ提供的容错策略
源码分析
MQFaultStrategy
MQFaultStrategy
主要维护的属性:
- 每个
Broker
发送消息的延迟 - 发送消息延迟容错开关
- 不可用时长与延迟级别的映射关系
public class MQFaultStrategy {
private final static InternalLogger log = ClientLogger.getLog();
/**
* 维护每个Broker发送消息的延迟
* key:brokerName
*/
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
/**
* 发送消息延迟容错开关
*/
private boolean sendLatencyFaultEnable = false;
/**
* 延迟级别数组
*/
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
/**
* 不可用时长
*/
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
public long[] getNotAvailableDuration() {
return notAvailableDuration;
}
public void setNotAvailableDuration(final long[] notAvailableDuration) {
this.notAvailableDuration = notAvailableDuration;
}
public long[] getLatencyMax() {
return latencyMax;
}
public void setLatencyMax(final long[] latencyMax) {
this.latencyMax = latencyMax;
}
public boolean isSendLatencyFaultEnable() {
return sendLatencyFaultEnable;
}
public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
this.sendLatencyFaultEnable = sendLatencyFaultEnable;
}
/**
* 根据`TopicPublishInfo`,选择一个消息队列
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 判断容错开关是否打开,默认是false
if (this.sendLatencyFaultEnable) {
try {
// 根据负载均衡策略选择一个MQ,brokerName == lastBrokerName && 可用的MQ
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 上一步没选出来时,选一个相对较好的Broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
// 上面两步都没选出来时,默认负载均衡策略选一个MQ
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
/**
* 更新延迟容错信息
*
* currentLatency:延迟
* isolation:是否隔离
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
// 当开启隔离时,延迟取默认30000
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
// 更新broker的延迟
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
/**
* 计算延迟对应的不可用时间,采用的查表法
*/
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
}
从源码中不难看出,selectOneMessageQueue
在容错策略下选择MQ的步骤:
- 优先获取上一次用过的Broker(上一次用的很大程度上是可用的)
- 选择一个次优的Broker
- 默认负载均衡策略返回一个Broker
updateFaultItem
更新Broker对应的延迟,如果Producer
发送消息时间过长,则认为一段时间N内不可用,N的取值与Producer
发送消息持续时长的关系如下表:(其实就是上面源码中的latencyMax
和notAvailableDuration
数组)