對於Spark開發人員來講,瞭解stage的劃分算法能夠讓你知道本身編寫的spark application被劃分爲幾個job,每一個job被劃分爲幾個stage,每一個stage包括了你的哪些代碼,只有知道了這些以後,碰到某個stage執行特別慢或者報錯,你才能快速定位到對應的代碼,對其進行性能優化和排錯。算法
接着上期內核源碼(五)的最後,每一個action操做最終會調用SparkContext初始化時建立的DAGSchedule的runJob方法建立一個job:性能優化
那麼這一篇就咱們來探究一下每一個job中stage究竟是如何劃分的app
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get)oop
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)post
eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))性能
new DAGSchedulerEventProcessLoop(this)優化
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)this
跳轉了這麼多,咱們終於找到了DAGScheduler的job調度核心入口handleJobSubmitted
方法,該方法總共分爲五步完成stage的劃分和提交。spa
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
使用觸發job的最後一個rdd建立finalStage3d
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
用finalStage建立一個job
submitStage(finalStage)
stage劃分算法重點!遞歸尋找父Stage!
val missing = getMissingParentStages(stage).sortBy(_.id)
獲取當前stage的父stage
submitMissingTasks(stage, jobId.get)
提交某一個stage
val locs = getPreferredLocs(stage.rdd, id)
給每一個partition建立一個ShuffleMapTask或ResultTask(最後一個stage),並計算其運行的最佳位置
重要知識點
對於每一種有shuffle的操做,例如:groupByKey、reduceByKey、countByKey等,底層都對應了三個RDD: