本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法
以下圖所示事件日誌監聽器EventLoggingListener,實現了SparkListenerInterface接口,重寫了全部的事件處理函數,包括Stage提交,stage完成等。緩存
經過在SparkContext初始化時把日誌監聽器EventLoggingListener註冊到LiveListenerBus事件總線上,並啓動LiveListenerBus內部的Thread線程,監聽提交到總線上的事件,調用SparkListenerBus的eventQueue.poll -->postToAll(event) --> doPostEvent方法,並進行事件匹配後處理,如:EventLoggingListener執行StageSubmited提交。架構
Spark UI的可視化展現,是有不一樣的監聽器實現的,他們都分別註冊在LiveListenerBus上,以下面SparkContext的初始化片斷:app
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}
複製代碼
下面片斷展現的是SparkUI.createLiveUI方法,能夠看到監聽器的註冊,經過事件的投遞(如:DAGScheduler ,DriverEndpoint等),從而實現UI的數據展現:框架
val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
val listener = new JobProgressListener(conf)
listenerBus.addListener(listener)
listener
}
val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf)
listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
listenerBus.addListener(operationGraphListener)
複製代碼
秦凱新 於深圳 2018-10-28函數