storm啓動supervisor源碼分析-supervisor.clj

supervisor是storm集羣重要組成部分,supervisor主要負責管理各個"工做節點"。supervisor與zookeeper進行通訊,經過zookeeper的"watch機制"能夠感知到是否有新的任務須要認領或哪些任務被從新分配。咱們能夠通用執行bin/storm supervisor >/dev/null 2>&1 &來啓動supervisor。bin/storm是一個python腳本,在這個腳本中定義了一個supervisor函數:java

supervisor函數

 

def supervisor( klass = "backtype.storm.daemon.supervisor" ):
    """Syntax: [storm supervisor]

   Launches the supervisor daemon. This command should be run
   under supervision with a tool like daemontools or monit.

   See Setting up a Storm cluster for more information.
   (https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
   """
    cppaths = [ STORM_DIR + "/log4j" , STORM_DIR + "/conf" ]
    jvmopts = parse_args( confvalue( "supervisor.childopts" , cppaths)) + [
        "-Dlogfile.name=supervisor.log" ,
        "-Dlog4j.configuration=storm.log.properties" ,
    ]
    exec_storm_class(
        klass ,
        jvmtype = "-server" ,
        extrajars = cppaths ,
        jvmopts = jvmopts)

klass參數的默認值爲backtype.storm.daemon.supervisor,backtype.storm.daemon.supervisor標識一個java類。STORM_DIR標識storm的安裝目錄,cppaths集合存放了log4j配置文件路徑和storm配置文件storm.yaml路徑,jvmopts存放傳遞給jvm的參數,包括log4j配文件路徑、storm.yaml路徑、log4j日誌名稱和log4j配置文件名稱。exec_storm_class函數的邏輯比較簡單,具體實現以下:node

exec_storm_class函數

 

def exec_storm_class( klass , jvmtype = "-server" , jvmopts = [], extrajars = [], args = [], fork = False ):  
    global CONFFILE  
    all_args = [  
        "java" , jvmtype , get_config_opts (),  
        "-Dstorm.home=" + STORM_DIR ,  
        "-Djava.library.path=" + confvalue( "java.library.path" , extrajars ),  
        "-Dstorm.conf.file=" + CONFFILE ,  
        "-cp" , get_classpath( extrajars ),  
    ] + jvmopts + [ klass ] + list( args)  
    print "Running: " + " " . join( all_args)  
    if fork :  
        os . spawnvp( os . P_WAIT , "java" , all_args)  
    else :  
        os . execvp( "java" , all_args) # replaces the current process and never returns

get_config_opts()獲取jvm的默認配置信息,confvalue("java.library.path", extrajars)獲取storm使用的本地庫JZMQ加載路徑,get_classpath(extrajars)獲取全部依賴jar包的完整路徑,而後拼接一個java -cp命令運行klass的main方法。klass默認值爲backtype.storm.daemon.supervisor,因此exec_storm_class函數最終調用backtype.storm.daemon.supervisor類的main方法。
backtype.storm.daemon.supervisor類定義在supervisor.clj文件中,定義以下:python

backtype.storm.daemon.supervisor類

 

(ns backtype.storm.daemon.supervisor
 ( :import [ backtype.storm.scheduler ISupervisor ])
 ( :use [ backtype.storm bootstrap ])
 ( :use [ backtype.storm.daemon common ])
 ( :require [ backtype.storm.daemon [ worker :as worker ]])
 ( :gen-class
    :methods [ ^ { :static true } [ launch [ backtype.storm.scheduler.ISupervisor ] void ]]))

       ( bootstrap)
        ;; ... ...
    ;; 其餘方法
    ;; ... ...
   ( defn -main []
         ( -launch ( standalone-supervisor)))

:gen-class指示Clojure生成Java類backtype.storm.daemon.supervisor,而且聲明一個靜態方法launch,launch方法接收一個實現backtype.storm.scheduler.ISupervisor接口的實例做爲參數。launch函數的參數是由standalone-supervisor函數生成的。standalone-supervisor函數定義以下:返回一個實現ISupervisor接口的實例。git

standalone-supervisor函數

 

;; 該函數主要返回一個實現了ISupervisor接口的實例。
( defn standalone-supervisor []
 ( let [ conf-atom ( atom nil)
        id-atom ( atom nil )]
   ( reify ISupervisor
      ;; prepare方法主要功能是建立一個基於磁盤的存放K/V對的database——LocalState對象,LocalState類參見其定義部分
     ( prepare [ this conf local-dir ]
        ;; conf-atom原子類型,綁定storm集羣配置信息
       ( reset! conf-atom conf)
    ;; state綁定LocalState對象,local-dir標識database在磁盤上的根目錄,database實際就是一個HashMap對象序列化後存放到磁盤local-dir目錄下
       ( let [ state ( LocalState. local-dir)
          ;; LS-ID值爲字符串"supervisor-id",定義在common.clj文件中。若是state中存放了該supervisor的id,那麼curr-id綁定該id,不然curr-id綁定32爲uuid
              curr-id ( if-let [ id ( .get state LS-ID )]
                        id
            ;; 調用uuid函數生成一個32的id
                       ( generate-supervisor-id ))]
          ;; 調用state的put函數,更新該supervisor的id
         ( .put state LS-ID curr-id)
      ;; id-atom原子類型,綁定該supervisor的id
         ( reset! id-atom curr-id))
       )
      ;; 返回true
     ( confirmAssigned [ this port ]
        true)
      ;; 從storm配置信息中獲取supervisor的全部端口,由於clojure中的map函數返回的是"懶惰序列",因此須要調用doall函數對"懶惰序列"進行徹底實例化
     ( getMetadata [ this ]
       ( doall ( map int ( get @ conf-atom SUPERVISOR-SLOTS-PORTS))))
      ;; 獲取supervisor的id
     ( getSupervisorId [ this ]
        @ id-atom)
      ;; 獲取supervisor的分配id即其id
     ( getAssignmentId [ this ]
        @ id-atom)
      ;; killedWorker空實現
     ( killedWorker [ this port ]
       )
      ;; assigned空實現
     ( assigned [ this ports ]
       ))))

