Spark二級調度系統Stage劃分算法和最佳任務調度細節剖析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

Spark商業環境實戰及調優進階系列

1. Spark調度系統的組件關係

1.1 Spark調度系統的組件(以StandAlone模式)

  • 一級調度:Cluster Manger (YARN模式下爲ResourceManger , Standalone 模式下爲 Master )負責將資源分配給Application。這裏的資源如:cpu核數,內存,磁盤空間等。緩存

  • 二級調度:Application 進一步將資源分配給Application的各個Task。DAG任務調度,延遲調度等。架構

  • 任務(Task): Task分爲ResultTask和ShuffleMapTask兩種,每個Stage會根據未完成的Partion的多少,建立零到多個Task,DAGScheduer最後將每一個Stage中的Task以任務集合(TaskSet)的形式提交給TaskScheduler繼續處理。app

  • TaskSchedulerImpl :手握StandaloneSchedulerBackend(StandaloneAppClient)和 CoarseGrainedSchedulerBackend(DriverEndpoint)兩大通信端點神器。操控了整個集羣資源的彙報和資源調度。框架

  • CoarseGrainedSchedulerBackend:維護了一個executorDataMap,用於實時拿到最新的且活着的executor用於資源分配。ide

  • StandaloneSchedulerBackend: 持有TaskSchedulerImpl的引用,目前來看,就是爲了初始化啓動StandaloneAppClient和DriverEndpoint終端,經過接受消息,實際幹活的仍是TaskSchedulerImpl。oop

  • StandaloneAppClient:在Standalone模式下StandaloneSchedulerBackend在啓動的時候構造AppClient實例並在該實例start的時候啓動了ClientEndpoint這個消息循環體。ClientEndpoint在啓動的時候會向Master註冊當前程序。(Interface allowing applications to speak with a Spark standalone cluster manager.)post

    (1) StandaloneSchedulerBackend 與 CoarseGrainedSchedulerBackend 的前世,能夠看到在TaskSchedulerImpl和StandaloneSchedulerBackend相互引用並啓動:
    SparkContext -> createTaskScheduler -> new TaskSchedulerImpl(sc) ->
      new StandaloneSchedulerBackend(scheduler, sc, masterUrls)-> scheduler.initialize(backend)
      _taskScheduler.start()-> backend.start()
    複製代碼
    (2) StandaloneAppClient 與 DriverEndpoint 的此生:
    StandaloneSchedulerBackend->start()
      
      -> super.start() [CoarseGrainedSchedulerBackend]-> 
      createDriverEndpointRef(properties)->  createDriverEndpoint(properties) ->
      
      -> new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf).start()
    複製代碼
    (3) StandaloneAppClient 新官上任
    onStart -> registerWithMaster -> tryRegisterAllMasters ->  rpcEnv.setupEndpointRef(masterAddress,
      Master.ENDPOINT_NAME) -> masterRef.send(RegisterApplication(appDescription, self))
    複製代碼
    (4)StandaloneAppClient負責的功能以下:
    RegisteredApplication(啓動時向Master註冊)
          ApplicationRemoved
          ExecutorAdded
          ExecutorUpdated
          WorkerRemoved
          MasterChanged
    複製代碼
  • DriverEndpoint:而StandaloneSchedulerBackend的父類CoarseGrainedSchedulerBackend在start的時候會實例化類型爲DriverEndpoint(這就是咱們程序運行時候的經典對象的Driver)的消息循環體,StandaloneSchedulerBackend專門負責收集Worker上的資源信息, 當Worker端的ExecutorBackend啓動的時候會發送RegisteredExecutor信息向DriverEndpoint註冊,學習

    此時StandaloneSchedulerBackend就掌握了當前應用程序擁有的計算資源。TaskSchedulerImpl就是經過StandaloneSchedulerBackend擁有的計算資源來具體運行Task。負責的功能以下:
    StatusUpdate
      ReviveOffers -->
      KillTask
      KillExecutorsOnHost
      RemoveExecutor
      RegisterExecutor()
      StopDriver
      StopExecutors
      RemoveWorker
    複製代碼

Spark調度系統整體規律:在Standalone模式下StandaloneSchedulerBackend在啓動的時候構造AppClient實例並在該實例start的時候啓動了ClientEndpoint這個消息循環體。ClientEndpoint在啓動的時候會向Master註冊當前程序。而StandaloneSchedulerBackend的父類CoarseGrainedSchedulerBackend在start的時候會實例化類型爲DriverEndpoint(這就是咱們程序運行時候的經典對象的Driver)的消息循環體,StandaloneSchedulerBackend專門負責收集Worker上的資源信息,當ExecutorBackend啓動的時候會發送RegisteredExecutor信息向DriverEndpoint註冊,此時StandaloneSchedulerBackend就掌握了當前應用程序擁有的計算資源,TaskScheduler就是經過StandaloneSchedulerBackend擁有的計算資源來具體運行Task。fetch

2. DAGScheduler 核心調度系統

2.1 DAGScheduler 核心成員

  • TaskSchdulerImpl

  • LiveListenerBus

  • MapoutTrackerMaster

  • BlockManagerMaster

  • SparkEnv

  • cacheLocas:緩存每一個RDD的全部分區的位置信息,最終創建分區號和位置信息序列映射。爲何是位置序列? 這裏着重講解一下:每個分區可能存在多個副本機制,所以RDD的每個分區的BLock可能存在多個節點的BlockManager上,所以是序列。聽懂了嗎??

    new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
    複製代碼
  • MessageScheduler:職責是對失敗的Stage進行重試,以下面的執行線程代碼段:

    private val messageScheduler =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
    
      case FetchFailed -> messageScheduler.schedule(new Runnable {
          override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
        }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
      }
    複製代碼
  • getPreferredLocs:重量級方法,分區最大偏好位置獲取。最終把分區最佳偏好位置序列放在cacheLocas中,獲取不到,調用rdd.preferredLocations方法獲取。

    getPreferredLocs 
      
      -> getPreferredLocsInternal 
      
      -> getCacheLocs(rdd)(partition) ---> cacheLocs(rdd.id) = locs() -> 返回 cached
                 (取不到直接放進內存後,再返回偏好序列)
      
      -> val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    複製代碼

    getCacheLocs 方法代碼段:

    def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] =         
          cacheLocs.synchronized {
      if (!cacheLocs.contains(rdd.id)) {
      val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
      IndexedSeq.fill(rdd.partitions.length)(Nil)
      } else {
      val blockIds =
      rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
      blockManagerMaster.getLocations(blockIds).map { bms =>
      bms.map(bm => TaskLocation(bm.host, bm.executorId))
      }
    }
      cacheLocs(rdd.id) = locs
      }
      cacheLocs(rdd.id)
      }
    複製代碼
  • eventProccessLoop:大名鼎鼎的DAGSchedulerEventProcessLoop,事件處理線程,負責處理各類事件,如:

    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
      
      case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener,
      properties)
      
      case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    
      case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)
    
      case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)
    
      case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)
    
      case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()
    
      case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)
    
      case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)
    
       case BeginEvent(task, taskInfo) =>
       dagScheduler.handleBeginEvent(task, taskInfo)
    
      case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)
    
      case completion: CompletionEvent =>
      dagScheduler.handleTaskCompletion(completion)
    
      case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
    
      case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
    複製代碼

2.2 DAGScheduler 與 job 的 牽手緣分,一個job的前世此生

(1) DAG的有向無環圖切分及任務調度job提交流程(如下非代碼堆積,從上往下看邏輯核心流程):
-> DAGScheduler.runJob->submitJob(rdd, func, partitions, callSite, resultHandler,
       properties)
    
    -> DAGScheduler.eventProcessLoop.post(JobSubmitted(
               jobId, rdd, func2, partitions.toArray, callSite, waiter,
               SerializationUtils.clone(properties))) 
    
    -> DAGSchedulerEventProcessLoop-> case JobSubmitted(jobId, dependency, callSite, listener, properties)->
    
    -> DAGScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    
    -> DAGScheduler.createResultStage(finalRDD, func, partitions, jobId, callSite) (廣度優先算法,創建無環圖)
    
    -> DAGScheduler.submitStage(finalStage)
    
    -> DAGScheduler.submitMissingTasks(stage, jobId.get) (提交最前面的Stage0)
    
    -> taskScheduler.submitTasks(new TaskSet(
       tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    
    -> taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId,
       properties)) 
       
    -> createTaskSetManager(taskSet, maxTaskFailures) (TaskSchedulerImpl.submitTasks方法內)
    
    -> taskScheduler ->initialize -> FIFOSchedulableBuilder ->buildPools (TaskSchedulerImpl初始化構建)
    
    -> schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
       (TaskSchedulerImpl.submitTasks方法內,  TaskSetManager pool ,遵循FIFO)
    
    -> backend.reviveOffers()(CoarseGrainedSchedulerBackend)
    
    -> driverEndpoint.send(ReviveOffers)
    
    -> DriverEndpoint.receive  -> case ReviveOffers  makeOffers() (makeOffers封裝公共調度任務調度方法)
    
    -> makeOffers(得到活着的Executor及executorHost和freeCores) -> workOffers(CoarseGrainedSchedulerBackend內部)
    
    -> TaskSchedulerImpl.resourceOffers(workOffers)(CoarseGrainedSchedulerBackend引用TaskSchedulerImpl)
    
    -> val sortedTaskSets = TaskSchedulerImpl.rootPool.getSortedTaskSetQueue  (TaskSetManager上場)
    
    -> TaskSchedulerImpl.resourceOfferSingleTaskSet(
                  taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)t
    
    -> TaskSetManager.resourceOffer(execId, host, maxLocality))
    
    -> TaskSchedulerImpl.getAllowedLocalityLevel(curTime)  (延遲調度,期待個人下一篇精彩講解)
    
    -> TaskSetManager.dequeueTask(execId, host, allowedLocality)
    
    -> sched.dagScheduler.taskStarted(task, info)
    
    -> new TaskInfo(taskId, index, attemptNum, curTime, execId, host, taskLocality, speculative)
複製代碼

2.3 Job 與 Stage 的分分合合,Stage反向驅動與正向提交

反向驅動:長江後浪推前浪,這裏我發現一個奇怪的事情,Spark 2.0版本stage的反向驅動算法和Spark 2.3的竟然不同,這裏以Spark 2.3爲準:

  • DAGScheduler進行submitStage提交後使命就結束了,最終實現submitStage正向提交任務集合便可:

    ->  DAGScheduler. handleJobSubmitted 
     ->  finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
        (封裝 finalRDD 和func爲最後的Stage)
     ->  submitStage(finalStage) -> submitMissingTasks
    複製代碼
  • 可是createResultStage反向驅動算法精彩開始了:

    -> createResultStage 
    -> getOrCreateParentStages -> getShuffleDependencies  (獲取父依賴)
    -> getOrCreateShuffleMapStage -> getShuffleDependencies  (不斷循環,造成遞歸)
    -> createShuffleMapStage 
    複製代碼
  • 如下代碼段做用是獲取父stage,有的話直接返回,沒有就從新根據final RDD父依賴來建立Stage。注意這裏會不斷遞歸調用getOrCreateParentStages,最終創建Stage,也所以

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
  getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
複製代碼

}

  • getOrCreateShuffleMapStage:建立依賴的依賴的所在Stage,有的話會直接獲取,沒有就優先建立 父Stage,而後執行子Stage.

    private def getOrCreateShuffleMapStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
      shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
    case Some(stage) =>
      stage
    
    case None =>
      // Create stages for all missing ancestor shuffle dependencies.
      getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
        // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
        // that were not already in shuffleIdToMapStage, it's possible that by the time we
        // get to a particular dependency in the foreach loop, it's been added to
        // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
        // SPARK-13902 for more information.
        if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
          createShuffleMapStage(dep, firstJobId)
        }
      }
      // Finally, create a stage for the given shuffle dependency.
      createShuffleMapStage(shuffleDep, firstJobId)
    複製代碼

    } }

  • createShuffleMapStage:最終落地方法,就是要返回須要的stage,注意阻塞點就在getOrCreateParentStages,從而一直遞歸到最頂層。

    def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int):      ShuffleMapStage = {
     val rdd = shuffleDep.rdd
     val numTasks = rdd.partitions.length
     val parents = getOrCreateParentStages(rdd, jobId)
     val id = nextStageId.getAndIncrement()
     val stage = new ShuffleMapStage(
     id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep,                mapOutputTracker)
     stageIdToStage(id) = stage
     shuffleIdToMapStage(shuffleDep.shuffleId) = stage
     updateJobIdStageIdMaps(jobId, stage)
     if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
     mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
     }
     stage
     }
    複製代碼

2.4 秦凱新原創總結:

總流程是:先建立最頂層Satge,慢慢遞歸執行建立子stage,相似於堆棧模型。

  • 總流程爲何是這樣呢?如無環圖: G -> F A -> H -> M L->N,由於由createShuffleMapStage作反向驅動,阻塞點在就在方法內的getOrCreateParentStages,所以先把建立(Final Stage的父stage的建立方法 F, A)放在堆棧底部,不斷向上存放(F, A以上的父Stage建立方法)依次到堆棧,但卻不執行Stage建立,直到最後最頂層Stage建立方法放到堆棧時,在獲得rdd的頂級父親時,開始執行最頂層Stage建立方法,也即createShuffleMapStage開始從阻塞點遞歸創建依賴關係,註冊Shuffle,執行createShuffleMapStage方法體,而後依次從阻塞點遞歸向下執行。

  • 注意 getOrCreateShuffleMapStage的緩存機制,即shuffleIdToMapStage,實現了依賴的直接獲取,不用再重複執行,如F,A的父Stage的獲取。

    G  -> F A -> H -> M  L->N
    
      1 先從 F A 的父依賴開始 開始遞歸構建stage
      
      2 進而開始創建H以上的stagey以及依賴關係。
      
      3 後創建 F A 與 H stage 的關係,注意H Stage是從shuffleIdToMapStage拿到的,
        最後返回F,A stage ,創建 final RDD(G)與F A的依賴關係
      
      4 最終提交submitStage(finalStage)
    
      最終stage構建順序爲: N -> M  L -> H-> F A  -> G  
    複製代碼

總結

本節內容是做者投入大量時間優化後的內容,採用最平實的語言來剖析Spark的任務調度,如今時間爲凌晨1:22,最後放張圖DAGScheduler.handleTaskCompletion,正向提交任務,放飛吧,spark。

做者:秦凱新 20181102

相關文章
相關標籤/搜索