Spark源碼之任務調度

主要部分包括:任務調度(Schedule),Shuffle機制,Executor,Task,BlockManager,DAG,ScheduleBackEnd,TasksetManager,數組

    1. Job運行流程

SparkContext的runJob提交—》  DagSchedule –》 dag.runJob --> 服務器

dag.handleJobSummitted  --> 建立finalResultStage,而後submitStage(裏面循環提交父stage)--》 dag.submitMissingTasks --> 判斷是shuffleMapTask仍是ResultTask,這是Spark的兩種task類型 –》 生成taskSet --> taskScheduler.submitTasks提交執行,其中taskSchedule是taskScheduleImpl –》app

taskScheduleImpl.submitTasks的流程:ide

createTaskSetManager –》 SchedulerBackend.reviveOffers() –》 經過CoarseGrainedSchedulerBackend(集羣狀態)oop

{post

override def reviveOffers() {ui

    driverEndpoint.send(ReviveOffers)this

    }spa

}線程

ReviveOffsers --> makeOffers() --> LauchTasks -->  exectorEndpoint.send(LaunchTask)

經過driverEntPoint發送ReviveOffers到Exector執行。

具體執行:CoarseGrainedExecutorBackend

{

    case LaunchTask(data) =>

      if (executor == null) {

        exitExecutor(1, "Received LaunchTask command but executor was null")

      } else {

        val taskDesc = TaskDescription.decode(data.value)

        logInfo("Got assigned task " + taskDesc.taskId)

        executor.launchTask(this, taskDesc)

      }

}

    1. ScheduleBackEnd

典型的集羣環境下的ScheduleBackEnd實現之一:CoarseGrainedScheduleBackEnd

首先看看CoarseGrained集羣服務器之間的消息類型有哪些:

大致上能夠分紅兩大類:Executor相關的消息和Task相關的消息。Executor相關的消息包括Executor的註冊、刪除、狀態更新等。Task的消息包括LaunchTask,KillTask,狀態更新等。集羣間Task的調度和執行主要是經過ScheduleBackEnd來維護的。

除了CoarseGrainedScheduleBackEnd還有LocalScheduleBackEnd和StandaloneScheduleBackEnd等種類。

      1. Executor端

啓動CoarseGrainedExecutorBackend,和主節點的ScheduleBackEnd通訊。首先建立Driver端和主站聯繫,獲取主站conf信息。而後建立SparkEnv。最後啓動CoarseGrainedExecutorBackend消息處理主線程,接收ScheduleBackEnd的建立Task,關閉Exector等消息。

若是收到從ScheduleBackEnd來的註冊成功消息(也就是RegisteredExecutor),則建立Executor,執行Task操做。CoarseGrainedExecutorBackend只是負責和ScheduleBackEnd之間的通訊,並非具體執行Task的類。

 

    1. DagSchedule

Dag調度類,對一個RDD進行shuffle分析,分解成多個Stage,從最後一個Stage逆向執行。Stage分紅ResultStage和ShuffleMapStage兩類。每一個Stage根據分區分解成多個任務,用一個taskSetManager來管理。

DagSchedule用EventLoopProcess處理交互消息。有的消息時調用TaskScheduleImpl的方法;有的消息執行DagSchdule本身的私有方法。

      1. DAGSchedulerEvent消息類型

名稱

說明

JobSubmitted

建立finalResultStage,最後執行submitStage。

必定是最後一個stage,也就是ResultStage來觸發job的提交,並建立ActiveJob對應它。

stage和他的父stage的jobId是同一個值。

MapStageSubmitted

處理ShuffleMapStage,和jobSubmitted是對應的。

clearCacheLocs()

StageCancelled

對該Stage的每一個job執行handleJobCancellation方法。

handleJobCancellation方法對job的每一個stage,執行:

{

taskScheduler.cancelTasks(stageId, shouldInterruptThread)

markStageAsFinished(stage, Some(failureReason))

}

running Stages中刪除,並通知listenerBus

JobCancelled

執行failJobAndIndependentStages

清除runningStage,調用TaskSchedule對應的消息處理,通知listenerBus等。

JobGroupCancelled

批量處理JobCancelled

AllJobsCancelled

批量處理JobCancelled

BeginEvent

很簡單:

listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))

GettingResultEvent

很簡單:

listenerBus.post(SparkListenerTaskGettingResult(taskInfo))

