storm定時器與java.util.Timer定時器比較類似。java.util.Timer定時器其實是個線程,定時調度所擁有的TimerTasks;storm定時器也有一個線程負責調度所擁有的"定時任務"。storm定時器的"定時任務"是一個vector類型的數據[time, callback, uuid],內有會有三個值,分別是時間、函數、和uuid,很好理解,時間表示該定時任務何時執行,函數表示要執行的函數,uuid用於標識該"定時任務"。"定時任務"被存放到定時器的PriorityQueue隊列中(和PriorityBlockingQueue區別,在於沒有阻塞機制,不是線程安全的)。優先級隊列是堆數據結構的典型應用,若是不提供Comparator的話,優先隊列中元素默認按天然順序排列,也就是數字默認是小的在隊列頭,字符串則按字典序排列(參閱 Comparable),也能夠根據 Comparator 來指定,這取決於使用哪一種構造方法。優先級隊列不容許null元素。依靠天然排序的優先級隊列還不容許插入不可比較的對象(這樣作可能致使 ClassCastException)。固然也能夠本身從新實現Comparator接口, 好比storm定時器就用reify從新實現了Comparator接口。storm定時器的執行過程比較簡單,經過timer-thread,不斷檢查PriorityQueue裏面時間最小的"定時任務"是否已經能夠觸發了, 若是能夠(當前時間>=執行時間),就poll出來,調用callback,並sleep。storm定時器相關的函數均定義在timer.clj文件中,storm定時器是由mk-timer函數建立的,mk-timer函數定義以下:
mk-timer函數java
;; kill-fn函數會在timer-thread發生exception的時候被調用,timer-name標識定時器的名稱 (defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil] ;; queue綁定PriorityQueue隊列,建立PriorityQueue隊列時指定隊列初始容量爲10,並指定一個Comparator比 ;;較器,Comparator比較器比較"定時任務"執行時間的大小,這樣每次poll出執行時間最小的"定時任務", ;; PriorityQueue隊列是一個依賴執行時間的小頂堆 (let [queue (PriorityQueue. 10 (reify Comparator (compare [this o1 o2] (- (first o1) (first o2))) (equals [this obj] true))) ;; active標識timer-thread是"active"的 active (atom true) ;; 建立一個鎖,由於PriorityQueue並非線程安全的,因此經過這個鎖,可使多線程互斥訪問PriorityQueue lock (Object.) ;; notifier是一個java信號量,初始值爲0,notifier信號量的主要功能就是當咱們調用cancel-timer函數中斷 ;; 一個timer-thread時,等待timer-thread結束,當timer-thread結束前會release notifier信號量 notifier (Semaphore. 0) ;; thread-name綁定timer-thread線程名,沒有指定時默認爲"timer" thread-name (if timer-name timer-name "timer") ;; timer-thread線程 timer-thread (Thread. (fn [] ;; 當timer-thread爲"active"即active=true時,進入while循環 (while @active (try ;; peek函數從PriorityQueue獲取執行時間最小的"定時任務",但並不出隊列。time-mil ;; lis綁定執行時間,elem綁定"定時任務"數據 (let [[time-millis _ _ :as elem] (locking lock (.peek queue))] ;; 若是elem不爲nil且當前時間>=執行時間,那麼先加鎖,而後poll出該"定時任務", ;; 並將"定時任務"的callback函數綁定到afn,最後調用該函數;不然判斷time-millis ;; 是否爲nil。 ;; 咱們能夠發現該定時器是軟時間執行"定時任務"的,也就是說"定時任務"有可能被延 ;; 遲執行,同時若是afn函數執行時間比較長,那麼會影響下一個"定時任務"的執行 (if (and elem (>= (current-time-millis) time-millis)) ;; It is imperative to not run the function ;; inside the timer lock. Otherwise, it is ;; possible to deadlock if the fn deals with ;; other locks, like the submit lock. (let [afn (locking lock (second (.poll queue)))] ;; 執行"定時任務"的callback函數 (afn)) ;; 該if語句是上面if語句的else分支,判斷time-millis是否爲nil,若是time-mill ;; is不爲nil,則timer-thread線程sleep(執行時間-當前時間);不然sleep(1000) ;; 代表PriorityQueue中沒有"定時任務" (if time-millis ;; If any events are scheduled, sleep until ;; event generation. If any recurring events ;; are scheduled then we will always go ;; through this branch, sleeping only the ;; exact necessary amount of time. (Time/sleep (- time-millis (current-time-millis))) ;; Otherwise poll to see if any new event ;; was scheduled. This is, in essence, the ;; response time for detecting any new event ;; schedulings when there are no scheduled ;; events. (Time/sleep 1000)))) (catch Throwable t ;; Because the interrupted exception can be ;; wrapped in a RuntimeException. ;; 檢查是不是InterruptedException,若是是InterruptedException,說明線程是由 ;; 於接收interrupt信號而中斷的,不作異常處理,不然調用kill-fn函數、修改線程 ;; 狀態並拋出該異常 (when-not (exception-cause? InterruptedException t) (kill-fn t) (reset! active false) (throw t))))) ;; release notifier信號量,標識timer—thread運行結束 (.release notifier)) thread-name)] ;; 設置timer-thread爲守護線程 (.setDaemon timer-thread true) ;; 設置timer-thread爲最高優先級 (.setPriority timer-thread Thread/MAX_PRIORITY) ;; 啓動timer-thread線程 (.start timer-thread) ;; 返回該定時器的"屬性" {:timer-thread timer-thread :queue queue :active active :lock lock :cancel-notifier notifier}))
咱們能夠經過調用cancel-timer函數中斷一個timer-thread線程,cancel-timer函數定義以下:
cancel-timer函數web
(defn cancel-timer [timer] ;; 檢查timer狀態是不是"active",若是不是則拋出異常 (check-active! timer) ;; 加鎖 (locking (:lock timer) ;; 將timer的狀態active設置成false,即"dead" (reset! (:active timer) false) ;; 調用interrupt方法,中斷線程,經過mk-timer函數咱們能夠知道在線程的run方法內調用了sleep方法, ;; 當接收到中斷新號後會拋出InterruptedException異常使線程退出 (.interrupt (:timer-thread timer))) ;; acquire timer中的notifier信號量,由於只有當線程結束前纔會release notifier信號量,因此此處是等待線程;;; 結束 (.acquire (:cancel-notifier timer)))
check-active!函數定義以下:
check-active!函數shell
(defn- check-active! [timer] (when-not @(:active timer) (throw (IllegalStateException. "Timer is not active"))))
經過調用schedule函數和schedule-recurring函數咱們能夠向storm定時器中添加"定時任務"。schedule函數定義以下:
schedule函數安全
(defnk schedule ;; timer綁定定時器,delay-secs綁定"定時任務"相對當前時間的延遲時間,afn綁定callback函數,check-active是;; 否須要檢查定時器 [timer delay-secs afn :check-active true] ;; 檢查定時器狀態 (when check-active (check-active! timer)) (let [id (uuid) ^PriorityQueue queue (:queue timer)] ;; 加鎖,執行時間=當前時間+延遲時間,將"定時任務"的vector類型數據添加到PriorityQueue隊列中 (locking (:lock timer) (.add queue [(+ (current-time-millis) (secs-to-millis-long delay-secs)) afn id]))))
schedule-recurring函數定義以下:
chedule-recurring函數也很簡單,與schedule函數的區別就是在"定時任務"的callback函數中又添加了一個相同的"定時任務"。schedule函數的語義能夠理解成向定時器添加
一個"一次性任務",schedule-recurring函數的語義能夠理解成向定時器添加"一個週期執行的定時任務"(開始執行時間=當前時間+延遲時間,而後每隔recur-secs執行一次)
schedule-recurring函數數據結構
(defn schedule-recurring [timer delay-secs recur-secs afn] (schedule timer delay-secs (fn this [] (afn) ; This avoids a race condition with cancel-timer. (schedule timer recur-secs this :check-active false))))
nimbus檢查心跳和重分配任務的實現就是經過schedule-recurring函數向storm定時器添加了一個"週期任務"實現的。多線程
(schedule-recurring (:timer nimbus) 0 (conf NIMBUS-MONITOR-FREQ-SECS) (fn [] (when (conf NIMBUS-REASSIGN) (locking (:submit-lock nimbus) (mk-assignments nimbus))) (do-cleanup nimbus) ))