Spark RPC框架源碼分析(三)Spark心跳機制分析

一.Spark心跳概述

前面兩節中介紹了Spark RPC的基本知識,以及深刻剖析了Spark RPC中一些源碼的實現流程。html

具體能夠看這裏:java

這一節咱們來看看一個Spark RPC中的運用實例--Spark的心跳機制。固然此次主要仍是從代碼的角度來看。程序員

Spark心跳

咱們首先要知道Spark的心跳有什麼用。心跳是分佈式技術的基礎,咱們知道在Spark中,是有一個Master和衆多的Worker,那麼Master怎麼知道每一個Worker的狀況呢,這就須要藉助心跳機制了。心跳除了傳輸信息,另外一個主要的做用就是Worker告訴Master它還活着,小心跳中止時,方便Master進行一些容錯操做,好比數據轉移備份等等。算法

與以前講Spark RPC同樣,咱們一樣分紅兩部分來分析Spark的心跳機制,分爲服務端(Spark Context)和客戶端(Executor)。併發

二. Spark心跳服務端heartbeatReceiver解析

咱們能夠發現,SparkContext中有關於心跳的類以及RpcEndpoint註冊代碼。框架

class SparkContext(config: SparkConf) extends Logging {
    ......
    private var _heartbeatReceiver: RpcEndpointRef = _
    ......
    //向 RpcEnv 註冊 Endpoint。
    _heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
    ......
      val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    ......
}

這裏rpcEnv已經在上下文中建立好,經過setupEndpoint向rpcEnv註冊一個心跳的Endpoint。還記得上一節中HelloworldServer的例子嗎,在setupEndpoint方法中,會去調用Dispatcher建立這個Endpoint(這裏就是HeartbeatReceiver)對應的Inbox和EndpointRef,而後在Inbox監聽是否有新消息,有新消息則處理它。註冊完會返回一個EndpointRef(注意這裏有Refer,便是客戶端,用來發送消息的)。dom

因此這一句分佈式

_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

就已經完成了心跳服務端監聽的功能。
那麼這條代碼的做用呢?ide

_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

這裏咱們要看上面那句val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode),它會根據master url建立SchedulerBackend和TaskScheduler。這兩個類都是和資源調度有關的,因此須要藉助心跳機制來傳送消息。其中TaskScheduler負責任務調度資源分配,SchedulerBackend負責與Master、Worker通訊收集Worker上分配給該應用使用的資源狀況。oop

這裏主要是告訴HeartbeatReceiver(心跳)的監聽端,告訴它TaskScheduler這個東西已經設置好啦。HeartbeatReceiver就會迴應你說好的,我知道的,並持有這個TaskScheduler。

到這裏服務端heartbeatReceiver就差很少完了,咱們能夠發現,HeartbeatReceiver除了向RpcEnv註冊並監聽消息以外,還會去持有一些資源調度相關的類,好比TaskSchedulerIsSet。

三. Spark心跳客戶端發送心跳解析

發送心跳發送在Worker,每一個Worker都會有一個Executor,因此咱們能夠發如今Executor中發送心跳的代碼。

private[spark] class Executor(
    executorId: String,
    executorHostname: String,
    env: SparkEnv,
    userClassPath: Seq[URL] = Nil,
    isLocal: Boolean = false)
  extends Logging {
  ......
  // must be initialized before running startDriverHeartbeat()
  //建立心跳的 EndpointRef
  private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
  ......
  startDriverHeartbeater()
  ......
    /**
   * Schedules a task to report heartbeat and partial metrics for active tasks to driver.
   * 用一個 task 來報告活躍任務的信息以及發送心跳。
   */
  private def startDriverHeartbeater(): Unit = {
    val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")

    // Wait a random interval so the heartbeats don't end up in sync
    val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

    val heartbeatTask = new Runnable() {
      override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
    }
    //heartbeater是一個單線程線程池,scheduleAtFixedRate 是定時執行任務用的,和 schedule 相似,只是一些策略不一樣。
    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
  }
  ......
}

能夠看到,在Executor中會建立心跳的EndpointRef,變量名爲heartbeatReceiverRef。

而後咱們主要看startDriverHeartbeater()這個方法,它是關鍵。
咱們能夠看到最後部分代碼

val heartbeatTask = new Runnable() {
      override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
    }
    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)

heartbeatTask是一個Runaable,即一個線程任務。scheduleAtFixedRate則是java concurrent包中用來執行定時任務的一個類,這裏的意思是每隔10s跑一次heartbeatTask中的線程任務,超時時間30s。

爲何到這裏仍是沒看到heartbeatReceiverRef呢,說好的發送心跳呢?別急,其實在heartbeatTask線程任務中又調用了另外一個方法,咱們到裏面去一探究竟。

private[spark] class Executor(
    executorId: String,
    executorHostname: String,
    env: SparkEnv,
    userClassPath: Seq[URL] = Nil,
    isLocal: Boolean = false)
  extends Logging {
  ......
  private def reportHeartBeat(): Unit = {
    // list of (task id, accumUpdates) to send back to the driver
    val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
    val curGCTime = computeTotalGcTime()

    for (taskRunner <- runningTasks.values().asScala) {
      if (taskRunner.task != null) {
        taskRunner.task.metrics.mergeShuffleReadMetrics()
        taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
        accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators()))
      }
    }

    val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
    try {
      //終於看到 heartbeatReceiverRef 的身影了
      val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
          message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
      if (response.reregisterBlockManager) {
        logInfo("Told to re-register on heartbeat")
        env.blockManager.reregister()
      }
      heartbeatFailures = 0
    } catch {
      case NonFatal(e) =>
        logWarning("Issue communicating with driver in heartbeater", e)
        heartbeatFailures += 1
        if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
          logError(s"Exit as unable to send heartbeats to driver " +
            s"more than $HEARTBEAT_MAX_FAILURES times")
          System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
        }
    }
  }
  ......
  
}

能夠看到,這裏heartbeatReceiverRef和咱們上一節的例子,HelloworldClient相似,核心也是調用了askWithRetry()方法,這個方法是經過同步的方式發送Rpc消息。而這個方法裏其餘代碼其實就是獲取task的信息啊,或者是一些容錯處理。核心就是調用askWithRetry()方法來發送消息。

看到這你就明白了吧。Executor初始化便會用一個定時任務不斷髮送心跳,同時當有task的時候,會獲取task的信息一併發送。這就是心跳的大概內容了。

總的來講Spark心跳的代碼也是比較雜的,不過這些也都是爲了讓設計更加高耦合,低內聚,讓這些代碼更加方便得複用。不過經過層層剖析,咱們仍是發現其實它底層就是咱們以前說到的Spark RPC框架的內容!!

OK,Spark RPC三部曲完畢。若是你能看到這裏那不容易呀,給本身點個贊吧!!


推薦閱讀 :
從分治算法到 MapReduce
大數據存儲的進化史 --從 RAID 到 Hadoop Hdfs
一個故事告訴你什麼纔是好的程序員

相關文章
相關標籤/搜索