Channel相关的代码主要位于nsqd/channel.gonsqd/nsqd.go中。

Channel与Topic的关系

Channel是消费者订阅特定Topic的一种抽象。对于发往Topic的消息,nsqd向该Topic下的所有Channel投递消息,而同一个Channel只投递一次,Channel下如果存在多个消费者,则随机选择一个消费者做投递。这种投递方式可以被用作消费者负载均衡。

Channel从属于特定Topic,可以认为是Topic的下一级。在同一个Topic之下可以有零个或多个Channel。 
和Topic一样,Channel同样有永久和临时之分,永久的Channel只能通过显式删除销毁,临时的Channel在最后一个消费者断开连接的时候被销毁。

与服务于生产者的Topic不同,Channel直接面向消费者。

在代码上Channel和Topic有许多相似之处,对于和Topic相同或者相似的部分,以下不再赘述,可以参考Topic相关博文。

Channel的创建

Channel和Topic在创建的时候都会初始化结构,初始化backend,创建消息循环,不同的是Channel在创建时多了给e2eProcessingLatencyStream赋值的以及initPQ部分。

其中e2eProcessingLatencyStream主要用于统计消息投递的延迟等,将在以后的博文中叙述。

initPQ函数创建了两个字典inFlightMessagesdeferredMessages和两个队列inFlightPQdeferredPQ。在nsq中inFlight指的是正在投递但还没确认投递成功的消息,defferred指的是投递失败,等待重新投递的消息。initPQ创建的字典和队列主要用于索引和存放这两类消息。其中两个字典使用消息ID作索引。

inFlightPQ使用newInFlightPqueue初始化,InFlightPqueue位于nsqd\in_flight_pqueue.gonsqd\in_flight_pqueue.go是nsq实现的一个优先级队列,提供了常用的队列操作,值得学习。

deferredPQ使用pqueue.New初始化,pqueue位于nsqd\pqueue.go,也是一个优先级队列。

待投递消息进入Channel

在分析Topic时提到,消息进入Topic的消息循环后会被投递到该Topic下所有的Channel,由Channel的PutMessage函数进行处理。

PutMessage判断当前Channel是否已经被销毁,若未销毁,则调用put函数进行处理,最后,自增消息计数器。

Channel的put函数与Topic的同名函数相似,可以参考Topic。

Channel对消息的处理

进入Channel的消息在messagePump函数中处理,该函数也与Topic的同名函数相似:消息都从memory和backend两个来源接收,然后解码消息后处理。与Topic不同的是,channel在投递消息前,会自增msg.Attempts,该变量用于保存投递尝试的次数。

在消息投递前会将bufferedCount置为1,在投递后置为0。该变量在Depth函数中被调用。

Deepth函数返回内存,磁盘以及正在投递的消息数量之和,也就是尚未投递成功的消息数。

messagePump函数在投递消息时将消息送入clientMsgChan,随后被nsqd\protocol_v2.gomessagePump函数处理。

在protocolV2的messagePump函数中,消息被通过投送到相应消费者。投递时首先调用Channel的StartInFlightTimeout函数

该函数填充消息的消费者ID、投送时间、优先级,然后调用pushInFlightMessage函数将消息放入inFlightMessages字典中。最后调用addToInFlightPQ将消息放入inFlightPQ队列中。

至此,消息投递流程完成,接下来需要等待消费者对投送结果的反馈。消费者通过发送FINREQTOUCH来回复对消息的处理结果。

关于TCP protocol相关的内容,在后续博文分析。以下只分析与Channel相关的部分。

消息投送结果处理

消息投送成功的处理

消费者发送FIN,表明消息已经被接收并正确处理。

FIN消息在与Channel相关的部分交由FinishMessage处理。最后调用addToInFlightPQ将消息放入inFlightPQ队列中。FinishMessage分别调用popInFlightMessageremoveFromInFlightPQ将消息从inFlightMessagesinFlightPQ中删除。最后,统计该消息的投递情况。

消息投送失败的处理

客户端发送REQ,表明消息投递失败,需要再次被投递。

Channel在RequeueMessage函数对消息投递失败进行处理。该函数将消息从inFlightMessagesinFlightPQ中删除,随后进行重新投递。

发送REQ时有一个附加参数timeout,该值为0时表示立即重新投递,大于0时表示等待timeout时间之后投递。

立即投递使用doRequeue函数,该函数简单地调用put函数重新进行消息的投递,并自增requeueCount,该变量在统计消息投递情况时用到。

