系列文章目录
一. 背景
Kafka内部涉及大量的"延时"操作,比如收到PRODUCE请求后可为副本等待一个timeout的时间后再响应客户端。
那我们讨论一个问题:Kafka为什么自己实现了一个延时任务组件,而不直接使用java的java.util.concurrent.DelayQueue?
可以按如下两点来分析这个事:
-
DelayQueue提供了哪些能力?
-
Kafka所面对的场景是怎样的,为什么DelayQueue不适用?
1.1 DelayQueue的能力
DelayQueue相关的接口/类如下图:
对应地,DelayQueue所提供的能力如下:
-
为任务指定延时时间
DelayQueue所维护的任务都要实现Delayed接口;其中的getDelay方法返回了距离该任务所设定执行时间有多远; -
任务存取的时间复杂度为O(log n)
DelayQueue内部使用"优先级队列"来保存任务,该数据结构存取的时间复杂度为对数复杂度;
1.2 Kafka的业务场景
Kafka的业务背景有如下特点:
-
延时任务不一定非得等到超时后才执行,有一些事件会触发其提前执行;
比如PRODUCE请求处理过程中,若副本的响应比较快,那收到了预期数量的副本响应后就可以开始给客户端发响应,不一定非得等满一个timeout的时间; -
延时任务的"入队"操作QPS很高;
Kafka就是为高QPS而生,内部会产生大量的延时任务;
对应地,Kafka对延时任务组件有如下两点要求:
-
要求有提前执行任务的能力;
-
要求尽可能降低延时任务入队操作的时间复杂度;
(借助一个名为"时间轮"的数据结构,Kafka将时间复杂度实际降到了O(1))
这两点要求都无法通过直接应用DelayQueue的方式得到满足。
二. 组件接口
我们来看看Kafka的延时任务组件对外提供的接口,进而搞清楚其提供的能力和使用方式。
如下图:
左边两个类定义了"延时操作":
-
TimerTask
描述了一个最基本的延时Task;定义了超时时间(delayMs属性)、提供了一个取消操作(cancel方法); -
DelayedOperation
描述了一个可被外部事件提前触发的TimerTask;其在TimerTask基础上提供了检查是否满足提前触发条件的方法(tryComplete)、并定义被外部事件提前触发后的行为(onComplete)和无事件触发直到超时后的行为(onExpiration);这几个方法都需要DelayedOperation的子类进行实现;
右边的DelayedOperationPurgatory类定义了一个维护DelayOperaton的容器,其核心操作如下:
-
tryCompleteElseWatch
添加延时任务;该方法有两个入参:operation和watchkeys;前者是要添加的延时任务,后者是该任务要监听的事件类型(可以同时监听多个事件类型);可以在kafka.server.DelayedOperationKey所在scala文件中查看目前可选的watchkeys类型;一个watchkey对应一个延时任务队列; -
checkAndComplete
检查各任务是否以满足提前触发的条件;该方法在某个事件发生之后,逻辑是遍历watchkey对应的任务队列,逐个判断是否满足了触发条件; -
cancelForKey
取消watchkey下的所有延时任务;
三. 实现
下文主要关注"延时"的实现方式。
3.1 业务模型
时间轮延时组件的思路如下:
-
仍然使用java的DelayQueue存储延时任务;
但为了降低延时任务入队的事件复杂度,Kafka并不直接将单个延时任务直接存储于DelayQueue,而是先将延时任务列表(TimerTaskList)加入DelayQueue,然后再向对应的列表中添加延时任务 (TimerTaskList也有超时时间的概念,等于其中最快超时的任务的超时时间)。这样任务的入队就由向优先级队列添加元素变为了向TimerTaskList中添加元素;前者时间复杂度为O(logn),后者时间复杂度为O(1)。 -
如何判断一个新的延时任务应该加到哪个TimerTaskList中?时间轮(TimingWheel)!
delayQueue中存储了多个TimerTaskList,当拿到一个新的延时任务时,Kafka会使用TimingWheel来计算该延时任务应插入到哪个TimerTaskList。其实,TimingWheel的本质就是TimerTask和TimerTaskList之间的映射函数。
接下来通过一个具体的例子来说明这种映射逻辑:
先关注上图中①号时间轮。圆环中每一个单元格表示一个TimerTaskList。单元格有其关联的时间跨度;下方的"1s x 12"表示时间轮上共12个单元格,每个单元格的时间跨度为1秒。有一个指针指向了"当前时间"所对应的单元格。顺时针方向为时间流动方向。
当收到一个延迟时间在0-1s的TimerTask时,会将其追加到①号时间轮的橙色单元格中。当收到一个延时时间在3-4s的TimerTask时,会将其追加到①号时间轮的黄色单元格中。以此类推。
现在有个问题:①号时间轮能表示的最大延迟时间是12秒,那如果收到了延迟13秒的任务该怎么办?这时该用到②号时间轮了,我们称②号为①号的"溢出时间轮"。②号时间轮的特点如下:
- 一个单元格所标识的时间跨度为①号的总长度,即12秒;
- 单元格数量与①号相同;
如此,延迟时间在12-24s的TimerTask会被追加到②号的紫色单元格,延迟时间在36-48s的TimerTask会被追加到②号的绿色单元格中。③号时间轮同理。
刚刚是按①->②->③的顺序来分析时间轮的逻辑,反过来也可以得到有用的结论:想象手里有一个"放大镜",其实③号时间轮的蓝色单元格"放大"后是②号时间轮;②号时间轮的蓝色单元格"放大"后是①号时间轮;蓝色单元格并不实际存储TimerTask。
3.2 数据结构
DelayedOperationPurgatory有个Timer类型的timeoutTimer属性,用于维护延时任务。实际使用的是Timer的实现类:SystemTimer。该类用于维护延时任务的核心属性有两个:delayQueue和timingWheel。TimingWheel表示单个时间轮,接下来我们来看看其类图:
各属性含义如下:
-
buckets:TimerTaskList数组;其中一个元素对应一个时间轮单元格;
-
tickMs:本时间轮一个单元格所表示的时间跨度(毫秒);
-
intervel:本时间轮所能表示的时间总长度,其值为 b u c k e t s . l e n g t h ∗ t i c k M s buckets.length*tickMs buckets.length∗tickMs ;
-
currentTime:当前时间;其是tickMs的整数倍,这样可以通过currentTime的值很方便地计算出"当前"对应的时间轮单元格;其初始值计算公式如下:
currentTime = startMs - (startMs % tickMs );(其中startMs为时间轮创建时间) -
overflowWheel:本时间轮的"溢出时间轮";
3.3 算法
3.3.1 添加任务
添加任务的入口是 DelayedOperationPurgatory.tryCompleteElseWatch,其核心逻辑分如下两步:
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
// 1. 尝试立即指定指定的任务
...
// 2. 加入到Timer
timeoutTimer.add(operation)
...
}
SystemTimer.add直接调用了addTimerTaskEntry方法,后者逻辑如下:
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
// 尝试将任务加入时间轮
if (!timingWheel.add(timerTaskEntry)) {
// 若任务已过期且未被取消,直接提交执行
if (!timerTaskEntry.cancelled)
taskExecutor.submit(timerTaskEntry.timerTask)
}
}
TimingWheel.add的逻辑也很清晰,分如下4种场景处理:
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {
// 1.如果任务已取消
... ...
false
} else if (expiration < currentTime + tickMs) {
// 2.若任务已过期, 即任务过期时间在"当前单元格"内
... ...
false
} else if (expiration < currentTime + interval) {
// 3.虽然过期时间不在"当前单元格"内,但仍在当前时间轮之内
// 将任务正常加入到当前时间轮对应的单元格内
... ...
true
} else {
// 4.超出了当前时间轮所能表示的范围: 将任务加入到"溢出时间轮"
if (overflowWheel == null)
// 如果当前不存在上层时间轮, 那么生成一个
addOverflowWheel()
// 将任务加入到"溢出时间轮"
overflowWheel.add(timerTaskEntry)
}
}
3.3.2 尝试提前触发任务
入口是DelayedOperationPurgatory.checkAndComplete:
def checkAndComplete(key: Any): Int = {
// 1. 根据事件key查找对应的任务队列(watchers就是一个队列的封装);
val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
// 2. 遍历队列,尝试执行各任务;
if(watchers == null)
0
else
watchers.tryCompleteWatched()
}
接下来看Watchers.tryCompleteWatched方法的内容:
def tryCompleteWatched(): Int = {
var completed = 0
// 遍历队列
val iter = operations.iterator()
while (iter.hasNext) {
// 逐个处理各任务
val curr = iter.next()
if (curr.isCompleted) {
// 若任务已经处于完成态, 直接移除
iter.remove()
} else if (curr.maybeTryComplete()) {
// 若任务在调用maybeTryComplete后被成功触发, 则移除
iter.remove()
completed += 1
}
}
// 若队列已空, 移除当前队列
if (operations.isEmpty)
removeKeyIfEmpty(key, this)
// 返回本次成功提前触发的任务数量
completed
}
DelayedOperation.maybeTryComplete方法最终调用了DelayedOperation.tryComplete;
DelayedOperation的子类需要在后者中实现自己的"触发条件"检查逻辑;若满足了提前触发的条件,则调用forceComplete方法执行事件触发场景下的业务逻辑。
3.3.3 任务到期自动执行
DelayedOperationPurgatory中维护了一个expirationReaper线程,其职责就是循环调用kafka.utils.timer.SystemTimer#advanceClock来从事件轮中获取已超时的任务,并更新时间轮的"当前时间"指针。
四. 总结
才疏学浅,未能窥其十之一二,随时欢迎各位交流补充。若文章质量还算及格,可以点赞收藏加以鼓励,后续我继续更新。
另外也可以在目录中找到同系列的其他文章:
Kafka源码分析系列-目录(收藏关注不迷路)
感谢阅读。