會發現, 如今storm裏面有兩套metrics系統, metrics framework和stats frameworknode
而且在全部地方都是同時註冊兩套, 貌似準備用metrics來替代stats, 但當前版本UI仍然使用statswindows
這個模塊統計的數據怎麼被使用, 數據結構
1. 在worker中, 會按期調用do-executor-heartbeats去往zk同步hb
能夠看到, stats也會做爲hb的一部分被同步到zk上app
(defnk do-executor-heartbeats [worker :executors nil] ;; stats is how we know what executors are assigned to this worker (let [stats (if-not executors (into {} (map (fn [e] {e nil}) (:executors worker))) (->> executors (map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)})) (apply merge))) zk-hb {:storm-id (:storm-id worker) :executor-stats stats
:uptime ((:uptime worker))
:time-secs (current-time-secs)
}]
;; do the zookeeper heartbeat
(.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb)
))
2. 如今任何人均可以經過nimbus的thrift接口來獲得相關信息
函數
(^TopologyInfo getTopologyInfo [this ^String storm-id]
beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))
stats (:stats heartbeat))
3. 最直接的用戶就是storm UI, 在準備topology page的時候, 就會調用getTopologyInfo來獲取數據this
(defn topology-page [id window include-sys?] (with-nimbus nimbus (let [summ (.getTopologyInfo ^Nimbus$Client nimbus id)] )
這個模塊用於spout和bolt來抽樣統計數據, 須要統計的具體metics以下atom
(def COMMON-FIELDS [:emitted :transferred]) (defrecord CommonStats [emitted transferred rate]) (def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies]) ;;acked and failed count individual tuples (defrecord BoltExecutorStats [common acked failed process-latencies executed execute-latencies]) (def SPOUT-FIELDS [:acked :failed :complete-latencies]) ;;acked and failed count tuple completion (defrecord SpoutExecutorStats [common acked failed complete-latencies])
抽樣的比例在storm-conf, TOPOLOGY_STATS_SAMPLE_RATE, 配置spa
爲何統計時每次加rate, 而不是加1?orm
由於這裏的統計是抽樣的, 因此若是抽樣比例是10%, 那麼發現一個, 應該加1/(10%), 10個接口
(defn sampling-rate [conf] (->> (conf TOPOLOGY-STATS-SAMPLE-RATE) (/ 1) int))
而後統計是基於時間窗口的, 底下是對應默認的bucket和時間窗口的定義
(def NUM-STAT-BUCKETS 20) ;;bucket數 ;; 10 minutes, 3 hours, 1 day ;;定義3種時間窗口 (def STAT-BUCKETS [30 540 4320]) ;;bucket大小分別是30,540,4320秒
核心數據結構是RollingWindowSet, 包含:
統計數據須要的函數, updater extractor, 之因此治理也須要是由於須要統計all-time
一組rolling windows, 默認是3個時間窗, 10 minutes, 3 hours, 1 day
all-time, 在完整的時間區間上的統計結果
(defrecord RollingWindowSet [updater extractor windows all-time]) (defn rolling-window-set [updater merger extractor num-buckets & bucket-sizes] (RollingWindowSet. updater extractor (dofor [s bucket-sizes] (rolling-window updater merger extractor s num-buckets)) nil) )
繼續看看rolling window的定義,
核心數據, buckets, hashmap, {streamid, data}, 初始化爲{}
統計data須要的函數, updater merger extractor
時間窗口, buckets大小和buckets個數
(defrecord RollingWindow [updater merger extractor bucket-size-secs num-buckets buckets]) (defn rolling-window [updater merger extractor bucket-size-secs num-buckets] (RollingWindow. updater merger extractor bucket-size-secs num-buckets {}))
在mk-executedata的時候須要建立stats
mk-executor-stats <> (sampling-rate storm-conf)
;; TODO: refactor this to be part of an executor-specific map
(defmethod mk-executor-stats :spout [_ rate]
(stats/mk-spout-stats rate))
(defmethod mk-executor-stats :bolt [_ rate]
(stats/mk-bolt-stats rate))
第一個參數忽略, 其實就是分別調用stats/mk-spout-stats或stats/mk-bolt-stats, 可見就是對於每一個須要統計的數據, 建立一個rolling-windows-set
(defn- mk-common-stats [rate] (CommonStats. (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS)) (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS)) rate )) (defn mk-bolt-stats [rate] (BoltExecutorStats. (mk-common-stats rate) (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS)) (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS)) (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS)) (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS)) (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS)) )) (defn mk-spout-stats [rate] (SpoutExecutorStats. (mk-common-stats rate) (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS)) (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS)) (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS)) ))
(defn spout-acked-tuple! [^SpoutExecutorStats stats stream latency-ms] (update-executor-stat! stats :acked stream (stats-rate stats)) (update-executor-stat! stats :complete-latencies stream latency-ms) )
(defmacro update-executor-stat! [stats path & args] (let [path (collectify path)] `(swap! (-> ~stats ~@path) update-rolling-window-set ~@args) ))
就以update-executor-stat! stats :acked stream (stats-rate stats)爲例子看看怎麼作的?
SpoutExecutorStats取出用於記錄spout acked狀況的rolling-windows-set
而後使用update-rolling-window-set來swap這個atom
來看看記錄acked的rolling-windows-set是如何定義的?
keyed-counter-rolling-window-set, 預約義了updater merger extractor
updater, incr-val [amap key amt], 把給定的值amt加到amap的對應的key的value上
merger, (partial merge-with +), 用+做爲map merge的邏輯, 即出現相同key則相加
extractor, counter-extract, (if v v {}), 有則返回, 無則返回{}
windows, rolling-window的list
all-time, 初始化爲nil
(defn keyed-counter-rolling-window-set [num-buckets & bucket-sizes] (apply rolling-window-set incr-val (partial merge-with +) counter-extract num-buckets bucket-sizes))
好, 下面就看看, 當spout-acked-tuple!時更新:acked時, 如何update的?
首先更新每一個rolling-window, 並把更新過的rolling-window-set更新到:windows
而且更新:all-time, (apply (:updater rws) (:all-time rws) args)
updated, incr-val [amap key amt]
args, steamid, rate
all-time, 是用來記錄整個時間區間上的, 某個stream的統計狀況
(defn update-rolling-window-set ([^RollingWindowSet rws & args] (let [now (current-time-secs) new-windows (dofor [w (:windows rws)] (apply update-rolling-window w now args))] (assoc rws :windows new-windows :all-time (apply (:updater rws) (:all-time rws) args)) )))
看下如何更新某個rolling-windw
根據now算出當前屬於哪一個bucket, time-bucket
取出buckets, 並使用:updater更新相應的bucket, 這裏的操做仍然是把rate疊加到streamid的value上
(defn update-rolling-window
([^RollingWindow rw time-secs & args]
;; this is 2.5x faster than using update-in...
(let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))
buckets (:buckets rw)
curr (get buckets time-bucket)
curr (apply (:updater rw) curr args)
]
(assoc rw :buckets (assoc buckets time-bucket curr))
)))