与Channel相关的代码主要位于nsqd/channel.go
, nsqd/nsqd.go
中。
Channel与Topic的关系
Channel是消费者订阅特定Topic的一种抽象。对于发往Topic的消息,nsqd向该Topic下的所有Channel投递消息,而同一个Channel只投递一次,Channel下如果存在多个消费者,则随机选择一个消费者做投递。这种投递方式可以被用作消费者负载均衡。
Channel从属于特定Topic,可以认为是Topic的下一级。在同一个Topic之下可以有零个或多个Channel。
和Topic一样,Channel同样有永久和临时之分,永久的Channel只能通过显式删除销毁,临时的Channel在最后一个消费者断开连接的时候被销毁。
与服务于生产者的Topic不同,Channel直接面向消费者。
在代码上Channel和Topic有许多相似之处,对于和Topic相同或者相似的部分,以下不再赘述,可以参考Topic相关博文。
Channel的创建
Channel和Topic在创建的时候都会初始化结构,初始化backend,创建消息循环,不同的是Channel在创建时多了给e2eProcessingLatencyStream
赋值的以及initPQ
部分。
其中e2eProcessingLatencyStream
主要用于统计消息投递的延迟等,将在以后的博文中叙述。
initPQ
函数创建了两个字典inFlightMessages
、deferredMessages
和两个队列inFlightPQ
、deferredPQ
。在nsq中inFlight指的是正在投递但还没确认投递成功的消息,defferred指的是投递失败,等待重新投递的消息。initPQ
创建的字典和队列主要用于索引和存放这两类消息。其中两个字典使用消息ID作索引。
inFlightPQ
使用newInFlightPqueue
初始化,InFlightPqueue
位于nsqd\in_flight_pqueue.go
。nsqd\in_flight_pqueue.go
是nsq实现的一个优先级队列,提供了常用的队列操作,值得学习。
deferredPQ
使用pqueue.New
初始化,pqueue
位于nsqd\pqueue.go
,也是一个优先级队列。
待投递消息进入Channel
在分析Topic时提到,消息进入Topic的消息循环后会被投递到该Topic下所有的Channel,由Channel的PutMessage
函数进行处理。
PutMessage
判断当前Channel是否已经被销毁,若未销毁,则调用put
函数进行处理,最后,自增消息计数器。
Channel的put
函数与Topic的同名函数相似,可以参考Topic。
Channel对消息的处理
进入Channel的消息在messagePump
函数中处理,该函数也与Topic的同名函数相似:消息都从memory和backend两个来源接收,然后解码消息后处理。与Topic不同的是,channel在投递消息前,会自增msg.Attempts
,该变量用于保存投递尝试的次数。
在消息投递前会将bufferedCount
置为1,在投递后置为0。该变量在Depth
函数中被调用。
Deepth
函数返回内存,磁盘以及正在投递的消息数量之和,也就是尚未投递成功的消息数。
messagePump
函数在投递消息时将消息送入clientMsgChan
,随后被nsqd\protocol_v2.go
的messagePump
函数处理。
在protocolV2的messagePump
函数中,消息被通过投送到相应消费者。投递时首先调用Channel的StartInFlightTimeout
函数
该函数填充消息的消费者ID、投送时间、优先级,然后调用pushInFlightMessage
函数将消息放入inFlightMessages
字典中。最后调用addToInFlightPQ
将消息放入inFlightPQ
队列中。
至此,消息投递流程完成,接下来需要等待消费者对投送结果的反馈。消费者通过发送FIN
、REQ
、TOUCH
来回复对消息的处理结果。
关于TCP protocol相关的内容,在后续博文分析。以下只分析与Channel相关的部分。
消息投送结果处理消息投送成功的处理
消费者发送FIN
,表明消息已经被接收并正确处理。
FIN消息在与Channel相关的部分交由FinishMessage
处理。最后调用addToInFlightPQ
将消息放入inFlightPQ
队列中。FinishMessage
分别调用popInFlightMessage
和removeFromInFlightPQ
将消息从inFlightMessages
和inFlightPQ
中删除。最后,统计该消息的投递情况。
消息投送失败的处理
客户端发送REQ
,表明消息投递失败,需要再次被投递。
Channel在RequeueMessage
函数对消息投递失败进行处理。该函数将消息从inFlightMessages
和inFlightPQ
中删除,随后进行重新投递。
发送REQ
时有一个附加参数timeout,该值为0时表示立即重新投递,大于0时表示等待timeout时间之后投递。
立即投递使用doRequeue
函数,该函数简单地调用put
函数重新进行消息的投递,并自增requeueCount
,该变量在统计消息投递情况时用到。
如果timeout大于0,则调用StartDeferredTimeout
进行延迟投递。首先计算延迟投递的时间点,然后调用pushDeferredMessage
将消息加入deferredMessage
字典,最后将消息放入deferredPQ
队列。延迟投递的消息会被专门的worker扫描并在延迟投递的时间点后进行投递。
需要注意的是,立即重新投递的消息不会进入deferredPQ
队列。
消息的超时值的重置
消费者发送TOUCH
,表明该消息的超时值需要被重置。
这个过程比较简单,从inFlightPQ
中取出消息,设置新的超时值后重新放入队列,新的超时值由当前时间、客户端通过IDENTIFY
设置的超时值、配置中允许的最大超时值MaxMsgTimeout
共同决定。
消息的超时和延迟投递
消息超时和延迟投递的处理流程层次比较多:
首先是在nsqd\nsqd.go
中启动的用于定时扫描的goroutine。该goroutine执行queueScanLoop
函数
该函数使用若干个worker来扫描并处理当前在投递中以及等待重新投递的消息。worker的个数由配置和当前Channel数量共同决定。
首先,初始化3个gochannel:workCh、responseCh、closeCh,分别控制worker的输入、输出和销毁。
然后获取当前的Channel集合,并且调用resizePool
函数来启动指定数量的worker。
最后进入扫描的循环。在循环中,等待两个定时器,workTicker
和refreshTicker
,定时时间分别由由配置中的QueueScanInterval
和QueueScanRefreshInterval
决定。这种由等待定时器触发的循环避免了函数持续的执行影响性能,而Golang的特性使得这种机制在写法上非常简洁。
workTicker
定时器触发扫描流程。
nsqd采用了Redis的probabilistic expiration算法来进行扫描。首先从所有Channel中随机选取部分Channel,然后遍历被选取的Channel,投到workerChan
中,并且等待反馈结果,结果有两种,dirty和非dirty,如果dirty的比例超过配置中设定的QueueScanDirtyPercent
,那么不进入休眠,继续扫描,如果比例较低,则重新等待定时器触发下一轮扫描。这种机制可以在保证处理延时较低的情况下减少对CPU资源的浪费。refreshTicker
定时器触发更新Channel列表流程。
这个流程比较简单,先获取一次Channel列表,
再调用resizePool
重新分配worker。
接下来再看看resizePool
的实现。
这个部分比较简单。注意一点,当需要的worker数量超过之前分配的数量时,通过向closeCh
投递消息使多余的worker销毁,如果需要的数量比之前的多,则通过queueScanWorker
创建新的worker。
queueScanWorker
接收workCh
发来的消息,处理,并且通过responseCh
反馈消息。收到closeCh
时则关闭。由于所有worker都监听相同的closeCh
,所以当向closeCh
发送消息时,随机关闭一个worker。且由于workCh
和closeCh
的监听是串行的,所以不存在任务处理到一半时被关闭的可能。这也是nsq中优雅关闭gochannel的的一个例子。
worker处理两件事:
一是处理inFlight消息
processInFlightQueue
取出inFlightPQ
顶部的消息,如果当前消息已经超时,则将消息从队列中移除,并返回消息。由于队列是优先级队列,所以如果processInFlightQueue
取出的消息为空,则不需要再往后取了,直接返回false表示当前非dirty状态。如果取到了消息,则说明该消息投递超时,需要把消息传入doRequeue
立即重新投递。
二是处理deferred消息
该处理流程与处理inFlight基本相同,不再详述。
其他操作
Channel中还有些其他函数如Exiting
、Delete
、Close
、exit
、Empty
、flush
、Pause
、UnPause
、doPause
等与Topic中很接近,不再详述。
AddClient
和RemoveClient
将在分析Client时讨论。
总结
Topic/Channel是发布/订阅模型的一种实现。Topic对应于发布,Channel对应于订阅。消费者通过在Topic下生成不同的Channel来接收来自该Topic的消息。通过生成相同的Channel来实现消费者负载均衡。
Channel本身在投递消息给消费者时维护两个队列,一个是inFlight队列,该队列存储正在投递,但还没被标记为投递成功的消息。另一个是deferred队列,用来存储需要被延时投递的消息。
inFlight队列中消息可能因为投递超时而失败,deferred队列中的消息需要在到达指定时间后进行重新投递。如果为两个队列中的每个消息都分别指定定时器,无疑是非常消耗资源的。因此nsq采用定时扫描队列的做法。
在扫描时采用多个worker分别处理。这种类似多线程的处理方式提高了处理效率。nsq在扫描策略上使用了Redis的probabilistic expiration算法,同时动态调整worker的数量,这些优化平衡了效率和资源占用。