spark internal - 做業調度

Spark中做業調度的相關類最重要的就是DAGSchedulerDAGScheduler顧名思義就是基於DAG圖的Schedulernode

DAG全稱 Directed Acyclic Graph,有向無環圖。簡單的來講,就是一個由頂點和有方向性的邊構成的圖中,從任意一個頂點出發,沒有任何一條路徑會將其帶回到出發的頂點。
異步

在做業調度系統中,調度的基礎就在於判斷多個做業任務的依賴關係,這些任務之間可能存在多重的依賴關係,也就是說有些任務必須先得到執行,而後另外的相關依賴任務才能執行,可是任務之間顯然不該該出現任何直接或間接的循環依賴關係,因此本質上這種關係適合用DAG有向無環圖來表示。
函數

歸納地描述DAGSchedulerTaskScheduler(關於TaskScheduler的相關細節,在我以前的關於Spark運行模式的文章中有)的功能劃分就是:TaskScheduler負責實際每一個具體任務的物理調度,DAGScheduler負責將做業拆分紅不一樣階段的具備依賴關係的多批任務,能夠理解爲DAGScheduler負責任務的邏輯調度。
性能

基本概念
ui

Task任務 :單個分區數據集上的最小處理流程單元spa

TaskSet任務集:一組關聯的,可是互相之間沒有Shuffle依賴關係的任務所組成的任務集.net

Stage調度階段:一個任務集所對應的調度階段scala

Job做業:一次RDD Action生成的一個或多個Stage所組成的一次計算做業
代理

運行方式
code

DAGSchedulerSparkContext初始化過程當中實例化,一個SparkContext對應一個DAGSchedulerDAGScheduler的事件循環邏輯基於Akka  Actor的消息傳遞機制來構建,在DAGSchedulerStart函數中建立了一個eventProcessActor用來處理各類DAGSchedulerEvent,這些事件包括做業的提交,任務狀態的變化,監控等等

private[scheduler]case class JobSubmitted(
    jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    allowLocal: Boolean,
    callSite: String,
    listener: JobListener,
    properties: Properties = null)
  extends DAGSchedulerEvent
 
private[scheduler]case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
private[scheduler]case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
private[scheduler]case object AllJobsCancelled extends DAGSchedulerEvent
private[scheduler]
case classBeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
 
private[scheduler]
case classGettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
 
private[scheduler]case class CompletionEvent(
    task: Task[_],
    reason: TaskEndReason,
    result: Any,
    accumUpdates: Map[Long, Any],
    taskInfo: TaskInfo,
    taskMetrics: TaskMetrics)
  extends DAGSchedulerEvent
 
private[scheduler]case class ExecutorAdded(execId: String, host: String) extendsDAGSchedulerEvent
private[scheduler]case class ExecutorLost(execId: String) extends DAGSchedulerEvent
private[scheduler]  caseclass TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
private[scheduler]case object ResubmitFailedStages extends DAGSchedulerEvent
private[scheduler]case object StopDAGScheduler extends DAGSchedulerEvent
 

不管是Client仍是TaskSchedulerDAGScheduler的交互方式基本上都是經過DAGScheduler暴露的函數接口間接的給eventProcessActor發送相關消息

如前面所說,DAGScheduler最重要的任務之一就是計算做業和任務的依賴關係,制定調度邏輯

DAGScheduler做業調度的兩個主要入口是submitJob runJob,二者的區別在於前者返回一個Jobwaiter對象,能夠用在異步調用中,用來判斷做業完成或者取消做業,runJob在內部調用submitJob,阻塞等待直到做業完成(或失敗)

具體往DAGScheduler提交做業的操做,基本都是封裝在RDD的相關Action操做裏面,不須要用戶顯式的提交做業

用戶代碼都是基於RDD的一系列計算操做,實際運行時,這些計算操做是Lazy執行的,並非全部的RDD操做都會觸發SparkCluster上提交實際做業,基本上只有一些須要返回數據或者向外部輸出的操做纔會觸發實際計算工做,其它的變換操做基本上只是生成對應的RDD記錄依賴關係。

