storm定時器timer源碼分析-timer.clj

storm定時器與java.util.Timer定時器比較類似。java.util.Timer定時器其實是個線程,定時調度所擁有的TimerTasks;storm定時器也有一個線程負責調度所擁有的"定時任務"。storm定時器的"定時任務"是一個vector類型的數據[time, callback, uuid],內有會有三個值,分別是時間、函數、和uuid,很好理解,時間表示該定時任務何時執行,函數表示要執行的函數,uuid用於標識該"定時任務"。"定時任務"被存放到定時器的PriorityQueue隊列中(和PriorityBlockingQueue區別,在於沒有阻塞機制,不是線程安全的)。優先級隊列是堆數據結構的典型應用,若是不提供Comparator的話,優先隊列中元素默認按天然順序排列,也就是數字默認是小的在隊列頭,字符串則按字典序排列(參閱 Comparable),也能夠根據 Comparator 來指定,這取決於使用哪一種構造方法。優先級隊列不容許null元素。依靠天然排序的優先級隊列還不容許插入不可比較的對象(這樣作可能致使 ClassCastException)。固然也能夠本身從新實現Comparator接口, 好比storm定時器就用reify從新實現了Comparator接口。storm定時器的執行過程比較簡單,經過timer-thread,不斷檢查PriorityQueue裏面時間最小的"定時任務"是否已經能夠觸發了, 若是能夠(當前時間>=執行時間),就poll出來,調用callback,並sleep。storm定時器相關的函數均定義在timer.clj文件中,storm定時器是由mk-timer函數建立的,mk-timer函數定義以下:
mk-timer函數java

;; kill-fn函數會在timer-thread發生exception的時候被調用,timer-name標識定時器的名稱

(defnk mk-timer [:kill-fn (fn [& _] ) :timer-name nil]

    ;; queue綁定PriorityQueue隊列,建立PriorityQueue隊列時指定隊列初始容量爲10,並指定一個Comparator比     ;;較器,Comparator比較器比較"定時任務"執行時間的大小,這樣每次poll出執行時間最小的"定時任務",

    ;; PriorityQueue隊列是一個依賴執行時間的小頂堆

    (let [queue (PriorityQueue. 10 (reify Comparator

                               (compare

                                 [this o1 o2]

                                 (- (first o1) (first o2)))

                               (equals

                                 [this obj]

                                 true)))

    ;; active標識timer-thread是"active"的

    active (atom true)

    ;; 建立一個鎖,由於PriorityQueue並非線程安全的,因此經過這個鎖,可使多線程互斥訪問PriorityQueue

    lock (Object.)

    ;; notifier是一個java信號量,初始值爲0,notifier信號量的主要功能就是當咱們調用cancel-timer函數中斷     ;; 一個timer-thread時,等待timer-thread結束,當timer-thread結束前會release notifier信號量

    notifier (Semaphore. 0)

    ;; thread-name綁定timer-thread線程名,沒有指定時默認爲"timer"

    thread-name (if timer-name timer-name "timer")

    ;; timer-thread線程

    timer-thread (Thread.

                   (fn []

                     ;; 當timer-thread爲"active"即active=true時,進入while循環

                     (while @active

                       (try

                         ;; peek函數從PriorityQueue獲取執行時間最小的"定時任務",但並不出隊列。time-mil                         ;; lis綁定執行時間,elem綁定"定時任務"數據

                         (let [[time-millis _ _ :as elem] (locking lock (.peek queue))]

                         ;; 若是elem不爲nil且當前時間>=執行時間,那麼先加鎖,而後poll出該"定時任務",                           ;; 並將"定時任務"的callback函數綁定到afn,最後調用該函數;不然判斷time-millis                          ;; 是否爲nil。

                         ;; 咱們能夠發現該定時器是軟時間執行"定時任務"的,也就是說"定時任務"有可能被延                          ;; 遲執行,同時若是afn函數執行時間比較長,那麼會影響下一個"定時任務"的執行

                           (if (and elem (>= (current-time-millis) time-millis))

                             ;; It is imperative to not run the function

                             ;; inside the timer lock. Otherwise, it is

                             ;; possible to deadlock if the fn deals with

                             ;; other locks, like the submit lock.

                             (let [afn (locking lock (second (.poll queue)))]

                               ;; 執行"定時任務"的callback函數

                               (afn))

                             ;; 該if語句是上面if語句的else分支,判斷time-millis是否爲nil,若是time-mill                             ;; is不爲nil,則timer-thread線程sleep(執行時間-當前時間);不然sleep(1000)                              ;; 代表PriorityQueue中沒有"定時任務"

                             (if time-millis

                               ;; If any events are scheduled, sleep until

                               ;; event generation. If any recurring events

                               ;; are scheduled then we will always go

                               ;; through this branch, sleeping only the

                               ;; exact necessary amount of time.

                               (Time/sleep (- time-millis (current-time-millis)))

                               ;; Otherwise poll to see if any new event

                               ;; was scheduled. This is, in essence, the

                               ;; response time for detecting any new event

                               ;; schedulings when there are no scheduled

                               ;; events.

                               (Time/sleep 1000))))

                         (catch Throwable t

                           ;; Because the interrupted exception can be

                           ;; wrapped in a RuntimeException.

                           ;; 檢查是不是InterruptedException,若是是InterruptedException,說明線程是由                            ;; 於接收interrupt信號而中斷的,不作異常處理,不然調用kill-fn函數、修改線程                            ;; 狀態並拋出該異常

                           (when-not (exception-cause? InterruptedException t)

                             (kill-fn t)

                             (reset! active false)

                             (throw t)))))

                     ;; release notifier信號量,標識timer—thread運行結束

                     (.release notifier)) thread-name)]

;; 設置timer-thread爲守護線程

(.setDaemon timer-thread true)

;; 設置timer-thread爲最高優先級

(.setPriority timer-thread Thread/MAX_PRIORITY)

;; 啓動timer-thread線程

(.start timer-thread)

;; 返回該定時器的"屬性"

{:timer-thread timer-thread

 :queue queue

 :active active

 :lock lock
 

 :cancel-notifier notifier}))


