storm事件管理器定義在event.clj中,主要功能就是經過獨立線程執行"事件處理函數"。咱們能夠將"事件處理函數"添加到EventManager的阻塞隊列中,EventManager的事件處理線程不斷地從阻塞隊列中獲取"事件處理函數"並執行。java
EventManager協議
協議就是一組函數定義的集合,協議中函數的第一個參數必須爲實現該協議的實例自己,相似於java中實例方法的第一個參數爲this;協議相似於java中的接口。函數
(
defprotocol
EventManager
(
add
[
this
event-fn
])
(
waiting?
[
this
])
(
shutdown
[
this
]))
event-manager函數
(
defn
event-manager
"Creates a thread to respond to events. Any error will cause process to halt"
;; daemon?表示是否將事件處理線程設置成守護線程
[
daemon?
]
;; added表示已添加的"事件處理函數"的個數
(
let
[
added (
atom
0)
;; processed表示已處理的"事件處理函數"的個數
processed (
atom
0)
;; queue綁定事件管理器的阻塞隊列LinkedBlockingQueue
^
LinkedBlockingQueue
queue (
LinkedBlockingQueue.)
;; 設置事件管理器的狀態爲"running"
running (
atom
true)
;; 建立事件處理線程。Clojure函數實現了Runnable和Callable接口,因此能夠將Clojure函數做爲參數傳遞給java.lang.Thread類的構造函數
runner (
Thread.
;; 事件處理線程循環檢查事件處理器的狀態是不是"running",若是是,就從阻塞隊列中獲取"事件處理函數",並執行;而後將processed加1
(
fn
[]
(
try-cause
(
while
@
running
(
let
[
r (
.take
queue
)]
(
r)
(
swap!
processed
inc)))
(
catch
InterruptedException
t
(
log-message
"Event manager interrupted"))
(
catch
Throwable
t
(
log-error
t
"Error when processing event")
(
exit-process!
20
"Error when processing an event"
)))))]
(
.setDaemon
runner
daemon?)
;; 啓動事件處理線程
(
.start
runner)
;; 返回一個實現了EventManager協議的實例
(
reify
EventManager
;; add函數將"事件處理函數"添加到事件處理器的阻塞隊列中
(
add
[
this
event-fn
]
;; should keep track of total added and processed to know if this is finished yet
(
when-not
@
running
(
throw (
RuntimeException.
"Cannot add events to a shutdown event manager")))
(
swap!
added
inc)
(
.put
queue
event-fn))
;; waiting?判斷事件處理線程是否處於等待狀態
(
waiting?
[
this
]
(
or (
Time/isThreadWaiting
runner)
(
=
@
processed
@
added)))
;; 關閉事件管理器
(
shutdown
[
this
]
(
reset!
running
false)
(
.interrupt
runner)
(
.join
runner)))))