supervisor啓動worker源碼分析-worker.clj

supervisor經過調用sync-processes函數來啓動worker,關於sync-processes函數的詳細分析請參見"storm啓動supervisor源碼分析-supervisor.clj"。sync-processes函數代碼片斷以下:java

sync-processes函數代碼片斷node

;; sync-processes函數用於管理workers, 好比處理不正常的worker或dead worker, 並建立新的workers
;; supervisor標識supervisor的元數據
(defn sync-processes [supervisor]
                .
            .
            .
              ;; 忽略了部分代碼
              .
              .
              .
       (wait-for-workers-launch
            conf
            (dofor [[port assignment] reassign-executors]
              (let [id (new-worker-ids port)]
                (log-message "Launching worker with assignment "
                             (pr-str assignment)
                             " for this supervisor "
                             (:supervisor-id supervisor)
                             " on port "
                             port
                             " with id "
                             id
                             )
                ;; launch-worker函數負責啓動worker              
                (launch-worker supervisor
                               (:storm-id assignment)
                               port
                               id)
                id)))
  ))

sync-processes函數調用launch-worker函數啓動worker,launch-worker函數是一個"多重函數",定義以下:
宏defmulti和defmethod常常被用在一塊兒來定義multimethod-"多重函數"。宏defmulti的參數包括一個方法名以及一個dispatch函數,這個dispatch函數的返回值會被用來選擇到底調用哪一個重載的函數。宏defmethod的參數則包括方法名,dispatch的值,參數列表以及方法體。一個特殊的dispatch值:default 是用來表示默認狀況的—即若是其它的dispatch值都不匹配的話,那麼就調用這個方法。defmethod定義名字相同的方法,它們的參數個數必須同樣。傳給multimethod的參數會傳給dipatch函數。實現相似java的重載shell

launch-worker函數數組

( defmulti launch-worker ( fn [ supervisor & _ ] ( cluster-mode ( :conf supervisor))))

 

