【Spark2.0源碼學習】-9.Job提交與Task的拆分

      在前面的章節Client的加載中,Spark的DriverRunner已開始執行用戶任務類(好比:org.apache.spark.examples.SparkPi),下面咱們開始針對於用戶任務類(或者任務代碼)進行分析
 
1、總體預覽
          基於上篇圖作了擴展,增長任務執行的相關交互
     
  • Code:指的用戶編寫的代碼
  • RDD:彈性分佈式數據集,用戶編碼根據SparkContext與RDD的api可以很好的將Code轉化爲RDD數據結構(下文將作轉化細節介紹)
  • DAGScheduler:有向無環圖調度器,將RDD封裝爲JobSubmitted對象存入EventLoop(實現類DAGSchedulerEventProcessLoop)隊列中
  • EventLoop: 定時掃描未處理JobSubmitted對象,將JobSubmitted對象提交給DAGScheduler
  • DAGScheduler:針對於JobSubmitted進行處理,最終將RDD轉化爲執行TaskSet,並將TaskSet提交至TaskScheduler
  • TaskScheduler: 根據TaskSet建立TaskSetManager對象存入SchedulableBuilder的數據池(Pool)中,並調用DriverEndpoint喚起消費(ReviveOffers)操做
  • DriverEndpoint:接受ReviveOffers指令後將TaskSet中的Tasks根據相關規則均勻分配給Executor
  • Executor:啓動一個TaskRunner執行一個Task
 
2、Code轉化爲初始RDDs
          咱們的用戶代碼經過調用Spark的Api(好比:SparkSession.builder.appName("Spark Pi").getOrCreate()),該Api會建立Spark的上下文(SparkContext),當咱們調用transform類方法 (如:parallelize(),map())都會建立(或者裝飾已有的) Spark數據結構(RDD), 若是是action類操做(如:reduce()),那麼將最後封裝的RDD做爲一次Job提交,存入待調度隊列中(DAGSchedulerEventProcessLoop )待後續異步處理。
          若是屢次調用action類操做,那麼封裝的多個RDD做爲多個Job提交。
     流程以下:
     
  • ExecuteEnv(執行環境 )
    • 這裏能夠是經過spark-submit提交的MainClass,也能夠是spark-shell腳本
    • MainClass : 代碼中一定會建立或者獲取一個SparkContext
    • spark-shell:默認會建立一個SparkContext
  • RDD(彈性分佈式數據集)
    • create:能夠直接建立(如:sc.parallelize(1 until n, slices) ),也能夠在其餘地方讀取(如:sc.textFile("README.md"))等
    • transformation:rdd提供了一組api能夠進行對已有RDD進行反覆封裝成爲新的RDD,這裏採用的是裝飾者設計模式,下面爲部分裝飾器類圖
    • action:當調用RDD的action類操做方法時(collect、reduce、lookup、save ),這觸發DAGScheduler的Job提交
  • DAGScheduler:建立一個名爲JobSubmitted的消息至DAGSchedulerEventProcessLoop阻塞消息隊列(LinkedBlockingDeque)中
  • DAGSchedulerEventProcessLoop:啓動名爲【dag-scheduler-event-loop】的線程實時消費消息隊列
  • 【dag-scheduler-event-loop】處理完成後回調JobWaiter
  • DAGScheduler:打印Job執行結果
  • JobSubmitted:相關代碼以下(其中jobId爲DAGScheduler全局遞增Id):
eventProcessLoop.post(JobSubmitted(
  jobId, rdd, func2, partitions.toArray, callSite, waiter,
  SerializationUtils.clone(properties)))
  • 最終示例:

     

     最終轉化的RDD分爲四層,每層都依賴於上層RDD,將ShffleRDD封裝爲一個Job存入DAGSchedulerEventProcessLoop待處理,若是咱們的代碼中存在幾段上面示例代碼,那麼就會建立對應對的幾個ShffleRDD分別存入DAGSchedulerEventProcessLoop
 