DAGScheduler內部維護了各類 task / stage / job之間的映射關係表

 

工做流程

 

提交併運行一個Job的基本流程,包括如下步驟

劃分Stage

當某個操做觸發計算,向DAGScheduler提交做業時,DAGScheduler須要從RDD依賴鏈最末端的RDD出發,遍歷整個RDD依賴鏈,劃分Stage任務階段,並決定各個Stage之間的依賴關係。Stage的劃分是以ShuffleDependency爲依據的,也就是說當某個RDD的運算須要將數據進行Shuffle時,這個包含了Shuffle依賴關係的RDD將被用來做爲輸入信息,構建一個新的Stage,由此爲依據劃分Stage,能夠確保有依賴關係的數據可以按照正確的順序獲得處理和運算。

GroupByKey操做爲例,該操做返回的結果其實是一個ShuffleRDD,當DAGScheduler遍歷到這個ShuffleRDD的時候,由於其Dependency是一個ShuffleDependency,因而這個ShuffleRDD的父RDD以及shuffleDependency等對象就被用來構建一個新的Stage,這個Stage的輸出結果的分區方式,則由ShuffleDependency中的Partitioner對象來決定。

能夠看到,儘管劃分和構建Stage的依據是ShuffleDependency,對應的RDD也就是這裏的ShuffleRDD,可是這個Stage所處理的數據是從這個shuffleRDD的父RDD開始計算的,只是最終的輸出結果的位置信息參考了ShuffleRDD返回的ShuffleDependency裏所包含的內容。而shuffleRDD自己的運算操做(其實就是一個獲取shuffle結果的過程),是在下一個Stage裏進行的。

生成Job,提交Stage

上一個步驟獲得一個或多個有依賴關係的Stage,其中直接觸發JobRDD所關聯的Stage做爲FinalStage生成一個Job實例,這二者的關係進一步存儲在resultStageToJob映射表中,用於在該Stage所有完成時作一些後續處理,如報告狀態,清理Job相關數據等。

具體提交一個Stage時,首先判斷該Stage所依賴的父Stage的結果是否可用,若是全部父Stage的結果均可用,則提交該Stage,若是有任何一個父Stage的結果不可用,則迭代嘗試提交父Stage 全部迭代過程當中因爲所依賴Stage的結果不可用而沒有提交成功的Stage都被放到waitingStages列表中等待未來被提交

何時waitingStages中的Stage會被從新提交呢,當一個屬於中間過程Stage的任務(這種類型的任務所對應的類爲ShuffleMapTask)完成之後,DAGScheduler會檢查對應的Stage的全部任務是否都完成了,若是是都完成了,則DAGScheduler將從新掃描一次waitingStages中的全部Stage,檢查他們是否還有任何依賴的Stage沒有完成,若是沒有就能夠提交該Stage

此外每當完成一次DAGScheduler的事件循環之後,也會觸發一次從等待和失敗列表中掃描並提交就緒Stage的調用過程

任務集的提交

每一個Stage的提交,最終是轉換成一個TaskSet任務集的提交,DAGScheduler經過TaskScheduler接口提交TaskSet,這個TaskSet最終會觸發TaskScheduler構建一個TaskSetManager的實例來管理這個TaskSet的生命週期,對於DAGScheduler來講提交Stage的工做到此就完成了。而TaskScheduler的具體實現則會在獲得計算資源的時候,進一步經過TaskSetManager調度具體的Task到對應的Executor節點上進行運算

任務做業完成狀態的監控

要保證相互依賴的job/stage可以獲得順利的調度執行,DAGScheduler就必然須要監控當前Job  / Stage乃至Task的完成狀況。這是經過對外(主要是對TaskScheduler)暴露一系列的回調函數來實現的,對於TaskScheduler來講,這些回調函數主要包括任務的開始結束失敗,任務集的失敗,DAGScheduler根據這些Task的生命週期信息進一步維護JobStage的狀態信息。

