Storm源碼分析--Nimbus-data

  1. nimbus-data
    storm-core/backtype/storm/nimbus.clj
(defn nimbus-data [conf inimbus]
  (let [forced-scheduler (.getForcedScheduler inimbus)]
    {:conf conf
     :inimbus inimbus                                   ; INimbus實現類, standalone-nimbus的返回值
     :submitted-count (atom 0)                              ; 已經提交的計算拓撲的數量, 初始值爲原子值0.當提交一個拓撲,這個數字+1
     :storm-cluster-state (cluster/mk-storm-cluster-state conf)         ; 建立storm集羣的狀態. 保存在ZooKeeper中!
     :submit-lock (Object.)                                 ; 對象鎖, 用於各個topology之間的互斥操做, 好比建目錄
     :heartbeats-cache (atom {})                            ; 心跳緩存, 記錄各個Topology的heartbeats的cache
     :downloaders (file-cache-map conf)                     ; 文件下載和上傳緩存, 是一個TimeCacheMap
     :uploaders (file-cache-map conf)
     :uptime (uptime-computer)                              ; 計算上傳的時間
     :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))     ; (map key)等價於(key map)
     :timer (mk-timer :kill-fn (fn [t]
                                 (log-error t "Error when processing event")
                                 (halt-process! 20 "Error when processing an event")
                                 ))
     :scheduler (mk-scheduler conf inimbus)
     }))

1) TimeCacheMaphtml

(defn file-cache-map [conf]
  (TimeCacheMap.
   (int (conf NIMBUS-FILE-COPY-EXPIRATION-SECS))
   (reify TimeCacheMap$ExpiredCallback
          (expire [this id stream]
                  (.close stream)
                  ))
   ))

上面調用的是TimeCacheMap的第二個構造函數:
storm-core/backtype/storm/utils/TimeCacheMap.javajava

public class TimeCacheMap<K, V> {
    //this default ensures things expire at most 50% past the expiration time
    private static final int DEFAULT_NUM_BUCKETS = 3;

    public static interface ExpiredCallback<K, V> {
        public void expire(K key, V val);
    }

    public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) {
        this(expirationSecs, DEFAULT_NUM_BUCKETS, callback);
    }
    public TimeCacheMap(int expirationSecs) {
        this(expirationSecs, DEFAULT_NUM_BUCKETS);
    }
    public TimeCacheMap(int expirationSecs, int numBuckets) {
        this(expirationSecs, numBuckets, null);
    }
}

Twitter Storm源碼分析之TimeCacheMap
TimeCacheMap是Twitter Storm裏面一個類, Storm使用它來保存那些最近活躍的對象,而且能夠自動刪除那些已通過期的對象。
TimeCacheMap裏面的數據是保存在內部變量 _bucket 裏面的:
private LinkedList<HashMap<K, V>> _buckets;
在這點上跟ConcurrentHashMap有點相似, ConcurrentHashMap是利用多個bucket來縮小鎖的粒度, 從而實現高併發的讀寫。
而TimeCacheMap則是利用多個bucket來使得數據清理線程佔用鎖的時間最小。node

首先來看看TimeCacheMap的構造函數, 它的構造函數首先是生成numBuckets個空的HashMap:緩存

_buckets =  new  LinkedList<HashMap<K, V>>();
for ( int  i=0; i<numBuckets; i++) {
    _buckets.add( new  HashMap<K, V>());
}

而後就是最關鍵的清理線程部分,TimeCacheMap使用一個單獨的線程來清理那些過時的數據:安全

private LinkedList<HashMap<K, V>> _buckets;
    private final Object _lock = new Object();
    private Thread _cleaner;
    private ExpiredCallback _callback;

    public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
        _buckets = new LinkedList<HashMap<K, V>>();
        for(int i=0; i<numBuckets; i++) {
            _buckets.add(new HashMap<K, V>());
        }
        _callback = callback;
        final long expirationMillis = expirationSecs * 1000L;
        final long sleepTime = expirationMillis / (numBuckets-1);
        _cleaner = new Thread(new Runnable() {
            public void run() {
                try {
                    while(true) {
                        Map<K, V> dead = null;
                        Time.sleep(sleepTime);
                        synchronized(_lock) {
                            dead = _buckets.removeLast();  //刪除鏈表中的最後一個bucket
                            _buckets.addFirst(new HashMap<K, V>());  //往鏈表頭部添加一個空的bucket
                        }
                        if(_callback!=null) {
                            for(Entry<K, V> entry: dead.entrySet()) {  //要刪除的最後一個bucket的每一個Map的條目
                                _callback.expire(entry.getKey(), entry.getValue());  //這個Map裏的每一個條目都要被過時!
                            }
                        }
                    }
                } catch (InterruptedException ex) {}
            }
        });
        _cleaner.setDaemon(true);
        _cleaner.start();
    }

這個線程每隔 expirationSecs / (numBuckets - 1) 秒鐘的時間去把最後一個bucket裏面的數據所有都刪除掉
— 這些被刪除掉的數據其實就是過時的數據。(爲何不是每隔expirationSecs就來刪除一次呢? 咱們下面會說)。
這裏值得注意的是: 正是由於這種分紅多個桶的機制, 清理線程對於 _lock 的佔用時間極短。
只要把最後一個bucket從_buckets解下,而且向頭上面添加一個新的bucket就行了:session

synchronized (_lock) {
    dead = _buckets.removeLast();
    _buckets.addFirst(new HashMap<K, V>());
}

若是不是這種機制的話, 那我能想到的最傻的辦法可能就是給每條數據一個過時時間字段,
而後清理線程就要遍歷每條數據來檢查數據是否過時了。那顯然要HOLD住這個鎖很長時間了。數據結構