咱們能夠經過調用cancel-timer函數中斷一個timer-thread線程,cancel-timer函數定義以下:
cancel-timer函數web

(defn cancel-timer

[timer]

;; 檢查timer狀態是不是"active",若是不是則拋出異常

(check-active! timer)

    ;; 加鎖

    (locking (:lock timer)

    ;; 將timer的狀態active設置成false,即"dead"

    (reset! (:active timer) false)

    ;; 調用interrupt方法,中斷線程,經過mk-timer函數咱們能夠知道在線程的run方法內調用了sleep方法,         ;; 當接收到中斷新號後會拋出InterruptedException異常使線程退出

    (.interrupt (:timer-thread timer)))

;; acquire timer中的notifier信號量,由於只有當線程結束前纔會release notifier信號量,因此此處是等待線程;;; 結束

(.acquire (:cancel-notifier timer)))

check-active!函數定義以下:
check-active!函數
shell

(defn- check-active!

[timer]

(when-not @(:active timer)

    (throw (IllegalStateException. "Timer is not active"))))

經過調用schedule函數和schedule-recurring函數咱們能夠向storm定時器中添加"定時任務"。schedule函數定義以下:
schedule函數安全

(defnk schedule

;; timer綁定定時器,delay-secs綁定"定時任務"相對當前時間的延遲時間,afn綁定callback函數,check-active是;; 否須要檢查定時器

[timer delay-secs afn :check-active true]

;; 檢查定時器狀態

(when check-active (check-active! timer))

(let [id (uuid)

    ^PriorityQueue queue (:queue timer)]

  ;; 加鎖,執行時間=當前時間+延遲時間,將"定時任務"的vector類型數據添加到PriorityQueue隊列中

  (locking (:lock timer)

    (.add queue [(+ (current-time-millis) (secs-to-millis-long delay-secs)) afn id]))))

schedule-recurring函數定義以下:
chedule-recurring函數也很簡單,與schedule函數的區別就是在"定時任務"的callback函數中又添加了一個相同的"定時任務"。schedule函數的語義能夠理解成向定時器添加
一個"一次性任務",schedule-recurring函數的語義能夠理解成向定時器添加"一個週期執行的定時任務"(開始執行時間=當前時間+延遲時間,而後每隔recur-secs執行一次)
schedule-recurring函數數據結構

(defn schedule-recurring

[timer delay-secs recur-secs afn]

(schedule timer

        delay-secs

        (fn this []

          (afn)

          ; This avoids a race condition with cancel-timer.

          (schedule timer recur-secs this :check-active false))))

nimbus檢查心跳和重分配任務的實現就是經過schedule-recurring函數向storm定時器添加了一個"週期任務"實現的。多線程

(schedule-recurring (:timer nimbus)

    0

    (conf NIMBUS-MONITOR-FREQ-SECS)

    (fn []

      (when (conf NIMBUS-REASSIGN)

        (locking (:submit-lock nimbus)

          (mk-assignments nimbus)))

      (do-cleanup nimbus)

      ))
相關文章
相關標籤/搜索