本文从外部消息在worker进程内部的转化,传递及处理过程入手,一步步分析在worker-data中的数据项存在的原因和意义。试图从代码实现的角度来回答,如果是从头开始实现worker的话,该如何来定义消息接口,如何实现各自接口上的消息处理。
Topology到Worker的映射关系
Topology由Spout,Bolt组成,其逻辑关系大体如下图所示。
无论是Spout或Bolt的处理逻辑都需要在进程或线程内执行,那么它们与进程及线程间的映射关系又是如何呢。有关这个问题,Understanding the Parallelism of a Storm Topology 一文作了很好的总结,现重复一下其要点。
- worker是进程,executor对应于线程,spout或bolt是一个个的task
- 同一个worker只会执行同一个topology相关的task
- 在同一个executor中可以执行多个同类型的task, 即在同一个executor中,要么全部是bolt类的task,要么全部是spout类的task
- 运行的时候,spout和bolt需要被包装成一个又一个task
worker,executor, task三者之间的关系可以用下图表示
小结一下,Worker=Process, Executor=Thread, Task=Spout or Bolt.
每一个executor使用的是actor pattern,high level的处理逻辑如下图所示
外部消息的接收和处理
在源码走读之四一文中总结了worker进程内的各种类型的thread,也即executor,这个等同于定义了进程内部和进程间的接口类型。那么这些接口上的消息在具体流传和处理过程中需要定义哪些数据结构,针对这些数据结构,又要做哪些必要的处理呢?
换句话说,就是为什么在worker.clj中有哪些数据和函数存在,不这样做,可以不?
先图示一下,外部消息处理的大概流程。
注:圈起来的数字表示消息转换和处理的序列。
步骤一
监听端口准备就绪,接收线程在收到外部的消息后,面临的问题就是如何确定由哪个task来处理该消息。接收到的tuple中含有task-id,根 据task-id可以知道运行该task的executor,executor中有receive-message-queue即(incoming queue)来存放外部的tuple. 定义的数据结构需要反映这个转换过程task-id->executor->receive-queue-map.
那么在worker-data中哪些数据项与这个过程相关呢
- :port
- :executor-receive-queue-map
- :short-executor-receive-queue-map
- :task->short-executor
- :transfer-local-fn
transfer-local-fn将数据从接收线程发送到spout或bolt所在的executor线程。
步骤二
接下来数据会被传递到executor,于是又牵涉到executor的数据结构问题。executor-data由函数mk-executor-data创建,其内容与worker-data比较起来相对较少。
executor收到tuple之后,第一步需要进行反序列化,storm中使用kyro来进行序列化和反序列化,这也是为什么在executor中有该数据项的原因。
executor中与步骤2相关的数据项
- :type executor-type
- :receive-queue
- :deserializer (executor-data中的数据项)
步骤三:
步骤2处理结束,会产生相应的tuple发送到外部。这个过程需要多解释一下,首先tuple不是直接发送给worker的transfer- thread(负责向其它进程发送消息),而是发送给send-handler线程,每一个executor在创建的时候最起码会有两个线程被创建,一个 用于运行bolt或spout的处理逻辑,另一个用以负责缓存bolt或spout产生的对外发送的tuple。
一旦snd-hander中的tuple数量达到阀值,这些被缓存的tuple会一次性发送给worker级别的transfer-thread.
executor中与步骤3相关的数据项
- :transfer-fn (mk-executor-transfer-fn batch-transfer->worker)
- :batch-transfer-queue
在步骤3中生成outgoing的tuple,tuple生成的时候需要回答两个基本问题
- tuple中含有哪些字段 -- 该问题的解答由spout或bolt中的declareOutFields来解决
- 由哪个node+port来接收该tuple -- 由grouping来解决,这个时候就可以看出为什么需要task这一层的逻辑抽象了,有关grouping的详细解释,请参考fxjwind撰写的Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)
步骤四:
处理逻辑很简单,先将数据缓存,然后在达到阀值之后,一起传送给transfer-thread.
start-batch-transfer->worker-handler
(defn start-batch-transfer->worker-handler! [worker executor-data] (let [worker-transfer-fn (:transfer-fn worker) cached-emit (MutableObject. (ArrayList.)) storm-conf (:storm-conf executor-data) serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data)) ] (disruptor/consume-loop* (:batch-transfer-queue executor-data) (disruptor/handler [o seq-id batch-end?] (let [^ArrayList alist (.getObject cached-emit)] (.add alist o) (when batch-end? (worker-transfer-fn serializer alist) (.setObject cached-emit (ArrayList.)) ))) :kill-fn (:report-error-and-die executor-data))))
worker-transfer-fn是worker中的transfer-fn,由mk-transfer-fn生成。
(defn mk-transfer-fn [worker] (let [local-tasks (-> worker :task-ids set) local-transfer (:transfer-local-fn worker) ^DisruptorQueue transfer-queue (:transfer-queue worker)] (fn [^KryoTupleSerializer serializer tuple-batch] (let [local (ArrayList.) remote (ArrayList.)] (fast-list-iter [[task tuple :as pair] tuple-batch] (if (local-tasks task) (.add local pair) (.add remote pair) )) (local-transfer local) ;; not using map because the lazy seq shows up in perf profiles (let [serialized-pairs (fast-list-for [[task ^TupleImpl tuple] remote] [task (.serialize serializer tuple)])] (disruptor/publish transfer-queue serialized-pairs) )))))
步骤五:
处理函数mk-transfer-tuples-handler,主要进行序列化,将序列化后的数据发送给目的地址。
(defn mk-transfer-tuples-handler [worker] (let [^DisruptorQueue transfer-queue (:transfer-queue worker) drainer (ArrayList.) node+port->socket (:cached-node+port->socket worker) task->node+port (:cached-task->node+port worker) endpoint-socket-lock (:endpoint-socket-lock worker) ] (disruptor/clojure-handler (fn [packets _ batch-end?] (.addAll drainer packets) (when batch-end? (read-locked endpoint-socket-lock (let [node+port->socket @node+port->socket task->node+port @task->node+port] ;; consider doing some automatic batching here (would need to not be serialized at this point to remove per-tuple overhead) ;; try using multipart messages ... first sort the tuples by the target node (without changing the local ordering) (fast-list-iter [[task ser-tuple] drainer] ;; TODO: consider write a batch of tuples here to every target worker ;; group by node+port, do multipart send (let [node-port (get task->node+port task)] (when node-port (.send ^IConnection (get node+port->socket node-port) task ser-tuple)) )))) (.clear drainer))))))
tuple发送的时候需要用到connection,但目前只知道task-id,所以在worker中需要保存task-id到node+port的映射,node+port与outgoing connections之间的映射。
worker中与步骤5相关的数据项:
- :cached-node+port->socket
- :cached-task->node+port
- :component->stream->fields
- :component->sorted-tasks
- :endpoint-socket-lock
- :transfer-queue (线程内部的消息队列)
- :task->component
其它的数据项
上述五个步骤并没有涵盖worker-data所有的数据项,那么其它的数据项归一归类,大体如下
timer相关,timer相关的数据项包括timer及其对应的处理句柄
- :heartbeat-timer
- :refresh-connection-timer
- :refresh-active-timer
- :executor-heartbeat-timer
- :user-timer
zk相关
- :storm-cluster-state
- :storm-active-atom
- :cluster-state
配置相关
- :conf
- :mq-context 在transport layer是使用zmq还是netty
Assignment相关
- :storm-id
- :assigment-id
- :worker-id
- :executors
- :task-ids
- :storm-conf
- :topology
- :system-topology
进程关闭相关
- :suicide-fn
其它的其它
- :uptime 运行时间,统计用
- :default-shared-resources 线程池
- :user-shared-resources 未启用
小结
设计的时候,一定是先画出一个大概的蓝图,然后逐步的细化并加以实现。具体来说,步骤如下
- manifest 定义主要的功能
- draw skeleton 画出实现草图,定义主要的接口
- discussion 与团队讨论
- data structures 数据结构
- function 函数实现
- testing 测试