Apache Spark源碼走讀之21 -- WEB UI和Metrics初始化及數據更新過程分析

歡迎轉載,轉載請註明出處,徽滬一郎.java

概要

WEB UI和Metrics子系統爲外部觀察監測Spark內部運行狀況提供了必要的窗口,本文將簡略的過一下其內部代碼實現。web

WEB UI

先上圖感覺一下spark webui 假設當前已經在本機運行standalone cluster模式,輸入http://127.0.0.1:8080將會看到以下頁面

json

  driver application默認會打開4040端口進行http監聽,能夠看到application相關的詳細信息
設計模式

顯示每一個stage的詳細信息
tomcat

啓動過程

本節要討論的重點是http server是如何啓動的,頁面中的數據是從哪裏獲取到的?Spark中用到的http server是jetty, jetty採用java編寫,是很是輕巧的servlet engine和http server。可以嵌入到用戶程序中執行,不用像tomcat或jboss那樣須要本身獨立的jvm進程。app

SparkUI在SparkContext初始化的時候建立 jvm

// Initialize the Spark UI , registering all
associated listeners
private [spark] val ui = new SparkUI (this)
ui.bind ()

initialize的主要工做是註冊頁面處理句柄,WebUI的子類須要實現本身的initialize函數ide

bind將真正啓動jetty server.函數

def bind () {
assert (! serverInfo .isDefined , " Attempted to bind %
s more than once!". format ( className ))
try {
// 啓 動 JettyServer
serverInfo = Some( startJettyServer (" 0.0.0.0 ",
port , handlers , conf))
logInfo (" Started %s at http ://%s:%d". format (
className , publicHostName , boundPort ))
} catch {
case e: Exception =>
logError (" Failed to bind %s". format ( className )
, e)
System .exit (1)
}
}

在startJettyServer函數中將JettyServer運行起來的關鍵處理函數是connectoop

def connect(currentPort: Int): (Server, Int) = {
      val server = new Server(new InetSocketAddress(hostName, currentPort))
      val pool = new QueuedThreadPool
      pool.setDaemon(true)
      server.setThreadPool(pool)
      server.setHandler(collection)

      Try {
        server.start()
      } match {
        case s: Success[_] =>
          (server, server.getConnectors.head.getLocalPort)
        case f: Failure[_] =>
          val nextPort = (currentPort + 1) % 65536
          server.stop()
          pool.stop()
          val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort."
          if (f.toString.contains("Address already in use")) {
            logWarning(s"$msg - $f")
          } else {
            logError(msg, f.exception)
          }
          connect(nextPort)
      }
    }

    val (server, boundPort) = connect(port)
    ServerInfo(server, boundPort, collection)
  }

數據獲取

頁面中的數據是如何獲取的呢,這就要歸功於SparkListener了,典型的觀察者設計模式。當有與stage及task相關的事件發生時,這些Listener都將收到通知,並進行數據更新。

須要指出的是,數據儘管得以自動更新,但頁面並無,仍是須要手工刷新才能獲得最新的數據。

 

上圖顯示的是SparkUI中註冊了哪些SparkListener子類。來看一看這些子類是在何時註冊進去的, 注意研究一下SparkUI.initialize函