同時對於每條過時的數據TimeCacheMap會執行咱們的callback函數:併發

if (_callback!= null ) {
      for (Entry<K, V> entry: dead.entrySet()) {
          _callback.expire(entry.getKey(), entry.getValue());
     }
}

大體機制就是這樣,那麼咱們如今回過頭來看看前面的那個問題: 爲何這個清理線程是每隔 expirationSecs / (numBuckets - 1) 秒的時間來檢查,
這樣對嗎?TimeCacheMap的內部有多個桶, 當你向這個TimeCacheMap裏面添加數據的時候,數據老是添加到第一個桶裏面去的。app

public  void  put(K key, V value) {
     synchronized (_lock) {
        Iterator<HashMap<K, V>> it = _buckets.iterator();
        HashMap<K, V> bucket = it.next();
        bucket.put(key, value);     // 在第一個bucket中
        while (it.hasNext()) {
            bucket = it.next();
            bucket.remove(key); // 若是key有在其餘的buckets中, 則要從那些bucket中刪除該key. 由於map要保證同一個key只出現一次!
        }
    }
}

咱們看個例子就明白了,假設 numBuckets = 3, expirationSecs = 2 。
咱們先往裏面填一條數據 {1: 1}, 這條數據被加到第一個桶裏面去, 如今TimeCacheMap的狀態是:
    [{1:1}, {}, {}]
1> 過了sleepTime = expirationSecs / (numBuckets - 1) = 2 / (3 - 1) = 1秒鐘以後。
清理線程幹掉最後一個HashMap, 而且在頭上添加一個新的空HashMap, 如今TimeCacheMap的狀態是:
    [{}, {1:1}, {}]
2> 再過了一秒鐘, 同上, TimeCacheMap的狀態會變成:
    [{}, {}, {1:1}]
3> 再過一秒鐘, 如今{1:1}是最後一個TimeCacheMap了,就被幹掉了。因而TimeCacheMap的狀態變成:
    [{}, {}, {}]dom

因此從 {1:1} 被加入到這個TimeCacheMap到被幹掉一共用了3秒,其實這個3秒就等於
    3 = expirationSecs * ( 1 + 1 / (numBuckets -­ 1)) = 2 * (1 + 1/(3-1)) = 2*(1 + 0.5) = 3
它的註釋裏面也提到了這一點
Expires keys that have not been updated in the configured number of seconds. 使尚未更新的key在配置的時間內失效
The algorithm used will take between expirationSecs and expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message.
get, put, remove, containsKey, and size take O(numBuckets) time to run.
The advantage of this design is that the expiration thread only locks the object for O(1) time, meaning the object is essentially always available for gets/puts.

來看TimeCacheMap的containsKey, get, remove方法. 其實按照一般的Map是隻有一個桶的. 分紅多個桶的目的是減小鎖的佔用.
按照Map的語義, 一個Map裏只能有相同的key, 若是有多個key,value,則會發生覆蓋. 因此在添加元素的時候, 將新元素加到第一個桶.
並判斷剩餘的桶中是否有這個key, 若是有, 就要從中刪除! 接下來在獲取Map元素的時候, 只要在某個桶中找到key對應的元素, 則能夠當即返回!

public boolean containsKey(K key) {
        synchronized(_lock) {
            for(HashMap<K, V> bucket: _buckets) {
                if(bucket.containsKey(key)) {
                    return true;
                }
            }
            return false;
        }
    }
    public V get(K key) {
        synchronized(_lock) {
            for(HashMap<K, V> bucket: _buckets) {
                if(bucket.containsKey(key)) {
                    return bucket.get(key);
                }
            }
            return null;
        }
    }
    public Object remove(K key) {
        synchronized(_lock) {
            for(HashMap<K, V> bucket: _buckets) {
                if(bucket.containsKey(key)) {
                    return bucket.remove(key);
                }
            }
            return null;
        }
    }
    public int size() {
        synchronized(_lock) {
            int size = 0;
            for(HashMap<K, V> bucket: _buckets) {
                size+=bucket.size();
            }
            return size;
        }
    }

ExpiredCallback是在失效的時候作的回調函數. 好比清理資源. nimbus-data中的file-cache-map在expire的時候關閉了文件流.

新版本的storm把TimeCacheMap做爲廢棄的, 轉而使用非線程安全版本的RotatingMap: (爲啥不用線程安全的版本??)
RotatingMap和TimeCacheMap不一樣的是再也不使用對象鎖和單獨的清理線程.
storm-core/backtype/storm/utils/RatatingMap.java

public class RotatingMap<K, V> {
    private LinkedList<HashMap<K, V>> _buckets;
    private ExpiredCallback _callback;

    public Map<K, V> rotate() {
        Map<K, V> dead = _buckets.removeLast();
        _buckets.addFirst(new HashMap<K, V>());
        if(_callback!=null) {
            for(Entry<K, V> entry: dead.entrySet()) {
                _callback.expire(entry.getKey(), entry.getValue());
            }
        }
        return dead;
    }
}

因爲不使用單獨的線程和鎖, 把TimeCacheMap清理線程的部分移到了單獨的方法rotate中.

2) Scheduler

