Storm-源碼分析-Topology Submit-Nimbus

Nimbus Server

Nimbus server, 首先從啓動命令開始, 一樣是使用storm命令"storm nimbus」來啓動
看下源碼, 此處和上面client不一樣, jvmtype="-server", 最終調用"backtype.storm.daemon.nimbus"的main
nimbus是用clojure實現的, 可是clojure是基於JVM的, 因此在最終發佈的時候會產生nimbus.class, 因此在用戶使用的時候徹底能夠不知道clojure, 看上去全部都是Java
clojure只是用於提升開發效率而已.html

def nimbus():
    """Syntax: [storm nimbus]

Launches the nimbus daemon. This command should be run under
supervision with a tool like daemontools or monit.

See Setting up a Storm cluster for more information.
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
"""
    cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"]
    childopts = confvalue("nimbus.childopts", cppaths) + " -Dlogfile.name=nimbus.log -Dlog4j.configuration=storm.log.properties"
    exec_storm_class(
        "backtype.storm.daemon.nimbus",
        jvmtype="-server",
        extrajars=cppaths,
        childopts=childopts)

launch-server!

來看看nimbus的main, 最終會調到launch-server!, conf參數是調用read-storm-config讀出的配置參數, 
而nimbus是INimbus接口(backtype.storm.scheduler.INimbus)的實現, 能夠參考standalone-nimbus
(defn -main []
  (-launch (standalone-nimbus)))
(defn -launch [nimbus]
  (launch-server! (read-storm-config) nimbus))

(defn launch-server! [conf nimbus]
  (validate-distributed-mode! conf)
  (let [service-handler (service-handler conf nimbus)
        options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
                    (THsHaServer$Args.)
                    (.workerThreads 64)
                    (.protocolFactory (TBinaryProtocol$Factory.))
                    (.processor (Nimbus$Processor. service-handler))
                    )
       server (THsHaServer. options)]
    (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
    (log-message "Starting Nimbus server...")
    (.serve server)))

1. service-handlerjava