;; 若是dispatch函數的返回值爲關鍵字:distributed,即storm集羣運行在分佈式模式下,則執行該方法
( defmethod launch-worker
    ;; supervisor標識supervisor的元數據,storm-id標識該worker所屬的topology,port標識該worker佔用的端口號,worker-id是一個32位的uuid,用於標識worker
    :distributed [ supervisor storm-id port worker-id ]
    ;; conf綁定集羣配置信息
   ( let [ conf ( :conf supervisor)
          ;; storm-home綁定storm本地安裝路徑
          storm-home ( System/getProperty "storm.home")
          ;; storm-log-dir綁定日誌路徑
          storm-log-dir ( or ( System/getProperty "storm.log.dir") ( str storm-home "/logs"))
          ;; stormroot綁定supervisor本地路徑"{storm.local.dir}/supervisor/stormdist/{storm-id}"
          stormroot ( supervisor-stormdist-root conf storm-id)
          ;; jlp綁定運行時所依賴的本地庫的路徑,jlp函數生成本地庫路徑,參見jlp函數定義部分
          jlp ( jlp stormroot conf)
          ;; stormjar綁定stormjar.jar文件的路徑"{storm.local.dir}/supervisor/stormdist/{storm-id}/stormjar.jar"
          stormjar ( supervisor-stormjar-path stormroot)
          ;; storm-conf綁定集羣配置信息和storm-id配置信息的並集
          storm-conf ( read-supervisor-storm-conf conf storm-id)
          ;; topo-classpath綁定storm-id的classpath集合
          topo-classpath ( if-let [ cp ( storm-conf TOPOLOGY-CLASSPATH )]
                          [ cp ]
                          [])
          ;; 將stormjar和topo-classpath所標識的路徑添加到Java的classpath中                
          classpath ( -> ( current-classpath)
                       ( add-to-classpath [ stormjar ])
                       ( add-to-classpath topo-classpath))
          ;; 從集羣配置信息中獲取默認狀況下supervisor啓動worker的jvm參數              
          worker-childopts ( when-let [s ( conf WORKER-CHILDOPTS )]
                            ( substitute-childopts s worker-id storm-id port))
          ;; 從topology的配置信息中獲取爲該topology的worker指定的jvm參數                
          topo-worker-childopts ( when-let [s ( storm-conf TOPOLOGY-WORKER-CHILDOPTS )]
                                 ( substitute-childopts s worker-id storm-id port))
          ;; 將該topology特有的依賴庫路徑合併到jlp中,這樣topology-worker-environment綁定的map中就包含了啓動該topology的worker所需的全部的依賴庫                                  
          topology-worker-environment ( if-let [ env ( storm-conf TOPOLOGY-ENVIRONMENT )]
                                       ( merge env { "LD_LIBRARY_PATH" jlp })
                                        { "LD_LIBRARY_PATH" jlp })
          ;; 生成該worker的日誌文件worker-{port}.log                              
          logfilename ( str "worker-" port ".log")
          ;; command綁定一個Java -server xxxxxx -cp classpath classname arg_0 arg_1 ... arg_n命令,xxxxxx表示傳遞給java命令的jvm參數
          command ( concat
                    [( java-cmd) "-server" ]
                    worker-childopts
                    topo-worker-childopts
                    [( str "-Djava.library.path=" jlp)
                    ( str "-Dlogfile.name=" logfilename)
                    ( str "-Dstorm.home=" storm-home)
                    ( str "-Dstorm.log.dir=" storm-log-dir)
                    ( str "-Dlogback.configurationFile=" storm-home "/logback/cluster.xml")
                    ( str "-Dstorm.id=" storm-id)
                    ( str "-Dworker.id=" worker-id)
                    ( str "-Dworker.port=" port)
                    "-cp" classpath
                    "backtype.storm.daemon.worker"
                    storm-id
                    ( :assignment-id supervisor)
                    port
                    worker-id ])
          ;; 去掉command命令數組中的空值
          command ( ->> command ( map str) ( filter ( complement empty?)))
          ;; 獲取command命令數組的字符串形式
          shell-cmd ( ->> command
                        ( map #( str \' ( clojure.string/escape % { \' "\\'" }) \'))
                        ( clojure.string/join " " ))]
     ( log-message "Launching worker with command: " shell-cmd)
      ;; 經過ProcessBuilder類來執行command命令,即執行java命令運行backtype.storm.daemon.worker類的main方法建立一個新的進程,傳遞給main方法的參數爲storm-id,supervisor-id,port和worker-id
      ;; 關於backtype.storm.daemon.worker類的main方法請參見其定義部分
     ( launch-process command :environment topology-worker-environment)
     ))

;; 若是dispatch函數的返回值爲關鍵字:local,即storm集羣運行在本地模式下,則執行該方法      
( defmethod launch-worker
    :local [ supervisor storm-id port worker-id ]
   ( let [ conf ( :conf supervisor)
          pid ( uuid)
          worker ( worker/mk-worker conf
                                  ( :shared-context supervisor)
                                  storm-id
                                  ( :assignment-id supervisor)
                                  port
                                  worker-id )]
     ( psim/register-process pid worker)
     ( swap! ( :worker-thread-pids-atom supervisor) assoc worker-id pid)
     ))

jlp函數定義以下:緩存

;; stormroot綁定supervisor本地路徑"{storm.local.dir}/supervisor/stormdist/{storm-id}",conf綁定集羣配置
( defn jlp [ stormroot conf ]
  ;; resource-root綁定supervisor本地路徑"{storm.local.dir}/supervisor/stormdist/{storm-id}/resources"
 ( let [ resource-root ( str stormroot File/separator RESOURCES-SUBDIR)
          ;; os綁定supervisor服務器的操做系統名
        os ( clojure.string/replace ( System/getProperty "os.name") # "\s+" "_")
        ;; arch綁定操做系統的架構,如"x86"和"i386"
        arch ( System/getProperty "os.arch")
        ;; arch-resource-root綁定路徑"{storm.local.dir}/supervisor/stormdist/{storm-id}/resources/{os}-{arch}"
        arch-resource-root ( str resource-root File/separator os "-" arch )]
    ;; 返回"{storm.local.dir}/supervisor/stormdist/{storm-id}/resources/{os}-{arch}:{storm.local.dir}/supervisor/stormdist/{storm-id}/resources:{java.library.path}"
   ( str arch-resource-root File/pathSeparator resource-root File/pathSeparator ( conf JAVA-LIBRARY-PATH))))

read-supervisor-storm-conf函數定義以下:安全

;; 從supervisor本地路徑"{storm.local.dir}/supervisor/stormdist/stormconf.ser"讀取topology運行配置信息
( defn read-supervisor-storm-conf
  [ conf storm-id ]
  ;; stormroot綁定目錄路徑"{storm.local.dir}/supervisor/stormdist"
 ( let [ stormroot ( supervisor-stormdist-root conf storm-id)
        ;; conf-path綁定文件路徑"{storm.local.dir}/supervisor/stormdist/stormconf.ser"
        conf-path ( supervisor-stormconf-path stormroot)
        ;; topology-path綁定文件路徑"{storm.local.dir}/supervisor/stormdist/stormcode.ser"
        topology-path ( supervisor-stormcode-path stormroot )]
    ;; 返回集羣配置信息和topology配置信息合併後的配置信息map
   ( merge conf ( Utils/deserialize ( FileUtils/readFileToByteArray ( File. conf-path))))
   ))

backtype.storm.daemon.worker類定義在worker.clj文件中,經過:gen-class生成一個lava類,其main方法以下:服務器

( defn -main [ storm-id assignment-id port-str worker-id ]  
  ;; 讀取storm集羣配置信息
 ( let [ conf ( read-storm-config )]
    ;; 驗證配置信息
   ( validate-distributed-mode! conf)
    ;; 調用mk-worker函數,mk-worker函數請參見其定義部分
   ( mk-worker conf nil storm-id assignment-id ( Integer/parseInt port-str) worker-id)))

mk-worker函數:架構

;; conf綁定集羣配置信息,shared-mq-context綁定共享mq,storm-id標識topology-id,assignment-id標識supervisor-id
( defserverfn mk-worker [ conf shared-mq-context storm-id assignment-id port worker-id ]
 ( log-message "Launching worker for " storm-id " on " assignment-id ":" port " with id " worker-id
              " and conf " conf)
  ;; 若是storm不是"本地模式"運行(即"分佈式模式"運行),則將標準輸入輸出流重定向到slf4j
 ( if-not ( local-mode? conf)
   ( redirect-stdio-to-slf4j!))
  ;; because in local mode, its not a separate
  ;; process. supervisor will register it in this case
  ;; 若是storm是"分佈式模式"運行,則在supervisor服務器本地建立文件"{storm.local.dir}/workers/{worker-id}/pids/{process-pid}",process-pid函數主要功能就是獲取jvm進程的id
  ;; 須要特別注意的是worker-id是咱們人爲分配給該進程的一個標識,建立進程時,咱們沒法指定一個jvm進程的id,進程id是由操做系統分配的,因此咱們須要獲取該進程的實際id,並將咱們指定的worker-id與進程id進行關聯
 ( when ( = :distributed ( cluster-mode conf))
   ( touch ( worker-pid-path conf worker-id ( process-pid))))
  ;; worker綁定該進程的"元數據",worker-data函數的主要功能就是生成進程的"元數據",worker-data函數請參見其定義部分
 ( let [ worker ( worker-data conf shared-mq-context storm-id assignment-id port worker-id)
              ;; heartbeat-fn綁定一個匿名函數,該匿名函數的功能就是生成worker"本地心跳信息",這裏至關定義了heartbeat-fn函數,do-heartbeat函數請參見其定義部分
        heartbeat-fn #( do-heartbeat worker)

        ;; do this here so that the worker process dies if this fails
        ;; it's important that worker heartbeat to supervisor ASAP when launching so that the supervisor knows it's running (and can move on)
        ;; 調用heartbeat-fn函數將worker進程心跳信息保存到本地LocalState對象中
        _ ( heartbeat-fn)
        ;; 定義一個原子類型的引用executors
        executors ( atom nil)
        ;; launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
        ;; to the supervisor
        ;; 將heartbeat-fn函數添加到定時器heartbeat-timer中,延遲執行時間爲0s,每隔WORKER-HEARTBEAT-FREQUENCY-SECS執行一次
        _ ( schedule-recurring ( :heartbeat-timer worker) 0 ( conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn)
        ;; 將#(do-executor-heartbeats worker :executors @executors)函數添加到定時器executor-heartbeat-timer中,延遲執行時間爲0s,每隔TASK-HEARTBEAT-FREQUENCY-SECS執行一次
        ;; 這樣就能夠將worker進程心跳信息同步到zookeeper中, 以便nimbus能夠馬上知道worker進程已經啓動,do-executor-heartbeats函數請參見其定義部分
        _ ( schedule-recurring ( :executor-heartbeat-timer worker) 0 ( conf TASK-HEARTBEAT-FREQUENCY-SECS) #( do-executor-heartbeats worker :executors @ executors))

        ;; 更新發送connections,mk-refresh-connections函數請參見其定義部分
        refresh-connections ( mk-refresh-connections worker)
          ;; 主動調用refresh-connections函數refresh該worker進程所擁有的connections,而且不向zookeeper註冊回調函數
        _ ( refresh-connections nil)
          ;; 調用refresh-storm-active函數refresh該worker進程緩存的所屬topology的活躍狀態,refresh-storm-active函數請其參見定義部分
        _ ( refresh-storm-active worker nil)
          ;; 調用mk-executor函數生成executor對象,保存到executors集合中。關於executor對象的建立將會在之後文章中具體分析
        _ ( reset! executors ( dofor [ e ( :executors worker )] ( executor/mk-executor worker e)))
        ;; 啓動worker進程專有的接收線程,將數據從worker進程的偵聽端口,不停的放到task對應的接收隊列,receive-thread-shutdown綁定該接收線程的關閉函數。launch-receive-thread函數請參見其定義部分
        receive-thread-shutdown ( launch-receive-thread worker)
       
        ;; 定義event handler來處理transfer queue裏面的數據。關於消息處理的流程會在之後文章中具體分析
        transfer-tuples ( mk-transfer-tuples-handler worker)
       
        ;; 建立transfer-thread。關於消息處理的流程會在之後文章中具體分析
        transfer-thread ( disruptor/consume-loop* ( :transfer-queue worker) transfer-tuples)
        ;; 定義worker進程關閉回調函數,當關閉worker進程時調用該函數釋放worker進程所佔有的資源
        shutdown* ( fn []
                   ( log-message "Shutting down worker " storm-id " " assignment-id " " port)
                    ;; 關閉該worker進程到其餘worker進程的鏈接
                   ( doseq [[ _ socket ] @( :cached-node+port->socket worker )]
                      ;; this will do best effort flushing since the linger period
                      ;; was set on creation
                     ( .close socket))
                   ( log-message "Shutting down receive thread")
                    ;; 調用receive-thread-shutdown函數關閉該worker進程的接收線程
                   ( receive-thread-shutdown)
                   ( log-message "Shut down receive thread")
                   ( log-message "Terminating messaging context")
                   ( log-message "Shutting down executors")
                    ;; 關閉該worker進程所擁有的executor
                   ( doseq [ executor @ executors ] ( .shutdown executor))
                   ( log-message "Shut down executors")
                                       
                    ;;this is fine because the only time this is shared is when it's a local context,
                    ;;in which case it's a noop
                    ;; 關閉該worker進程所擁有的backtype.storm.messaging.netty.Context實例
                   ( .term ^ IContext ( :mq-context worker))
                   ( log-message "Shutting down transfer thread")
                    ;; 關閉transfer-queue
                   ( disruptor/halt-with-interrupt! ( :transfer-queue worker))
                                        ;; 中斷transfer-thread
                   ( .interrupt transfer-thread)
                    ;; 等待transfer-thread結束
                   ( .join transfer-thread)
                   ( log-message "Shut down transfer thread")
                    ;; 調用cancel-timer函數中斷heartbeat-timer定時器線程
                   ( cancel-timer ( :heartbeat-timer worker))
                    ;; 調用cancel-timer函數中斷refresh-connections-timer定時器線程
                   ( cancel-timer ( :refresh-connections-timer worker))
                    ;; 調用cancel-timer函數中斷refresh-active-timer定時器線程
                   ( cancel-timer ( :refresh-active-timer worker))
                    ;; 調用cancel-timer函數中斷executor-heartbeat-timer定時器線程
                   ( cancel-timer ( :executor-heartbeat-timer worker))
                    ;; 調用cancel-timer函數中斷user-timer定時器線程
                   ( cancel-timer ( :user-timer worker))
                   
                    ;; 關閉該worker進程所擁有的線程池
                   ( close-resources worker)
                   
                    ;; TODO: here need to invoke the "shutdown" method of WorkerHook
                   
                    ;; 調用StormClusterState實例的remove-worker-heartbeat!函數從zookeeper上刪除worker心跳信息
                   ( .remove-worker-heartbeat! ( :storm-cluster-state worker) storm-id assignment-id port)
                   ( log-message "Disconnecting from storm cluster state context")
                    ;; 關閉zookeeper鏈接
                   ( .disconnect ( :storm-cluster-state worker))
                   ( .close ( :cluster-state worker))
                   ( log-message "Shut down worker " storm-id " " assignment-id " " port))
        ;; ret實現了Shutdownable和DaemonCommon協議
        ret ( reify
            Shutdownable
            ( shutdown
              [ this ]
             ( shutdown*))
            DaemonCommon
            ( waiting? [ this ]
              ( and
                ( timer-waiting? ( :heartbeat-timer worker))
                ( timer-waiting? ( :refresh-connections-timer worker))
                ( timer-waiting? ( :refresh-active-timer worker))
                ( timer-waiting? ( :executor-heartbeat-timer worker))
                ( timer-waiting? ( :user-timer worker))
                ))
            )]
   
    ;; 將refresh-connections函數添加到定時器refresh-connections-timer中,每隔TASK-REFRESH-POLL-SECS執行一次。refresh-connections函數的無參版本提供一個默認回調函數調用其有參版原本更新所屬           worker進程所擁有的collections,默認回調函數就是再次將refresh-connections函數無參版本添加到定時器refresh-connections-timer中
    ;; 這樣只要zookeeper上分配信息發生變化,refresh-connections函數的有參版本就會執行,這裏之因此週期執行refresh-connections函數是以防zookeeper的"watcher機制"失效
   ( schedule-recurring ( :refresh-connections-timer worker) 0 ( conf TASK-REFRESH-POLL-SECS) refresh-connections)
    ;; 將函數(partial refresh-storm-active worker)添加到定時器refresh-active-timer中,每隔TASK-REFRESH-POLL-SECS執行一次。refresh-storm-active函數的執行邏輯與refresh-connections函數徹底相      同
   ( schedule-recurring ( :refresh-active-timer worker) 0 ( conf TASK-REFRESH-POLL-SECS) ( partial refresh-storm-active worker))

   ( log-message "Worker has topology config " ( :storm-conf worker))
   ( log-message "Worker " worker-id " for storm " storm-id " on " assignment-id ":" port " has finished loading")
    ;; 返回實現了Shutdownable協議和DaemonCommon協議的實例ret,經過ret咱們能夠關閉worker進程
    ret
   ))

worker-data函數:app

;; worker-data函數生成進程的"元數據"
( defn worker-data [ conf mq-context storm-id assignment-id port worker-id ]
  ;; 爲該進程生成ClusterState實例
 ( let [ cluster-state ( cluster/mk-distributed-cluster-state conf)
        ;; 爲該進程生成StormClusterState實例,這樣進程就能夠經過StormClusterState與zookeeper進行交互了
        storm-cluster-state ( cluster/mk-storm-cluster-state cluster-state)
        ;; 調用read-supervisor-storm-conf函數讀取storm-id的配置信息,read-supervisor-storm-conf函數請參見其定義部分
        storm-conf ( read-supervisor-storm-conf conf storm-id)
        ;; executors綁定分配給該進程的executor的id集合,包含system executor的id
        executors ( set ( read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
        ;; 進程內executor間通訊是經過disruptor實現的,因此這裏爲該worker建立了一個名爲"worker-transfer-queue"的disruptor queue,關於disruptor的內容會在之後詳細介紹
        ;; 注意transfer-queue是worker相關的,與executor無關
        transfer-queue ( disruptor/disruptor-queue "worker-transfer-queue" ( storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
                                                  :wait-strategy ( storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
        ;; mk-receive-queue-map函數爲每一個executor建立一個名爲"receive-queue{executor-id}"的disruptor queue,executor-receive-queue-map綁定executor-id->"disruptor接收queue"的map  
        ;; 注意executor-receive-queue-map是executor相關,與worker無關                                    
        executor-receive-queue-map ( mk-receive-queue-map storm-conf executors)
        ;; executor可能有多個tasks,相同executor的tasks共用一個"disruptor接收queue",將executor-id->"disruptor接收queue"的map轉化爲task-id->"disruptor接收queue"的map,
        ;; 如executor-receive-queue-map={[1 2] receive-queue[1 2], [3 4] receive-queue[3 4]},那麼receive-queue-map={1 receive-queue[1 2], 2 receive-queue[1 2], 3 receive-queue[3             4], 4 receive-queue[3 4]}
        receive-queue-map ( ->> executor-receive-queue-map
                              ( mapcat ( fn [[ e queue ]] ( for [ t ( executor-id->tasks e )] [ t queue ])))
                              ( into {}))
                ;; 調用read-supervisor-topology函數從supervisor本地路徑"{storm.local.dir}/supervisor/stormdist/stormcode.ser"讀取topology對象的序列化文件
        topology ( read-supervisor-topology conf storm-id )]
    ;; recursive-map宏會將下面value都執行一遍,用返回值和key生成新的map做爲worker的"元數據",recursive-map宏見其定義部分
   ( recursive-map
      ;; 保存集羣配置信息
      :conf conf
      ;; 保存一個傳輸層實例用於worker進程間消息傳遞,storm傳輸層被定義成了"可插拔式"插件,經過實現backtype.storm.messaging.IContext接口就能夠定義本身的消息傳輸層。storm 0.8.x默認傳輸層實例是             backtype.storm.messaging.zmq,可是因爲
      ;; 1.ZeroMQ是一個本地化的消息庫,它過分依賴操做系統環境,並且ZeroMQ使用的是"堆外內存",沒法使用jvm相關的內存監控工具進行監控管理,存在"堆外內存"泄漏風險
      ;; 2.安裝起來比較麻煩
      ;; 3.ZeroMQ的穩定性在不一樣版本之間差別巨大,而且目前只有2.1.7版本的ZeroMQ能與Storm協調的工做。
      ;; 因此storm 0.9以後默認傳出層實例爲backtype.storm.messaging.netty.Context,Netty有以下優勢:
      ;; 1.平臺隔離,Netty是一個純Java實現的消息隊列,能夠幫助Storm實現更好的跨平臺特性,同時基於JVM的實現可讓咱們對消息有更好的控制,由於Netty使用jvm的堆內存,而不是堆外內存
      ;; 2.高性能,Netty的性能要比ZeroMQ快兩倍左右
      ;; 3. 安全性認證,使得咱們未來要作的worker進程之間的認證受權機制成爲可能。
      :mq-context ( if mq-context
                      mq-context
                     ( TransportFactory/makeContext storm-conf))
      ;; 記錄所屬storm-id
      :storm-id storm-id
      ;; 記錄所屬supervisor-id
      :assignment-id assignment-id
      ;; 記錄端口
      :port port
      ;; 記錄咱們分配給該進程的worker-id
      :worker-id worker-id
      ;; 記錄ClusterState實例
      :cluster-state cluster-state
      ;; 記錄StormClusterState實例,以便worker進程與zookeeper進行交互
      :storm-cluster-state storm-cluster-state
      ;; 記錄topology的當前活躍狀態爲false
      :storm-active-atom ( atom false)
      ;; 記錄分佈在該worker進程上的executors的id
      :executors executors
      ;; 記錄排序後的分佈在該worker進程上的tasks的id
      :task-ids ( ->> receive-queue-map keys ( map int) sort)
      ;; 記錄該topology的配置信息
      :storm-conf storm-conf
      ;; 記錄topology實例
      :topology topology
      ;; 記錄添加了acker,system bolt,metric bolt後的topology實例
      :system-topology ( system-topology! storm-conf topology)
      ;; 記錄一個名爲"heartbeat-timer"的定時器
      :heartbeat-timer ( mk-halting-timer "heartbeat-timer")
      ;; 記錄一個名爲"refresh-connections-timer"的定時器
      :refresh-connections-timer ( mk-halting-timer "refresh-connections-timer")
      ;; 記錄一個名爲"refresh-active-timer"的定時器
      :refresh-active-timer ( mk-halting-timer "refresh-active-timer")
      ;; 記錄一個名爲"executor-heartbeat-timer"的定時器
      :executor-heartbeat-timer ( mk-halting-timer "executor-heartbeat-timer")
      ;; 記錄一個名爲"user-timer"的定時器
      :user-timer ( mk-halting-timer "user-timer")
      ;; 記錄任務id->組件名稱鍵值對的map,形如:{1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"},storm-task-info函數請參見其定義部分
      :task->component ( HashMap. ( storm-task-info topology storm-conf)) ; for optimized access when used in tasks later on
      ;; 記錄"組件名稱"->"stream_id->輸出域Fields對象的map"的map,component->stream->fields函數請參見其定義部分
      :component->stream->fields ( component->stream->fields ( :system-topology <>))
      ;; 記錄"組件名稱"->排序後task-id集合的map,形如:{"boltA" [1 2 3 4], "boltB" [5 6]}
      :component->sorted-tasks ( ->> ( :task->component <>) reverse-map ( map-val sort))
      ;; 記錄一個ReentrantReadWriteLock對象
      :endpoint-socket-lock ( mk-rw-lock)
      ;; 記錄一個node+port->socket的原子類型的map
      :cached-node+port->socket ( atom {})
      ;; 記錄一個task->node+port的原子類型的map
      :cached-task->node+port ( atom {})
      ;; 記錄該worker進程的傳輸隊列transfer-queue
      :transfer-queue transfer-queue
      ;; 記錄executor接收隊列executor-receive-queue-map
      :executor-receive-queue-map executor-receive-queue-map
      ;; 記錄executor中"開始任務id"->executor接收queue的map,如executor-receive-queue-map={[1 2] receive-queue[1 2], [3 4] receive-queue[3 4]},那麼short-executor-receive-queue-map={1 receive-queue[1 2], 3 receive-queue[3 4]}
      :short-executor-receive-queue-map ( map-key first executor-receive-queue-map)
      ;; 記錄task_id->executor中"開始任務id"的map,如executors=#{[1 2] [3 4] [5 6]},task->short-executor={1 1, 2 1, 3 3, 4 3, 5 5, 6 5}
      :task->short-executor ( ->> executors
                                ( mapcat ( fn [ e ] ( for [ t ( executor-id->tasks e )] [ t ( first e )])))
                                ( into {})
                                ( HashMap.))
      ;; 記錄一個能夠終止該worker進程的"自殺函數"
      :suicide-fn ( mk-suicide-fn conf)
      ;; 記錄一個能夠計算該worker進程啓動了多長時間的函數
      :uptime ( uptime-computer)
      ;; 爲該worker進程生成一個線程池
      :default-shared-resources ( mk-default-resources <>)
      ;; mk-user-resources函數目前版本爲空實現
      :user-shared-resources ( mk-user-resources <>)
      ;; 記錄一個函數,該函數的主要功能就是接收messages並將message發送到task對應的接收隊列,mk-transfer-local-fn函數請參見其定義部分
      :transfer-local-fn ( mk-transfer-local-fn <>)
      ;; 記錄每一個worker進程特有的接收線程的個數
      :receiver-thread-count ( get storm-conf WORKER-RECEIVER-THREAD-COUNT)
      ;; 將executor處理過的message放到worker進程發送隊列transfer-queue中,mk-transfer-fn函數請參見其定義部分
      :transfer-fn ( <>)
     )))

read-worker-executors函數:jvm

;; read-worker-executors函數用於讀取分佈在該進程上的executor信息
( defn read-worker-executors [ storm-conf storm-cluster-state storm-id assignment-id port ]
  ;; assignment綁定executor->node+port的map,調用StormClusterState實例的assignment-info函數從zookeeper上讀取storm-id的分配信息AssignmentInfo實例
  ;; AssignmentInfo定義以下:(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs])
 ( let [ assignment ( :executor->node+port ( .assignment-info storm-cluster-state storm-id nil ))]
    ;; 返回分配給該進程的executor的id集合,包含system executor的id
   ( doall
    ;; 將system executor的id和topology executor的id合併
    ( concat
      ;; system executor的id,[-1 -1]    
      [ Constants/SYSTEM_EXECUTOR_ID ]
      ;; 從分配信息assignment中獲取分配給該進程的executor
     ( mapcat ( fn [[ executor loc ]]
               ( if ( = loc [ assignment-id port ])
                  [ executor ]
                 ))
              assignment)))))

mk-receive-queue-map函數:

;; mk-receive-queue-map函數爲每一個executor建立一個名爲"receive-queue{executor-id}"的disruptor queue,如"receive-queue[1 3]",並返回executor-id->receive-queue的map
( defn- mk-receive-queue-map [ storm-conf executors ]
  ;; executors標識了executor-id集合
 ( ->> executors
      ;; TODO: this depends on the type of executor
      ;; 經過調用map函數爲每一個executor-id建立一個"disruptor接收queue"
      ( map ( fn [ e ] [ e ( disruptor/disruptor-queue ( str "receive-queue" e)
                                                 ( storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
                                                  :wait-strategy ( storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY ))]))
      ;; 返回executor-id->receive-queue的map                                          
      ( into {})
      ))

storm-task-info函數:

( defn storm-task-info
  "Returns map from task -> component id"
  [ ^ StormTopology user-topology storm-conf ]
 ( ->> ( system-topology! storm-conf user-topology)
      ;; 獲取組件名稱->組件對象鍵值對的map
      all-components
      ;; 返回組件名稱->組件任務數鍵值對的map,如{"boltA" 4, "boltB" 2}
      ( map-val ( comp #( get % TOPOLOGY-TASKS) component-conf))
      ;; 按照組件名稱對map進行排序返回結果序列,如(["boltA" 4] ["boltB" 2])
      ( sort-by first)
      ;; mapcat函數等價於對(map (fn...))的返回結果執行concat函數,返回("boltA" "boltA" "boltA" "boltA" "boltB" "boltB")
      ( mapcat ( fn [[ c num-tasks ]] ( repeat num-tasks c)))
      ;; {1 "boltA", 2 "boltA",3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"}
      ( map ( fn [ id comp ] [ id comp ]) ( iterate ( comp int inc) ( int 1)))
      ( into {})
      ))

component->stream->fields函數:

( defn component->stream->fields [ ^ StormTopology topology ]
  ;; 調用ThriftTopologyUtils/getComponentIds方法獲取topology全部組件名稱集合,如#{"boltA", "boltB", "boltC"}
 ( ->> ( ThriftTopologyUtils/getComponentIds topology)
          ;; 獲取每一個組件的stream_id->StreamInfo對象的map,stream->fields函數請參見其定義部分
      ( map ( fn [ c ] [ c ( stream->fields topology c )]))
      ;; 生成"組件名稱"->"stream_id->輸出域Fields對象的map"的map
      ( into {})
      ;; 將其轉化成Java的HashMap
      ( HashMap.)))

stream->fields函數:

( defn- stream->fields [ ^ StormTopology topology component ]
  ;; 獲取指定組件名的ComponentCommon對象
 ( ->> ( ThriftTopologyUtils/getComponentCommon topology component)
        ;; 調用ComponentCommon對象的get_streams方法獲取stream_id->StreamInfo對象的map,一個組件能夠有多個輸出流
      .get_streams
      ;; s綁定stream_id,info綁定StremInfo對象,調用StreamInfo對象的get_output_fields獲取輸出域List<String>對象,再用輸出域List<String>對象生成Fields對象
      ( map ( fn [[s info ]] [s ( Fields. ( .get_output_fields info ))]))
      ;; 生成stream_id->Fields對象的map
      ( into {})
      ;; 將clojure結構的map轉換成java中的HashMap
      ( HashMap.)))

mk-transfer-local-fn函數:

;; mk-transfer-local-fn函數返回一個匿名函數,該匿名函數的主要功能就是接收messages並將message發送到task對應的接收隊列    
( defn mk-transfer-local-fn [ worker ]
  ;; short-executor-receive-queue-map綁定"開始任務id"->executor接收queue的map,如:{1 receive-queue[1 2], 3 receive-queue[3 4]}
 ( let [ short-executor-receive-queue-map ( :short-executor-receive-queue-map worker)
        ;; task->short-executor綁定task_id->executor中"開始任務id"的map,如:{1 1, 2 1, 3 3, 4 3}
        task->short-executor ( :task->short-executor worker)
        ;; task-getter綁定一個由comp生成的組合函數
        task-getter ( comp #( get task->short-executor %) fast-first )]
    ;; 返回一個匿名函數,tuple-batch是一個ArrayList對象,ArrayList的每一個元素都是一個長度爲2的數組[task_id, message],task_id表示該消息由哪一個task處理,message表示消息
   ( fn [ tuple-batch ]
      ;; 調用fast-group-by函數獲取"executor簡寫id"->須要該executor處理的消息List的map
     ( let [ grouped ( fast-group-by task-getter tuple-batch )]
        ;; fast-map-iters宏主要用於遍歷map,short-executor標識"executor簡寫id",pairs標識消息[task_id, message]
       ( fast-map-iter [[ short-executor pairs ] grouped ]
          ;; 獲取該executor的接收queue
         ( let [ q ( short-executor-receive-queue-map short-executor )]
            ;; 若是q不爲空,則調用disruptor的publish方法將消息放入disruptor中
           ( if q
             ( disruptor/publish q pairs)
             ( log-warn "Received invalid messages for unknown tasks. Dropping... ")
             )))))))

fast-group-by函數:

;; fast-group-by函數的主要功能就是生成"executor簡寫id"->須要該executor處理的消息List的map            
( defn fast-group-by
  ;; afn綁定mk-transfer-local-fn函數中定義的task-getter函數,alist綁定一個ArrayList對象,ArrayList的每一個元素都是一個長度爲2的數組[task_id, message],task_id表示該消息由哪一個task處理,message表示消息
  [ afn alist ]
  ;; 建立一個HashMap對象ret
 ( let [ ret ( HashMap. )]
    ;; fast-list-iter是一個宏,主要功能就是遍歷list
   ( fast-list-iter
      ;; e綁定每一個[task_id, message]數組對象
      [ e alist ]
      ;; 調用afn綁定的task-getter函數獲取該task_id所屬的"executor的簡寫id",因此key綁定"executor簡寫id"
     ( let [ key ( afn e)
              ;; 從ret中獲取key所對應的ArrayList對象,即須要該executor處理的消息列表
            ^ List curr ( get-with-default ret key ( ArrayList. ))]
        ;; [task_id, message]數組對象添加到list中
       ( .add curr e)))
    ;; 返回ret
    ret))

mk-transfer-fn函數:

;; mk-transfer-fn函數主要功能就是將executor處理過的message放到worker進程發送隊列transfer-queue中
( defn mk-transfer-fn [ worker ]
  ;; local-tasks綁定分佈在該worker進程上的task的id集合
 ( let [ local-tasks ( -> worker :task-ids set)
        ;; local-transfer標識mk-transfer-local-fn返回的匿名函數
        local-transfer ( :transfer-local-fn worker)
        ;; transfer-queue綁定該worker進程的傳輸隊列transfer-queue
        ^ DisruptorQueue transfer-queue ( :transfer-queue worker)
        ;; task->node+port綁定task_id->node+port的map
        task->node+port ( :cached-task->node+port worker )]
    ;; 返回一個匿名函數,serializer標識一個Kryo序列化器,tuple-batch是一個ArrayList對象,ArrayList的每一個元素都是一個長度爲2的數組[task_id, message],task_id表示該消息由哪一個task處理,即message的目標task,message表示消息
   ( fn [ ^ KryoTupleSerializer serializer tuple-batch ]
      ;; local爲ArrayList
     ( let [ local ( ArrayList.)
            ;; remoteMap爲HashMap
            remoteMap ( HashMap. )]
        ;; 遍歷tuple-batch
       ( fast-list-iter [[ task tuple :as pair ] tuple-batch ]
          ;; 若是接收該消息的task爲本地task,即該task也分佈在該worker進程上,那麼將該消息添加到local中
         ( if ( local-tasks task)
           ( .add local pair)
           
            ;;Using java objects directly to avoid performance issues in java code
            ;; 不然說明接收該消息的task不是本地task,即該task分佈在其餘worker進程上;node+port標識了運行該task的worker進程所在的節點和端口
           ( let [ node+port ( get @ task->node+port task )]
              ;; 若是remoteMap不包含node+port,則添加
             ( when ( not ( .get remoteMap node+port))
               ( .put remoteMap node+port ( ArrayList.)))
             ( let [ remote ( .get remoteMap node+port )]
                ;; 首先用task_id和序列化後的tuple生成TaskMessage對象,而後將TaskMessage對象添加到ArrayList中
               ( .add remote ( TaskMessage. task ( .serialize serializer tuple)))
                ))))
        ;; 調用local-transfer函數發送須要本地task處理的消息
       ( local-transfer local)
        ;; 調用disruptor的publish方法將remoteMap放入worker進程的傳輸隊列transfer-queue中,remoteMap的key爲node+port,value爲ArrayList,ArrayList中每一個元素都是須要node+port所對應的worker進行處理
       ( disruptor/publish transfer-queue remoteMap)
         ))))

do-heartbeat函數:

( defn do-heartbeat [ worker ]
  ;; 獲取集羣配置信息
 ( let [ conf ( :conf worker)
        ;; 建立WorkerHeartbeat對象
        hb ( WorkerHeartbeat.
            ;; 本次心跳時間
            ( current-time-secs)
            ;; 該worker進程所屬的topology-id
            ( :storm-id worker)
            ;; 分佈在該worker進程上的executor-id集合
            ( :executors worker)
            ;; 該worker進程所佔用的端口
            ( :port worker))
        ;; 建立一個基於目錄"{storm.local.dir}/workers/{worker-id}/heartbeats"的LocalState對象,用於存放worker進程的"本地心跳信息",經過LocalState對象咱們能夠訪問一個序列化到磁盤的map對象
        state ( worker-state conf ( :worker-id worker ))]
   ( log-debug "Doing heartbeat " ( pr-str hb))
    ;; do the local-file-system heartbeat.
    ;; 將worker進程心跳信息經過LocalState對象存入磁盤,map對象的key爲"worker-heartbeat"字符串,value爲worker心跳信息
   ( .put state
        LS-WORKER-HEARTBEAT
        hb
        false
       )
    ;; 調用LocalState對象的clearup方法,只保留最近60次心跳信息
   ( .cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
                        ; it shouldn't take supervisor 120 seconds between listing dir and reading it

   ))

do-executor-heartbeats函數:

;; do-executor-heartbeats函數主要功能就是經過worker-heartbeat!函數將worker進程心跳信息寫入zookeeper的workerbeats節點中
( defnk do-executor-heartbeats [ worker :executors nil ]
  ;; stats is how we know what executors are assigned to this worker
  ;; stats綁定executor對象->executor統計信息的map。當第一次調用do-executor-heartbeats函數時,即第一次心跳時,executors爲nil,map形如:{executor_1 nil, executor_2 nil, ... }
  ;; 當再次心跳時,將會調用executor對象的get-executor-id函數和render-stats函數,獲取executor_id->executor統計信息的map,因此stats綁定的map在第一次心跳時和再次心跳時是不一樣的,有關executor統計信  息的計算會在之後文章中具體分析。
 ( let [ stats ( if-not executors
                 ( into {} ( map ( fn [ e ] { e nil }) ( :executors worker)))
                 ( ->> executors
                   ( map ( fn [ e ] {( executor/get-executor-id e) ( executor/render-stats e )}))
                   ( apply merge)))
        ;; 構建worker進程的心跳信息
        zk-hb { :storm-id ( :storm-id worker)
              ;; 記錄executor統計信息
              :executor-stats stats
              ;; 記錄worker進程運行了屢次時間
              :uptime (( :uptime worker))
              ;; 記錄worker進程心跳時間
              :time-secs ( current-time-secs)
              }]
    ;; do the zookeeper heartbeat
    ;; 調用StormClusterState對象的worker-heartbeat!函數將worker進程心跳信息zk-hb同步到zookeeper的"/workerbeats/{topology-id}/{supervisorId-port}/"節點中
   ( .worker-heartbeat! ( :storm-cluster-state worker) ( :storm-id worker) ( :assignment-id worker) ( :port worker) zk-hb)    
   ))

mk-refresh-connections函數:

;; mk-refresh-connections函數返回一個名爲this的函數,在"storm啓動supervisor源碼分析-supervisor.clj"中,咱們在mk-synchronize-supervisor函數也見過這種定義函數的方式,是由於這個函數自己要在函數體內被使用。
;; 而且refresh-connections是須要反覆被執行的,即當每次assignment-info發生變化的時候,就須要refresh一次,這裏是經過zookeeper的"watcher機制"實現的
( defn mk-refresh-connections [ worker ]
  ;; outbound-tasks綁定用於接收該worker進程輸出消息的全部任務,worker-outbound-tasks函數請參見其定義部分
 ( let [ outbound-tasks ( worker-outbound-tasks worker)
        ;; conf綁定worker配置信息
        conf ( :conf worker)
        ;; storm-cluster-state綁定StormClusterState實例
        storm-cluster-state ( :storm-cluster-state worker)
        ;; storm-id標識該worker進程所屬的topology的id
        storm-id ( :storm-id worker )]
    ;; 返回名稱爲this的函數,每次assignment-info發生變化時,就執行一次來refresh該worker進程的connections
   ( fn this
      ;; 無參版本,提供一個"默認回調函數"調用有參版本,"默認回調函數"就是將this函數無參版本自己添加到worker進程的refresh-connections-timer定時器中,這樣當assignment-info發生變化時,zookeeper的"watcher機制"
      ;; 就會執行回調函數,refresh-connections-timer定時器線程將會執行this函數。這樣就能夠保證,每次assignment發生變化,定時器都會在後臺作refresh-connections的操做
      ([]
       ( this ( fn [ & ignored ] ( schedule ( :refresh-connections-timer worker) 0 this))))
      ;; 有參版本
      ([ callback ]
        ;; 調用StormClusterState實例的assignment-version函數獲取storm-id的當前分配信息版本,並將callback函數註冊到zookeeper
        ( let [ version ( .assignment-version storm-cluster-state storm-id callback)
              ;; 若是worker本地緩存的分配版本和zookeeper上獲取的分配版本相等,那麼說明storm-id的分配信息未發生變化,直接從worker本地獲取分配信息
              assignment ( if ( = version ( :version ( get @( :assignment-versions worker) storm-id)))
                           ( :data ( get @( :assignment-versions worker) storm-id))
                            ;; 不然調用assignment-info-with-version函數從zookeeper的"/assignments/{storm-id}"節點從新獲取帶有版本號的分配信息,並註冊回調函數,這樣worker就能感知某個已存在的assignment是否被從新分配
                           ( let [ new-assignment ( .assignment-info-with-version storm-cluster-state storm-id callback )]
                              ;; 將最近分配信息保存到worker本地緩存
                             ( swap! ( :assignment-versions worker) assoc storm-id new-assignment)
                             ( :data new-assignment)))
              ;; my-assignment標識"接收該worker進程輸出消息的任務"->[node port]的map
              my-assignment ( -> assignment
                                                  ;; 獲取executor_id->[node port]的map,如:{[1 1] [node1 port1], [4 4] [node1 port1], [2 2] [node2 port1], [5 5] [node2 port1], [3 3] [node3 port1], [6 6] [node3 port1]}
                                :executor->node+port
                                ;; 獲取task_id->[node port]的map,如:{[1 [node1 port1], 4 [node1 port1], 2 [node2 port1], 5 [node2 port1], 3 [node3 port1], 6 [node3 port1]}
                                to-task->node+port
                                ;; 選擇"鍵"包含在outbound-tasks集合的鍵值對,假設outbound-tasks=#{4 5 6},過濾後爲{4 [node1 port1], 5 [node2 port1], 6 [node3 port1]}
                               ( select-keys outbound-tasks)
                                ;; {4 "node1/port1", 5 "node2/port1", 6 "node3/port1"}
                               ( #( map-val endpoint->string %)))
              ;; we dont need a connection for the local tasks anymore
              ;; 過濾掉分佈在該worker進程上的task,由於分佈在通一個進程上不須要創建socket鏈接。假設該worker進程位於node1的port1上,則needed-assignment={5 "node2/port1", 6 "node3/port1"}
              needed-assignment ( ->> my-assignment
                                     ( filter-key ( complement ( -> worker :task-ids set))))
              ;; needed-connections綁定"須要的鏈接"的集合,needed-connections=#{"node2/port1", "node3/port1"}
              needed-connections ( -> needed-assignment vals set)
              ;; needed-tasks綁定須要創建鏈接的任務集合,needed-tasks=#{5, 6}
              needed-tasks ( -> needed-assignment keys)
             
              ;; current-connections綁定當前該worker進程"已創建的鏈接"的集合
              current-connections ( set ( keys @( :cached-node+port->socket worker)))
              ;; needed-connections和current-connections的差集表示須要"新建的鏈接"的集合,假設current-connections=#{},則new-connections=#{"node2/port1", "node3/port1"}
              new-connections ( set/difference needed-connections current-connections)
              ;; current-connections和needed-connections的差集表示須要"刪除的鏈接"的集合
              remove-connections ( set/difference current-connections needed-connections )]
              ;; 將新建的鏈接合併到cached-node+port->socket中
             ( swap! ( :cached-node+port->socket worker)
                    #( HashMap. ( merge ( into {} %1) %2))
                    ;; 建立endpoint-str->connection對象的map,即創建新的鏈接。如:{"node2/port1" connect1, "node3/port1" connect2}
                    ( into {}
                      ( dofor [ endpoint-str new-connections
                              :let [[ node port ] ( string->endpoint endpoint-str )]]
                        [ endpoint-str
                         ( .connect
                          ^ IContext ( :mq-context worker)
                          storm-id
                          (( :node->host assignment) node)
                          port)
                          ]
                        )))
              ;; 將my-assignment保存到worker進程本地緩存cached-task->node+port中
             ( write-locked ( :endpoint-socket-lock worker)
               ( reset! ( :cached-task->node+port worker)
                       ( HashMap. my-assignment)))
              ;; close須要"刪除的鏈接"
             ( doseq [ endpoint remove-connections ]
               ( .close ( get @( :cached-node+port->socket worker) endpoint)))
              ;; 將須要"刪除的鏈接"從worker進程本地緩存cached-node+port->socket中刪除,經過worker進程本地緩存cached-task->node+port和cached-node+port->socket,咱們就能夠或得task和socket的對應關係
             ( apply swap!
                    ( :cached-node+port->socket worker)
                    #( HashMap. ( apply dissoc ( into {} %1) % &))
                    remove-connections)
              ;; 查找出未創建鏈接的task
             ( let [ missing-tasks ( ->> needed-tasks
                                      ( filter ( complement my-assignment )))]
                ;; 若是存在未創建鏈接的task,則記錄日誌文件
               ( when-not ( empty? missing-tasks)
                 ( log-warn "Missing assignment for following tasks: " ( pr-str missing-tasks))
                 )))))))

worker-outbound-tasks函數:

;; worker-outbound-tasks函數主要功能就是獲取接收來自該worker消息的組件的task-id集合                
( defn worker-outbound-tasks
  "Returns seq of task-ids that receive messages from this worker"
  [ worker ]
  ;; context綁定backtype.storm.task.WorkerTopologyContext對象,worker-context函數請參見其定義部分
 ( let [ context ( worker-context worker)
        ;; 對分佈在該worker進程上的每一個任務的task_id調用匿名函數(fn [task-id] ... ),並對返回結果進行concat操做,components綁定了接收組件id的集合
        components ( mapcat
                    ( fn [ task-id ]
                      ;; 調用context的getComponentId方法獲取該task-id所屬的組件(spout/bolt)的名稱
                      ( ->> ( .getComponentId context ( int task-id))
                            ;; 調用context的getTargets方法,獲取哪些組件接收了componentId輸出的消息
                           ( .getTargets context)
                            vals
                            ;; 獲取接收組件id的集合
                           ( map keys)
                           ( apply concat)))
                    ;; 獲取分佈在該worker進程上的task_id集合
                    ( :task-ids worker ))]
   ( -> worker
        ;; 獲取任務id->組件名稱鍵值對的map,形如:{1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"}
        :task->component
        ;; 結果形如:{"boltA" [1 2 3 4], "boltB" [5 6]}
        reverse-map
        ;; 過濾出"鍵"包含在components集合中的鍵值對
       ( select-keys components)
        vals
        flatten
        ;; 獲取接收組件全部任務的id的集合
        set )))

worker-context函數:

( defn worker-context [ worker ]
  ;; 返回backtype.storm.task.WorkerTopologyContext對象
 ( WorkerTopologyContext. ( :system-topology worker)
                         ( :storm-conf worker)
                         ( :task->component worker)
                         ( :component->sorted-tasks worker)
                         ( :component->stream->fields worker)
                         ( :storm-id worker)
                         ( supervisor-storm-resources-path
                           ( supervisor-stormdist-root ( :conf worker) ( :storm-id worker)))
                         ( worker-pids-root ( :conf worker) ( :worker-id worker))
                         ( :port worker)
                         ( :task-ids worker)
                         ( :default-shared-resources worker)
                         ( :user-shared-resources worker)
                         ))

getTargets方法:

;; WorkerTopologyContext類繼承GeneralTopologyContext類,getTargets方法是GeneralTopologyContext類實例方法,主要功能就是獲取哪些組件接收了componentId輸出的消息
;; 返回值爲一個stream_id->{receive_component_id->Grouping}的map,receive_component_id就是接收組件的id              
public Map<String, Map<String, Grouping>> getTargets( String componentId) {
        ;; 建立返回結果map,ret
        Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
        ;; 獲取該topology的全部組件ids,並遍歷
        for( String otherComponentId : getComponentIds()) {
            ;; 經過組件id獲取組件的ComponentCommon對象,而後再獲取其輸入信息inputs
            Map<GlobalStreamId, Grouping> inputs = getComponentCommon( otherComponentId) .get_inputs();
            ;; 遍歷輸入信息,GlobalStreamId對象有兩個成員屬性,一個是流id,一個是發送該流的組件id
            for( GlobalStreamId id : inputs.keySet()) {
                ;; 若是輸入流的組件id和componentId相等,那麼說明該組件接收來自componentId的輸出,則將其添加到ret中
                if( id.get_componentId() .equals( componentId)) {
                    Map<String, Grouping> curr = ret.get( id.get_streamId());
                    if( curr==null) curr = new HashMap<String, Grouping>();
                    curr.put( otherComponentId, inputs.get( id));
                    ret.put( id.get_streamId(), curr);
                }
            }
        }
        return ret;
    }

refresh-storm-active函數:

;; refresh-storm-active函數主要功能就是refresh指定worker進程緩存的所屬topology的活躍狀態                
( defn refresh-storm-active
  ;; "無回調函數"版本,使用默認回調函數調用"有回調函數"版本,默認回調函數將refresh-storm-active函數自己添加到refresh-active-timer定時器
  ([ worker ]
   ( refresh-storm-active worker ( fn [ & ignored ] ( schedule ( :refresh-active-timer worker) 0 ( partial refresh-storm-active worker)))))
  ;; "有回調函數"版本
  ([ worker callback ]
    ;; 調用StormClusterState實例的storm-base函數,從zookeeper的"/storms/{storm-id}"節點獲取該topology的StormBase數據,並將回調函數callback註冊到zookeeper的"/storms/{storm-id}"節點
    ;; 這樣當該節點數據發生變化時,callback函數將被執行,即將refresh-storm-active函數添加到refresh-active-timer定時器,refresh-active-timer定時器線程將會執行refresh-storm-active函數
   ( let [ base ( .storm-base ( :storm-cluster-state worker) ( :storm-id worker) callback )]
    ;; 更新worker進程緩存的topology的活躍狀態
    ( reset!
     ( :storm-active-atom worker)
     ( = :active ( -> base :status :type))
     ))
    ))

launch-receive-thread函數:

;; 爲worker進程啓動專有接收線程  
( defn launch-receive-thread [ worker ]
 ( log-message "Launching receive-thread for " ( :assignment-id worker) ":" ( :port worker))
  ;; launch-receive-thread!函數請參見其定義部分
 ( msg-loader/launch-receive-thread!
    ;; 鏈接實例,0.9版本開始默認使用netty,backtype.storm.messaging.netty.Context實例
   ( :mq-context worker)
   ( :storm-id worker)
    ;; 接收線程數
   ( :receiver-thread-count worker)
   ( :port worker)
    ;; 獲取本地消息傳輸函數transfer-local-fn,transfer-local-fn函數將消息發送給分佈在該worker進程上的task相應隊列
   ( :transfer-local-fn worker)
    ;; 獲取worker進程輸入隊列大小
   ( -> worker :storm-conf ( get TOPOLOGY-RECEIVER-BUFFER-SIZE))
    :kill-fn ( fn [ t ] ( exit-process! 11))))

launch-receive-thread!函數:

;; launch-receive-thread!函數定義在loader.clj文件中,用於啓動指定worker進程的接收線程  
( defnk launch-receive-thread!
  [ context storm-id receiver-thread-count port transfer-local-fn max-buffer-size
  :daemon true
  :kill-fn ( fn [ t ] ( System/exit 1))
  :priority Thread/NORM_PRIORITY ]
  ;; max-buffer-size綁定worker進程最大輸入隊列大小
 ( let [ max-buffer-size ( int max-buffer-size)
      ;; 調用backtype.storm.messaging.netty.Context的bind方法創建一個服務器端的鏈接,socket綁定backtype.storm.messaging.netty.Server實例
        socket ( .bind ^ IContext context storm-id port)
        ;; thread-count綁定接收線程數,默認值爲1
        thread-count ( if receiver-thread-count receiver-thread-count 1)
        ;; 調用mk-receive-threads函數建立接收線程,vthreads綁定接收線程所對應的SmartThread實例,經過該實例咱們能夠start、join、interrupt接收線程,mk-receive-threads函數請參見其定義部分
        vthreads ( mk-receive-threads context storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-count )]
    ;; 返回一個匿名函數,該匿名函數的主要功能就是經過向task_id=-1的任務發送一個空消息來關閉接收線程
   ( fn []
      ;; 向本地端口port建立鏈接
     ( let [ kill-socket ( .connect ^ IContext context storm-id "localhost" port )]
       ( log-message "Shutting down receiving-thread: [" storm-id ", " port "]")
        ;; 向task_id=-1的任務發送一個空消息,接收線程在接收消息時,首先檢查是不是發送給task_id=-1消息,若是是則關閉接收線程
       ( .send ^ IConnection kill-socket
                  -1 ( byte-array []))
        ;; 關閉鏈接
       ( .close ^ IConnection kill-socket)
       
       ( log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
        ;; 等待全部接收線程結束
       ( for [ thread-id ( range thread-count )]
            ( .join ( vthreads thread-id)))
       
       ( log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
       ))))

mk-receive-threads函數:

;; mk-receive-threads函數循環調用mk-receive-thread函數建立接收線程,mk-receive-thread請參見其定義部分
( defn- mk-receive-threads [ context storm-id port transfer-local-fn   daemon kill-fn priority socket max-buffer-size thread-count ]
 ( into [] ( for [ thread-id ( range thread-count )]
            ( mk-receive-thread context storm-id port transfer-local-fn   daemon kill-fn priority socket max-buffer-size thread-id))))

mk-receive-thread函數:

( defn- mk-receive-thread [ context storm-id port transfer-local-fn   daemon kill-fn priority socket max-buffer-size thread-id ]
    ;; async-loop函數接收一個"函數"或"函數工廠"做爲參數生成一個java thread,這個java thread不斷循環執行這個"函數"或"函數工廠"生產的函數。async-loop函數返回實現SmartThread協議的實例,經過該實例咱們能夠start、join、interrupt接收線程
   ( async-loop
      ;; 這個參數就是一個"函數工廠","函數工廠"就是一個返回函數的函數
      ( fn []
        ( log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id: " thread-id   " ]")
        ;; 生成的java thread的run方法不斷循環執行該函數
        ( fn []
          ;; batched是一個ArrayList對象
          ( let [ batched ( ArrayList.)
                ;; backtype.storm.messaging.netty.Server的recv方法返回ArrayList<TaskMessage>的Iterator<TaskMessage>。關於消息的處理流程會在之後文章中具體分析
                ^ Iterator iter ( .recv ^ IConnection socket 0 thread-id)
                closed ( atom false )]
            ;; 當iter不爲nil,遍歷iter
            ( when iter
              ( while ( and ( not @ closed) ( .hasNext iter))
                  ;; packet綁定一個TaskMessage對象,TaskMessage有兩個成員屬性task和message,task表示處理該消息的任務id,message表示消息的byte數組
                 ( let [ packet ( .next iter)
                      ;; task綁定接收該消息的任務id
                        task ( if packet ( .task ^ TaskMessage packet))
                        ;; message綁定消息的byte數組
                        message ( if packet ( .message ^ TaskMessage packet ))]
                      ;; 若是task=-1,則關閉接收線程
                     ( if ( = task -1)
                        ( do ( log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
                          ( .close socket)
                          ( reset! closed   true))
                        ;; 不然將數組[task message]添加到batched
                        ( when packet ( .add batched [ task message ]))))))
            ;; 若是接收線程關閉標識closed值爲false,則調用transfer-local-fn函數將接收到的一批消息發送給task對應的接收隊列
            ( when ( not @ closed)
              ( do
                ( if ( > ( .size batched) 0)
                  ( transfer-local-fn batched))
                ;; 0表示函數執行完一次不須要sleep,直接進行下一次執行
                0)))))
        ;; 表示參數是一個"函數工廠"
        :factory? true
        ;; daemon的值爲true,因此接收線程是一個守護線程
        :daemon daemon
        ;; 指定kill函數
        :kill-fn kill-fn
        ;; 指定java thread的優先級
        :priority priority
        ;; 指定接收線程的名稱爲"worker-receiver-thread-"+thread-id
        :thread-name ( str "worker-receiver-thread-" thread-id)))

 

以上就是supervisor啓動worker的源碼分析,啓動worker的過程當中涉及了executor的相關內容,這裏沒有詳細分析,會在之後進行分析。同時也涉及了跟消息隊列相關的內容也會在之後進行詳細分析。

相關文章
相關標籤/搜索