(defn mk-scheduler [conf inimbus]
  (let [forced-scheduler (.getForcedScheduler inimbus)          ; standalone-nimbus的INimbus匿名類getForcedScheduler的實現爲空
        scheduler (cond                                     ; 條件表達式 (cond cond1 exp1 cond2 exp2 :else default)
                    forced-scheduler                        ; INimbus指定了調度器的實現
                    (do (log-message "Using forced scheduler from INimbus " (class forced-scheduler))
                        forced-scheduler)                   ; do 表達式相似語句塊

                    (conf STORM-SCHEDULER)              ; 使用Storm的配置. defaults.yaml中並無storm.scheduler這個配置項, 要自定義的話寫在storm.yaml中
                    (do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER))
                        (-> (conf STORM-SCHEDULER) new-instance))       ; 實例化

                    :else
                    (do (log-message "Using default scheduler")
                        (DefaultScheduler.)))]                  ; 默認的調度器DefaultScheduler.clj
    (.prepare scheduler conf)
    scheduler
    ))

cond表達式有三種狀況. 1) standalone-nimbus中getForcedScheduler爲nil, 不知足. 2) defaults.yaml中沒有配置storm.scheduler配置項,
若是沒有在storm.yaml中定義調度器實現, 則也不知足 3) 默認的調度器是DefaultScheduler, 實現了IScheduler接口
storm-core/backtype/storm/scheduler/IScheduler.java

public interface IScheduler {    
    void prepare(Map conf);

    /** Set assignments for the topologies which needs scheduling. The new assignments is available  through <code>cluster.getAssignments()</code>
     * 爲集羣中的須要調度的計算拓撲分配任務. 新的任務能夠經過cluster.getAssignments()獲取
     *@param topologies all the topologies in the cluster, some of them need schedule. Topologies object here 
     *       only contain static information about topologies. Information like assignments, slots are all in the <code>cluster</code>object.
     *@param cluster the cluster these topologies are running in. <code>cluster</code> contains everything user
     *       need to develop a new scheduling logic. e.g. supervisors information, available slots, current 
     *       assignments for all the topologies etc. User can set the new assignment for topologies using <code>cluster.setAssignmentById</code> */
    void schedule(Topologies topologies, Cluster cluster);
}

