上篇 spark 源碼分析之十九 -- DAG的生成和Stage的劃分 中,主要介紹了下圖中的前兩個階段DAG的構建和Stage的劃分。html
本篇文章主要剖析,Stage是如何提交的。apache
rdd的依賴關係構成了DAG,DAGScheduler根據shuffle依賴關係將DAG圖劃分爲一個一個小的stage。具體能夠看 spark 源碼分析之十九 -- DAG的生成和Stage的劃分 作進一步瞭解。緩存
上篇文章中,DAGScheduler的handleJobSubmitted方法咱們只剖析了stage的生成部分,下面咱們看一下stage的提交部分源碼。安全
首先構造ActiveJob對象,其次清除緩存的block location信息,而後記錄jobId和job對象的映射關係到jobIdToActiveJob map集合中,而且將該jobId記錄到活動的job集合中。app
獲取到Job全部的stage的惟一標識,而且根據惟一標識來獲取stage對象,而且調用其lastestInfo方法獲取其StageInfo對象。ide
而後進一步封裝成 SparkListenerJobStart 事件對象,並post到 listenerBus中,listenerBus 是一個 LiveListenerBus 對象,其內部封裝了四個消息隊列組成的集合,具體能夠看 spark 源碼分析之三 -- LiveListenerBus介紹 文章作進一步瞭解。函數
最後調用submitStage 方法執行Stage的提交。源碼分析
先來看一下ActiveJob的說明。post
A running job in the DAGScheduler. Jobs can be of two types: a result job, which computes a ResultStage to execute an action, or a map-stage job, which computes the map outputs for a ShuffleMapStage before any downstream stages are submitted. The latter is used for adaptive query planning, to look at map output statistics before submitting later stages. We distinguish between these two types of jobs using the finalStage field of this class. Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's submitJob or submitMapStage methods. However, either type of job may cause the execution of other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some of these previous stages. These dependencies are managed inside DAGScheduler.ui
它表明了正運行在DAGScheduler中的一個job,job有兩種類型:result job,其經過計算一個ResultStage來執行一個action操做;map-stage job,它在下游的stage提交以前,爲ShuffleMapStage計算map的輸出。
finalStages是這個job的最後一個stage。
直接先來看submitStage方法,以下:
思路: 首先先獲取可能丟失的父stage信息,若是該stage的父stage被遺漏了,則遞歸調用查看其爺爺stage是否被遺漏。
getMissingParentStages方法以下:
思路:不斷建立父stage,能夠看上篇文章 spark 源碼分析之十九 -- DAG的生成和Stage的劃分 作進一步瞭解。
submitMissingTasks方法過於長,爲方便分析,按功能大體分爲以下部分:
org.apache.spark.scheduler.ResultStage#findMissingPartitions 方法以下:
org.apache.spark.scheduler.ShuffleMapStage#findMissingPartitions 方法以下:
org.apache.spark.MapOutputTrackerMaster#findMissingPartitions 方法以下:
OutputCommitCoordinator 的 stageStart實現以下:
本質上就是把它放入到一個map中了。
思路:根據stage的RDD和分區id獲取到其rdd中的分區的優先位置。
下面看一下 getPreferredLocs 方法:
註釋中說到,它是線程安全的,下面看一下,它是如何實現的,即 getPrefferredLocsInternal 方法。
這個方法中提到四種狀況:
1. 若是以前獲取到過,那麼直接返回Nil便可。
2. 若是以前已經緩存在內存中,直接從緩存的內存句柄中取出返回便可。
3. 若是RDD對應的是HDFS輸入的文件等,則使用RDD記錄的優先位置。
4. 若是上述三種狀況都不知足,且是narrowDependency,則調用該方法,獲取子RDDpartition對應的父RDD的partition的優先位置。
下面仔細說一下中間兩種狀況。
getCacheLocs 方法以下:
思路:先查看rdd的存儲級別,若是沒有存儲級別,則直接返回Nil,不然根據RDD和分區id組成BlockId集合,請求存儲系統中的BlockManager來獲取block的位置,而後轉換爲TaskLocation信息返回。
RDD的 preferredLocations 方法以下:
思路:先從checkpoint中找,若是checkpoint中沒有,則返回默認的爲Nil。
返回對象是TaskLocation對象,作一下簡單的說明。
類說明
A location where a task should run. This can either be a host or a (host, executorID) pair. In the latter case, we will prefer to launch the task on that executorID, but our next level of preference will be executors on the same host if this is not possible.
它有三個子類,以下:
這三個類定義以下:
很簡單,不作過多說明。
TaskLocation伴隨對象以下,如今用的方法是第二種 apply 方法:
對應方法以下:
org.apache.spark.scheduler.Stage#makeNewStageAttempt 方法以下:
很簡單,主要是調用了StageInfo的fromStage方法。
先來看Stage類。
StageInfo封裝了關於Stage的一些信息,用於調度和SparkListener傳遞stage信息。
其伴生對象以下:
對應源碼以下:
經過broadcast機制,將數據廣播到spark集羣中的driver和各個executor中。關於broadcast的實現細節,能夠查看 spark 源碼分析之十四 -- broadcast 是如何實現的?作進一步瞭解。
根據stage的類型生成不一樣的類型Task。關於過多Task 的內容,在階段四進行剖析。
對應代碼以下:
其中taskScheduler是 TaskSchedulerImpl,它是TaskScheduler的惟一子類實現。它負責task的調度。
org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks方法實現以下:
其中 createTaskSetManager 方法以下:
SchedulableBuilder類是構建Schedulable樹的接口。
schedulableBuilder 定義以下:
其中schedulingMode 能夠經過參數 spark.scheduler.mode 來調整,默認爲FIFO。
schedulableBuilder 初始化以下:
schedulableBuilder的 addTaskSetManager (FIFO)方法以下:
即調用了內部Pool對象的addSchedulable 方法:
關於更多TaskSetManager的內容,將在階段四進行剖析。
backend是一個 SchedulerBackend 實例。在SparkContetx的初始化過程當中調用 createTaskScheduler 初始化 backend,具體能夠看 spark 源碼分析之四 -- TaskScheduler的建立和啓動過程 作深刻了解。
在yarn 模式下,它有兩個實現yarn-client 模式下的 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend實現 和 yarn-cluster 模式下的 org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend 實現。
這兩個類在spark 項目的 resource-managers 目錄下的 yarn 目錄下定義實現,固然它也支持 kubernetes 和 mesos,不作過多說明。
這兩個類的繼承關係以下:
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers 實現以下:
發送ReviveOffers 請求給driver。
driver端的 CoarseGrainedSchedulerBackend 的 receive 方法有以下事件處理分支:
其內部通過一系列RPC過程,關於 RPC 能夠看 spark 源碼分析之十二--Spark RPC剖析之Spark RPC總結 作進一步瞭解。
即會調用driver端的makeOffsers方法,以下:
本篇文章剖析了從DAGScheduler生成的Stage是如何被提交給TaskScheduler,以及TaskScheduler是如何把TaskSet提交給ResourceManager的。
下面就是task的運行部分了,下篇文章對其作詳細介紹。跟task執行關係很密切的TaskSchedulerBackend、Task等內容,也將在下篇文章作更詳細的說明。