如果timeout大于0,则调用StartDeferredTimeout进行延迟投递。首先计算延迟投递的时间点,然后调用pushDeferredMessage将消息加入deferredMessage字典,最后将消息放入deferredPQ队列。延迟投递的消息会被专门的worker扫描并在延迟投递的时间点后进行投递。 
需要注意的是,立即重新投递的消息不会进入deferredPQ队列。

消息的超时值的重置

消费者发送TOUCH,表明该消息的超时值需要被重置。

这个过程比较简单,从inFlightPQ中取出消息,设置新的超时值后重新放入队列,新的超时值由当前时间、客户端通过IDENTIFY设置的超时值、配置中允许的最大超时值MaxMsgTimeout共同决定。

消息的超时和延迟投递

消息超时和延迟投递的处理流程层次比较多:

首先是在nsqd\nsqd.go中启动的用于定时扫描的goroutine。该goroutine执行queueScanLoop函数

该函数使用若干个worker来扫描并处理当前在投递中以及等待重新投递的消息。worker的个数由配置和当前Channel数量共同决定。

首先,初始化3个gochannel:workCh、responseCh、closeCh,分别控制worker的输入、输出和销毁。

然后获取当前的Channel集合,并且调用resizePool函数来启动指定数量的worker。

最后进入扫描的循环。在循环中,等待两个定时器,workTickerrefreshTicker,定时时间分别由由配置中的QueueScanIntervalQueueScanRefreshInterval决定。这种由等待定时器触发的循环避免了函数持续的执行影响性能,而Golang的特性使得这种机制在写法上非常简洁。

  1. workTicker定时器触发扫描流程。 
    nsqd采用了Redis的probabilistic expiration算法来进行扫描。首先从所有Channel中随机选取部分Channel,然后遍历被选取的Channel,投到workerChan中,并且等待反馈结果,结果有两种,dirty和非dirty,如果dirty的比例超过配置中设定的QueueScanDirtyPercent,那么不进入休眠,继续扫描,如果比例较低,则重新等待定时器触发下一轮扫描。这种机制可以在保证处理延时较低的情况下减少对CPU资源的浪费。
  2. refreshTicker定时器触发更新Channel列表流程。 
    这个流程比较简单,先获取一次Channel列表, 
    再调用resizePool重新分配worker。

接下来再看看resizePool的实现。

这个部分比较简单。注意一点,当需要的worker数量超过之前分配的数量时,通过向closeCh投递消息使多余的worker销毁,如果需要的数量比之前的多,则通过queueScanWorker创建新的worker。

queueScanWorker接收workCh发来的消息,处理,并且通过responseCh反馈消息。收到closeCh时则关闭。由于所有worker都监听相同的closeCh,所以当向closeCh发送消息时,随机关闭一个worker。且由于workChcloseCh的监听是串行的,所以不存在任务处理到一半时被关闭的可能。这也是nsq中优雅关闭gochannel的的一个例子。

worker处理两件事:

一是处理inFlight消息

processInFlightQueue取出inFlightPQ顶部的消息,如果当前消息已经超时,则将消息从队列中移除,并返回消息。由于队列是优先级队列,所以如果processInFlightQueue取出的消息为空,则不需要再往后取了,直接返回false表示当前非dirty状态。如果取到了消息,则说明该消息投递超时,需要把消息传入doRequeue立即重新投递。

二是处理deferred消息

该处理流程与处理inFlight基本相同,不再详述。

其他操作

Channel中还有些其他函数如ExitingDeleteCloseexitEmptyflushPauseUnPausedoPause 
等与Topic中很接近,不再详述。

AddClientRemoveClient将在分析Client时讨论。

总结

Topic/Channel是发布/订阅模型的一种实现。Topic对应于发布,Channel对应于订阅。消费者通过在Topic下生成不同的Channel来接收来自该Topic的消息。通过生成相同的Channel来实现消费者负载均衡。

Channel本身在投递消息给消费者时维护两个队列,一个是inFlight队列,该队列存储正在投递,但还没被标记为投递成功的消息。另一个是deferred队列,用来存储需要被延时投递的消息。

inFlight队列中消息可能因为投递超时而失败,deferred队列中的消息需要在到达指定时间后进行重新投递。如果为两个队列中的每个消息都分别指定定时器,无疑是非常消耗资源的。因此nsq采用定时扫描队列的做法。 
在扫描时采用多个worker分别处理。这种类似多线程的处理方式提高了处理效率。nsq在扫描策略上使用了Redis的probabilistic expiration算法,同时动态调整worker的数量,这些优化平衡了效率和资源占用。

04-27 21:56