Spark事件監聽總線流程分析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

Spark商業環境實戰及調優進階系列

1. Spark事件監聽總線流程分析

1.1 Spark事件監聽總線流程分析

以下圖所示事件日誌監聽器EventLoggingListener,實現了SparkListenerInterface接口,重寫了全部的事件處理函數,包括Stage提交,stage完成等。緩存

經過在SparkContext初始化時把日誌監聽器EventLoggingListener註冊到LiveListenerBus事件總線上,並啓動LiveListenerBus內部的Thread線程,監聽提交到總線上的事件,調用SparkListenerBus的eventQueue.poll -->postToAll(event) --> doPostEvent方法,並進行事件匹配後處理,如:EventLoggingListener執行StageSubmited提交。架構

1.2 Spark UI 事件監聽總線流程分析

Spark UI的可視化展現,是有不一樣的監聽器實現的,他們都分別註冊在LiveListenerBus上,以下面SparkContext的初始化片斷:app

if (conf.getBoolean("spark.ui.enabled", true)) {
    Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
      _env.securityManager, appName, startTime = startTime))
  } else {
    // For tests, do not enable the UI
    None
  }
複製代碼

下面片斷展現的是SparkUI.createLiveUI方法,能夠看到監聽器的註冊,經過事件的投遞(如:DAGScheduler ,DriverEndpoint等),從而實現UI的數據展現:框架

val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
  val listener = new JobProgressListener(conf)
  listenerBus.addListener(listener)
  listener
}

val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)

listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
listenerBus.addListener(operationGraphListener)
複製代碼

3 結語

秦凱新 於深圳 2018-10-28函數

相關文章
相關標籤/搜索