Spark Job的提交與task本地化分析(源碼閱讀八)

  咱們又都知道,Spark中任務的處理也要考慮數據的本地性(locality),Spark目前支持PROCESS_LOCAL(本地進程)NODE_LOCAL(本地節點)NODE_PREF、RACK_LOCAL(本地機架)ANY(任何)幾種。其餘都很好理解,NODE_LOCAL會在spark日誌中執行拉取數據所執行的task時,打印出來,由於Spark是移動計算,而不是移動數據的嘛。node

  那麼什麼是NODE_PREF數組

  當Driver應用程序剛剛啓動,Driver分配得到的Executor極可能尚未初始化,因此有一部分任務的本地化級別被設置爲NO_PREF.若是是ShuffleRDD,其本地性始終爲NO_PREF。這兩種本地化級別是NO_PREF的狀況,在任務分配時會被優先分配到非本地節點執行,達到必定的優化效果。緩存

  那麼下來咱們從job的任務提交開始玩起~dom

   

  getMissingParentStages方法用來找到Stage的全部不可用的父Stage.從代碼能夠到這裏是個遞歸的調用,submitWaitingStages實際上循環waitingStages中的stage並調用submitStaghe:函數

  

  那麼下來開始提交task,提交task的入口是submitMissingTasks,此函數在Stage沒有可用的父stage時,被調用處理當前Stage未提交的任務。源碼分析

  一、那麼在沒有父stage時,會首先調用paendingPartitions.clear 用於清空pendingTasks.因爲當前Stage的任務剛開始提交,因此須要清空,便於記錄須要計算的任務。性能

  二、將當前Stage加入運行中的Stage集合,是用HashSet進行構造的。優化

  三、找出位計算的partition,若是Stage是map任務,那麼outputLocs中partition對應的List爲Nil,說明此partition還未計算。若是Stage不是map任務,那麼須要獲取stage的finalJob,調用finished方法判斷每一個partition的任務是否完成。spa

  

  四、而後經過stage.makeNewStageAttemp,使用StageInfo.fromStage方法建立當前Stage的_latestInfo:3d

  

  五、若是是Stage Map任務,那麼序列化Stage的RDD及ShuffleDependency,若是Stage不是map任務,那麼序列化Stage的RDD及resultOfJob的處理函數。最終這些序列化獲得的字節數組須要用sc.broadcast進行廣播。

  

  六、最後,建立全部Task、當前stage的id、jobId等信息建立TaskSet,並調用taskScheduler的submitTasks,批量提交Stage及其全部Task.

  

  有可能同時有多個任務提交,因此就有了調度策略FIFO,那麼下來調用LocalBackendreviveOffers方法,向local-Actor發送ReviveOffers消息。localActor對ReviveOffers消息的匹配執行reviveOffers方法。調用TaskSchedulerImpl的resourceOffers方法分配資源,最後調用Executor的launchTask方法運行任務。

  

  同時你會發現,這裏有段代碼,shuffleOffers = Random.shuffle(offers),是爲了計算資源的分配與計算,對全部WorkerOffer隨機洗牌,避免將任務老是分配給一樣的WorkerOffer。

  好了,知道了整個流程,下來咱們來看一下本地化問題:

  

  myLocalityLevles:當前TaskSetManager容許使用的本地化級別。那麼這裏的computeValidLocalityLevels方法是用於計算有效的本地化緩存級別。若是存在Executor中的有待執行的任務,且PROCESS_LOCAL本地化的等待時間不爲0,且存在Executor已被激活,那麼容許的本地化級別裏包括PROCESS_LOCAL.

  

  這裏又發現新大陸,獲取各個本地化級別的等待時間。

  spark.locality.wait 本地化級別的默認等待時間 

  spark.locality.wait.process 本地進程的等待時間

  spark.locality.wait.node 本地節點的等待時間

  spark.locality.wait.rack 本地機架的等待時間

  這些參數呢,在任務的運行很長且數量不少的狀況下,適當調高這些參數能夠顯著提升性能,然而當這些參數值都已經超過任務的運行時長時,則須要調小這些參數。任何任務都但願被分配到能夠從本地讀取數據的節點上以獲得最大的性能提高,然而每一個任務的運行時長時不可預計的。當一個任務在分配時,若是沒有知足最佳本地化(PROCESS_LOCAL)的資源時,若是執拗的期盼獲得最佳的資源,極可能被已經佔用最佳資源可是運行時間很長的任務耽誤,因此這些代碼實現了當沒有最佳本地化時,選擇稍差點的資源。

 

參考文獻:《深刻理解Spark:核心思想與源碼分析》

相關文章
相關標籤/搜索