LocalState類是個java類,定義見LocalState.java,這個類有一個VersionedStore類型對象,VersionedStore類見VersionedStore.java,因爲這兩個類是java實現,並且也比較簡單,這樣就在詳細分析。github

mk-supervisor函數定義以下:數據庫

mk-supervisor函數

 

;; conf綁定storm集羣配置信息,isupervisor綁定standalone-supervisor函數返回的實現ISupervisor接口的實例
( defserverfn mk-supervisor [ conf shared-context ^ ISupervisor isupervisor ]
  ;; 打印日誌信息
 ( log-message "Starting Supervisor with conf " conf)
  ;; supervisor-isupervisor-dir函數調用了supervisor-local-dir函數,supervisor-local-dir函數從storm配置中獲取storm的安裝路徑,而後在supervisor上建立目錄{storm.local.dir}/supervisor,並返回目錄
  ;; supervisor-isupervisor-dir函數返回字符串"{storm.local.dir}/supervisor/isupervisor"做爲一個LocalState對象的根目錄,該LocalState對象只用來存放supervisor的id
  ;; 調用isupervisor的prepare方法,建立一個LocalState對象,並生成該supervisor的id,將其存入LocalState對象
 ( .prepare isupervisor conf ( supervisor-isupervisor-dir conf))
  ;; supervisor-tmp-dir函數在supervisor上建立{storm.local.dir}/supervisor/tmp目錄;FileUtils類的cleanDirectory方法清空該目錄
 ( FileUtils/cleanDirectory ( File. ( supervisor-tmp-dir conf)))
  ;; supervisor綁定supervisor元數據信息,supervisor-data參見其定義部分
 ( let [ supervisor ( supervisor-data conf shared-context isupervisor)
    ;; event-manager和processes-event-manager分別綁定一個EventManager實例,managers綁定包含兩個EventManager實例的集合。event-manager函數請參見文章"storm事件管理器EventManager源碼分析-event.clj"
        [ event-manager processes-event-manager :as managers ] [( event/event-manager false) ( event/event-manager false )]
    ;; partial用於定義"偏函數",所謂偏函數就是給一個指定函數的某些參數預賦值,這樣就獲得了一個新函數。sync-processes就綁定這個新函數,sync-processes參見其定義部分
        sync-processes ( partial sync-processes supervisor)
    ;; synchronize-supervisor綁定一個函數,該函數主要功能就是當assignment發生變化時, 從nimbus同步topology的代碼到本地,當assignment發生變化時, check workers狀態, 保證被分配的work的狀態都是valid
        synchronize-supervisor ( mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
    ;; heartbeat-fn綁定一個匿名函數,該匿名函數主要功能是調用StormClusterState實例的supervisor-heartbeat!函數將該supervisor的心跳信息SupervisorInfo實例寫入zookeeper的"/supervisors/supervisor-id"節點中
        heartbeat-fn ( fn [] ( .supervisor-heartbeat!
                          ;; StormClusterState實例
                              ( :storm-cluster-state supervisor)
                  ;; supervisor-id
                              ( :supervisor-id supervisor)
                  ;; 建立SupervisorInfo實例,即該supervisor的心跳信息
                              ( SupervisorInfo. ( current-time-secs)
                                    ;; 主機名
                                               ( :my-hostname supervisor)
                        ;; assignment-id即supervisor-id
                                               ( :assignment-id supervisor)
                        ;; 當前集羣已使用的全部port
                                               ( keys @( :curr-assignment supervisor))
                                                ;; used ports
                        ;; 該supervisor上全部可用port,即在storm配置文件中配置的port
                                               ( .getMetadata isupervisor)
                                               ( conf SUPERVISOR-SCHEDULER-META)
                        ;; supervisor啓動時間
                                               (( :uptime supervisor )))))]
    ;; 調用heartbeat-fn綁定的匿名函數,將supervisor心跳心跳寫入zookeeper
   ( heartbeat-fn)
    ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
    ;; 調用timer.clj中的schedule-recurring函數向該supervisor的定時器中添加一個週期執行的定時任務heartbeat-fn--"向zookeeper彙報superior的心跳信息",關於storm定時器的詳細信息請參看"storm定時器timer源碼分析-timer.clj"
   ( schedule-recurring ( :timer supervisor)
                        0
            ;; 指定每隔多長時間彙報一次心跳信息
                       ( conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
                        heartbeat-fn)
    ;; 若是supervisor.enable值爲true時(默認值就是true,並且不會改變,因此必定會執行),那麼將synchronize-supervisor綁定的函數(mk-synchronize-supervisor函數返回的函數)每隔10s加入event-manager事件管理器中,
    ;; 這樣即便zookeeper的"watcher機制"異常時,supervisor也能夠主動的獲取分配信息的變化。同時將sync-processes綁定的函數(sync-processes函數)每隔SUPERVISOR-MONITOR-FREQUENCY-SECS秒加入processes-event-manager事件管理器中,
    ;; 這樣即便zookeeper的"watcher機制"異常時,supervisor也能夠正常管理worker
   ( when ( conf SUPERVISOR-ENABLE)
      ;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
      ;; to date even if callbacks don't all work exactly right
     ( schedule-recurring ( :timer supervisor) 0 10 ( fn [] ( .add event-manager synchronize-supervisor)))
     ( schedule-recurring ( :timer supervisor)
                          0
                         ( conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
                         ( fn [] ( .add processes-event-manager sync-processes))))
   ( log-message "Starting supervisor with id " ( :supervisor-id supervisor) " at host " ( :my-hostname supervisor))
    ;; 返回實現了Shutdownable接口、SupervisorDaemon協議和DaemonCommon協議的實例
   ( reify
    Shutdownable
    ;; 關閉supervisor,就是關閉該supervisor所擁護的資源
    ( shutdown [ this ]
              ( log-message "Shutting down supervisor " ( :supervisor-id supervisor))
              ( reset! ( :active supervisor) false)
              ( cancel-timer ( :timer supervisor))
              ( .shutdown event-manager)
              ( .shutdown processes-event-manager)
              ( .disconnect ( :storm-cluster-state supervisor)))
    SupervisorDaemon
    ;; 返回集羣配置信息
    ( get-conf [ this ]
      conf)
    ;; 返回supervisor-id
    ( get-id [ this ]
      ( :supervisor-id supervisor))
    ;; 見名知意,關閉全部worker
    ( shutdown-all-workers [ this ]
      ( let [ ids ( my-worker-ids conf )]
        ( doseq [ id ids ]
          ( shutdown-worker supervisor id)
          )))
    DaemonCommon
    ( waiting? [ this ]
      ( or ( not @( :active supervisor))
          ( and
        ;; 定時器線程是否處於sleep狀態
           ( timer-waiting? ( :timer supervisor))
        ;; 調用事件管理器的waiting?函數檢查event-manager和processes-event-manager內事件執行線程是否處於sleep狀態,memfn宏能夠自動生成代碼以使得java方法能夠當成clojure裏面的函數
           ( every? ( memfn waiting?) managers)))
          ))))

supervisor-data函數定義以下:bootstrap

supervisor-data函數返回一個包含了supervisor元數據的map對象。windows

supervisor-data函數

 

( defn supervisor-data [ conf shared-context ^ ISupervisor isupervisor ]
  ;; 保存集羣配置信息
  { :conf conf
  ;; 啓動supervisor時,shared-context爲nil
  :shared-context shared-context
  ;; 保存supervisor實例
  :isupervisor isupervisor
  ;; 保存supervisor是不是活躍的(默認是活躍的)
  :active ( atom true)
  ;; 保存supervisor啓動時間
  :uptime ( uptime-computer)
  ;; 保存工做線程id
  :worker-thread-pids-atom ( atom {})
  ;; 保存StormClusterState對象
  :storm-cluster-state ( cluster/mk-storm-cluster-state conf)
  ;; 保存supervisor的LocalState對象,該LocalState對象的根目錄是"{storm.local.dir}/supervisor/localstate"
  :local-state ( supervisor-state conf)
  ;; 保存supervisor的id
  :supervisor-id ( .getSupervisorId isupervisor)
  ;; 保存supervisor的分配id,分配id與supervisor_id相同
  :assignment-id ( .getAssignmentId isupervisor)
  ;; 保存supervisor的主機名,若是配置conf(map對象)中包含"storm.local.hostname",那麼就使用配置的主機名,不然經過調用InetAddress.getLocalHost().getCanonicalHostName()獲取主機名
  :my-hostname ( if ( contains? conf STORM-LOCAL-HOSTNAME)
                 ( conf STORM-LOCAL-HOSTNAME)
                 ( local-hostname))
  ;; 心跳時彙報當前集羣的全部分配信息
  :curr-assignment ( atom nil) ;; used for reporting used ports when heartbeating
  ;; 保存一個storm定時器timer,kill-fn函數會在timer-thread發生exception的時候被調用
  :timer ( mk-timer :kill-fn ( fn [ t ]
                              ( log-error t "Error when processing event")
                              ( exit-process! 20 "Error when processing an event")
                              ))
  ;; 建立一個用於存放帶有版本號的分配信息的map
  :assignment-versions ( atom {})
  })

sync-processes函數定義以下:緩存

sync-processes函數

 

;; sync-processes函數用於管理workers, 好比處理不正常的worker或dead worker, 並建立新的workers
;; supervisor標識supervisor的元數據
( defn sync-processes [ supervisor ]
  ;; conf綁定storm的配置信息map
 ( let [ conf ( :conf supervisor)
        ;; local-state綁定supervisor的LocalState實例
        ^ LocalState local-state ( :local-state supervisor)
        ;; 從supervisor的LocalState實例中獲取本地分配信息端口port->LocalAssignment實例的map,LocalAssignment實例封裝了storm-id和分配給該storm-id的executors
        assigned-executors ( defaulted ( .get local-state LS-LOCAL-ASSIGNMENTS) {})
        ;; now綁定當前時間
        now ( current-time-secs)
        ;; allocated綁定worker-id->worker狀態和心跳的map,read-allocated-workers函數請參見其定義部分
        allocated ( read-allocated-workers supervisor assigned-executors now)
        ;; 過濾掉allocated中state不等於:valid的元素,並將過濾後的結果綁定到keepers
        keepers ( filter-val
                ( fn [[ state _ ]] ( = state :valid))
                allocated)
        ;; keep-ports綁定keepers中心跳信息所包含的端口
        keep-ports ( set ( for [[ id [ _ hb ]] keepers ] ( :port hb)))
        ;; reassign-executors綁定assigned-executors中端口不在集合keep-ports的鍵值對構成的map,也就是說已分配的線程所對應的進程掛掉了,須要從新進行分配
        reassign-executors ( select-keys-pred ( complement keep-ports) assigned-executors)
        ;; new-worker-ids綁定port->worker-id的map,new-worker-ids保存了須要從新啓動進程的worker-id
        new-worker-ids ( into
                        {}
                       ( for [ port ( keys reassign-executors )]
                          [ port ( uuid )]))
        ]
    ;; 1. to kill are those in allocated that are dead or disallowed
    ;; 2. kill the ones that should be dead
    ;;     - read pids, kill -9 and individually remove file
    ;;     - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log)
    ;; 3. of the rest, figure out what assignments aren't yet satisfied
    ;; 4. generate new worker ids, write new "approved workers" to LS
    ;; 5. create local dir for worker id
    ;; 5. launch new workers (give worker-id, port, and supervisor-id)
    ;; 6. wait for workers launch
 
   ( log-debug "Syncing processes")
   ( log-debug "Assigned executors: " assigned-executors)
   ( log-debug "Allocated: " allocated)
    ;; allocated綁定worker-id->worker狀態和心跳的map,id綁定worker-id,state綁定worker狀態,heartbeat綁定worker心跳時間
   ( doseq [[ id [ state heartbeat ]] allocated ]
      ;; 若是worker的狀態不是:valid,那麼就關閉worker
     ( when ( not= :valid state)
       ( log-message
        "Shutting down and clearing state for id " id
        ". Current supervisor time: " now
        ". State: " state
        ", Heartbeat: " ( pr-str heartbeat))
        ;; shutdown-worker函數關閉進程,shutdown-worker函數請參見其定義部分
       ( shutdown-worker supervisor id)
       ))
    ;; new-worker-ids保存了須要從新啓動進程的worker-id,遍歷new-worker-ids,爲每一個worker-id建立本地目錄"{storm.local.dir}/workers/{worker_id}"
   ( doseq [ id ( vals new-worker-ids )]
     ( local-mkdirs ( worker-pids-root conf id)))
    ;; 將合併後的map從新保存到local-state的LS-APPROVED-WORKERS中
   ( .put local-state LS-APPROVED-WORKERS
          ;; 將new-worker-ids的鍵值交換由原來的port->worker-id轉換成worker-id->port,並與local-state的LS-APPROVED-WORKERS合併
         ( merge
          ;; select-keys函數從local-state的LS-APPROVED-WORKERS中獲取key包含在keepers中的鍵值對,返回結果是一個map
          ( select-keys ( .get local-state LS-APPROVED-WORKERS)
                       ( keys keepers))
          ;; zipmap函數返回new-worker-ids的worker-id->port的map
          ( zipmap ( vals new-worker-ids) ( keys new-worker-ids))
          ))
    ;; wait-for-workers-launch函數等待全部worker啓動完成,請參見wait-for-workers-launch函數定義部分
   ( wait-for-workers-launch
    conf
    ;; assignment綁定在該port運行的executor信息
    ( dofor [[ port assignment ] reassign-executors ]
      ;; id爲port所對應的worker-id
      ( let [ id ( new-worker-ids port )]
        ( log-message "Launching worker with assignment "
                     ( pr-str assignment)
                      " for this supervisor "
                     ( :supervisor-id supervisor)
                      " on port "
                      port
                      " with id "
                      id
                     )
        ;; launch-worker函數負責啓動worker,關於worker啓動的相關分析會在之後的文章中詳細介紹,在此再也不介紹
        ( launch-worker supervisor
                       ( :storm-id assignment)
                        port
                        id)
        id)))
   ))

read-allocated-workers函數定義以下:服務器

read-allocated-workers函數

 

;; 返回worker-id->worker狀態和心跳的map,若是worker心跳爲nil,那麼worker是"dead"
( defn read-allocated-workers
  "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)"
  ;; supervisor綁定supervisor元數據,assigned-executors綁定supervisor分配信息端口port->LocalAssignment實例的map,now綁定當前時間
  [ supervisor assigned-executors now ]
  ;; 獲取集羣配置信息
 ( let [ conf ( :conf supervisor)
    ;; 獲取supervisor的LocalState實例
        ^ LocalState local-state ( :local-state supervisor)
    ;; id->heartbeat綁定supervisor上運行進程的worker-id->心跳信息的map
        id->heartbeat ( read-worker-heartbeats conf)
    ;; approved-ids綁定supervisor的LocalState實例中保存的worker-id的集合
        approved-ids ( set ( keys ( .get local-state LS-APPROVED-WORKERS )))]
    ;; 生成worker-id->[state hb]的map
   ( into
    {}
    ( dofor [[ id hb ] id->heartbeat ]
            ;; cond至關於if...else嵌套
           ( let [ state ( cond
                ;; 若是心跳信息爲nil,那麼state值爲:not-started關鍵字
                        ( not hb)
                          :not-started
            ;; 若是approved-ids不包含id或者matches-an-assignment?返回false,那麼state值爲:disallowed關鍵字
                        ( or ( not ( contains? approved-ids id))
                ;; matches-an-assignment?函數經過比較心跳信息和分配信息中的storm-id和線程id集合是否相同,來斷定該worker是否已分配
                            ( not ( matches-an-assignment? hb assigned-executors)))
                          :disallowed
            ;; 若是當前時間-上次心跳時間>心跳超時時間,state值爲:timed-out關鍵字
                        ( > ( - now ( :time-secs hb))
                           ( conf SUPERVISOR-WORKER-TIMEOUT-SECS))
                          :timed-out
            ;; 以上條件均不知足時,state值爲:valid關鍵字
                        true
                          :valid )]
             ( log-debug "Worker " id " is " state ": " ( pr-str hb) " at supervisor time-secs " now)
              [ id [ state hb ]]
             ))
    )))

read-worker-heartbeats函數定義以下:

read-worker-heartbeats函數

 

;; 獲取supervisor上運行進程的worker-id->心跳信息的map
( defn read-worker-heartbeats
  "Returns map from worker id to heartbeat"
  [ conf ]
  ;; ids綁定supervisor上進程的worker-id集合
 ( let [ ids ( my-worker-ids conf )]
    ;; 生成worker-id->心跳信息的map
   ( into {}
     ( dofor [ id ids ]
        ;; read-worker-heartbeat函數獲取指定worker-id的心跳信息,從supervisor上"{storm.local.dir}/workers/{worker-id}/heartbeats"中獲取心跳信息
        [ id ( read-worker-heartbeat conf id )]))
   ))

my-worker-ids函數定義以下:

my-worker-ids函數

 

;; 獲取supervisor上運行的進程的worker-id
( defn my-worker-ids [ conf ]
  ;; worker-root函數返回supervisor本地目錄"{storm.local.dir}/workers",read-dir-contents函數獲取目錄"{storm.local.dir}/workers"下全部文件名的集合(即該supervisor上正在運行的全部進程的worker-id)
 ( read-dir-contents ( worker-root conf)))

matches-an-assignment?函數定義以下:

matches-an-assignment?函數

 

;; worker-heartbeat標識心跳信息,assigned-executors標識supervisor分配信息port->LocalAssignment實例的map
( defn matches-an-assignment? [ worker-heartbeat assigned-executors ]
  ;; 從worker-heartbeat中獲取進程佔用的端口,進而從assigned-executors中獲取LocalAssignment實例
 ( let [ local-assignment ( assigned-executors ( :port worker-heartbeat ))]
    ;; 若是local-assignment不爲nil,且心跳信息中的storm-id和分配信息中的storm-id相等,且心跳信息中的線程id集合和分配信息中的線程id集合相等,那麼返回true;不然返回false
   ( and local-assignment
        ( = ( :storm-id worker-heartbeat) ( :storm-id local-assignment))
    ;; Constants/SYSTEM_EXECUTOR_ID標識"系統bolt"的線程id,我定義的topology除了咱們指定的spout和bolt外,還包含一些"系統bolt"
        ( = ( disj ( set ( :executors worker-heartbeat)) Constants/SYSTEM_EXECUTOR_ID)
           ( set ( :executors local-assignment))))))

shutdown-worker函數定義以下:

shutdown-worker函數

 

;; 關閉進程,supervisor標識supervisor元數據,id標識worker_id
( defn shutdown-worker [ supervisor id ]
 ( log-message "Shutting down " ( :supervisor-id supervisor) ":" id)
  ;; conf綁定集羣配置信
 ( let [ conf ( :conf supervisor)
        ;; 注意當storm集羣"分佈式模式"運行時,supervisor的"{storm.local.dir}/workers/{worker_id}/pids"路徑中存放了worker實際對應的jvm進程id
        ;; 從supervisor的"{storm.local.dir}/workers/{worker_id}/pids"路徑獲取進程id,worker_id標識咱們指定的進程id,pids目錄存放了該worker實際對應的jvm進程的id
        pids ( read-dir-contents ( worker-pids-root conf id))
        ;; 注意當storm集羣"本地模式"運行時,supervisor元數據中關鍵字:worker-thread-pids-atom所對應的map用於存放worker_id->線程id集合的鍵值對
        ;; 先從supervisor元數據中獲取worker-id(咱們人爲分配給worker的id)->jvm進程id的map,thread-pid實際上綁定的是worker的jvm進程id
        thread-pid ( @( :worker-thread-pids-atom supervisor) id )]
    ;; 當thread-pid不爲空時,kill掉該進程
   ( when thread-pid
      ;; 調用backtype.storm.process-simulator中的kill-process函數kill掉進程
     ( psim/kill-process thread-pid))
    ;; 遍歷pids集合,kill掉每一個進程
   ( doseq [ pid pids ]
      ;; 調用backtype.storm.util中的kill-process-with-sig-term函數,kill-process-with-sig-term函數又調用了send-signal-to-process函數,send-signal-to-process函數實現比較簡單就是執行系統命令"kill -15 pid",kill掉進程
      ;; 注意在建立worker進程時爲worker進程指定了關閉回調函數,當調用"kill -15 pid"關閉worker進程時會觸發回調函數執行,回調函數是在worker.clj的mk-worker函數中添加的
     ( kill-process-with-sig-term pid))
    ;; 若是pids不爲空,sleep 1秒,等着"清理函數"--關閉回調函數執行完畢
   ( if-not ( empty? pids) ( sleep-secs 1)) ;; allow 1 second for execution of cleanup threads on worker.
    ;; 經過調用"kill -15 pid"命令未能關閉的進程,將經過調用force-kill-process函數關閉,force-kill-process函數只是調用了"kill -9 pid"命令
   ( doseq [ pid pids ]
     ( force-kill-process pid)
     ( try
        ;; 刪除"{storm.local.dir}/workers/{worker_id}/pids"
       ( rmpath ( worker-pid-path conf id pid))
       ( catch Exception e))) ;; on windows, the supervisor may still holds the lock on the worker directory
    ;; try-cleanup-worker函數清理本地目錄,try-cleanup-worker函數參見其定義部分
   ( try-cleanup-worker conf id))
 ( log-message "Shut down " ( :supervisor-id supervisor) ":" id))

try-cleanup-worker函數定義以下:

try-cleanup-worker函數

 

;; 清理本地目錄
( defn try-cleanup-worker [ conf id ]
 ( try
    ;; 刪除"{storm.local.dir}/workers/{worker_id}/heartbeats"目錄
   ( rmr ( worker-heartbeats-root conf id))
    ;; this avoids a race condition with worker or subprocess writing pid around same time
    ;; 刪除"{storm.local.dir}/workers/{worker_id}/pids"目錄
   ( rmpath ( worker-pids-root conf id))
    ;; 刪除"{storm.local.dir}/workers/{worker_id}"目錄
   ( rmpath ( worker-root conf id))
 ( catch RuntimeException e
   ( log-warn-error e "Failed to cleanup worker " id ". Will retry later")
   )
 ( catch java.io.FileNotFoundException e ( log-message ( .getMessage e)))
 ( catch java.io.IOException e ( log-message ( .getMessage e)))
   ))

wait-for-workers-launch函數定義以下:

wait-for-workers-launch函數

 

;; wait-for-workers-launch函數等待全部worker啓動完成
( defn- wait-for-workers-launch [ conf ids ]
 ( let [ start-time ( current-time-secs )]
   ( doseq [ id ids ]
      ;; 調用wait-for-worker-launch函數
     ( wait-for-worker-launch conf id start-time))
   ))

wait-for-worker-launch函數定義以下:

wait-for-worker-launch函數

 

;; wait-for-worker-launch函數等待worker啓動完成,worker啓動完成的條件是:若是worker在規定的心跳超時時間內有一次心跳那麼就說明worker成功啓動
( defn- wait-for-worker-launch [ conf id start-time ]
 ( let [ state ( worker-state conf id )]    
   ( loop []
     ( let [ hb ( .get state LS-WORKER-HEARTBEAT )]
       ( when ( and
              ( not hb)
              ( <
               ( - ( current-time-secs) start-time)
               ( conf SUPERVISOR-WORKER-START-TIMEOUT-SECS)
               ))
         ( log-message id " still hasn't started")
         ( Time/sleep 500)
         ( recur)
         )))
   ( when-not ( .get state LS-WORKER-HEARTBEAT)
     ( log-message "Worker " id " failed to start")
     )))

mk-synchronize-supervisor函數定義以下:

mk-synchronize-supervisor函數

 

;; mk-synchronize-supervisor函數返回一個名字爲"this"的函數,
( defn mk-synchronize-supervisor [ supervisor sync-processes event-manager processes-event-manager ]
 ( fn this []
    ;; conf綁定集羣配置信息
   ( let [ conf ( :conf supervisor)
          ;; storm-cluster-state綁定StormClusterState對象
          storm-cluster-state ( :storm-cluster-state supervisor)
          ;; isupervisor綁定實現了ISupervisor接口的實例
          ^ ISupervisor isupervisor ( :isupervisor supervisor)
          ;; local-state綁定LocalState實例
          ^ LocalState local-state ( :local-state supervisor)
          ;; sync-callback綁定一個匿名函數,這個匿名函數的主要功能就是將上面定義的"this"函數添加到event-manager中,這樣"this"函數將會在一個新的線程內執行
          ;; 每次執行,都須要再一次把sync-callback註冊到zookeeper中做爲回調函數,以保證下次能夠被繼續觸發,當zookeeper的子節點"/assignments"發生變化時執行回調函數sync-callback
          sync-callback ( fn [ & ignored ] ( .add event-manager this))
          ;; assignment-versions綁定帶有版本號的分配信息,topology-id->分配信息的map
          assignment-versions @( :assignment-versions supervisor)
          ;; assignments-snapshot綁定topoloy-id->分配信息AssignmentInfo對象的map,versions綁定帶有版本號的分配信息,assignments-snapshot函數從zookeeper的子節點"/assignments"獲取分配信息(當前集羣分配信息快照),並將回調函數添加到子節點"/assignments"上,assignments-snapshot函數參見其定義部分
          { assignments-snapshot :assignments versions :versions }  ( assignments-snapshot
                                                                  storm-cluster-state sync-callback
                                                                  assignment-versions)
          ;; 調用read-storm-code-locations函數獲取topology-id->nimbus上該topology代碼目錄的map                                                
          storm-code-map ( read-storm-code-locations assignments-snapshot)
          ;; read-downloaded-storm-ids函數從supervisor本地的"{storm.local.dir}/stormdist"目錄讀取已經下載了代碼jar包的topology-id
          downloaded-storm-ids ( set ( read-downloaded-storm-ids conf))
          ;; all-assignment綁定該supervisor上的全部分配信息,即port->LocalAssignment對象的map
          all-assignment ( read-assignments
                          assignments-snapshot
                          ( :assignment-id supervisor))
          ;; 調用isupervisor對象的confirmAssigned函數驗證all-assignment的key即port的有效性,將經過驗證的保存到new-assignment中。isupervisor對象是在standalone-supervisor函數中建立的,查看standalone-supervisor函數,咱們能夠發現isupervisor對象的confirmAssigned函數只是返回true,因此new-assignment=all-assignment
          new-assignment ( ->> all-assignment
                             ( filter-key #( .confirmAssigned isupervisor %)))
          ;; assigned-storm-ids綁定分配給該supervisor的topology-id的集合
          assigned-storm-ids ( assigned-storm-ids-from-port-assignments new-assignment)
          ;; existing-assignment綁定該supervisor上已經存在的分配信息
          existing-assignment ( .get local-state LS-LOCAL-ASSIGNMENTS )]
     ( log-debug "Synchronizing supervisor")
     ( log-debug "Storm code map: " storm-code-map)
     ( log-debug "Downloaded storm ids: " downloaded-storm-ids)
     ( log-debug "All assignment: " all-assignment)
     ( log-debug "New assignment: " new-assignment)
     
      ;; download code first
      ;; This might take awhile
      ;;   - should this be done separately from usual monitoring?
      ;; should we only download when topology is assigned to this supervisor?
      ;; storm-code-map綁定當前集羣上已分配的全部topology-id->nimbus上代碼jar包目錄的鍵值對的map
     ( doseq [[ storm-id master-code-dir ] storm-code-map ]
        ;; 若是downloaded-storm-ids集合不包含該storm-id,且assigned-storm-ids集合包含該storm-id(代表該storm-id須要在該superior上運行,可是該storm-id的代碼jar包尚未從nimbus服務器下載到本地),則調用download-storm-code函數下載代碼jar包
       ( when ( and ( not ( downloaded-storm-ids storm-id))
                  ( assigned-storm-ids storm-id))
         ( log-message "Downloading code for storm id "
            storm-id
            " from "
            master-code-dir)
          ;; 從nimbus服務器上下載該storm-id相關的代碼jar包,序列化後的topology對象,運行時所需的配置信息,並將其保存到"{storm.local.dir}/nimbus/stormdist/{storm-id}/"目錄
         ( download-storm-code conf storm-id master-code-dir)
         ( log-message "Finished downloading code for storm id "
            storm-id
            " from "
            master-code-dir)
         ))

     ( log-debug "Writing new assignment "
                ( pr-str new-assignment))
      ;; existing-assignment與new-assignment的差集表示不須要在該supervisor上運行的分配的集合,因此要把這些分配對應的worker關閉
     ( doseq [p ( set/difference ( set ( keys existing-assignment))
                               ( set ( keys new-assignment )))]
        ;; 當前storm版本0.9.2中,killedWorker爲空實現,因此什麼都沒作
       ( .killedWorker isupervisor ( int p)))
      ;; assigned函數爲空實現,什麼也沒有作
     ( .assigned isupervisor ( keys new-assignment))
      ;; 將最新分配信息new-assignment保存到local-state數據庫中
     ( .put local-state
            LS-LOCAL-ASSIGNMENTS
            new-assignment)
      ;; 將帶有版本號的分配信息versions存入supervisor緩存:assignment-versions中
     ( swap! ( :assignment-versions supervisor) versions)
      ;; 從新設置supervisor緩存的:curr-assignment值爲new-assignment,即保存當前storm集羣上最新分配信息
     ( reset! ( :curr-assignment supervisor) new-assignment)
      ;; remove any downloaded code that's no longer assigned or active
      ;; important that this happens after setting the local assignment so that
      ;; synchronize-supervisor doesn't try to launch workers for which the
      ;; resources don't exist
      ;; 若是當前supervisor服務器的操做系統是"Windows_NT"系統,那麼執行shutdown-disallowed-workers函數,關閉狀態爲:disallowed的worker
     ( if on-windows? ( shutdown-disallowed-workers supervisor))
      ;; 遍歷downloaded-storm-ids集合,該集合內存放了已經下載了jar包等信息的topology的id
     ( doseq [ storm-id downloaded-storm-ids ]
        ;; 若是storm-id不在assigned-storm-ids集合內,則遞歸刪除"{storm.local.dir}/supervisor/stormdist/{storm-id}"目錄。assigned-storm-ids表示當前須要在該supervisor上運行的topology的id
       ( when-not ( assigned-storm-ids storm-id)
         ( log-message "Removing code for storm id "
                      storm-id)
         ( try
           ( rmr ( supervisor-stormdist-root conf storm-id))
           ( catch Exception e ( log-message ( .getMessage e))))
         ))
      ;; 將sync-processes函數添加到processes-event-manager事件管理器中,這樣就能夠在一個單獨線程內執行sync-processes函數。由於sync-processes函數比較耗時,因此須要在一個新的線程內執行
     ( .add processes-event-manager sync-processes)
     )))

assignments-snapshot函數定義以下:

assignments-snapshot函數

 

;; assignments-snapshot函數從zookeeper的子節點"/assignments"獲取分配信息,並將回調函數添加到子節點"/assignments"上,assignment-versions綁定該supervisor本地緩存的帶有版本號的分配信息
( defn- assignments-snapshot [ storm-cluster-state callback assignment-versions ]
  ;; storm-ids綁定已分配的topology-id的集合,獲取/assignments的子節點列表,若是callback不爲空,將其賦值給assignments-callback,並對/assignments添加"節點觀察",這樣supervisor就能感知集羣是否有新的assignment或者有assignment被刪除
 ( let [ storm-ids ( .assignments storm-cluster-state callback )]
    ;; new-assignments綁定最新分配信息
   ( let [ new-assignments
         ( ->>
          ;; sid綁定topology-id
          ( dofor [ sid storm-ids ]
                  ;; recorded-version綁定該supervisor上緩存的該sid的分配信息版本號
                 ( let [ recorded-version ( :version ( get assignment-versions sid ))]
                    ;; assignment-version綁定zookeeper上"/assignments/{sid}"節點數據及其版本號,並註冊回調函數
                   ( if-let [ assignment-version ( .assignment-version storm-cluster-state sid callback )]
                      ;; 若是緩存的分配版本號和zookeeper上獲取的分配版本號相等,則返回sid->緩存的分配信息的map,不然從zookeeper的"/assignments/{sid}"節點從新獲取帶有版本號的分配信息,並註冊回調函數,這樣supervisor就能感知某個已存在的assignment是否被從新分配
                     ( if ( = assignment-version recorded-version)
                        { sid ( get assignment-versions sid )}
                        { sid ( .assignment-info-with-version storm-cluster-state sid callback )})
                      ;; 若是從zookeeper上獲取分配信息失敗,值爲{sid nil}
                      { sid nil })))
          ;; 將dofor結果進行合併,形如:{sid_1 {:data data_1 :version version_1}, sid_2 {:data data_2 :version version_2},......sid_n {:data data_n :version version_n} }
          ( apply merge)
          ;; 保留值不空的鍵值對
          ( filter-val not-nil? ))]
      ;; 返回的map形如:{:assignments {sid_1 data_1, sid_2 data_2, ...... , sid_n data_n}, :versions {sid_1 {:data data_1 :version version_1}, sid_2 {:data data_2 :version version_2},......sid_n {:data data_n :version version_n} } }    
      ;; data_x是一個AssignmentInfo對象,AssignmentInfo對象包含對應的nimbus上的代碼目錄,全部task的啓動時間,每一個task與機器、端口的映射
      { :assignments ( into {} ( for [[ k v ] new-assignments ] [ k ( :data v )]))
      :versions new-assignments })))

read-assignments函數定義以下:

read-assignments函數

 

( defn- read-assignments
  "Returns map from port to struct containing :storm-id and :executors"
  ;; assignments-snapshot綁定topology-id->分配信息AssignmentInfo對象的map,assignment-id綁定supervisor-id
  [ assignments-snapshot assignment-id ]
  ;; 遍歷read-my-executors函數返回結果,檢查是否存在多個topology分配到同一個端口,若是存在則拋出異常。檢查的方式特別巧妙,經過對返回結果調用merge-with函數,若是返回結果中存在相同的port,那麼就會調用
  ;; 匿名函數(fn [& ignored] ......),這樣就會拋出異常
 ( ->> ( dofor [ sid ( keys assignments-snapshot )] ( read-my-executors assignments-snapshot sid assignment-id))
      ( apply merge-with ( fn [ & ignored ] ( throw-runtime "Should not have multiple topologies assigned to one port")))))

read-my-executor函數定義以下:

read-my-executor函數

 

;; assignments-snapshot綁定topology-id->分配信息AssignmentInfo對象的map,assignment-id綁定supervisor-id,storm-id爲topoloy-id
( defn- read-my-executors [ assignments-snapshot storm-id assignment-id ]
 ( let [ assignment ( get assignments-snapshot storm-id)
        ;; my-executors綁定分配給該supervisor的executor信息,即executor->node+port的map
        my-executors ( filter ( fn [[ _ [ node _ ]]] ( = node assignment-id))
                          ( :executor->node+port assignment))
        ;; port-executors綁定port->executor-id集合的map,merge-with函數的做用就是對key相同的value調用concat函數
        port-executors ( apply merge-with
                          concat
                         ( for [[ executor [ _ port ]] my-executors ]
                            { port [ executor ]}
                            ))]
    ;; 返回port->LocalAssignment對象的map,LocalAssignment包含兩個屬性:topology-id和executor-id集合
   ( into {} ( for [[ port executors ] port-executors ]
              ;; need to cast to int b/c it might be a long (due to how yaml parses things)
              ;; doall is to avoid serialization/deserialization problems with lazy seqs
              [( Integer. port) ( LocalAssignment. storm-id ( doall executors ))]
              ))))

download-storm-code函數定義以下:

download-storm-code函數

 

( defmulti download-storm-code cluster-mode)
download-storm-code 函數是一個 "多重函數" ,根據 cluster-mode 函數的返回值決定調用哪一個函數, cluster-mode 函數可能返回關鍵字 :distributed :local ,若是返回 :distributed ,那麼會調用下面這個函數。
( defmethod download-storm-code
    ;; master-code-dir綁定storm-id的代碼jar包在nimbus服務器上的路徑
    :distributed [ conf storm-id master-code-dir ]
    ;; Downloading to permanent location is atomic
    ;; tmproot綁定supervisor本地路徑"{storm.local.dir}/supervisor/tmp/{uuid}",臨時存放從nimbus上下載的代碼jar包
   ( let [ tmproot ( str ( supervisor-tmp-dir conf) file-path-separator ( uuid))
          ;; stormroot綁定該storm-id的代碼jar包在supervisor上的路徑"{storm.local.dir}/supervisor/stormdist/{storm-id}"
          stormroot ( supervisor-stormdist-root conf storm-id )]
      ;; 建立臨時目錄tmproot
     ( FileUtils/forceMkdir ( File. tmproot))
      ;; 將nimbus服務器上的"{storm.local.dir}/nimbus/stormdist/{storm-id}/stormjar.jar"文件下載到supervisor服務器的tmproot目錄中,stormjar.jar包含這個topology全部代碼
     ( Utils/downloadFromMaster conf ( master-stormjar-path master-code-dir) ( supervisor-stormjar-path tmproot))
      ;; 將nimbus服務器上的"{storm.local.dir}/nimbus/stormdist/{storm-id}/stormcode.ser"文件下載到supervisor服務器的tmproot目錄中,stormcode.ser是這個topology對象的序列化
     ( Utils/downloadFromMaster conf ( master-stormcode-path master-code-dir) ( supervisor-stormcode-path tmproot))
      ;; 將nimbus服務器上的"{storm.local.dir}/nimbus/stormdist/{storm-id}/stormconf.ser"文件下載到supervisor服務器的tmproot目錄中,stormconf.ser包含運行這個topology的配置
     ( Utils/downloadFromMaster conf ( master-stormconf-path master-code-dir) ( supervisor-stormconf-path tmproot))
      ;; RESOURCES-SUBDIR值爲字符串"resources",extract-dir-from-jar函數主要做用就是將jar包解壓,而後將jar包中路徑以"resources"開頭的文件解壓到"{tmproot}/resources/......"目錄
     ( extract-dir-from-jar ( supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
      ;; 將臨時目錄tmproot中的文件剪切到stormroot目錄中,這樣"{storm.local.dir}/nimbus/stormdist/{storm-id}/"目錄中將包括resources目錄,stormjar.jar文件,stormcode.ser文件,stormconf.ser文件
     ( FileUtils/moveDirectory ( File. tmproot) ( File. stormroot))
     ))

extract-dir-from-jar函數定義以下:

extract-dir-from-jar函數

 

;; jarpath標識jar路徑,dir標識"resources",destdir標識"{tmproot}"路徑
( defn extract-dir-from-jar [ jarpath dir destdir ]
 ( try-cause
    ;; 使用類ZipFile來解壓jar包,jarpath綁定ZipFile對象
   ( with-open [ jarpath ( ZipFile. jarpath )]
      ;; 調用entries方法,返回一個枚舉對象,而後調用enumeration-seq函數獲取文件的ZIP條目對象
     ( let [ entries ( enumeration-seq ( .entries jarpath ))]
        ;; 遍歷entries中路徑以"resources"開頭的文件
       ( doseq [ file ( filter ( fn [ entry ]( and ( not ( .isDirectory entry)) ( .startsWith ( .getName entry) dir))) entries )]
          ;; 在"tmproot"目錄中建立文件的完整父路徑
         ( .mkdirs ( .getParentFile ( File. destdir ( .getName file))))
          ;; 將文件複製到"{tmproot}/{在壓縮文件中的路徑}"
         ( with-open [ out ( FileOutputStream. ( File. destdir ( .getName file )))]
           ( io/copy ( .getInputStream jarpath file) out)))))
   ( catch IOException e
     ( log-message "Could not extract " dir " from " jarpath))))

 

以上就是storm啓動supervisor的完整流程,啓動supervisor的工做主要是在mk-supervisor函數中進行的,因此閱讀該部分源碼時,要首先從該函數入手,而後依次分析在該函數中所調用的其餘函數,根據函數的控制流程分析每一個函數。

相關文章
相關標籤/搜索