歡迎轉載,轉載請註明出處,徽滬一郎.java
WEB UI和Metrics子系統爲外部觀察監測Spark內部運行狀況提供了必要的窗口,本文將簡略的過一下其內部代碼實現。web
先上圖感覺一下spark webui 假設當前已經在本機運行standalone cluster模式,輸入http://127.0.0.1:8080將會看到以下頁面
json
driver application默認會打開4040端口進行http監聽,能夠看到application相關的詳細信息設計模式
顯示每一個stage的詳細信息tomcat
本節要討論的重點是http server是如何啓動的,頁面中的數據是從哪裏獲取到的?Spark中用到的http server是jetty, jetty採用java編寫,是很是輕巧的servlet engine和http server。可以嵌入到用戶程序中執行,不用像tomcat或jboss那樣須要本身獨立的jvm進程。app
SparkUI在SparkContext初始化的時候建立 jvm
// Initialize the Spark UI , registering all associated listeners private [spark] val ui = new SparkUI (this) ui.bind ()
initialize的主要工做是註冊頁面處理句柄,WebUI的子類須要實現本身的initialize函數ide
bind將真正啓動jetty server.函數
def bind () { assert (! serverInfo .isDefined , " Attempted to bind % s more than once!". format ( className )) try { // 啓 動 JettyServer serverInfo = Some( startJettyServer (" 0.0.0.0 ", port , handlers , conf)) logInfo (" Started %s at http ://%s:%d". format ( className , publicHostName , boundPort )) } catch { case e: Exception => logError (" Failed to bind %s". format ( className ) , e) System .exit (1) } }
在startJettyServer函數中將JettyServer運行起來的關鍵處理函數是connectoop
def connect(currentPort: Int): (Server, Int) = { val server = new Server(new InetSocketAddress(hostName, currentPort)) val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) server.setHandler(collection) Try { server.start() } match { case s: Success[_] => (server, server.getConnectors.head.getLocalPort) case f: Failure[_] => val nextPort = (currentPort + 1) % 65536 server.stop() pool.stop() val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort." if (f.toString.contains("Address already in use")) { logWarning(s"$msg - $f") } else { logError(msg, f.exception) } connect(nextPort) } } val (server, boundPort) = connect(port) ServerInfo(server, boundPort, collection) }
頁面中的數據是如何獲取的呢,這就要歸功於SparkListener了,典型的觀察者設計模式。當有與stage及task相關的事件發生時,這些Listener都將收到通知,並進行數據更新。
須要指出的是,數據儘管得以自動更新,但頁面並無,仍是須要手工刷新才能獲得最新的數據。
上圖顯示的是SparkUI中註冊了哪些SparkListener子類。來看一看這些子類是在何時註冊進去的, 注意研究一下SparkUI.initialize函
def initialize() { listenerBus.addListener(storageStatusListener) val jobProgressTab = new JobProgressTab(this) attachTab(jobProgressTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/stages", basePath = basePath)) attachHandler( createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest)) if (live) { sc.env.metricsSystem.getServletHandlers.foreach(attachHandler) } }
舉一個實際例子來看看Notifier發送Event的時刻,好比有任務提交的時 resourceOffer->taskStarted->handleBeginEvent
private [ scheduler ] def handleBeginEvent (task: Task[_ ], taskInfo : TaskInfo ) { listenerBus .post( SparkListenerTaskStart (task. stageId , taskInfo )) submitWaitingStages () }
post實際上是向listenerBus的消息隊列中添加一個消息,真正將消息發送 出去的時另外一個處理線程listenerThread
override def run (): Unit = Utils. logUncaughtExceptions { while (true) { eventLock . acquire () // Atomically remove and process this event LiveListenerBus .this. synchronized { val event = eventQueue .poll if (event == SparkListenerShutdown ) { // Get out of the while loop and shutdown the daemon thread return } Option (event). foreach ( postToAll ) } } }
Option(event).foreach(postToAll)負責將事件通知給各個Observer.postToAll的函數實現以下
def postToAll(event: SparkListenerEvent) { event match { case stageSubmitted: SparkListenerStageSubmitted => foreachListener(_.onStageSubmitted(stageSubmitted)) case stageCompleted: SparkListenerStageCompleted => foreachListener(_.onStageCompleted(stageCompleted)) case jobStart: SparkListenerJobStart => foreachListener(_.onJobStart(jobStart)) case jobEnd: SparkListenerJobEnd => foreachListener(_.onJobEnd(jobEnd)) case taskStart: SparkListenerTaskStart => foreachListener(_.onTaskStart(taskStart)) case taskGettingResult: SparkListenerTaskGettingResult => foreachListener(_.onTaskGettingResult(taskGettingResult)) case taskEnd: SparkListenerTaskEnd => foreachListener(_.onTaskEnd(taskEnd)) case environmentUpdate: SparkListenerEnvironmentUpdate => foreachListener(_.onEnvironmentUpdate(environmentUpdate)) case blockManagerAdded: SparkListenerBlockManagerAdded => foreachListener(_.onBlockManagerAdded(blockManagerAdded)) case blockManagerRemoved: SparkListenerBlockManagerRemoved => foreachListener(_.onBlockManagerRemoved(blockManagerRemoved)) case unpersistRDD: SparkListenerUnpersistRDD => foreachListener(_.onUnpersistRDD(unpersistRDD)) case applicationStart: SparkListenerApplicationStart => foreachListener(_.onApplicationStart(applicationStart)) case applicationEnd: SparkListenerApplicationEnd => foreachListener(_.onApplicationEnd(applicationEnd)) case SparkListenerShutdown => } }
在系統設計中,測量模塊是不可或缺的組成部分。經過這些測量數據來感知系統的運行狀況。
在Spark中,測量模塊由MetricsSystem來擔任,MetricsSystem中有三個重要的概念,分述以下。
Spark目前支持將測量數據保存或發送到以下目的地
下面從MetricsSystem的建立,數據源的添加,數據更新與發送幾個方面來跟蹤一下源碼。
MetricsSystem依賴於由codahale提供的第三方庫Metrics,能夠在metrics.codahale.com找到更爲詳細的介紹。
以Driver Application爲例,driver application首先會初始化SparkContext,在SparkContext的初始化過程當中就會建立MetricsSystem,具體調用關係以下。 SparkContext.init->SparkEnv.init->MetricsSystem.createMetricsSystem
註冊數據源,繼續以SparkContext爲例
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) private def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) SparkEnv.get.metricsSystem.registerSource(blockManagerSource) } initDriverMetrics()
數據讀取由Sink來完成,在Spark中建立的Sink子類以下圖所示
讀取最新的數據,以CsvSink爲例,最主要的就是建立CsvReporter,啓動以後會按期更新最近的數據到console。不一樣類型的Sink所使用的Reporter是不同的。
val reporter: CsvReporter = CsvReporter.forRegistry(registry) .formatFor(Locale.US) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build(new File(pollDir)) override def start() { reporter.start(pollPeriod, pollUnit) }
Spark中關於metrics子系統的配置文件詳見conf/metrics.properties. 默認的Sink是MetricsServlet,在任務提交執行以後,輸入http://127.0.0.1:4040/metrics/json會獲得以json格式保存的metrics信息。