前面兩節中介紹了Spark RPC的基本知識,以及深刻剖析了Spark RPC中一些源碼的實現流程。html
具體能夠看這裏:java
這一節咱們來看看一個Spark RPC中的運用實例--Spark的心跳機制。固然此次主要仍是從代碼的角度來看。程序員
咱們首先要知道Spark的心跳有什麼用。心跳是分佈式技術的基礎,咱們知道在Spark中,是有一個Master和衆多的Worker,那麼Master怎麼知道每一個Worker的狀況呢,這就須要藉助心跳機制了。心跳除了傳輸信息,另外一個主要的做用就是Worker告訴Master它還活着,小心跳中止時,方便Master進行一些容錯操做,好比數據轉移備份等等。算法
與以前講Spark RPC同樣,咱們一樣分紅兩部分來分析Spark的心跳機制,分爲服務端(Spark Context)和客戶端(Executor)。併發
咱們能夠發現,SparkContext中有關於心跳的類以及RpcEndpoint註冊代碼。框架
class SparkContext(config: SparkConf) extends Logging { ...... private var _heartbeatReceiver: RpcEndpointRef = _ ...... //向 RpcEnv 註冊 Endpoint。 _heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) ...... val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) ...... }
這裏rpcEnv已經在上下文中建立好,經過setupEndpoint向rpcEnv註冊一個心跳的Endpoint。還記得上一節中HelloworldServer的例子嗎,在setupEndpoint方法中,會去調用Dispatcher建立這個Endpoint(這裏就是HeartbeatReceiver)對應的Inbox和EndpointRef,而後在Inbox監聽是否有新消息,有新消息則處理它。註冊完會返回一個EndpointRef(注意這裏有Refer,便是客戶端,用來發送消息的)。dom
因此這一句分佈式
_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
就已經完成了心跳服務端監聽的功能。
那麼這條代碼的做用呢?ide
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
這裏咱們要看上面那句val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode),它會根據master url建立SchedulerBackend和TaskScheduler。這兩個類都是和資源調度有關的,因此須要藉助心跳機制來傳送消息。其中TaskScheduler負責任務調度資源分配,SchedulerBackend負責與Master、Worker通訊收集Worker上分配給該應用使用的資源狀況。oop
這裏主要是告訴HeartbeatReceiver(心跳)的監聽端,告訴它TaskScheduler這個東西已經設置好啦。HeartbeatReceiver就會迴應你說好的,我知道的,並持有這個TaskScheduler。
到這裏服務端heartbeatReceiver就差很少完了,咱們能夠發現,HeartbeatReceiver除了向RpcEnv註冊並監聽消息以外,還會去持有一些資源調度相關的類,好比TaskSchedulerIsSet。
發送心跳發送在Worker,每一個Worker都會有一個Executor,因此咱們能夠發如今Executor中發送心跳的代碼。
private[spark] class Executor( executorId: String, executorHostname: String, env: SparkEnv, userClassPath: Seq[URL] = Nil, isLocal: Boolean = false) extends Logging { ...... // must be initialized before running startDriverHeartbeat() //建立心跳的 EndpointRef private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv) ...... startDriverHeartbeater() ...... /** * Schedules a task to report heartbeat and partial metrics for active tasks to driver. * 用一個 task 來報告活躍任務的信息以及發送心跳。 */ private def startDriverHeartbeater(): Unit = { val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") // Wait a random interval so the heartbeats don't end up in sync val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int] val heartbeatTask = new Runnable() { override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat()) } //heartbeater是一個單線程線程池,scheduleAtFixedRate 是定時執行任務用的,和 schedule 相似,只是一些策略不一樣。 heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS) } ...... }
能夠看到,在Executor中會建立心跳的EndpointRef,變量名爲heartbeatReceiverRef。
而後咱們主要看startDriverHeartbeater()這個方法,它是關鍵。
咱們能夠看到最後部分代碼
val heartbeatTask = new Runnable() { override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat()) } heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
heartbeatTask是一個Runaable,即一個線程任務。scheduleAtFixedRate則是java concurrent包中用來執行定時任務的一個類,這裏的意思是每隔10s跑一次heartbeatTask中的線程任務,超時時間30s。
爲何到這裏仍是沒看到heartbeatReceiverRef呢,說好的發送心跳呢?別急,其實在heartbeatTask線程任務中又調用了另外一個方法,咱們到裏面去一探究竟。
private[spark] class Executor( executorId: String, executorHostname: String, env: SparkEnv, userClassPath: Seq[URL] = Nil, isLocal: Boolean = false) extends Logging { ...... private def reportHeartBeat(): Unit = { // list of (task id, accumUpdates) to send back to the driver val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]() val curGCTime = computeTotalGcTime() for (taskRunner <- runningTasks.values().asScala) { if (taskRunner.task != null) { taskRunner.task.metrics.mergeShuffleReadMetrics() taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators())) } } val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) try { //終於看到 heartbeatReceiverRef 的身影了 val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse]( message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() } heartbeatFailures = 0 } catch { case NonFatal(e) => logWarning("Issue communicating with driver in heartbeater", e) heartbeatFailures += 1 if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) { logError(s"Exit as unable to send heartbeats to driver " + s"more than $HEARTBEAT_MAX_FAILURES times") System.exit(ExecutorExitCode.HEARTBEAT_FAILURE) } } } ...... }
能夠看到,這裏heartbeatReceiverRef和咱們上一節的例子,HelloworldClient相似,核心也是調用了askWithRetry()方法,這個方法是經過同步的方式發送Rpc消息。而這個方法裏其餘代碼其實就是獲取task的信息啊,或者是一些容錯處理。核心就是調用askWithRetry()方法來發送消息。
看到這你就明白了吧。Executor初始化便會用一個定時任務不斷髮送心跳,同時當有task的時候,會獲取task的信息一併發送。這就是心跳的大概內容了。
總的來講Spark心跳的代碼也是比較雜的,不過這些也都是爲了讓設計更加高耦合,低內聚,讓這些代碼更加方便得複用。不過經過層層剖析,咱們仍是發現其實它底層就是咱們以前說到的Spark RPC框架的內容!!
OK,Spark RPC三部曲完畢。若是你能看到這裏那不容易呀,給本身點個贊吧!!
推薦閱讀 :
從分治算法到 MapReduce
大數據存儲的進化史 --從 RAID 到 Hadoop Hdfs
一個故事告訴你什麼纔是好的程序員