此外TaskScheduler還能夠經過回調函數通知DAGScheduler具體的Executor的生命狀態,若是某一個Executor崩潰了,或者因爲任何緣由與Driver失去聯繫了,則對應的StageshuffleMapTask的輸出結果也將被標誌爲不可用,這也將致使對應Stage狀態的變動,進而影響相關Job的狀態,再進一步可能觸發對應Stage的從新提交來從新計算獲取相關的數據。

任務結果的獲取

一個具體的任務在Executor中執行完畢之後,其結果須要以某種形式返回給DAGScheduler,根據任務類型的不一樣,任務的結果的返回方式也不一樣

對於FinalStage所對應的任務(對應的類爲ResultTask)返回給DAGScheduler的是運算結果自己,而對於ShuffleMapTask,返回給DAGScheduler的是一個MapStatus對象,MapStatus對象管理了ShuffleMapTask的運算輸出結果在BlockManager裏的相關存儲信息,而非結果自己,這些存儲位置信息將做爲下一個Stage的任務的獲取輸入數據的依據

而根據任務結果的大小的不一樣,ResultTask返回的結果又分爲兩類,若是結果足夠小,則直接放在DirectTaskResult對象內,若是超過特定尺寸(默認約10MB)則在Executor端會將DirectTaskResult先序列化,再把序列化的結果做爲一個Block存放在BlockManager裏,然後將BlockManager返回的BlockID放在IndirectTaskResult對象中返回給TaskSchedulerTaskScheduler進而調用TaskResultGetterIndirectTaskResult中的BlockID取出並經過BlockManager最終取得對應的DirectTaskResult。固然從DAGScheduler的角度來講,這些過程對它來講是透明的,它所得到的都是任務的實際運算結果。

TaskSetManager

前面提到DAGScheduler負責將一組任務提交給TaskScheduler之後,這組任務的調度工做對它來講就算完成了,接下來這組任務內部的調度邏輯,則是由TaskSetManager來完成的。

TaskSetManager的主要接口包括:

ResourceOffer根據TaskScheduler所提供的單個Resource資源包括hostexecutorlocality的要求返回一個合適的TaskTaskSetManager內部會根據上一個任務成功提交的時間,自動調整自身的Locality匹配策略,若是上一次成功提交任務的時間間隔很長,則下降對Locality的要求(例如從最差要求Process  Local下降爲最差要求Node Local),反之則提升對Locality的要求。這一動態調整Locality策略基本能夠理解爲是爲了提升任務在最佳Locality的狀況下獲得運行的機會,由於Resource資源多是在短時間內分批提供給TaskSetManager的,動態調整Locality門檻有助於改善總體的Locality分佈狀況。

舉個例子,若是TaskSetManager內部有a/b兩個任務等待調度,a/b兩個任務Prefer的節點分別是Host  A Host B 這時候先有一個Host C的資源以最差匹配爲Rack Local的形式提供給TaskSetManager,若是沒有內部動態Locality調整機制,那麼好比a任務將被調度。接下來在很短的時間間隔內,一個Host  A的資源來到,一樣的b任務被調度。 而本來最佳的狀況應該是任務b調度給Host C 而任務a調度給Host A

固然動態Locality也會帶來必定的調度延遲,所以如何設置合適的調整策略也是須要針對實際狀況來肯定的。目前能夠設置參數包括

spark.locality.wait.process

spark.locality.wait.node

spark.locality.wait.rack

 

即各個Locality級別中TaskSetManager等待分配下一個任務的時間,若是距離上一次成功分配資源的時間間隔超過對應的參數值,則下降匹配要求(即process  -> node -> rack -> any) 而每當成功分配一個任務時,則重置時間間隔,並更新Locality級別爲當前成功分配的任務的Locality級別 

