《深刻理解Spark-核心思想與源碼分析》(五)第五章任務提交與執行

即欲捭之貴周,即欲闔之貴密。周密之貴,微而與道相隨。---《鬼谷子》緩存

解釋:譯文:若是要分析問題,關鍵在於周詳,若是要綜合概括問題,關鍵在於嚴密。周詳嚴密的關鍵在於精深而與道相隨。網絡

解詞:捭闔(bǎihé):開合。意爲運用某些計策和手段,使雙方聯合或分化。含此義的成語有「縱橫捭闔」。

  解析:《鬼谷子》是一部充滿着謀略和智慧的名著。「即欲捭之貴周,即欲闔之貴密」十分鮮明地體現了此書的特色。文中首先提出了「捭」和「闔」,這是兩種不一樣的克敵制勝的計策。而後,再提出運用這兩種計策必須具備的智慧,即「捭」要貴於周詳,「闔」要貴於嚴密。在此基礎上,又進一步指出,「周密之貴」在於「微而與道相隨」。「微」意在「周密」之上,再進一步精深嚴密。最後提升到「道」上,則是理性的昇華。

  讀此名句,在於有助於謀略的深化和智慧的提高。數據結構

5.1任務概述

1.完成RDD的轉換及DAG的構建閉包

2.完成finalStage的建立與Stage的劃分,作好Stage與Task的準備工做後,最後提交Stage與Task。函數

3.使用集羣管理器分配資源與任務調度,,對於失敗的有重試和容錯機制。oop

4.執行任務ui

5.2 廣播Hadoop的配置信息

SparkContext的BroadCast方法用於廣播Hadoop的配置信息。scala

  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
      "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }

  上面的代碼經過使用BroadcastManager發送廣播,廣播結束將廣播對象註冊到ContextCleaner中,以便清理。設計

  代碼中BroadcastManager的newBroadcast方法實際上代理了broadcastFactory的newBroadcast。代理

5.3 RDD轉換及DAG構建

爲何須要RDD?

下面從四個方面解釋:

1.數據模型方面

RDD是一個容錯的、並行的數據結構,能夠控制將數據存儲到磁盤或者內存,可以獲取數據的分區。

2.依賴劃分原則

依賴主要分爲寬依賴和窄依賴,窄依賴劃分爲用一個Stage,管道方式迭代執行。

寬依賴的上游RDD不止一個,每每須要跨節傳輸數據。

3.數據處理效率

4.容錯處理

RDD實現分析

5.4 任務提交

任務提交準備

1.通過5.3節對RDD的層層轉換以及DAG的構建。

接下來調用RDD的collect方法轉成Seq,封裝爲Seq爲ArrayList

RDD的collect方法調用SparkContext的runJob

 

SparkContext的runJob從新調用runJob,點擊runJob進入源代碼

接着調用重載的runJob,最終調用的runJob方法又一次調用clean方法防止閉包的反序列化錯誤,而後運行dagScheduler的runJob

1.提交Job

submitJob方法將一個Job提交到Job scheduler,處理過程:
1)、調用RDD的paritition函數來獲取當前Job的最大分區數,即爲maxPartitions。根據maxPartitions,確認咱們沒有一個不存在的partition上運行的任務

2)、生成當前Job的JobId

3)、建立JobWaiter

4)、向eventProcessActor發送JobSubmitted

5)、返回JobWaiter

2.處理Job提交

DAGSchedulerEventProcessActor收到JobSubmit事件,會調用dagScheduler的handleJobSubmitted方法。

5.4.2 finalStage的建立與Stage的劃分

在Spark中,一個Job可能被劃分爲一個或者多個Stage,各個之間存在依賴關係,其中最下游的Stage也被成爲最終的Stage,用於處理Job最後階段的工做

1.newStage的實現分析

handleJobSubmitted方法使用newStage方法建立finalStage

2.獲取父Stage列表

Spark的Job會被劃分到一到多個Stage,這些Stage的劃分是從finalStage開始,從後往前邊劃分邊建立的。getParentStages方法用於獲取或者建立給定的RDD的全部父Stage

這些Stage將被分配給jobId對應的job

3.獲取map任務對應Stage

getShuffleMapStage方法用於建立或者獲取Stage並註冊到shuffleToMapStage

5.4.3 建立Job

5.4.4 提交Stage

5.4.5 提交Task

5.5 執行任務

5.5.1 狀態更新

調用execBackend的statusUpdate方法更新任務狀態。

5.5.2 任務還原

所謂任務還原就是將Driver提交的Task在Executor上經過反序列化、更新依賴達到Task的還原效果的過程。

5.5.3任務運行

TaskRunner最終調用Task的run方法運行任務。

5.6 任務執行後續處理

5.6.1 計量統計與執行結果序列化

5.6.2 內存回收

TaskRunner的run方法最後還會在finally中作一些清理工做。

5.6.3 執行結果處理

5.7 小結

  首先從Spark爲何設計RDD入手,依次講解RDD的實現分析、Stage的劃分、提交Stage、任務執行、執行結果處理等內容。

  在資源分配中涉及的本地化實現,本章作了較爲詳細的分析,Spark經過一種階梯式的本地化策略,

在有效利用資源、節省網絡I/O的同時提升系統執行的效率。

  容錯方面,Spark經過DAG構成的有向無環圖能夠在某些任務執行失敗的狀況下,經過從新提交任務達到容錯,而那些執行成功的任務因爲結果已經存在緩存中,因此不須要重複計算。

相關文章
相關標籤/搜索