IScheduler的幾個實現類都在storm-core/backtype/storm/scheduler/*.clj中:
-----IScheduler.java
|--------------DefaultScheduler.clj
|--------------EvenScheduler.clj
|--------------IsolationScheduler.clj
調度器分析到這裏, 後面會專門針對調度器進行分析.
調度器是用來幹嗎的?? Twitter Storm的新利器Pluggable Scheduler Twitter Storm: How to develop a pluggable scheduler
storm scheduler源碼分析: http://www.cnblogs.com/fxjwind/archive/2013/06/14/3136008.html

3) ZooKeeper
咱們知道Twitter Storm的全部的狀態信息都是保存在Zookeeper裏面,nimbus經過在zookeeper上面寫狀態信息來分配任務,supervisor,task經過從zookeeper中讀狀態來領取任務,同時supervisor, task也會定義發送心跳信息到zookeeper, 使得nimbus能夠監控整個storm集羣的狀態, 從而能夠重啓一些掛掉的task。ZooKeeper 使得整個storm集羣十分的健壯 — 任何一臺工做機器掛掉都沒有關係,只要重啓而後從zookeeper上面從新獲取狀態信息就能夠了。本文主要介紹Twitter Storm在ZooKeeper中保存的數據目錄結構,源代碼主要是: backtype.storm.cluster, 廢話很少說,直接看下面的結構圖:
一個要注意的地方是,做者在代碼裏面不少地方用到的 storm-id , 其實就是 topology-id 的意思。(之前他把topology叫作storm, 代碼裏面尚未改過來)

/­{storm­-zk­-root}         storm 在zookeeper 上的根目錄
  |
  |­/assignments            topology 的任務分配信息
  |   |­/{topology-­id}         這個下面保存的是每一個topology 的assignments
  |                     信息包括:  對應的nimbus 上的代碼目錄, 全部task 的啓動時間,每一個task 與機器、端口的映射
  |
  |­/tasks                  全部的task
  |   |­/{topology­-id}         這個目錄下面id 爲{topology­id} 的topology所對應的全部的task­id
  |       |­/{task­-id}         這個文件裏面保存的是這個task 對應的component­id :多是spout­id 或者bolt­id
  |
  |­/storms                 這個目錄保存全部正在運行的topology 的id
  |   |­/{topology­-id}     保存這個topology的一些信息:topology的名字,topology開始運行的時間,topology的狀態(StormBase)
  |
  |­/supervisors            這個目錄保存全部的supervisor的心跳信息          
  |   |­/{supervisor­-id}   保存supervisor的心跳信息:心跳時間,主機名,這個supervisor 上worker的端口號運行時間(SupervisorInfo)
  |
  |­/taskbeats              全部task 的心跳
  |   |­/{topology­-id}     這個目錄保存這個topology 的全部的task 的心跳信息
  |       |­/{task-­id}     task 的心跳信息,包括心跳的時間,task 運行時間以及一些統計信息
  |
  |­/taskerrors             全部task 所產生的error 信息
      |­/{topology-­id}     這個目錄保存這個topology 下面每一個task 的出錯信息
          |­/{task-­id}     這個task 的出錯信息

在0.9.0.1+後的版本, zookeeper目錄發生了變化: 沒有了tasks. taskbeats改名爲workerbeats. taskerrors改名爲errors.

這個幾個目錄定義在cluster.clj. 在mk-storm-cluster-state時進行建立
storm-core/backtype/storm/cluster.clj

(def ASSIGNMENTS-ROOT "assignments")
(def CODE-ROOT "code")
(def STORMS-ROOT "storms")
(def SUPERVISORS-ROOT "supervisors")
(def WORKERBEATS-ROOT "workerbeats")
(def ERRORS-ROOT "errors")

zookeeper.clj中的mk-client函數就是初始化Curator即創建與zookeeperserver的鏈接。
service-handler(nimbus)
|---------nimbus-data(nimbus)
|-------mk-storm-cluster-state(cluster.clj)
|--------------mk-distributed-cluster-state(cluster)
|----------------zk/mk-client(zookeeper.clj)
須要注意的是,nimbus中並無註冊任何回調函數來處理zookeeper送來的notification。???
nimbus是經過定時機制來按期輪詢zookeeper中的目錄進而感知到supervisor的加入和退出。
storm-core/backtype/storm/cluster.clj

;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
;; 應該使用Watches來優化. 當ZK從新鏈接時, 它們(這裏)並無保證被調用.
;; nimbus.clj的nimbus-data調用參數是conf, 是一個Map. 這裏定義了一個新的協議接口. satisfies?返回false
;; 何時返回true? 參數cluster-state-spec是ClusterState的子類時
(defn mk-storm-cluster-state [cluster-state-spec]
  (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
                                [false cluster-state-spec]
                                [true (mk-distributed-cluster-state cluster-state-spec)])
   … ...
(defn mk-distributed-cluster-state [conf]
  (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
    (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))      ; defaults.yaml--> storm.zookeeper.root: "/storm"
    (.close zk))                                        ; 建立完/storms目錄後, 關閉zk   
  … …

storm-core/backtype/storm/zookeeper.clj
(defnk mk-client [conf servers port :root "" :watcher default-watcher :auth-conf nil]
  (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
    (.. fk                                              ; 兩個.表示連續調用. fk = CuratorFramework
        (getCuratorListenable)                              ; fk.getCuratorListenable().addListener(new CuratorListener(){...})
        (addListener                                    ; 監聽器做爲匿名類
         (reify CuratorListener                             ; 經過監聽器的方式指定Watcher
           (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
             (when (= (.getType e) CuratorEventType/WATCHED)    ; 事件類型爲WATCHED
               (let [^WatchedEvent event (.getWatchedEvent e)]
               ;; watcher即方法中參數:watcher. 默認爲default-watcher函數, 接受三個參數[state, type, path]
                 (watcher (zk-keeper-states (.getState event))  ; zk-keeper-states是個map, event.getState()獲得的是肯定的key. (map key)
                          (zk-event-types (.getType event))     ; zk-event-types也是一個map
                          (.getPath event))))))))               ; watcher(Watcher.Event.KeeperState, Watcher.Event.EventType, path). watcher是一個function!
    (.start fk)                                         ; CuratorFramework.start()
    fk))                                                ; 返回值爲fk. 由於操做zk的接口使用的就是這個對象

storm-core/backtype/storm/utils/Utils.java

public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
        List<String> serverPorts = new ArrayList<String>();
        for(String zkServer: (List<String>) servers) {
            serverPorts.add(zkServer + ":" + Utils.getInt(port));   // storm.yaml中storm.zookeeper.servers能夠配置多個選項, port只有一個
        }
        String zkStr = StringUtils.join(serverPorts, ",") + root;       // root: 相對路徑. h1:2181,h2:2181/root
        try {
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(zkStr)                   // 鏈接多個zookeeper servers. 
                    .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
                    .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
                    .retryPolicy(new BoundedExponentialBackoffRetry(    // 嘗試策略. 設置時間間隔等參數  
                                Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
                                Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
                                Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING))));
            if(auth!=null && auth.scheme!=null)  builder = builder.authorization(auth.scheme, auth.payload);
            return builder.build();
        } catch (IOException e) {
           throw new RuntimeException(e);
        }
    }

zk的幾個常量都是Map的形式. 獲取map中key的方式都使用了這種形式: (map key)

(def zk-keeper-states
  {Watcher$Event$KeeperState/Disconnected :disconnected
   Watcher$Event$KeeperState/SyncConnected :connected
   Watcher$Event$KeeperState/AuthFailed :auth-failed
   Watcher$Event$KeeperState/Expired :expired
  })
(def zk-event-types
  {Watcher$Event$EventType/None :none
   Watcher$Event$EventType/NodeCreated :node-created
   Watcher$Event$EventType/NodeDeleted :node-deleted
   Watcher$Event$EventType/NodeDataChanged :node-data-changed
   Watcher$Event$EventType/NodeChildrenChanged :node-children-changed
  })
(def zk-create-modes
  {:ephemeral CreateMode/EPHEMERAL
   :persistent CreateMode/PERSISTENT
   :sequential CreateMode/PERSISTENT_SEQUENTIAL})

zookeeper.clj的mk-client的返回值是一個CuratorFramework對象. 這是操做zookeeper接口的對象. zk/mkdirs的第一個參數zk就是這個對象!

(defn mkdirs [^CuratorFramework zk ^String path]            ;; 第一個參數zk就是mk-client的返回值CuratorFramework對象
  (let [path (normalize-path path)]
    (when-not (or (= path "/") (exists-node? zk path false))    ; path不是/ 或者不存在, 才能夠新建
      (mkdirs zk (parent-path path))
      (try-cause
        (create-node zk path (barr 7) :persistent)          ; 最後一個參數是個keyword, 是參數mode, 由於mode的值是肯定的
        (catch KeeperException$NodeExistsException e))  ;; this can happen when multiple clients doing mkdir at same time
      )))
(defn create-node
  ([^CuratorFramework zk ^String path ^bytes data mode] ; 假設參數mode=:persistent, 則(zk-create-modes mode)爲CreateMode/PERSISTENT
    (try
      (.. zk (create) (withMode (zk-create-modes mode)) (withACL ZooDefs$Ids/OPEN_ACL_UNSAFE) (forPath (normalize-path path) data))
      (catch Exception e (throw (wrap-in-runtime e)))))
  ([^CuratorFramework zk ^String path ^bytes data]          ; 沒有mode的處理方式: 遞歸
    (create-node zk path data :persistent)))

curator使用java操做的示例: http://shift-alt-ctrl.iteye.com/blog/1983073
幾個處理zookeeper路徑的幫助函數在util.clj中
storm-core/backtype/storm/util.clj

;; 分隔路徑. zookeeper的節點都是以/開頭的絕對路徑. 返回一個向量, 包含了某個節點的全部路徑的部分
(defn tokenize-path [^String path]                      ;; path=/storm/storms/topo-id
  (let [toks (.split path "/")]
    (vec (filter (complement empty?) toks))                 ;; 返回值爲: #{storm storms topo-id}    
    ))
(defn parent-path [path]                                ;; path=/storm/storms/topo-id
  (let [toks (tokenize-path path)]
    (str "/" (str/join "/" (butlast toks)))                     ;;父路徑爲: /storm/storms   
    ))
;; 將toks從新組裝成路徑. 每一個元素以/拼接, 最開始也必須是/
(defn toks->path [toks]                             ;; toks=#{storm storms topo-id}
  (str "/" (str/join "/" toks))                             ;; 返回值爲: /storm/storms/topo-id
  )
(defn normalize-path [^String path]                     ;; 規範化路徑    
  (toks->path (tokenize-path path)))
(defn full-path [parent name]                           ;; parent=/storm/storms name=topo-id
  (let [toks (tokenize-path parent)]                        ; #{storm storms}
    (toks->path (conj toks name))                       ; conj的結果爲: #{storm storms topo-id}
    ))                                                  ; toks-path後爲: /storm/storms/topo-id

zookeeper.clj是操做zk的接口. cluster.clj的mk-distributed-cluster-state咱們只分析到第一個let表達式. 第二個let表達式返回ClusterState匿名對象:

(defprotocol ClusterState
  (set-ephemeral-node [this path data])
  (delete-node [this path])
  (create-sequential [this path data])
  (set-data [this path data])  ;; if node does not exist, create persistent with this data 
  (get-data [this path watch?])
  (get-children [this path watch?])
  (mkdirs [this path])
  (close [this])
  (register [this callback])
  (unregister [this id])
  )

定義集羣的狀態爲一個協議類型. 集羣的狀態是記錄在zookeeper中的. 因此這個協議的方法都是一些zookeeper操做的API.

(defn mk-distributed-cluster-state [conf]
  (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
    (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))          ; defaults.yaml--> storm.zookeeper.root: "/storm"
    (.close zk))                                            ; 建立完/storms後, 關閉zk
  (let [callbacks (atom {})
        active (atom true)
        zk (zk/mk-client conf                           ; 由於mk-client是一個defnk函數. :key value提供的是額外的參數,會覆蓋方法中的參數
                         (conf STORM-ZOOKEEPER-SERVERS)
                         (conf STORM-ZOOKEEPER-PORT)
                         :auth-conf conf
                         :root (conf STORM-ZOOKEEPER-ROOT)      ; 指定了相對路徑爲/storm, 則後面的mkdirs等方法均會加上這個相對路徑!
                         :watcher (fn [state type path]                 ; event.getState(), event.getType(), path. 和default-watcher參數同樣
                                     (when @active                  ; 原子的解引用
                                       (when-not (= :connected state)   ; state和type的取值見zookeeper.clj的zk-keeper-states和zk-event-types
                                         (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
                                       (when-not (= :none type)
                                         (doseq [callback (vals @callbacks)]
                                           (callback type path))))      ; 真正的回調函數只須要兩個參數[type path]. state用於條件判斷
                                       ))]
    (reify ClusterState
     (register [this callback]
               (let [id (uuid)]
                 (swap! callbacks assoc id callback)                    ; 給id註冊一個回調函數. 添加到callbacks中. 這樣前面聲明的@callbacks就有數據了!
                 id                                                 ; 返回值爲id, 這樣接下來的操做就可使用這個id. 也就能取到對應的callback
                 ))
     (unregister [this id] (swap! callbacks dissoc id))                         ; 不註冊. 參數id就是上面第一個函數register的返回值. 見怪不怪了吧.
     (set-ephemeral-node [this path data]                               ; 設置短暫節點
                         (zk/mkdirs zk (parent-path path))              ; zk是let中賦值的CuratorFramework操做客戶端. zk/mkdirs的第一個參數就是它
                         (if (zk/exists zk path false)                  ; 已經存在節點: 直接更新數據
                           (try-cause
                             (zk/set-data zk path data) ; should verify that it's ephemeral
                             (catch KeeperException$NoNodeException e
                               (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
                               (zk/create-node zk path data :ephemeral)
                               ))
                           (zk/create-node zk path data :ephemeral)     ; 不存在, 直接建立一個mode=:ephemeral的節點
                           ))
     (create-sequential [this path data] 
                           (zk/create-node zk path data :sequential))       ; 建立順序節點
     (set-data [this path data]                                     ;; note: this does not turn off any existing watches
               (if (zk/exists zk path false)                            ; if exists (set-data) (mkdirs and create-node-with-data)
                 (zk/set-data zk path data)
                 (do
                   (zk/mkdirs zk (parent-path path))                    ; 若是不存在, 先建立該節點的父節點
                   (zk/create-node zk path data :persistent)                ; 而後建立該節點(建立節點的時候指定數據)
                   )))
     (delete-node [this path] (zk/delete-recursive zk path))
     (get-data [this path watch?] (zk/get-data zk path watch?))             ; 獲取數據的方法都有一個watch?標誌位     
     (get-children [this path watch?] (zk/get-children zk path watch?))
     (mkdirs [this path] (zk/mkdirs zk path))
     (close [this]
            (reset! active false)                                   ; 重置active=false. 在mk-client前初始值爲true
            (.close zk))
     )))

:watcher與callback流程圖:
watcher and callback

4) mk-storm-cluster-state
如今一步步逆推, 回到mk-storm-cluster-state這條主線上來.
storm-core/backtype/storm/cluster.clj

;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called. 應該使用Watches來優化. 當ZK從新鏈接時, 它們(這裏)並無保證被調用.
;; nimbus.clj的nimbus-data調用參數是conf, 是一個Map. 這裏定義了一個新的協議接口. satisfies?返回false. 何時返回true? 參數cluster-state-spec是ClusterState的子類時
(defn mk-storm-cluster-state [cluster-state-spec]
  ; 由於mk-distributed-cluster-state返回的是reify ClusterState, 因此let綁定的cluster-state是一個ClusterState實例!
  (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
                                [false cluster-state-spec]
                                [true (mk-distributed-cluster-state cluster-state-spec)])
        assignment-info-callback (atom {})
        supervisors-callback (atom nil)
        assignments-callback (atom nil)
        storm-base-callback (atom {})
        state-id (register                  ; 調用的ClusterState的register方法. register(this callback), 返回值是一個uuid, 取消註冊時根據uuid從map中刪除
                  cluster-state                             ; 第一個參數爲this, 第二個參數爲callback匿名函數. 
                  (fn [type path]                           ; callback方法的參數和mk-distributed-cluster-state的:watcher的函數的(callback type path)同樣.
                    (let [[subtree & args] (tokenize-path path)]    ; path爲/storms/topo-id, 返回#{storms topo-id}, 則subtree=storms
                      (condp = subtree
                          ASSIGNMENTS-ROOT (if (empty? args)
                             (issue-callback! assignments-callback)
                             (issue-map-callback! assignment-info-callback (first args)))
                          SUPERVISORS-ROOT (issue-callback! supervisors-callback)
                          STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
                          (halt-process! 30 "Unknown callback for subtree " subtree args) ;; this should never happen
                          )
                      )))]
    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
      (mkdirs cluster-state p))                         ; 在zookeeper上建立各個目錄. ClusterState.mkdirs -> zk/mkdirs

    (reify StormClusterState                ; 又定義了一個協議. 不過這個協議都是經過調用ClusterState的方法來達到目的. 固然這兩個協議的方法是不同的
    …  …

在調用register註冊了一個回調函數以後, 循環建立[**-SUBTREE]這些子目錄. 注意到這些SUBTREE並不包括最開始的/storm這個路徑:

(def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT))
(def STORMS-SUBTREE (str "/" STORMS-ROOT))
(def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
(def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
(def ERRORS-SUBTREE (str "/" ERRORS-ROOT))

而zookeeper中建立assignments所在的完整路徑是/storm/assignments. 答案在於mk-distributed-cluster-state的第二個let表達式的:root選項
    :root (conf STORM-ZOOKEEPER-ROOT) ; 指定了相對路徑爲/storm, 則後面的mkdirs等方法均會加上這個相對路徑!

因此當調用ClusterState的mkdirs時, :root的值會自動做爲操做的節點的前綴. 由於zk操做都在:root這個相對路徑下進行!
這就不難推斷出下面幾個獲取路徑的方法的返回值也是不包含/storm的:

;; 下面的幾個路徑是否包含最開始的/storm?? NO!
;; /supervisors/supervisor-id
(defn supervisor-path [id] (str SUPERVISORS-SUBTREE "/" id))
;; /assignments/topology-id
(defn assignment-path [id] (str ASSIGNMENTS-SUBTREE "/" id))
;; /storms/topology-id
(defn storm-path [id] (str STORMS-SUBTREE "/" id))
;; /workerbeats/topology-id
(defn workerbeat-storm-root [storm-id] (str WORKERBEATS-SUBTREE "/" storm-id))
;; /workerbeats/topology-id/node-port
(defn workerbeat-path [storm-id node port] (str (workerbeat-storm-root storm-id) "/" node "-" port))
;; /errors/topology-id
(defn error-storm-root [storm-id] (str ERRORS-SUBTREE "/" storm-id))
(defn error-path [storm-id component-id] (str (error-storm-root storm-id) "/" (url-encode component-id)))

前面定義了一個ClusterState, 是操做ZooKeeper API的方法. 好比建立目錄, 建立節點, 設置數據等. 而Storm的集羣狀態則和Storm的內部數據結構相關.
由於Storm的集羣狀態最終也是寫到ZooKeeper中的. 因此這個接口的實現類, 內部會使用ClusterState來操做ZooKeeper API.
言歸正傳, Storm的集羣狀態包括了: 任務的分配assignments, supervisors, workers, hearbeat, errors等信息.

(defprotocol StormClusterState
  ; 任務(get)
  (assignments [this callback])
  (assignment-info [this storm-id callback])
  (active-storms [this])
  (storm-base [this storm-id callback])
  ; worker(get)
  (get-worker-heartbeat [this storm-id node port])
  (executor-beats [this storm-id executor->node+port])
  (supervisors [this callback])
  (supervisor-info [this supervisor-id])  ;; returns nil if doesn't exist
  ; heartbeat and error
  (setup-heartbeats! [this storm-id])
  (teardown-heartbeats! [this storm-id])
  (teardown-topology-errors! [this storm-id])
  (heartbeat-storms [this])
  (error-topologies [this])
  ; update!(set and remove)
  (worker-heartbeat! [this storm-id node port info])
  (remove-worker-heartbeat! [this storm-id node port])
  (supervisor-heartbeat! [this supervisor-id info])
  (activate-storm! [this storm-id storm-base])
  (update-storm! [this storm-id new-elems])
  (remove-storm-base! [this storm-id])
  (set-assignment! [this storm-id info])
  (remove-storm! [this storm-id])
  (report-error [this storm-id task-id error])
  (errors [this storm-id task-id])
  (disconnect [this])
  )

看看mk-storm-cluster-state對應的reify StormClusterState的實現

(reify StormClusterState
      ; 獲取集羣的全部任務. /storm/assignments下的全部topology-id
     (assignments [this callback]
        (when callback (reset! assignments-callback callback))
        (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback))) 

      ; 任務信息: 序列化某一個指定的storm-id(即topology-id).  /storm/assignments/storm-id
      (assignment-info [this storm-id callback]
        (when callback (swap! assignment-info-callback assoc storm-id callback))
        (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))

      ; 全部活動的計算拓撲. /storm/storms下的全部topology-id
      (active-storms [this] (get-children cluster-state STORMS-SUBTREE false))

      ; 全部產生心跳的計算拓撲. /storm/workerbeats下的全部topology-id
      (heartbeat-storms [this] (get-children cluster-state WORKERBEATS-SUBTREE false))

      ; 有錯誤的計算拓撲. /storm/errors下的全部topology-id
      (error-topologies [this] (get-children cluster-state ERRORS-SUBTREE false))

      ; 工做進程worker的心跳信息. 工做進程在supervisor(node)的指定端口(port)上運行
      (get-worker-heartbeat [this storm-id node port]
        (-> cluster-state
            (get-data (workerbeat-path storm-id node port) false)
            maybe-deserialize))

      ;; supervisor > worker > executor > component/task(spout|bolt)
      ;; 一個supervisor有多個worker
      ;; 在storm.yaml中配置的每一個node+port都組成一個worker
      ;; 一個worker能夠有多個executor
      ;; 一個executor也能夠有多個task
      ; 執行線程executor的心跳信息
      (executor-beats [this storm-id executor->node+port]
        ;; need to take executor->node+port in explicitly so that we don't run into a situation where a 
        ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats with an assigned node+port,
        ;; and only reading executors from that heartbeat that are actually assigned, we avoid situations like that
        (let [node+port->executors (reverse-map executor->node+port)                    ; 反轉map. supervisor node上配置的一個端口能夠運行多個executor
              all-heartbeats (for [[[node port] executors] node+port->executors]
                                (->> (get-worker-heartbeat this storm-id node port)         ; 獲取worker的心跳信息
                                     (convert-executor-beats executors)                 ; 上面函數的返回值做爲convert的最後一個參數
                                     ))]
          (apply merge all-heartbeats)))

      ; 全部的supervisors. /storm/supervisors下的全部supervisor-id
      (supervisors [this callback]
        (when callback (reset! supervisors-callback callback))
        (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))

      ; 序列化指定id的supervisor
      (supervisor-info [this supervisor-id] (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))

      ; 更新worker的心跳信息
      (worker-heartbeat! [this storm-id node port info] (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))

      (remove-worker-heartbeat! [this storm-id node port] (delete-node cluster-state (workerbeat-path storm-id node port)))

      ; 建立一個心跳  /storm/workerbeats/storm-id
      (setup-heartbeats! [this storm-id] (mkdirs cluster-state (workerbeat-storm-root storm-id)))

      (teardown-heartbeats! [this storm-id]
        (try-cause
         (delete-node cluster-state (workerbeat-storm-root storm-id))
         (catch KeeperException e (log-warn-error e "Could not teardown heartbeats for " storm-id))))

      (teardown-topology-errors! [this storm-id]
        (try-cause
         (delete-node cluster-state (error-storm-root storm-id))         
         (catch KeeperException e (log-warn-error e "Could not teardown errors for " storm-id))))

      (supervisor-heartbeat! [this supervisor-id info]
        (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))

      ; 激活一個指定id的計算拓撲
      (activate-storm! [this storm-id storm-base]
        (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))

      ; 使用new-elems更新一個指定的計算拓撲. 最後會進行序列化
      (update-storm! [this storm-id new-elems]
        (let [base (storm-base this storm-id nil)
              executors (:component->executors base)  ; component即task, 包括spout或者bolt. ->表示component從屬於executor
              new-elems (update new-elems :component->executors (partial merge executors))] ; 偏函數, 由於executors只是其中的一部分
          (set-data cluster-state (storm-path storm-id)
                                  (-> base
                                      (merge new-elems)
                                      Utils/serialize))))

      ; 一個計算拓撲的基本信息
      (storm-base [this storm-id callback]
        (when callback (swap! storm-base-callback assoc storm-id callback))
        (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))

      (remove-storm-base! [this storm-id] (delete-node cluster-state (storm-path storm-id)))

      (set-assignment! [this storm-id info] (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))

      (remove-storm! [this storm-id]
        (delete-node cluster-state (assignment-path storm-id))
        (remove-storm-base! this storm-id))

      (report-error [this storm-id component-id error]                
         (let [path (error-path storm-id component-id)                          ; component-id即builder.setSpout("spout",...)
               data {:time-secs (current-time-secs) :error (stringify-error error)}
               _ (mkdirs cluster-state path)                                    ; /storm/errors/topology-id/spout ...
               _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))   ; 建立以e開頭的順序節點(編號遞增)
               to-kill (->> (get-children cluster-state path false)
                            (sort-by parse-error-path)
                            reverse
                            (drop 10))]
           (doseq [k to-kill]
             (delete-node cluster-state (str path "/" k)))))

      (errors [this storm-id component-id]
         (let [path (error-path storm-id component-id)
               _ (mkdirs cluster-state path)
               children (get-children cluster-state path false)
               errors (dofor [c children]
                             (let [data (-> (get-data cluster-state (str path "/" c) false)
                                            maybe-deserialize)]
                               (when data
                                 (struct TaskError (:error data) (:time-secs data))
                                 )))
               ]
           (->> (filter not-nil? errors)
                (sort-by (comp - :time-secs)))))

      (disconnect [this]
        (unregister cluster-state state-id)
        (when solo?
          (close cluster-state)))
      )

對比ClusterState和StormCluasterState. nimbus.clj訪問的是StormClusterState. 而StormClusterState在實現中訪問ClusterState.
storm cluster state

在前面咱們驗證了assignments, supervisors, workerbeats的三個截圖. 如今來看下errors的截圖.
zk nodes example

/storm/errors下是單詞計數的計算拓撲的topology-id. 拓撲下是component-id. 對應的代碼:

TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

注意到還有一個storm默認的component: __acket. 若是有錯誤信息, 則會在對應的component下生成以e開頭的順序節點.
這個節點的內容包含了:time-secs和:error的data信息. 這兩個信息包含在TaskError這個數據結構中:
(defstruct TaskError :error :time-secs)

是時候從nimbus-data > cluster/mk-storm-cluster-state回頭了. 接下來咱們從新回到nimbus.clj的service-handler這條重要的主線上去.
service-handler在nimbus-data以後有一個校驗過程和transition!過程. 這個先放一邊. 來看看和計算拓撲相關的代碼Nimbus$Iface.

在cluster.clj的最後有一段註釋: 其實是對mk-storm-cluster-state的StormClusterState的每一個方法的說明.
;; daemons have a single thread that will respond to events 進程(nimbus,supervisor..)有一個單獨的線程來響應事件
;; start with initialize event 進程啓動時有一個初始事件, 好比nimbus初始事件爲:startup
;; callbacks add events to the thread's queue 回調函數會添加事件到線程的隊列中

;; keeps in memory cache of the state, only for what client subscribes to. 將狀態保存在內存緩存中, 只針對訂閱的客戶端
;; Any subscription is automatically kept in sync, and when there are changes, client is notified. 全部的訂閱都是同步的, 當狀態編號, 會通知訂閱的客戶端
;; master gives orders through state, and client records status in state (ephemerally)

;; master tells nodes what workers to launch 主進程將任務信息寫到zk的節點上, 來通知工做進行去執行任務

;; master writes this. supervisors and workers subscribe to this to understand complete topology. 主進程寫, supervisors和worker是訂閱計算拓撲
;; each storm is a map from nodes to workers to tasks to ports whenever topology changes everyone will be notified. 每一個計算拓撲的映射鏈路
;; master includes timestamp of each assignment so that appropriate time can be given to each worker to start up /assignments/{storm-id}

;; which tasks they talk to, etc. (immutable until shutdown)
;; everyone reads this in full to understand structure
;; /tasks/{storm id}/{task id} ; just contains bolt id

;; supervisors send heartbeats here, master doesn't subscribe but checks asynchronously
;; /supervisors/status/{ephemeral node ids} ;; node metadata such as port ranges are kept here

;; tasks send heartbeats here, master doesn't subscribe, just checks asynchronously
;; /taskbeats/{storm id}/{ephemeral task id}

;; contains data about whether it's started or not, tasks and workers subscribe to specific storm here to know when to shutdown
;; master manipulates
;; /storms/{storm id}

;; Zookeeper flows:

;; Master:
;; job submit:
;; 1. read which nodes are available
;; 2. set up the worker/{storm}/{task} stuff (static)
;; 3. set assignments
;; 4. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off)

;; Monitoring (or by checking when nodes go down or heartbeats aren't received):
;; 1. read assignment
;; 2. see which tasks/nodes are up
;; 3. make new assignment to fix any problems
;; 4. if a storm exists but is not taken down fully, ensure that storm takedown is launched (step by step remove tasks and finally remove assignments)

;; masters only possible watches is on ephemeral nodes and tasks, and maybe not even

;; Supervisor:
;; 1. monitor /storms/* and assignments
;; 2. local state about which workers are local
;; 3. when storm is on, check that workers are running locally & start/kill if different than assignments
;; 4. when storm is off, monitor tasks for workers - when they all die or don't hearbeat, kill the process and cleanup

;; Worker:
;; 1. On startup, start the tasks if the storm is on

;; Task:
;; 1. monitor assignments, reroute when assignments change
;; 2. monitor storm (when storm turns off, error if assignments change) - take down tasks as master turns them off

;; locally on supervisor: workers write pids locally on startup, supervisor deletes it on shutdown (associates pid with worker name)
;; supervisor periodically checks to make sure processes are alive
;; {rootdir}/workers/{storm id}/{worker id} ;; contains pid inside

;; all tasks in a worker share the same cluster state ;; workers, supervisors, and tasks subscribes to storm to know when it's started or stopped ;; on stopped, master removes records in order (tasks need to subscribe to themselves to see if they disappear) ;; when a master removes a worker, the supervisor should kill it (and escalate to kill -9) ;; on shutdown, tasks subscribe to tasks that send data to them to wait for them to die. when node disappears, they can die

相關文章
相關標籤/搜索