storm事件管理器EventManager源碼分析-event.clj

storm事件管理器定義在event.clj中,主要功能就是經過獨立線程執行"事件處理函數"。咱們能夠將"事件處理函數"添加到EventManager的阻塞隊列中,EventManager的事件處理線程不斷地從阻塞隊列中獲取"事件處理函數"並執行。java

EventManager協議

協議就是一組函數定義的集合,協議中函數的第一個參數必須爲實現該協議的實例自己,相似於java中實例方法的第一個參數爲this;協議相似於java中的接口。函數

 

( defprotocol EventManager
 ( add [ this event-fn ])
 ( waiting? [ this ])
 ( shutdown [ this ]))
 
event-manager函數

( defn event-manager
  "Creates a thread to respond to events. Any error will cause process to halt"
  ;; daemon?表示是否將事件處理線程設置成守護線程
  [ daemon? ]
  ;; added表示已添加的"事件處理函數"的個數
 ( let [ added ( atom 0)
      ;; processed表示已處理的"事件處理函數"的個數
        processed ( atom 0)
      ;; queue綁定事件管理器的阻塞隊列LinkedBlockingQueue
        ^ LinkedBlockingQueue queue ( LinkedBlockingQueue.)
      ;; 設置事件管理器的狀態爲"running"
        running ( atom true)
      ;; 建立事件處理線程。Clojure函數實現了Runnable和Callable接口,因此能夠將Clojure函數做爲參數傳遞給java.lang.Thread類的構造函數
        runner ( Thread.
               ;; 事件處理線程循環檢查事件處理器的狀態是不是"running",若是是,就從阻塞隊列中獲取"事件處理函數",並執行;而後將processed加1
                ( fn []
                  ( try-cause
                    ( while @ running
                      ( let [ r ( .take queue )]
                        ( r)
                        ( swap! processed inc)))
                    ( catch InterruptedException t
                      ( log-message "Event manager interrupted"))
                    ( catch Throwable t
                      ( log-error t "Error when processing event")
                      ( exit-process! 20 "Error when processing an event" )))))]
   ( .setDaemon runner daemon?)
    ;; 啓動事件處理線程
   ( .start runner)
    ;; 返回一個實現了EventManager協議的實例
   ( reify
      EventManager
      ;; add函數將"事件處理函數"添加到事件處理器的阻塞隊列中
     ( add
        [ this event-fn ]
        ;; should keep track of total added and processed to know if this is finished yet
       ( when-not @ running
         ( throw ( RuntimeException. "Cannot add events to a shutdown event manager")))
       ( swap! added inc)
       ( .put queue event-fn))
      ;; waiting?判斷事件處理線程是否處於等待狀態
     ( waiting?
        [ this ]
       ( or ( Time/isThreadWaiting runner)
           ( = @ processed @ added)))
      ;; 關閉事件管理器
     ( shutdown
        [ this ]
       ( reset! running false)
       ( .interrupt runner)
       ( .join runner)))))
相關文章
相關標籤/搜索