Spark源碼剖析(八):stage劃分原理與源碼剖析

引言

對於Spark開發人員來講,瞭解stage的劃分算法能夠讓你知道本身編寫的spark application被劃分爲幾個job,每一個job被劃分爲幾個stage,每一個stage包括了你的哪些代碼,只有知道了這些以後,碰到某個stage執行特別慢或者報錯,你才能快速定位到對應的代碼,對其進行性能優化和排錯算法

 

stage劃分原理與源碼

接着上期內核源碼(五)的最後,每一個action操做最終會調用SparkContext初始化時建立的DAGSchedule的runJob方法建立一個job:性能優化

Alt text

那麼這一篇就咱們來探究一下每一個job中stage究竟是如何劃分的app

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get)oop

val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)post

eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))性能

new DAGSchedulerEventProcessLoop(this)優化

dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)this

 

跳轉了這麼多,咱們終於找到了DAGScheduler的job調度核心入口handleJobSubmitted方法,該方法總共分爲五步完成stage的劃分和提交。spa

 

finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)使用觸發job的最後一個rdd建立finalStage3d

val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)用finalStage建立一個job

submitStage(finalStage) stage劃分算法重點!遞歸尋找父Stage!

val missing = getMissingParentStages(stage).sortBy(_.id)獲取當前stage的父stage

 

submitMissingTasks(stage, jobId.get)提交某一個stage

 

val locs = getPreferredLocs(stage.rdd, id)給每一個partition建立一個ShuffleMapTask或ResultTask(最後一個stage),並計算其運行的最佳位置

 

stage劃分算法總結

1. 從finalStage倒推

2. 經過寬依賴,來進行新stage的劃分

3. 使用遞歸,優先提交父stage

 

重要知識點

對於每一種有shuffle的操做,例如:groupByKey、reduceByKey、countByKey等,底層都對應了三個RDD

  • MapPartitionsRDD:對應父stage的最後一個RDD
  • ShuffleRDD:對應子stage的第一個RDD
  • MapPartitionsRDD:對應子stage的第二個RDD
相關文章
相關標籤/搜索