Spark程序在運行的過程當中,Driver端的不少功能都依賴於事件的傳遞和處理,而事件總線在這中間發揮着相當重要的紐帶做用。事件總線經過異步線程,提升了Driver執行的效率。html
Spark定義了一個特質[1]ListenerBus,能夠接收事件而且將事件提交到對應事件的監聽器。爲了對ListenerBus有個直觀的理解,咱們先來看看它的代碼實現,見代碼清單1。java
代碼清單1 ListenerBus的定義數組
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { private[spark] val listeners = new CopyOnWriteArrayList[L] final def addListener(listener: L): Unit = { listeners.add(listener) } final def removeListener(listener: L): Unit = { listeners.remove(listener) } final def postToAll(event: E): Unit = { val iter = listeners.iterator while (iter.hasNext) { val listener = iter.next() try { doPostEvent(listener, event) } catch { case NonFatal(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) } } } protected def doPostEvent(listener: L, event: E): Unit private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = { val c = implicitly[ClassTag[T]].runtimeClass listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq } }
代碼清單1中展現了ListenerBus是個泛型特質,其泛型參數爲 [L <: AnyRef, E],其中L是表明監聽器的泛型參數,能夠看到ListenerBus支持任何類型的監聽器,E是表明事件的泛型參數。ListenerBus中各個成員的做用以下:安全
下面將分別對如下內容進行介紹:數據結構
[1] 特質是Scala語言中提供真正的多重繼承的語法特性,相似於Java的Interface,可是又能夠實現方法。有關Scala特質的更多介紹請訪問Scala官網http://www.scala-lang.org。架構
理解了ListenerBus的定義後,本小節一塊兒來看看有哪些類繼承了它。ListenerBus的類繼承體系如圖1所示。app
圖1 ListenerBus的類繼承體系異步
從圖1中能夠看到有三種ListenerBus的具體實現,分別爲:ide
SparkListenerBus也有兩種實現:工具
有了對事件總線的這些介紹,讀者已經在宏觀上對其有所認識。可是若是沒有具體的實現,ListenerBus自己也沒法發揮做用。下一小節咱們將選擇對SparkListenerBus從更加微觀的角度說明如何使用事件總線。
有了上一節對ListenerBus類繼承體系的介紹,本小節將詳細介紹SparkListenerBus的實現,見代碼清單2。
代碼清單2 SparkListenerBus的實現
private[spark] trait SparkListenerBus extends ListenerBus[SparkListenerInterface, SparkListenerEvent] { protected override def doPostEvent( listener: SparkListenerInterface, event: SparkListenerEvent): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => listener.onStageSubmitted(stageSubmitted) case stageCompleted: SparkListenerStageCompleted => listener.onStageCompleted(stageCompleted) case jobStart: SparkListenerJobStart => listener.onJobStart(jobStart) case jobEnd: SparkListenerJobEnd => listener.onJobEnd(jobEnd) case taskStart: SparkListenerTaskStart => listener.onTaskStart(taskStart) case taskGettingResult: SparkListenerTaskGettingResult => listener.onTaskGettingResult(taskGettingResult) case taskEnd: SparkListenerTaskEnd => listener.onTaskEnd(taskEnd) case environmentUpdate: SparkListenerEnvironmentUpdate => listener.onEnvironmentUpdate(environmentUpdate) case blockManagerAdded: SparkListenerBlockManagerAdded => listener.onBlockManagerAdded(blockManagerAdded) case blockManagerRemoved: SparkListenerBlockManagerRemoved => listener.onBlockManagerRemoved(blockManagerRemoved) case unpersistRDD: SparkListenerUnpersistRDD => listener.onUnpersistRDD(unpersistRDD) case applicationStart: SparkListenerApplicationStart => listener.onApplicationStart(applicationStart) case applicationEnd: SparkListenerApplicationEnd => listener.onApplicationEnd(applicationEnd) case metricsUpdate: SparkListenerExecutorMetricsUpdate => listener.onExecutorMetricsUpdate(metricsUpdate) case executorAdded: SparkListenerExecutorAdded => listener.onExecutorAdded(executorAdded) case executorRemoved: SparkListenerExecutorRemoved => listener.onExecutorRemoved(executorRemoved) case blockUpdated: SparkListenerBlockUpdated => listener.onBlockUpdated(blockUpdated) case logStart: SparkListenerLogStart => // ignore event log metadata case _ => listener.onOtherEvent(event) } } }
咱們看到SparkListenerBus已經實現了ListenerBus的doPostEvent方法,經過對SparkListenerEvent事件的匹配,執行SparkListenerInterface監聽器的相應方法。
這裏的SparkListenerEvent實際上是個特質,代碼清單2中列出的SparkListenerStageSubmitted、SparkListenerStageCompleted等都是繼承了SparkListenerEvent特質的樣例類[2]。爲說明問題,這裏僅僅摘選SparkListenerEvent及部分SparkListenerEvent子類的實現以下:
@DeveloperApi @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event") trait SparkListenerEvent { protected[spark] def logEvent: Boolean = true } @DeveloperApi case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null) extends SparkListenerEvent @DeveloperApi case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent @DeveloperApi case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo) extends SparkListenerEvent // 省略其餘SparkListenerEvent的實現 private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
SparkListenerInterface也是一個特質,其中定義了全部SparkListener應當遵照的接口規範。因爲SparkListenerInterface中定義了不少接口,爲說明問題只摘抄SparkListenerInterface中的部分接口定義,代碼以下:
private[spark] trait SparkListenerInterface { def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit // 省略其餘接口方法 def onOtherEvent(event: SparkListenerEvent): Unit }
結合代碼清單2,咱們知道以上代碼片斷中的onStageCompleted和onStageSubmitted將在SparkListenerBus的doPostEvent方法中分別匹配到SparkListenerStageCompleted和SparkListenerStageSubmitted事件時執行,而對於doPostEvent中沒法匹配的事件,都將執行onOtherEvent方法。
在詳細介紹了ListenerBus及SparkListenerBus後,咱們知道當有事件須要通知監聽器的時候,能夠調用ListenerBus的postToAll方法,postToAll方法遍歷全部監聽器並調用SparkListenerBus實現的doPostEvent方法,doPostEvent方法對事件類型進行匹配後調用監聽器的不一樣方法。整個投遞事件的過程是經過方法調用實現的,因此這是一個同步調用。在監聽器比較多的時候這個過程會相對比較耗時(好比用於寫日誌的EventLoggingListener在調度頻繁的時候,有可能致使寫入延遲,這將致使部分事件的丟失。此問題已在spark2.3.0版本中獲得改進。),在Spark UI(在《Spark內核設計的藝術 架構設計與實現》一書的第4章中詳細介紹)中爲了達到頁面的即時刷新 ,實現了SparkListenerBus的子類LiveListenerBus。下一小節將圍繞LiveListenerBus來詳細說明異步投遞消息的實現細節。
[2] 樣例類是Scala語言的語法特性。樣例類是一種特殊的類型,經常使用做事件、參數、模式匹配等。有關樣例類的更多介紹,請讀者閱讀Scala語言的相關資料。
LiveListenerBus繼承了SparkListenerBus,並實現了將事件異步投遞給監聽器,達到實時刷新UI界面數據的效果。LiveListenerBus主要由如下部分組成:
listenerThread用於異步處理eventQueue中的事件,爲了便於說明,這裏將展現listenerThread及LiveListenerBus中的主要代碼片斷,見代碼清單3。
代碼清單3 LiveListenerBus主要邏輯的代碼片斷
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) private def validateAndGetQueueSize(): Int = { val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) if (queueSize <= 0) { throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!") } queueSize } private val started = new AtomicBoolean(false) private val stopped = new AtomicBoolean(false) private val droppedEventsCounter = new AtomicLong(0L) @volatile private var lastReportTimestamp = 0L private var processingEvent = false private val logDroppedEvent = new AtomicBoolean(false) private val eventLock = new Semaphore(0) private val listenerThread = new Thread(name) { setDaemon(true) override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { LiveListenerBus.withinListenerThread.withValue(true) { while (true) { eventLock.acquire() // 獲取信號量 self.synchronized { processingEvent = true } try { val event = eventQueue.poll //從eventQueue中獲取事件 if (event == null) { // Get out of the while loop and shutdown the daemon thread if (!stopped.get) { throw new IllegalStateException("Polling `null` from eventQueue means" + " the listener bus has been stopped. So `stopped` must be true") } return } postToAll(event) // 事件處理 } finally { self.synchronized { processingEvent = false } } } } } }
經過分析代碼清單3,listenerThread的工做步驟爲:
值得一提的是,listenerThread的run方法中調用了Utils的tryOrStopSparkContext,tryOrStopSparkContext方法能夠保證當listenerThread的內部循環拋出異常後啓動一個新的線程中止SparkContext(SparkContext的內容將在第4章詳細介紹,tryOrStopSparkContext方法的具體實現請閱讀Utils工具類的實現)。
在解釋了異步線程listenerThread的工做內容後,還有一個要點沒有解釋:eventQueue中的事件是如何放進去的呢?因爲eventQueue定義在LiveListenerBus中,所以ListenerBus和SparkListenerBus中並無操縱eventQueue的方法,要將事件放入eventQueue只能依靠LiveListenerBus本身了,其post方法就是爲此目的而生的,見代碼清單4。
代碼清單4 向LiveListenerBus投遞SparkListenerEvent事件
def post(event: SparkListenerEvent): Unit = { if (stopped.get) { logError(s"$name has already stopped! Dropping event $event") return } val eventAdded = eventQueue.offer(event) // 向eventQueue中添加事件 if (eventAdded) { eventLock.release() } else { onDropEvent(event) droppedEventsCounter.incrementAndGet() } // 打印刪除事件數的日誌 val droppedEvents = droppedEventsCounter.get if (droppedEvents > 0) { if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { val prevLastReportTimestamp = lastReportTimestamp lastReportTimestamp = System.currentTimeMillis() logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + new java.util.Date(prevLastReportTimestamp)) } } } }
從代碼清單4看到post方法的處理步驟以下:
與LiveListenerBus配合使用的監聽器,並不是是父類SparkListenerBus的類型參數SparkListenerInterface,而是繼承自SparkListenerInterface的SparkListener及其子類。圖2列出了Spark中監聽器SparkListener以及它的6種最經常使用的實現[3]。
圖2 SparkListener的類繼承體系
SparkListener雖然實現了SparkListenerInterface中的每一個方法,可是其實都是空實現,具體的實現須要交給子類去完成。
本文首先對事件總線的接口定義進行了一些介紹,以後選擇ListenerBus的子類SparkListenerBus與LiveListenerBus做爲具體的實現例子進行分析,最後本文選擇LiveListenerBus做爲具體的實現例子進行分析,這裏將經過圖3更加直觀的展現ListenerBus、SparkListenerBus及LiveListenerBus的工做原理。
圖3 LiveListenerBus的工做流程圖
最後對於圖3做一些補充說明:圖中的DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint及LocalSchedulerBackend都是LiveListenerBus的事件來源,它們都是經過調用LiveListenerBus的post方法將消息交給異步線程listenerThread處理的。
[3] 除了本節列出的的六種SparkListener的子類外,還有不少其餘的子類,這裏就不一一列出了,感興趣的讀者能夠查閱Spark相關文檔或閱讀源碼知曉。
通過近一年的準備,基於Spark2.1.0版本的《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:
紙質版售賣連接以下: