Spark任務延遲調度及調度池Pool架構剖析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

Spark商業環境實戰及調優進階系列

1. 任務延遲調度原理

Spark數據本地化即移動計算而不是移動數據,爲了讓task能儘量的以最優本地化級別(Locality Levels)來啓動,Spark的延遲調度應運而生,資源不夠可在該Locality Levels對應的限制時間內重試,超過限制時間後還沒法啓動則下降Locality Levels再嘗試啓動。緩存

2. TaskSetManger重要成員

  • taskSet:TaskSet由當前TaskSetManager管理,一個TaskSet對應一個Stage,一個TaskSet歸屬於一個TaskSetManager,而TaskSetManager又歸屬於TaskSetManager Pool來管理。

3. 任務的啓動過程

  • 注意 CoarseGrainedSchedulerBackend.makeOffers在任意Executor上變更時,開始調用,makeOffers屬於公共方法。架構

    StatusUpdate
       RegisterExecutor
    複製代碼
  • 首先選定一個Executor,選中在指定executor上的任務,以最大優先級分配。框架

    CoarseGrainedSchedulerBackend.makeOffers(公共方法,任務分配觸發點,過濾活的Executor())
                            ||
                            ||
                           \||/
       val tasks = TaskSchedulerImpl.resourceOffer(workOffers) (分配開始)
                            ||
                            ||
                           \||/
      (遍歷全部TaskSet內部的Task的優先級,以最大本地性開始分配任務)
      for (currentMaxLocality <- taskSet.myLocalityLevels) 
                            ||
                            ||
                           \||/
          (遍歷全部可用的Executor,以指定Executor開始分配)
          for (i <- 0 until shuffledOffers.size)  
                            ||
                            ||
                           \||/
            (選定一個Executor,經過TaskSetManager進行專項任務分配)
             for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {  
                  tasks(i) += task
                  val tid = task.taskId
                  taskIdToTaskSetManager(tid) = taskSet
                  taskIdToExecutorId(tid) = execId
                  executorIdToRunningTaskIds(execId).add(tid)
                  availableCpus(i) -= CPUS_PER_TASK
                  assert(availableCpus(i) >= 0)
                  launchedTask = true
               }
                            ||
                            ||
                           \||/
           (TaskSchedulerImpl引用結束後,返回Tasks,後執行)
           CoarseGrainedSchedulerBackend.launchTasks(taskS)  
    複製代碼
  • 經過在CoarseGrainedSchedulerBackend 中的makeOffers方法,經過scheduler的引用,執行TaskSchedulerImpl.resourceOffers 方法,返回taskDescs(包含了全部Task的位置信息和task的算子等),後執行launchTasks,向 executor 發送消息executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))oop

    private def makeOffers() {
     
     val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
     
     val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
     val workOffers = activeExecutors.map {
       case (id, executorData) =>
         new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
     }.toIndexedSeq
     
      scheduler.resourceOffers(workOffers)
    複製代碼

    } if (!taskDescs.isEmpty) { launchTasks(taskDescs) } }post

  • TaskSetManager中resourceOffer內部的是如何分配任務的呢?學習

  • -> allowedLocality = getAllowedLocalityLevel(curTime) (延遲調度)
    
      -> dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality,
         speculative)  (返回TaskDescription序列,方便後續發送到Executor)
    複製代碼

4 總結

本篇內容還須要完善,並作進一步剖析。this

秦凱新 於深圳spa

相關文章
相關標籤/搜索