spark 源碼分析之三 -- LiveListenerBus介紹

LiveListenerBus 

官方說明以下:node

Asynchronously passes SparkListenerEvents to registered SparkListeners.apache

即它的功能是異步地將SparkListenerEvent傳遞給已經註冊的SparkListener,這種異步的機制是經過生產消費者模型來實現的。數組

首先,它定義了 4 個 消息堵塞隊列,隊列的名字分別爲shared、appStatus、executorManagement、eventLog。隊列的類型是 org.apache.spark.scheduler.AsyncEventQueue#AsyncEventQueue,保存在 queues 變量中。每個隊列上均可以註冊監聽器,若是隊列沒有監聽器,則會被移除。app

它有啓動和stop和start兩個標誌位來指示 監聽總線的的啓動中止狀態。 若是總線沒有啓動,有事件過來,先放到 一個待添加的可變數組中,不然直接將事件 post 到每個隊列中。異步

其直接依賴類是 AsyncEventQueue, 至關於 LiveListenerBus 的多事件隊列是對 AsyncEventQueue 進一步的封裝。ide

AsyncEventQueue

其繼承關係以下:post

  

它有啓動和stop和start兩個標誌位來指示 監聽總線的的啓動中止狀態。spa

其內部維護了listenersPlusTimers 主要就是用來保存註冊到這個總線上的監聽器對象的。code

post 操做將事件放入內部的 LinkedBlockingQueue中,默認大小是 10000。對象

有一個事件分發器,它不停地從 LinkedBlockingQueue 執行 take 操做,獲取事件,並將事件進一步分發給全部的監聽器,由org.apache.spark.scheduler.SparkListenerBus#doPostEvent 方法實現事件轉發,具體代碼以下:

 1 protected override def doPostEvent(
 2  listener: SparkListenerInterface, 3 event: SparkListenerEvent): Unit = { 4  event match { 5 case stageSubmitted: SparkListenerStageSubmitted => 6  listener.onStageSubmitted(stageSubmitted) 7 case stageCompleted: SparkListenerStageCompleted => 8  listener.onStageCompleted(stageCompleted) 9 case jobStart: SparkListenerJobStart => 10  listener.onJobStart(jobStart) 11 case jobEnd: SparkListenerJobEnd => 12  listener.onJobEnd(jobEnd) 13 case taskStart: SparkListenerTaskStart => 14  listener.onTaskStart(taskStart) 15 case taskGettingResult: SparkListenerTaskGettingResult => 16  listener.onTaskGettingResult(taskGettingResult) 17 case taskEnd: SparkListenerTaskEnd => 18  listener.onTaskEnd(taskEnd) 19 case environmentUpdate: SparkListenerEnvironmentUpdate => 20  listener.onEnvironmentUpdate(environmentUpdate) 21 case blockManagerAdded: SparkListenerBlockManagerAdded => 22  listener.onBlockManagerAdded(blockManagerAdded) 23 case blockManagerRemoved: SparkListenerBlockManagerRemoved => 24  listener.onBlockManagerRemoved(blockManagerRemoved) 25 case unpersistRDD: SparkListenerUnpersistRDD => 26  listener.onUnpersistRDD(unpersistRDD) 27 case applicationStart: SparkListenerApplicationStart => 28  listener.onApplicationStart(applicationStart) 29 case applicationEnd: SparkListenerApplicationEnd => 30  listener.onApplicationEnd(applicationEnd) 31 case metricsUpdate: SparkListenerExecutorMetricsUpdate => 32  listener.onExecutorMetricsUpdate(metricsUpdate) 33 case executorAdded: SparkListenerExecutorAdded => 34  listener.onExecutorAdded(executorAdded) 35 case executorRemoved: SparkListenerExecutorRemoved => 36  listener.onExecutorRemoved(executorRemoved) 37 case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage => 38  listener.onExecutorBlacklistedForStage(executorBlacklistedForStage) 39 case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage => 40  listener.onNodeBlacklistedForStage(nodeBlacklistedForStage) 41 case executorBlacklisted: SparkListenerExecutorBlacklisted => 42  listener.onExecutorBlacklisted(executorBlacklisted) 43 case executorUnblacklisted: SparkListenerExecutorUnblacklisted => 44  listener.onExecutorUnblacklisted(executorUnblacklisted) 45 case nodeBlacklisted: SparkListenerNodeBlacklisted => 46  listener.onNodeBlacklisted(nodeBlacklisted) 47 case nodeUnblacklisted: SparkListenerNodeUnblacklisted => 48  listener.onNodeUnblacklisted(nodeUnblacklisted) 49 case blockUpdated: SparkListenerBlockUpdated => 50  listener.onBlockUpdated(blockUpdated) 51 case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted => 52  listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted) 53 case _ => listener.onOtherEvent(event) 54  } 55 }

而後去調用 listener 的相對應的方法。

就這樣,事件總線上的消息事件被監聽器消費了。

相關文章
相關標籤/搜索