接着上期內核源碼(六)的最後,DAGSchedule會將每一個Job劃分一系列stage,而後爲每一個stage建立一批task(數量與partition數量相同),並計算其運行的最佳位置,最後針對這一批task建立一個TaskSet對象,調用submitTasks方法提交TaskSet到TaskSchedule
。那麼這篇文章咱們來剖析TaskScheduler接收到TaskSet後會進行的一系列操做。算法
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))spa
val manager = createTaskSetManager(taskSet, maxTaskFailures)線程
backend.reviveOffers()code
果真在父類CoarseGrainedSchedulerBackend中找到了reviveOffers方法,發送reviveOffers消息給driverActor。對象
緊接着咱們來看看driverActor線程收到reviveOffers消息後如何處理進程
new DriverActor(properties)能夠看到DriverActor類是CoarseGrainedSchedulerBackend中的類部類ip
能夠看到driverActor線程收到reviveOffers消息後調用了makeOffers()方法:資源
new WorkerOffer()表明每一個Executor上空閒的資源源碼
scheduler.resourceOffers() 任務分配算法入口it
任務分配算法核心:
雙重for循環,第一層遍歷全部taskset,第二層遍歷每一種本地化級別(從優到劣)
本地化級別:
* PROCESS_LOCAL:進程本地化,RDD的partition和task進入同一個Executor內,那麼速度固然快
* NODE_LOCAL:RDD的partition和task在同一個worker節點上
* NO_PREF:無,沒有所謂的本地化級別
* RACK_LOCAL:機架本地化,RDD的partition和task在同一個機架上
* ANY:任意的本地化級別
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
taskSet.resourceOffer(execId, host, maxLocality)
Scheduler.resourceOffers方法最終會返回已經分配好Executor的任務列表tasks。
launchTasks方法會接收tasks列表做爲參數,通知對應的Executor啓動相應的task
至此TaskSchedule處理TaskSet的流程已經所有結束,咱們稍做總結:
下一篇咱們將剖析Executor接收到LaunchTask消息後會如何一步步啓動Task。