handleSuccessfulTask / handleFailedTask /handleTaskGettingResult :用於更新任務的運行狀態,Taskset Manager在這些函數中除了更新自身維護的任務狀態列表等信息,用於剩餘的任務的調度之外,也會進一步調用DAGScheduler的函數接口將結果通知給它。

此外,TaskSetManager在調度任務時還可能進一步考慮Speculation的狀況,亦即當某個任務的運行時間超過其它任務的運行完成時間的一個特定比例值時,該任務可能被重複調度。目的固然是爲了防止某個運行中的Task因爲某些特殊緣由(例如所在節點CPU負載太高,IO帶寬被佔等等)運行特別緩慢拖延了整個Stage的完成時間,Speculation一樣須要根據集羣和做業的實際狀況合理配置,不然可能反而下降集羣性能。 

Pool 調度池

前面咱們說了,DAGScheduler負責構建具備依賴關係的任務集,TaskSetManager負責在具體的任務集的內部調度任務,而TaskScheduler負責將資源提供給TaskSetManager供其做爲調度任務的依據。可是每一個SparkContext可能同時存在多個可運行的任務集(沒有依賴關係),這些任務集之間如何調度,則是由調度池(POOL)對象來決定的,Pool所管理的對象是下一級的Pool或者TaskSetManager對象

TaskSchedulerImpl在初始化過程當中會根據用戶設定的SchedulingMode(默認爲FIFO)建立一個rootPool根調度池,以後根據具體的調度模式再進一步建立SchedulableBuilder對象,具體的SchedulableBuilder對象的BuildPools方法將在rootPool的基礎上完成整個Pool的構建工做。

目前的實現有兩種調度模式,對應了兩種類型的Pool

FIFO:先進先出型,FIFO Pool直接管理的是TaskSetManager,每一個TaskSetManager建立時都存儲了其對應的StageIDFIFO  pool最終根據StageID的順序來調度TaskSetManager

FAIR:公平調度,FAIR Pool管理的對象是下一級的POOL,或者TaskSetManager,公平調度的基本原則是根據所管理的Pool/TaskSetManager中正在運行的任務的數量來判斷優先級,用戶能夠設置minShare最小任務數,weight任務權重來調整對應Pool裏的任務集的優先程度。當採用公平調度模式時,目前所構建的調度池是兩級的結構,即根調度池管理一組子調度池,子調度池進一步管理屬於該調度池的TaskSetManager

 公平調度模式的配置經過配置文件來管理,默認使用fairscheduler.xml文件,範例參見conf目錄下的模板:

<?xmlversion="1.0"?>
<allocations>
  <pool name="production">
   <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
   <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>


因爲這裏的調度池是在SparkContext內部的調度,所以其調度範疇是一個基於該SparkContextSpark應用程序,正常狀況下,多個Spark應用程序之間在調度池層面是沒有調度優先級關係的。那麼這種調度模式的應用場合是怎樣的呢? 舉一個例子就是SparkServer或者SharkServer,做爲一個長期運行的SparkContext,他們代理運行了其它連上ServerSpark應用的任務,這樣你能夠爲每一個連接按照用戶名指定一個Pool運行,從而實現用戶優先級和資源分配的合理調度等。

 

Spark應用之間的調度

 

前面提到調度池只是在SparkContxt內部調度資源,SparkContext之間的調度關係,按照Spark不一樣的運行模式,就不必定歸Spark所管理的了。

MesosYARN模式下,底層資源調度系統的調度策略由MesosYARN所決定,只有在Standalone模式下,Spark  Master按照當前cluster資源是否知足等待列表中的Spark應用 對內存和CPU資源的需求,而決定是否建立一個SparkContext對應的Driver,進而完成Spark應用的啓動過程,這能夠粗略近似的認爲是一種粗顆粒度的有條件的FIFO策略吧

轉自:http://blog.csdn.net/colorant/article/details/24010035

相關文章
相關標籤/搜索