1. storm通过multimethods来区分local和distributed模式
当调用launch-worker的时候, clojure会自动根据defmulti里面定义的fn来判断是调用哪个版本的launch-worker
(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
(defmethod launch-worker
:distributed [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
storm-home (System/getProperty "storm.home")
stormroot (supervisor-stormdist-root conf storm-id)
stormjar (supervisor-stormjar-path stormroot)
storm-conf (read-supervisor-storm-conf conf storm-id)
classpath (add-to-classpath (current-classpath) [stormjar])
childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS))
"%ID%"
(str port))
logfilename (str "worker-" port ".log")
command (str "java -server " childopts
" -Djava.library.path=" (conf JAVA-LIBRARY-PATH)
" -Dlogfile.name=" logfilename
" -Dstorm.home=" storm-home
" -Dlogback.configurationFile=" storm-home "/logback/cluster.xml"
" -cp " classpath " backtype.storm.daemon.worker "
(java.net.URLEncoder/encode storm-id) " " (:assignment-id supervisor)
" " port " " worker-id)]
(log-message "Launching worker with command: " command)
(launch-process command :environment {"LD_LIBRARY_PATH" (conf JAVA-LIBRARY-PATH)})
))
(defmethod launch-worker
:local [supervisor storm-id port worker-id]
(let [conf (:conf supervisor)
pid (uuid)
worker (worker/mk-worker conf
(:shared-context supervisor)
storm-id
(:assignment-id supervisor)
port
worker-id)]
(psim/register-process pid worker)
(swap! (:worker-thread-pids-atom supervisor) assoc worker-id pid)
))
2. storm通过multimethods来区分blot和spout
在executor.clj中通过multimethods来区分blot和spout不同的创建逻辑
(defmulti mk-threads executor-selector)
(defmulti mk-executor-stats executor-selector)
(defmulti close-component executor-selector)