Spark中大量採用事件監聽方式,實現driver端的組件之間的通訊。本文就來解釋一下Spark中事件監聽是如何實現的java
觀察者模式和監聽器在設計模式中有一個觀察者模式,該模式創建一種對象與對象之間的依賴關係,一個對象狀態發生改變時當即通知其餘對象,其餘對象就據此做出相應的反應。其中發生改變的對象稱之爲觀察目標(也有叫主題的),被通知的對象稱之爲觀察者,能夠有多個觀察者註冊到一個觀察目標中,這些觀察者之間沒有聯繫,其數量能夠根據須要增減。編程
事件驅動的異步化編程Spark-Core內部的事件框架實現了基於事件的異步化編程模式。它的最大好處是能夠提高應用程序對物理資源的充分利用,能最大限度的壓榨物理資源,提高應用程序的處理效率。缺點比較明顯,下降了應用程序的可讀性。Spark的基於事件的異步化編程框架由事件框架和異步執行線程池組成,應用程序產生的Event發送給ListenerBus,ListenerBus再把消息廣播給全部的Listener,每一個Listener收到Event判斷是否本身感興趣的Event,如果,會在Listener獨享的線程池中執行Event所對應的邏輯程序塊。下圖展現Event、ListenerBus、Listener、Executor的關係,從事件生成、事件傳播、事件解釋三個方面的視角來看。 設計模式
咱們從線程的視角來看,看異步化處理。異步化處理體如今事件傳播、事件解釋兩個階段,其中事件解釋的異步化實現了咱們的基於事件的異步化編程。app
Spark的實現Spark-Core、Spark-Streaming採用了分類的思路(分而治之)進行管理,每一大類事件都有獨自的Event、ListenerBus框架
Spark-Core的核心事件trait是SparkListenerEvent,Spark-Straming的核心事件trait是StreamingListenerEvent異步
下圖是各類事件實體類:ide
咱們在定義事件須要注意哪些方面呢?咱們以SparkListenerTaskStart爲例,分析一個事件擁有哪些特徵。post
Spark-Core的核心監聽triat是SparkListener,Spark-Streaming的核心監聽triat StreamingListener,二者都表明了一類監聽的抽象ui
下圖是一些監聽實體類:spa
監聽器總線對象,Spark程序在運行的過程當中,Driver端的不少功能都依賴於事件的傳遞和處理,而事件總線在這中間發揮着相當重要的紐帶做用。事件總線經過異步線程,提升了Driver執行的效率。Listener註冊到ListenerBus對象中,而後經過ListenerBus對象來實現事件監聽(相似於計算機與周邊設備之間的關係)
其start方法直接啓動一個dispatchThread,其核心邏輯就是不停地在一個事件隊列eventQueue裏取出事件,若是事件合法且LiverListenerBus沒有被關停,就將事件通知給全部註冊的listener中
其dispatch方法就是向事件隊列裏添加相應的事件。
ListenerBus用於管理全部的Listener,Spark-Core和Spark-Streaming公用相同的trait ListenerBus, 最終都是使用AsyncEventQueue類對Listener進行管理。
管理全部註冊的Listener,爲一類Listener建立一個惟一的AsyncEventQueue,廣播Event到全部的Listener。默承認提供四類AsyncEventQueue分別爲‘shared’、‘appStatus’、‘executorManagement’、‘eventLog’。目前Spark-Core並無放開類別設置,意謂着最多隻能有上述四類,從設計的嚴謹上來說分類並非越多越好,每多一個類別,就會多一個AsyncEventQueue實例,每一個實例中會包含一個事件傳播的線程,對系統的資源佔用仍是比較多的。
private val listenerThread = new Thread(name) { setDaemon(true) //線程自己設爲守護線程 override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { LiveListenerBus.withinListenerThread.withValue(true) { while (true) { eventLock.acquire()//不斷獲取信號量,信號量減一,能獲取到說明還有事件未處理 self.synchronized { processingEvent = true } try { val event = eventQueue.poll //獲取事件, remove() 和 poll() 方法都是從隊列中刪除第一個元素(head)。 if (event == null) { // 此時說明沒有事件,但仍是拿到信號量了,這說明stop方法被調用了 // 跳出while循環,關閉守護進程線程 if (!stopped.get) { throw new IllegalStateException("Polling `null` from eventQueue means" + " the listener bus has been stopped. So `stopped` must be true") } return } // 調用ListenerBus的postxToAll(event: E)方法 postxToAll(event) } finally { self.synchronized { processingEvent = false } } } } } }
private val started = new AtomicBoolean(false) private val stopped = new AtomicBoolean(false) //存放事件 private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent] // 表示隊列中產生和使用的事件數量的計數器,這個信號量是爲了不消費者線程空跑 private val eventLock = new Semaphore(0)
LiveListenerBus在SparkContext的setupAndStartListenerBus中被初始化,並調用start方法啓動LiveListenerBus。
def start(): Unit = { if (started.compareAndSet(false, true)) { listenerThread.start() //啓動消費者線程 } else { throw new IllegalStateException(s"$name already started!") }
中止LiveListenerBus,它將等待隊列事件被處理,但在中止後丟掉全部新的事件。須要注意stop可能會致使長時間的阻塞,執行stop方法的線程會被掛起,直到全部的AsyncEventQueue(默認四個)中的dispatch線程都退出後執行stop主法的線程纔會被喚醒。
def stop(): Unit = { if (!started.get()) { throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") } if (stopped.compareAndSet(false, true)) { // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know // `stop` is called. // 釋放一個信號量,但此時是沒有事件的,從而listenerThread會拿到一個空事件,從而知道該中止了 eventLock.release() //而後等待消費者線程自動關閉 listenerThread.join() } else { // Keep quiet } }
採用廣播的方式事件傳播,這個過程很快,主線程只須要把事件傳播給AsyncEventQueue便可,最後由AsyncEventQueue再廣播給相應的Listener
def post(event: SparkListenerEvent): Unit = { if (stopped.get) { // Drop further events to make `listenerThread` exit ASAP logError(s"$name has already stopped! Dropping event $event") return } // 在事件隊列隊尾添加事件 // add()和offer()區別:二者都是往隊列尾部插入元素,不一樣的時候,當超出隊列界限的時候,add()方法是拋出異常讓你處理,而offer()方法是直接返回false val eventAdded = eventQueue.offer(event) if (eventAdded) { //若是成功加入隊列,則在信號量中加一 eventLock.release() } else { // 若是事件隊列超過其容量,則將刪除新的事件,這些子類將被通知到刪除事件。 onDropEvent(event) droppedEventsCounter.incrementAndGet() } val droppedEvents = droppedEventsCounter.get if (droppedEvents > 0) { // Don't log too frequently 日誌不要太頻繁 // 若是上一次,隊列滿了EVENT_QUEUE_CAPACITY=1000設置的值,就丟掉,而後記錄一個時間,若是一直持續丟掉,那麼每過60秒記錄一第二天志,否則日誌會爆滿的 if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { val prevLastReportTimestamp = lastReportTimestamp lastReportTimestamp = System.currentTimeMillis() // 記錄一個warn日誌,表示這個事件,被丟棄了 logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + new java.util.Date(prevLastReportTimestamp)) } } } }
圖中的DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint及LocalSchedulerBackend都是LiveListenerBus的事件來源,它們都是經過調用LiveListenerBus的post方法將消息提交給事件隊列,每post一個事件,信號量就加一。
listenerThread不停的獲取信號量,而後從事件隊列中取出事件,取到事件,則調用postForAll把事件分發給已註冊的監聽器,不然,就是取到空事件,它明白這是事件總線搞的鬼,它調用了stop可是每post事件,從而中止事件總線線程。