Spark源碼剖析(九):TaskScheduler原理與源碼剖析

接着上期內核源碼(六)的最後,DAGSchedule會將每一個Job劃分一系列stage,而後爲每一個stage建立一批task(數量與partition數量相同),並計算其運行的最佳位置,最後針對這一批task建立一個TaskSet對象,調用submitTasks方法提交TaskSet到TaskSchedule。那麼這篇文章咱們來剖析TaskScheduler接收到TaskSet後會進行的一系列操做。算法

Alt text 
  
taskScheduler.submitTasks( 
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))spa

  1. 建立TaskSetManager負責某一個TaskSet任務執行狀況的監控和管理
  2. 調用SparkDeployScheduleBackend的reviveOffers方法。

Alt text 
  
val manager = createTaskSetManager(taskSet, maxTaskFailures)線程

Alt text 
  
backend.reviveOffers()code

Alt text 
  
果真在父類CoarseGrainedSchedulerBackend中找到了reviveOffers方法,發送reviveOffers消息給driverActor。對象

Alt text 
  
緊接着咱們來看看driverActor線程收到reviveOffers消息後如何處理進程

Alt text 
  
new DriverActor(properties)能夠看到DriverActor類是CoarseGrainedSchedulerBackend中的類部類ip

Alt text 
  
能夠看到driverActor線程收到reviveOffers消息後調用了makeOffers()方法:資源

  1. 調用TaskScheduleImpl的resourceOffers方法,執行任務分配算法,將各個task分配到Executor上去。
  2. 分配好task到executor以後,執行本身的launchTasks方法,將分配的task發送LaunchTask消息 
    到對應的Executor上去,由Executor啓動並執行task。

Alt text 
  
new WorkerOffer()表明每一個Executor上空閒的資源源碼

Alt text 
  
scheduler.resourceOffers() 任務分配算法入口it

Alt text

Alt text 
  
任務分配算法核心: 
雙重for循環,第一層遍歷全部taskset,第二層遍歷每一種本地化級別(從優到劣) 
本地化級別: 
* PROCESS_LOCAL:進程本地化,RDD的partition和task進入同一個Executor內,那麼速度固然快 
* NODE_LOCAL:RDD的partition和task在同一個worker節點上 
* NO_PREF:無,沒有所謂的本地化級別 
* RACK_LOCAL:機架本地化,RDD的partition和task在同一個機架上 
* ANY:任意的本地化級別

Alt text 
  
launchedTask = resourceOfferSingleTaskSet( 
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)

Alt text

Alt text 
  
taskSet.resourceOffer(execId, host, maxLocality)

Alt text 
  
Scheduler.resourceOffers方法最終會返回已經分配好Executor的任務列表tasks。 
launchTasks方法會接收tasks列表做爲參數,通知對應的Executor啓動相應的task

Alt text

Alt text

至此TaskSchedule處理TaskSet的流程已經所有結束,咱們稍做總結:

  1. 當TaskScheduler接收到從DAGScheduler提交過來的TaskSet時,首先給每一個TaskSet都建立一個TaskSetManager負責管理和監控該TaskSet
  2. 接着調用SparkDeployScheduleBackend的reviveOffers方法,通過一系列調用到makeOffers方法
  3. makeOffers方法中的Scheduler.resourceOffers方法會調用TaskScheduleImpl的resourceOffers方法,執行任務分配算法,將各個task分配到Executor上去
  4. makeOffers方法中的launchTasks方法接收已經分配完成的tasks列表,併爲每一個task發送LaunchTask消息到對應的Executor上去,由Executor啓動並執行task

  下一篇咱們將剖析Executor接收到LaunchTask消息後會如何一步步啓動Task。

相關文章
相關標籤/搜索