Storm-源碼分析-Stats (backtype.storm.stats)

會發現, 如今storm裏面有兩套metrics系統, metrics framework和stats frameworknode

而且在全部地方都是同時註冊兩套, 貌似準備用metrics來替代stats, 但當前版本UI仍然使用statswindows

 

這個模塊統計的數據怎麼被使用, 數據結構

1. 在worker中, 會按期調用do-executor-heartbeats去往zk同步hb
能夠看到, stats也會做爲hb的一部分被同步到zk上
app

(defnk do-executor-heartbeats [worker :executors nil]
  ;; stats is how we know what executors are assigned to this worker 
  (let [stats (if-not executors
                  (into {} (map (fn [e] {e nil}) (:executors worker)))
                  (->> executors
                    (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))
                    (apply merge)))
        zk-hb {:storm-id (:storm-id worker)
               :
executor-stats stats
               :uptime ((:uptime worker))
               :time-secs (current-time-secs)
               }]
    ;; do the zookeeper heartbeat
    (.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb)    
    ))

2. 如今任何人均可以經過nimbus的thrift接口來獲得相關信息
函數

(^TopologyInfo getTopologyInfo [this ^String storm-id]
   beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))
   stats (:stats heartbeat))

3. 最直接的用戶就是storm UI, 在準備topology page的時候, 就會調用getTopologyInfo來獲取數據this

(defn topology-page [id window include-sys?]
  (with-nimbus nimbus
    (let [summ (.getTopologyInfo ^Nimbus$Client nimbus id)]
)

 

Stats

這個模塊用於spout和bolt來抽樣統計數據, 須要統計的具體metics以下atom

(def COMMON-FIELDS [:emitted :transferred])
(defrecord CommonStats [emitted transferred rate])

(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies])
;;acked and failed count individual tuples
(defrecord BoltExecutorStats [common acked failed process-latencies executed execute-latencies])

(def SPOUT-FIELDS [:acked :failed :complete-latencies])
;;acked and failed count tuple completion
(defrecord SpoutExecutorStats [common acked failed complete-latencies])

 

抽樣的比例在storm-conf, TOPOLOGY_STATS_SAMPLE_RATE, 配置spa

爲何統計時每次加rate, 而不是加1?orm

由於這裏的統計是抽樣的, 因此若是抽樣比例是10%, 那麼發現一個, 應該加1/(10%), 10個接口

(defn sampling-rate [conf]
  (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)
       (/ 1)
       int))

 

而後統計是基於時間窗口的, 底下是對應默認的bucket和時間窗口的定義

(def NUM-STAT-BUCKETS 20) ;;bucket數
;; 10 minutes, 3 hours, 1 day ;;定義3種時間窗口
(def STAT-BUCKETS [30 540 4320]) ;;bucket大小分別是30,540,4320秒

 

核心數據結構是RollingWindowSet, 包含:
統計數據須要的函數, updater extractor, 之因此治理也須要是由於須要統計all-time 
一組rolling windows, 默認是3個時間窗, 10 minutes, 3 hours, 1 day
all-time, 在完整的時間區間上的統計結果

(defrecord RollingWindowSet [updater extractor windows all-time])
(defn rolling-window-set [updater merger extractor num-buckets & bucket-sizes]
  (RollingWindowSet. updater extractor (dofor [s bucket-sizes] (rolling-window updater merger extractor s num-buckets)) nil)
  )

 

繼續看看rolling window的定義,
核心數據, buckets, hashmap, {streamid, data}, 初始化爲{}
統計data須要的函數, updater merger extractor
時間窗口, buckets大小和buckets個數

(defrecord RollingWindow [updater merger extractor bucket-size-secs num-buckets buckets])
(defn rolling-window [updater merger extractor bucket-size-secs num-buckets]
  (RollingWindow. updater merger extractor bucket-size-secs num-buckets {}))

 

1. mk-stats

在mk-executedata的時候須要建立stats

mk-executor-stats <> (sampling-rate storm-conf)

 

;; TODO: refactor this to be part of an executor-specific map
(defmethod mk-executor-stats :spout [_ rate]
  (stats/mk-spout-stats rate))
(defmethod mk-executor-stats :bolt [_ rate]
  (stats/mk-bolt-stats rate))

第一個參數忽略, 其實就是分別調用stats/mk-spout-stats或stats/mk-bolt-stats, 可見就是對於每一個須要統計的數據, 建立一個rolling-windows-set

(defn- mk-common-stats [rate]
  (CommonStats. (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                rate
                ))

(defn mk-bolt-stats [rate]
  (BoltExecutorStats. (mk-common-stats rate)
                  (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                  (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                  (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                  (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                  (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                  ))

(defn mk-spout-stats [rate]
  (SpoutExecutorStats. (mk-common-stats rate)
                   (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                   (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                   (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
                   ))

 

2. 數據更新

(defn spout-acked-tuple! [^SpoutExecutorStats stats stream latency-ms]
  (update-executor-stat! stats :acked stream (stats-rate stats))
  (update-executor-stat! stats :complete-latencies stream latency-ms)
  )
(defmacro update-executor-stat! [stats path & args]
  (let [path (collectify path)]
    `(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)
    ))

就以update-executor-stat! stats :acked stream (stats-rate stats)爲例子看看怎麼作的?

SpoutExecutorStats取出用於記錄spout acked狀況的rolling-windows-set
而後使用update-rolling-window-set來swap這個atom

來看看記錄acked的rolling-windows-set是如何定義的?

keyed-counter-rolling-window-set, 預約義了updater merger extractor
updater, incr-val [amap key amt], 把給定的值amt加到amap的對應的key的value上
merger, (partial merge-with +), 用+做爲map merge的邏輯, 即出現相同key則相加
extractor, counter-extract, (if v v {}), 有則返回, 無則返回{}
windows, rolling-window的list
all-time, 初始化爲nil

(defn keyed-counter-rolling-window-set [num-buckets & bucket-sizes]
  (apply rolling-window-set incr-val (partial merge-with +) counter-extract num-buckets bucket-sizes))

 

好, 下面就看看, 當spout-acked-tuple!時更新:acked時, 如何update的?

首先更新每一個rolling-window, 並把更新過的rolling-window-set更新到:windows
而且更新:all-time, (apply (:updater rws) (:all-time rws) args)
updated, incr-val [amap key amt]
args, steamid, rate
all-time, 是用來記錄整個時間區間上的, 某個stream的統計狀況

(defn update-rolling-window-set
  ([^RollingWindowSet rws & args]
     (let [now (current-time-secs)
           new-windows (dofor [w (:windows rws)]
                         (apply update-rolling-window w now args))]
       (assoc rws :windows new-windows :all-time (apply (:updater rws) (:all-time rws) args))
       )))

看下如何更新某個rolling-windw
根據now算出當前屬於哪一個bucket, time-bucket
取出buckets, 並使用:updater更新相應的bucket, 這裏的操做仍然是把rate疊加到streamid的value上

(defn update-rolling-window
  ([^RollingWindow rw time-secs & args]
     ;; this is 2.5x faster than using update-in...
     (let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))
           buckets (:buckets rw)
           curr (get buckets time-bucket)           
           curr (apply (:updater rw) curr args)
           ]
       (assoc rw :buckets (assoc buckets time-bucket curr))
       )))
相關文章
相關標籤/搜索