def initialize() {
    listenerBus.addListener(storageStatusListener)
    val jobProgressTab = new JobProgressTab(this)
    attachTab(jobProgressTab)
    attachTab(new StorageTab(this))
    attachTab(new EnvironmentTab(this))
    attachTab(new ExecutorsTab(this))
    attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
    attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
    attachHandler(
      createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
    if (live) {
      sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
    }
  }

舉一個實際例子來看看Notifier發送Event的時刻,好比有任務提交的時 resourceOffer->taskStarted->handleBeginEvent

private [ scheduler ] def handleBeginEvent (task: Task[_
], taskInfo : TaskInfo ) {
listenerBus .post( SparkListenerTaskStart (task.
stageId , taskInfo ))
submitWaitingStages ()
}

post實際上是向listenerBus的消息隊列中添加一個消息,真正將消息發送 出去的時另外一個處理線程listenerThread

override def run (): Unit = Utils.
logUncaughtExceptions {
while (true) {
eventLock . acquire ()
// Atomically remove and process this event
LiveListenerBus .this. synchronized {
val event = eventQueue .poll
if (event == SparkListenerShutdown ) {
// Get out of the while loop and shutdown
the daemon thread
return
}
Option (event). foreach ( postToAll )
}
}
}

Option(event).foreach(postToAll)負責將事件通知給各個Observer.postToAll的函數實現以下

def postToAll(event: SparkListenerEvent) {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        foreachListener(_.onStageSubmitted(stageSubmitted))
      case stageCompleted: SparkListenerStageCompleted =>
        foreachListener(_.onStageCompleted(stageCompleted))
      case jobStart: SparkListenerJobStart =>
        foreachListener(_.onJobStart(jobStart))
      case jobEnd: SparkListenerJobEnd =>
        foreachListener(_.onJobEnd(jobEnd))
      case taskStart: SparkListenerTaskStart =>
        foreachListener(_.onTaskStart(taskStart))
      case taskGettingResult: SparkListenerTaskGettingResult =>
        foreachListener(_.onTaskGettingResult(taskGettingResult))
      case taskEnd: SparkListenerTaskEnd =>
        foreachListener(_.onTaskEnd(taskEnd))
      case environmentUpdate: SparkListenerEnvironmentUpdate =>
        foreachListener(_.onEnvironmentUpdate(environmentUpdate))
      case blockManagerAdded: SparkListenerBlockManagerAdded =>
        foreachListener(_.onBlockManagerAdded(blockManagerAdded))
      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
      case unpersistRDD: SparkListenerUnpersistRDD =>
        foreachListener(_.onUnpersistRDD(unpersistRDD))
      case applicationStart: SparkListenerApplicationStart =>
        foreachListener(_.onApplicationStart(applicationStart))
      case applicationEnd: SparkListenerApplicationEnd =>
        foreachListener(_.onApplicationEnd(applicationEnd))
      case SparkListenerShutdown =>
    }
  }

Metrics

在系統設計中,測量模塊是不可或缺的組成部分。經過這些測量數據來感知系統的運行狀況。

在Spark中,測量模塊由MetricsSystem來擔任,MetricsSystem中有三個重要的概念,分述以下。

  • instance 表示誰在使用metrics system, 目前已知的有master, worker, executor和client driver會建立metrics system用以測量
  • source 表示數據源,從哪裏獲取數據
  • sinks 數據目的地,將從source獲取的數據發送到哪

Spark目前支持將測量數據保存或發送到以下目的地

  • ConsoleSink 輸出到console
  • CSVSink 按期保存成爲CSV文件
  • JmxSink 註冊到JMX,以經過JMXConsole來查看
  • MetricsServlet 在SparkUI中添加MetricsServlet用以查看Task運行時的測量數據
  • GraphiteSink 發送給Graphite以對整個系統(不單單包括spark)進行監控

下面從MetricsSystem的建立,數據源的添加,數據更新與發送幾個方面來跟蹤一下源碼。

初始化過程

MetricsSystem依賴於由codahale提供的第三方庫Metrics,能夠在metrics.codahale.com找到更爲詳細的介紹。

以Driver Application爲例,driver application首先會初始化SparkContext,在SparkContext的初始化過程當中就會建立MetricsSystem,具體調用關係以下。 SparkContext.init->SparkEnv.init->MetricsSystem.createMetricsSystem

註冊數據源,繼續以SparkContext爲例

private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
  private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)

  private def initDriverMetrics() {
    SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
    SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
  }
initDriverMetrics()

數據讀取

數據讀取由Sink來完成,在Spark中建立的Sink子類以下圖所示

讀取最新的數據,以CsvSink爲例,最主要的就是建立CsvReporter,啓動以後會按期更新最近的數據到console。不一樣類型的Sink所使用的Reporter是不同的。

val reporter: CsvReporter = CsvReporter.forRegistry(registry)
      .formatFor(Locale.US)
      .convertDurationsTo(TimeUnit.MILLISECONDS)
      .convertRatesTo(TimeUnit.SECONDS)
      .build(new File(pollDir))
      
  override def start() {
    reporter.start(pollPeriod, pollUnit)
  }

Spark中關於metrics子系統的配置文件詳見conf/metrics.properties. 默認的Sink是MetricsServlet,在任務提交執行以後,輸入http://127.0.0.1:4040/metrics/json會獲得以json格式保存的metrics信息。

相關文章
相關標籤/搜索