首先定義service-handler,  service-handler前面的定義以下
(defserverfn service-handler [conf inimbus]
    (reify Nimbus$Iface
      ...)
)
這邊用到一個macro定義defserverfn, 以下
(defmacro defserverfn [name & body]
  `(let [exec-fn# (fn ~@body)]
    (defn ~name [& args#]0
      (try-cause
        (apply exec-fn# args#)
      (catch InterruptedException e#
        (throw e#))
      (catch Throwable t#
        (log-error t# "Error on initialization of server " ~(str name))
        (halt-process! 13 "Error on initialization")
        )))))
這個macro兩個參數, 結合例子, name = service-handler, body就是後面全部的,包括參數和函數體
定義匿名函數 fn[conf inimbus] (……)
定義函數defn service-handler [& args], 裏面只是簡單的調用fn…使用這個macro和直接定義defn service-handler [conf inimbus]幾乎沒有啥區別
我有個疑問, 爲何要定義這個無聊的macro, 難道就是爲了便於後面的exception處理
在service-handler函數裏面最主要就是實現Nimbus$Iface接口(backtype.storm.generated.Nimbus$Iface, $在class文件裏面就是這樣寫的, 應該是java的命名規則)

2. servernode

生成server option參數, 使用TNonblockingServerSocket, 定義的work thread數目, 使用的protocol和使用的processor
其中processor, 是server上主要的處理進程, 使用傳入的service-handler進行數據處理
最終啓動nimbus server.
 

Nimbus$Iface

Nimbus server已經啓動, 剩下就是處理從client傳來的RPC調用, 關鍵就是Nimbus$Iface的實現git

在下面的實現中老是用到nimbus這個變量, nimbus-data, 用於存放nimbus相關配置和全局的參數github

let [nimbus (nimbus-data conf inimbus)]
(defn nimbus-data [conf inimbus]
  (let [forced-scheduler (.getForcedScheduler inimbus)]
    {:conf conf
     :inimbus inimbus
     :submitted-count (atom 0) ;記錄多少topology被submit
     :storm-cluster-state (cluster/mk-storm-cluster-state conf) ;抽象Zookeeper接口(Zookeeper用於存放cluster state)
     :submit-lock (Object.) ;建立鎖對象,用於各個topology之間的互斥操做, 好比建目錄
     :heartbeats-cache (atom {}) ;記錄各個Topology的heartbeats的cache
     :downloaders (file-cache-map conf)
     :uploaders (file-cache-map conf)
     :uptime (uptime-computer)
     :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
     :timer (mk-timer :kill-fn (fn [t]
                                 (log-error t "Error when processing event")
                                 (halt-process! 20 "Error when processing an event")
                                 ))
     :scheduler (mk-scheduler conf inimbus)
     }))


接着重點看下submitTopology,
4個參數,
^String storm-name, storm名字
^String uploadedJarLocation, 上傳Jar的目錄 
^String serializedConf, 序列化過的Conf信息
^StormTopology topology, topology對象(thrift對象), 由topologyBuilder產生json

(^void submitTopology
        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
        (try
          (validate-topology-name! storm-name) ;;名字起的是否符合規範
          (check-storm-active! nimbus storm-name false) ;;check是否active
          (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) ;;調用用戶定義的validator.validate
                     storm-name
                     (from-json serializedConf)
                     topology)
          (swap! (:submitted-count nimbus) inc) ;;submitted-count加1, 表示nimbus上submit的topology的數量
          (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs)) ;;生成storm-id
                storm-conf (normalize-conf  ;;轉化成json,增長kv,最終生成storm-conf
                            conf
                            (-> serializedConf
                                from-json
                                (assoc STORM-ID storm-id)
                                (assoc TOPOLOGY-NAME storm-name))
                            topology)
                total-storm-conf (merge conf storm-conf)
                topology (normalize-topology total-storm-conf topology) ;;規範化的topology對象
                topology (if (total-storm-conf TOPOLOGY-OPTIMIZE)
                           (optimize-topology topology)
                           topology)
                storm-cluster-state (:storm-cluster-state nimbus)] ;;操做zk的interface
            (system-topology! total-storm-conf topology) ;; this validates the structure of the topology, 1. System-topology!
            (log-message "Received topology submission for " storm-name " with conf " storm-conf)
            ;; lock protects against multiple topologies being submitted at once and
            ;; cleanup thread killing topology in b/w assignment and starting the topology
            (locking (:submit-lock nimbus)
              (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology) ;;2. 創建topology的本地目錄
              (.setup-heartbeats! storm-cluster-state storm-id) ;;3. 創建Zookeeper heartbeats
              (start-storm nimbus storm-name storm-id)  ;;4. start-storm
              (mk-assignments nimbus))) ;;5. mk-assignments

          (catch Throwable e
            (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
            (throw e))))

1. System-topology!app

Validate Topology, 好比使用的comonentid, steamid是否合法
添加系統所須要的component, 好比acker等, 不過沒有用到, 不知道爲何要調用System-topology!jvm

(system-topology! total-storm-conf topology) ;; this validates the structure of the topology
(defn system-topology! [storm-conf ^StormTopology topology]
  (validate-basic! topology)
  (let [ret (.deepCopy topology)]
    (add-acker! storm-conf ret)
    (add-metric-components! storm-conf ret)    
    (add-system-components! storm-conf ret)
    (add-metric-streams! ret)
    (add-system-streams! ret)
    (validate-structure! ret)
    ret
    ))

2. 創建topology的本地目錄 (這步開始須要lock互斥)函數

Jars and configs are kept on local filesystem because they're too big for Zookeeper. The jar and configs are copied into the path {nimbus local dir}/stormdist/{topology id}源碼分析

(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
借用這張圖, 比較清晰, 先建立目錄, 並將Jar move到當前目錄
再將topology對象和conf對象都序列化保存到目錄中

image 

3. 創建Zookeeper heartbeats

就是按照下面圖示在Zookeeper創建topology的心跳目錄

(.setup-heartbeats! storm-cluster-state storm-id)
 
(setup-heartbeats! [this storm-id]
  (mkdirs cluster-state (workerbeat-storm-root storm-id)))

(defn mkdirs [^CuratorFramework zk ^String path]
  (let [path (normalize-path path)]
    (when-not (or (= path "/") (exists-node? zk path false))
      (mkdirs zk (parent-path path))
      (try-cause
        (create-node zk path (barr 7) :persistent)
        (catch KeeperException$NodeExistsException e
          ;; this can happen when multiple clients doing mkdir at same time
          ))
      )))
image

4. start-storm, 產生StormBase

雖然叫作start-storm, 其實作的事情只是把StormBase結構序列化並放到zookeeper上
這個StormBase和topology對象有什麼區別,
topology對象, topology的靜態信息, 包含components的詳細信息和之間的拓撲關係, 內容比較多因此存儲在磁盤上stormcode.ser
而StormBase, topology的動態信息, 只記錄了launch時間, status, worker數, component的executor數運行態數據, 比較mini, 因此放在zk上

(defn- start-storm [nimbus storm-name storm-id]
  (let [storm-cluster-state (:storm-cluster-state nimbus)
        conf (:conf nimbus)
        storm-conf (read-storm-conf conf storm-id)
        topology (system-topology! storm-conf (read-storm-topology conf storm-id))
        num-executors (->> (all-components topology) (map-val num-start-executors))]
    (log-message "Activating " storm-name ": " storm-id)
    (.activate-storm! storm-cluster-state
                      storm-id
                      (StormBase. storm-name
                                  (current-time-secs)
                                  {:type :active}
                                  (storm-conf TOPOLOGY-WORKERS)
                                  num-executors))))

;; component->executors is a map from spout/bolt id to number of executors for that component
(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors])
 
struct ComponentCommon {
  1: required map<GlobalStreamId, Grouping> inputs;
  2: required map<string, StreamInfo> streams; //key is stream id
  3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component
  4: optional string json_conf;
}

重上面能夠看出StormBase是定義的一個record, 包含storm-name, 當前時間戳, topology的初始狀態(active或inactive), worker數目, 和executor的數目
其中計算num-executors, 使用->>, 其實等於(map-val num-start-executors (all-components topology)), map-value就是對(k,v)中的value執行num-start-executors, 而這個函數其實就是去讀ComponentCommon裏面的parallelism_hint, 因此num-executors, 描述每一個component須要幾個executors(線程)

(activate-storm! [this storm-id storm-base]
  (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base))
  )
(defn storm-path [id]
  (str STORMS-SUBTREE "/" id)) ;/storms/id
 
(defn set-data [^CuratorFramework zk ^String path ^bytes data]
  (.. zk (setData) (forPath (normalize-path path) data)))

最終調用activate-storm!將storm-base序列化後的數據存到Zookeeper的"/storms/id」目錄下
image

 

5. mk-assignments

Storm-源碼分析-Topology Submit-Nimbus-mk-assignments

相關文章
相關標籤/搜索