在前面的章節Client的加載中,Spark的DriverRunner已開始執行用戶任務類(好比:org.apache.spark.examples.SparkPi),下面咱們開始針對於用戶任務類(或者任務代碼)進行分析
1、總體預覽
基於上篇圖作了擴展,增長任務執行的相關交互
- Code:指的用戶編寫的代碼
- RDD:彈性分佈式數據集,用戶編碼根據SparkContext與RDD的api可以很好的將Code轉化爲RDD數據結構(下文將作轉化細節介紹)
- DAGScheduler:有向無環圖調度器,將RDD封裝爲JobSubmitted對象存入EventLoop(實現類DAGSchedulerEventProcessLoop)隊列中
- EventLoop: 定時掃描未處理JobSubmitted對象,將JobSubmitted對象提交給DAGScheduler
- DAGScheduler:針對於JobSubmitted進行處理,最終將RDD轉化爲執行TaskSet,並將TaskSet提交至TaskScheduler
- TaskScheduler: 根據TaskSet建立TaskSetManager對象存入SchedulableBuilder的數據池(Pool)中,並調用DriverEndpoint喚起消費(ReviveOffers)操做
- DriverEndpoint:接受ReviveOffers指令後將TaskSet中的Tasks根據相關規則均勻分配給Executor
- Executor:啓動一個TaskRunner執行一個Task
2、Code轉化爲初始RDDs
咱們的用戶代碼經過調用Spark的Api(好比:SparkSession.builder.appName("Spark Pi").getOrCreate()),該Api會建立Spark的上下文(SparkContext),當咱們調用transform類方法 (如:parallelize(),map())都會建立(或者裝飾已有的) Spark數據結構(RDD), 若是是action類操做(如:reduce()),那麼將最後封裝的RDD做爲一次Job提交,存入待調度隊列中(DAGSchedulerEventProcessLoop )待後續異步處理。
若是屢次調用action類操做,那麼封裝的多個RDD做爲多個Job提交。
流程以下:
- ExecuteEnv(執行環境 )
-
- 這裏能夠是經過spark-submit提交的MainClass,也能夠是spark-shell腳本
- MainClass : 代碼中一定會建立或者獲取一個SparkContext
- spark-shell:默認會建立一個SparkContext
- RDD(彈性分佈式數據集)
-
- create:能夠直接建立(如:sc.parallelize(1 until n, slices) ),也能夠在其餘地方讀取(如:sc.textFile("README.md"))等
- transformation:rdd提供了一組api能夠進行對已有RDD進行反覆封裝成爲新的RDD,這裏採用的是裝飾者設計模式,下面爲部分裝飾器類圖


- action:當調用RDD的action類操做方法時(collect、reduce、lookup、save ),這觸發DAGScheduler的Job提交
- DAGScheduler:建立一個名爲JobSubmitted的消息至DAGSchedulerEventProcessLoop阻塞消息隊列(LinkedBlockingDeque)中
- DAGSchedulerEventProcessLoop:啓動名爲【dag-scheduler-event-loop】的線程實時消費消息隊列
- 【dag-scheduler-event-loop】處理完成後回調JobWaiter
- DAGScheduler:打印Job執行結果
- JobSubmitted:相關代碼以下(其中jobId爲DAGScheduler全局遞增Id):
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
- 最終示例:
最終轉化的RDD分爲四層,每層都依賴於上層RDD,將ShffleRDD封裝爲一個Job存入DAGSchedulerEventProcessLoop待處理,若是咱們的代碼中存在幾段上面示例代碼,那麼就會建立對應對的幾個ShffleRDD分別存入DAGSchedulerEventProcessLoop
3、RDD分解爲待執行任務集合(TaskSet
)
Job提交後,DAGScheduler根據RDD層次關係解析爲對應的Stages,同時維護Job與Stage的關係。
將最上層的Stage根據併發關係(findMissingPartitions )分解爲多個Task,將這個多個Task封裝爲TaskSet提交給TaskScheduler。非最上層的Stage的存入處理的列表中(waitingStages += stage)
流程以下:
- DAGSchedulerEventProcessLoop中,線程【dag-scheduler-event-loop】處理到JobSubmitted
- 調用DAGScheduler進行handleJobSubmitted
-
- 首先根據RDD依賴關係依次建立Stage族,Stage分爲ShuffleMapStage,ResultStage兩類


- 更新jobId與StageId關係Map
- 建立ActiveJob,調用LiveListenerBug,發送SparkListenerJobStart指令
- 找到最上層Stage進行提交,下層Stage存入waitingStage中待後續處理
-
- 調用OutputCommitCoordinator進行stageStart()處理
- 調用LiveListenerBug, 發送 SparkListenerStageSubmitted指令
- 調用SparkContext的broadcast方法獲取Broadcast對象
- 根據Stage類型建立對應多個Task,一個Stage根據findMissingPartitions分爲多個對應的Task,Task分爲ShuffleMapTask,ResultTask


- 將Task封裝爲TaskSet,調用TaskScheduler.submitTasks(taskSet)進行Task調度,關鍵代碼以下:
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
4、TaskSet封裝爲TaskSetManager並提交至Driver
TaskScheduler將TaskSet封裝爲TaskSetManager(new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)),存入待處理任務池(Pool)中,發送DriverEndpoint喚起消費(ReviveOffers)指令
- DAGSheduler將TaskSet提交給TaskScheduler的實現類,這裏是TaskChedulerImpl
- TaskSchedulerImpl建立一個TaskSetManager管理TaskSet,關鍵代碼以下:
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
- 同時將TaskSetManager添加SchedduableBuilder的任務池Poll中
- 調用SchedulerBackend的實現類進行reviveOffers,這裏是standlone模式的實現類StandaloneSchedulerBackend
- SchedulerBackend發送ReviveOffers指令至DriverEndpoint
5、Driver將TaskSetManager分解爲TaskDescriptions併發布任務到Executor
Driver接受喚起消費指令後,將全部待處理的TaskSetManager與Driver中註冊的Executor資源進行匹配,最終一個TaskSetManager獲得多個TaskDescription對象,按照TaskDescription想對應的Executor發送LaunchTask指令
當Driver獲取到ReviveOffers(請求消費)指令時
- 首先根據executorDataMap緩存信息獲得可用的Executor資源信息(WorkerOffer),關鍵代碼以下
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
- 接着調用TaskScheduler進行資源匹配,方法定義以下:
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {..}
-
- 將WorkerOffer資源打亂(val shuffledOffers = Random.shuffle(offers))
- 將Poo中待處理的TaskSetManager取出(val sortedTaskSets = rootPool.getSortedTaskSetQueue),
- 並循環處理sortedTaskSets並與shuffledOffers循環匹配,若是shuffledOffers(i)有足夠的Cpu資源( if (availableCpus(i) >= CPUS_PER_TASK) ),調用TaskSetManager建立TaskDescription對象(taskSet.resourceOffer(execId, host, maxLocality)),最終建立了多個TaskDescription,TaskDescription定義以下:
new TaskDescription(
taskId,
attemptNum,
execId,
taskName,
index,
sched.sc.addedFiles,
sched.sc.addedJars,
task.localProperties,
serializedTask)
- 若是TaskDescriptions不爲空,循環TaskDescriptions,序列化TaskDescription對象,並向ExecutorEndpoint發送LaunchTask指令,關鍵代碼以下:
for (task <- taskDescriptions.flatten) {
val serializedTask = TaskDescription.encode(task)
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}