3、RDD分解爲待執行任務集合(TaskSet
     Job提交後,DAGScheduler根據RDD層次關係解析爲對應的Stages,同時維護Job與Stage的關係。
     將最上層的Stage根據併發關係(findMissingPartitions )分解爲多個Task,將這個多個Task封裝爲TaskSet提交給TaskScheduler。非最上層的Stage的存入處理的列表中(waitingStages += stage)
 流程以下:
  
  • DAGSchedulerEventProcessLoop中,線程【dag-scheduler-event-loop】處理到JobSubmitted
  • 調用DAGScheduler進行handleJobSubmitted
    • 首先根據RDD依賴關係依次建立Stage族,Stage分爲ShuffleMapStage,ResultStage兩類

    • 更新jobId與StageId關係Map
    • 建立ActiveJob,調用LiveListenerBug,發送SparkListenerJobStart指令
    • 找到最上層Stage進行提交,下層Stage存入waitingStage中待後續處理
      • 調用OutputCommitCoordinator進行stageStart()處理
      • 調用LiveListenerBug, 發送 SparkListenerStageSubmitted指令
      • 調用SparkContext的broadcast方法獲取Broadcast對象
      • 根據Stage類型建立對應多個Task,一個Stage根據findMissingPartitions分爲多個對應的Task,Task分爲ShuffleMapTask,ResultTask

      • 將Task封裝爲TaskSet,調用TaskScheduler.submitTasks(taskSet)進行Task調度,關鍵代碼以下:
taskScheduler.submitTasks(new TaskSet(
  tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
 
4、TaskSet封裝爲TaskSetManager並提交至Driver
     TaskScheduler將TaskSet封裝爲TaskSetManager(new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)),存入待處理任務池(Pool)中,發送DriverEndpoint喚起消費(ReviveOffers)指令
      
  • DAGSheduler將TaskSet提交給TaskScheduler的實現類,這裏是TaskChedulerImpl
  • TaskSchedulerImpl建立一個TaskSetManager管理TaskSet,關鍵代碼以下:
     new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
  • 同時將TaskSetManager添加SchedduableBuilder的任務池Poll中
  • 調用SchedulerBackend的實現類進行reviveOffers,這裏是standlone模式的實現類StandaloneSchedulerBackend
  • SchedulerBackend發送ReviveOffers指令至DriverEndpoint
 
5、Driver將TaskSetManager分解爲TaskDescriptions併發布任務到Executor
     Driver接受喚起消費指令後,將全部待處理的TaskSetManager與Driver中註冊的Executor資源進行匹配,最終一個TaskSetManager獲得多個TaskDescription對象,按照TaskDescription想對應的Executor發送LaunchTask指令
當Driver獲取到ReviveOffers(請求消費)指令時
  • 首先根據executorDataMap緩存信息獲得可用的Executor資源信息(WorkerOffer),關鍵代碼以下
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
  new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
  • 接着調用TaskScheduler進行資源匹配,方法定義以下:
     def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {..}
    • 將WorkerOffer資源打亂(val shuffledOffers = Random.shuffle(offers))
    • 將Poo中待處理的TaskSetManager取出(val sortedTaskSets = rootPool.getSortedTaskSetQueue),
    • 並循環處理sortedTaskSets並與shuffledOffers循環匹配,若是shuffledOffers(i)有足夠的Cpu資源( if (availableCpus(i) >= CPUS_PER_TASK) ),調用TaskSetManager建立TaskDescription對象(taskSet.resourceOffer(execId, host, maxLocality)),最終建立了多個TaskDescription,TaskDescription定義以下:
new TaskDescription(
  taskId,
  attemptNum,
  execId,
  taskName,
  index,
  sched.sc.addedFiles,
  sched.sc.addedJars,
  task.localProperties,
  serializedTask)
  • 若是TaskDescriptions不爲空,循環TaskDescriptions,序列化TaskDescription對象,並向ExecutorEndpoint發送LaunchTask指令,關鍵代碼以下:
for (task <- taskDescriptions.flatten) {
  val serializedTask = TaskDescription.encode(task)
  val executorData = executorDataMap(task.executorId)
  executorData.freeCores -= scheduler.CPUS_PER_TASK
  executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
相關文章
相關標籤/搜索