CompletionEvent

task執行完成事件,根據完成的狀態和結果來決定是否要從新提交,是否觸發整個stage結束等狀態更迭。

最後將事件發送給listenerBus。

這段代碼比較長。

ExecutorAdded

Executor事件,新的Executor啓動了。

從failedEpoch中刪除該Executor

ExecutorLost

Executor事件,Executor關閉了。

刪除該Executor的blockManager信息,更新ShuffleStage的輸出outputMapper信息,清除CacheLocs

 

TaskSetFailed

對依賴Stage和Job執行failJobAndIndependentStages

ResubmitFailedStages

從新submitStage,針對已經失敗的stage。

executorHeartbeatReceived

通知blockmanager發送心跳,通知listenerBus

 

 

    1. TaskScheduleImp

做爲任務調度系統的重要類,(DagSchedule、TaskScheduleImpl、TaskSetManager)。

主要方法:

名稱

說明

start

啓動backend

啓動SpeculatableTasks

executorLost

刪除executor

通知DagSchedule

 

submitTasks

建立taskSetManager,並添加到ScheduleBuilder,等待下一步調度;

backend的reviveOffers方法進行調度。

 

cancelTasks

向backend發送KillTask消息

stop

中止backend,中止taskResultGetter

executorHeartbeatReceived

更新matrics,通知dagschedule

killTaskAttempt

backend發送KillTask消息。(backend向對應的executor發送KillTask消息)

applicationId

生成新的applicaionId,每一個application對應一個TaskScheduleImpl。

 

 

其餘方法(不是TaskSchedule接口中的方法):

名稱

說明

resourceOffers

對每個taskSet,執行resourceOfferSingleTaskSet方法,直到不能找到知足條件的task爲止:

搜索待執行的task。

backend會調用該方法獲取待運行的task,根據本地化task優先級,獲取指定本地化級別的task,最後生成待執行的task數組。

最後一步:提交執行task數組。

 

 

 

    1. CoarseGrainedSchedulerBackend

SchedulerBackend接口的一個實現。調度後臺,負責集羣間調度消息的傳遞。CoarseGrainedSchedulerBackend有一個DriverEndpoint,經過DriverEndPoint的receive方法接收消息,執行實際的消息處理。

主要方法:

名稱

說明

start

建立並啓動DriverEndPoint

stop

中止DriverEndPoint

makeOffers

(1)調用TaskScheduleImpl的resourcesOffer方法,從全部Executor中尋找能夠分配的task數組。

(2)執行launchTask方法,向Executor發送LauchTask消息。

 

    1. CoarseGrainedExecutorBackend

Executor端,接收ScheduleEndpoint的消息,主要是LaunchTask消息。經過Executor執行Executor啓動時建立本地SparkEnv

初始化參數:

driverUrl

driver端的鏈接地址

executorId

executor的編號,惟一

cores

Executorcpu數量

 

核心示例代碼:

override def receive: PartialFunction[Any, Unit] = {

    case RegisteredExecutor =>

      logInfo("Successfully registered with driver")

      try {

        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

      } catch {

        case NonFatal(e) =>

          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)

      }

 

    case RegisterExecutorFailed(message) =>

      exitExecutor(1, "Slave registration failed: " + message)

 

    case LaunchTask(data) =>

      if (executor == null) {

        exitExecutor(1, "Received LaunchTask command but executor was null")

      } else {

        val taskDesc = TaskDescription.decode(data.value)

        logInfo("Got assigned task " + taskDesc.taskId)

        executor.launchTask(this, taskDesc)

      }

 

    case KillTask(taskId, _, interruptThread, reason) =>

      if (executor == null) {

        exitExecutor(1, "Received KillTask command but executor was null")

      } else {

        executor.killTask(taskId, interruptThread, reason)

      }

 

    case StopExecutor =>

      stopping.set(true)

      logInfo("Driver commanded a shutdown")

      // Cannot shutdown here because an ack may need to be sent back to the caller. So send

      // a message to self to actually do the shutdown.

      self.send(Shutdown)

 

    case Shutdown =>

      stopping.set(true)

      new Thread("CoarseGrainedExecutorBackend-stop-executor") {

        override def run(): Unit = {

          // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.

          // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to

          // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).

          // Therefore, we put this line in a new thread.

          executor.stop()

        }

      }.start()

  }

相關文章
相關標籤/搜索