Spark源碼系列(三)做業運行過程

做業執行

上一章講了RDD的轉換,可是沒講做業的運行,它和Driver Program的關係是啥,和RDD的關係是啥?html

官方給的例子裏面,一執行collect方法就能出結果,那咱們就從collect開始看吧,進入RDD,找到collect方法。node

  def collect(): Array[T] = {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

它進行了兩個操做:apache

一、調用SparkContext的runJob方法,把自身的引用傳入去,再傳了一個匿名函數(把Iterator轉換成Array數組)數組

二、把result結果合併成一個Array,注意results是一個Array[Array[T]]類型,因此第二句的那個寫法纔會那麼奇怪。這個操做是很重的一個操做,若是結果很大的話,這個操做是會報OOM的,由於它是把結果保存在Driver程序的內存當中的result數組裏面。數據結構

咱們點進去runJob這個方法吧。app

    val callSite = getCallSite
    val cleanedFunc = clean(func)
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get)
    rdd.doCheckpoint()

追蹤下去,咱們會發現通過多個不一樣的runJob同名函數調用以後,執行job做業靠的是dagScheduler,最後把結果經過resultHandler保存返回。dom

DAGScheduler如何劃分做業

好的,咱們繼續看DAGScheduler的runJob方法,提交做業,而後等待結果,成功什麼都不作,失敗拋出錯誤,咱們接着看submitJob方法。jvm

    val jobId = nextJobId.getAndIncrement()
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
// 記錄做業成功與失敗的數據結構,一個做業的Task數量是和分片的數量一致的,Task成功以後調用resultHandler保存結果。 val waiter
= new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)

走到這裏,感受有點兒繞了,爲何到了這裏,還不直接運行呢,還要給eventProcessActor發送一個JobSubmitted請求呢,new一個線程和這個區別有多大?ide

無論了,搜索一下eventProcessActor吧,結果發現它是一個DAGSchedulerEventProcessActor,它的定義也在DAGScheduler這個類裏面。它的receive方法裏面定義了12種事件的處理方法,這裏咱們只須要看函數

JobSubmitted的就行,它也是調用了自身的handleJobSubmitted方法。可是這裏很奇怪,沒辦法打斷點調試,可是它的結果卻是能返回的,所以咱們得用另一種方式,打開test工程,找到scheduler目錄下的DAGSchedulerSuite這個類,咱們本身寫一個test方法,首先咱們要在import那裏加上import org.apache.spark.SparkContext._  ,而後加上這一段測試代碼。

  test("run shuffle") {
    val rdd1 = sc.parallelize(1 to 100, 4)
    val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1)
    val rdd3 = rdd2.map(_ - 1).filter(_ < 50).map(i => (i, i))
    val rdd4 = rdd3.reduceByKey(_ + _)
    submit(rdd4, Array(0,1,2,3))
    complete(taskSets(0), Seq(
      (Success, makeMapStatus("hostA", 1)),
      (Success, makeMapStatus("hostB", 1))))
    complete(taskSets(1), Seq((Success, 42)))
    complete(taskSets(2), Seq(
      (Success, makeMapStatus("hostA", 2)),
      (Success, makeMapStatus("hostB", 2))))
    complete(taskSets(3), Seq((Success, 68)))
  }
View Code

這個例子的重點仍是shuffle那塊,另外也包括了map的多個轉換,你們能夠按照這個例子去測試下。

咱們接着看handleJobSubmitted吧。

    var finalStage: Stage = null
    try {
      finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
    } catch {
      // 錯誤處理,告訴監聽器做業失敗,返回....
    }
    if (finalStage != null) {
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
      clearCacheLocs()
      if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
        // 很短、沒有父stage的本地操做,好比 first() or take() 的操做本地執行.
        listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
        runLocally(job)
      } else {
        // collect等操做走的是這個過程,更新相關的關係映射,用監聽器監聽,而後提交做業
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        resultStageToJob(finalStage) = job
        listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties))
        // 提交stage
        submitStage(finalStage)
      }
    }
    // 提交stage
    submitWaitingStages()
View Code

從上面這個方法來看,咱們應該重點關注newStage方法、submitStage方法和submitWaitingStages方法。

咱們先看newStage,它獲得的結果叫作finalStage,挺奇怪的哈,爲啥?先看吧

    val id = nextStageId.getAndIncrement()
    val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stageToInfos(stage) = StageInfo.fromStage(stage)
    stage

能夠看出來Stage也沒有太多的東西可言,它就是把rdd給傳了進去,tasks的數量,shuffleDep是空,parentStage。

那它的parentStage是啥呢?

  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        // 在visit函數裏面,只有存在ShuffleDependency的,parent才經過getShuffleMapStage計算出來
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_,_] =>
              parents += getShuffleMapStage(shufDep, jobId)
            case _ =>
              visit(dep.rdd)
          }
        }
      }
    }
    visit(rdd)
    parents.toList
  }
View Code

它是經過不停的遍歷它以前的rdd,若是碰到有依賴是ShuffleDependency類型的,就經過getShuffleMapStage方法計算出來它的Stage來。

那咱們就開始看submitStage方法吧。

  private def submitStage(stage: Stage) {
        //...
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing == Nil) {
          // 沒有父stage,執行這stage的tasks
          submitMissingTasks(stage, jobId.get)
          runningStages += stage
        } else {
         // 提交父stage的task,這裏是個遞歸,真正的提交在上面的註釋的地方
          for (parent <- missing) {
            submitStage(parent)
          }
          // 暫時不能提交的stage,先添加到等待隊列
          waitingStages += stage
        }
      }
  }
View Code

這個提交stage的過程是一個遞歸的過程,它是先要把父stage先提交,而後把本身添加到等待隊列中,直到沒有父stage以後,就提交該stage中的任務。等待隊列在最後的submitWaitingStages方法中提交。

這裏我引用一下上一章當中我所畫的那個圖來表示這個過程哈。

getParentStages方法能夠看出來,RDD當中存在ShuffleDependency的Stage纔會有父Stage, 也就是圖中的虛線的位置!

因此咱們只須要記住凡是涉及到shuffle的做業都會至少有兩個Stage,即shuffle前和shuffle後。

TaskScheduler提交Task

那咱們接着看submitMissingTasks方法,下面是主體代碼。

  private def submitMissingTasks(stage: Stage, jobId: Int) {
    val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
    myPending.clear()
    var tasks = ArrayBuffer[Task[_]]()
    if (stage.isShuffleMap) {
      // 這是shuffle stage的狀況
      for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
        val locs = getPreferredLocs(stage.rdd, p)
        tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
      }
    } else {
      // 這是final stage的狀況
      val job = resultStageToJob(stage)
      for (id <- 0 until job.numPartitions if !job.finished(id)) {
        val partition = job.partitions(id)
        val locs = getPreferredLocs(stage.rdd, partition)
        tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
      }
    }
    if (tasks.size > 0) {
      myPending ++= tasks
      taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
    } else {
      runningStages -= stage
    }
  }
View Code

Task也是有兩類的,一種是ShuffleMapTask,一種是ResultTask,咱們須要注意這兩種Task的runTask方法。最後Task是經過taskScheduler.submitTasks來提交的。

咱們找到TaskSchedulerImpl裏面看這個方法。

  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasksthis.synchronized {
      val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
      activeTaskSets(taskSet.id) = manager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }

調度器有兩種模式,FIFO和FAIR,默認是FIFO, 能夠經過spark.scheduler.mode來設置,schedulableBuilder也有相應的兩種FIFOSchedulableBuilder和FairSchedulableBuilder。

那backend是啥?聽說是爲了給TaskSchedulerImpl提供插件式的調度服務的。

它是怎麼實例化出來的,這裏咱們須要追溯回到SparkContext的createTaskScheduler方法,下面我直接把經常使用的3中類型的TaskScheduler給列出來了。

mode            Scheduler                          Backend

cluster          TaskSchedulerImpl             SparkDeploySchedulerBackend

yarn-cluster  YarnClusterScheduler          CoarseGrainedSchedulerBackend

yarn-client    YarnClientClusterScheduler  YarnClientSchedulerBackend

好,咱們回到以前的代碼上,schedulableBuilder.addTaskSetManager比較簡單,把做業集添加到調度器的隊列當中。

咱們接着看backend的reviveOffers,裏面只有一句話driverActor ! ReviveOffers。真是頭暈,搞那麼多Actor,只是爲了接收消息。。。

照舊吧,找到它的receive方法,找到ReviveOffers這個case,發現它調用了makeOffers方法,咱們繼續追殺!

def makeOffers() {
    launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}

從executorHost中隨機抽出一些來給調度器,而後調度器返回TaskDescription,executorHost怎麼來的,待會兒再說,咱們接着看resourceOffers方法。

def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    SparkEnv.set(sc.env)

    // 遍歷worker提供的資源,更新executor相關的映射
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
      }
    }

    // 從worker當中隨機選出一些來,防止任務都堆在一個機器上
    val shuffledOffers = Random.shuffle(offers)
    // worker的task列表
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    val sortedTaskSets = rootPool.getSortedTaskSetQueue

    // 隨機遍歷抽出來的worker,經過TaskSetManager的resourceOffer,把本地性最高的Task分給Worker
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
      do {
        launchedTask = false
        for (i <- 0 until shuffledOffers.size) {
          val execId = shuffledOffers(i).executorId
          val host = shuffledOffers(i).host
          if (availableCpus(i) >= CPUS_PER_TASK) {
            // 把本地性最高的Task分給Worker
            for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
              tasks(i) += task
              val tid = task.taskId
              taskIdToTaskSetId(tid) = taskSet.taskSet.id
              taskIdToExecutorId(tid) = execId
              activeExecutorIds += execId
              executorsByHost(host) += execId
              availableCpus(i) -= CPUS_PER_TASK
              assert (availableCpus(i) >= 0)
              launchedTask = true
            }
          }
        }
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }
View Code

resourceOffers主要作了3件事:

一、從Workers裏面隨機抽出一些來執行任務。

二、經過TaskSetManager找出和Worker在一塊兒的Task,最後編譯打包成TaskDescription返回。

三、將Worker-->Array[TaskDescription]的映射關係返回。

咱們繼續看TaskSetManager的resourceOffer,看看它是怎麼找到和host再起的Task,而且包裝成TaskDescription。

經過查看代碼,我發現以前我解釋的和它具體實現的差異比較大,它所謂的本地性是根據當前的等待時間來肯定的任務本地性的級別。

它的本地性主要是包括四類:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。

  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
    while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
        currentLocalityIndex < myLocalityLevels.length - 1)
    {
      // 成立條件是當前時間-上次發佈任務的時間 > 當前本地性級別的,條件成立就跳到下一個級別
      lastLaunchTime += localityWaits(currentLocalityIndex)
      currentLocalityIndex += 1
    }
    myLocalityLevels(currentLocalityIndex)
  }
View Code

等待時間是能夠經過參數去設置的,具體的本身查下面的代碼。

  private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
    val defaultWait = conf.get("spark.locality.wait", "3000")
    level match {
      case TaskLocality.PROCESS_LOCAL =>
        conf.get("spark.locality.wait.process", defaultWait).toLong
      case TaskLocality.NODE_LOCAL =>
        conf.get("spark.locality.wait.node", defaultWait).toLong
      case TaskLocality.RACK_LOCAL =>
        conf.get("spark.locality.wait.rack", defaultWait).toLong
      case TaskLocality.ANY =>
        0L
    }
  }
View Code

下面繼續看TaskSetManager的resourceOffer的方法,經過findTask來從Task集合裏面找到相應的Task。

      findTask(execId, host, allowedLocality) match {
        case Some((index, taskLocality)) => {
val task = tasks(index) val serializedTask
= Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)   val timeTaken = clock.getTime() - startTime   addRunningTask(taskId)   val taskName = "task %s:%d".format(taskSet.id, index)   sched.dagScheduler.taskStarted(task, info)   return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask)) }

它的findTask方法以下:

  private def findTask(execId: String, host: String, locality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value)] =
  {
   // 同一個Executor,經過execId來查找相應的等待的task
    for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }
   // 經過主機名找到相應的Task,不過比以前的多了一步判斷
    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
      for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL))
      }
    }
  // 經過Rack的名稱查找Task
    if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- findTaskFromList(execId, getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL))
      }
    }
   // 查找那些preferredLocations爲空的,不指定在哪裏執行的Task來執行
    for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }
  // 查找那些preferredLocations爲空的,不指定在哪裏執行的Task來執行
    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
      for (index <- findTaskFromList(execId, allPendingTasks)) {
        return Some((index, TaskLocality.ANY))
      }
    }
    // 最後沒辦法了,拖的時間太長了,只能啓動推測執行了
    findSpeculativeTask(execId, host, locality)
  }
View Code

從這個方面能夠看得出來,Spark對運行時間仍是很注重的,等待的時間越長,它就可能越飢不擇食,從PROCESS_LOCAL一直讓步到ANY,最後的最後,推測執行都用到了。

找到任務以後,它就調用dagScheduler.taskStarted方法,通知dagScheduler任務開始了,taskStarted方法就不詳細講了,它觸發dagScheduler的BeginEvent事件,裏面只作了2件事:

一、檢查Task序列化的大小,超過100K就警告。

二、提交等待的Stage。

好,咱們繼續回到發佈Task上面來,中間過程講完了,咱們應該是要回到CoarseGrainedSchedulerBackend的launchTasks方法了。

def makeOffers() {
    launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}

它的方法體是:

    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
        executorActor(task.executorId) ! LaunchTask(task)       }
    }

經過executorId找到相應的executorActor,而後發送LaunchTask過去,一個Task佔用一個Cpu。

註冊Application

那這個executorActor是怎麼來的呢?找唄,最後發現它是在receive方法裏面接受到RegisterExecutor消息的時候註冊的。經過搜索,咱們找到CoarseGrainedExecutorBackend這個類,在它的preStart方法裏面赫然找到了driver ! RegisterExecutor(executorId, hostPort, cores)  帶的這三個參數都是在初始化的時候傳入的,那是誰實例化的它呢,再逆向搜索找到SparkDeploySchedulerBackend!以前的backend一直都是它,咱們看reviveOffers是在它的父類CoarseGrainedSchedulerBackend裏面。

關係清楚了,在這個backend的start方法裏面啓動了一個AppClient,AppClient的其中一個參數ApplicationDescription就是封裝的運行CoarseGrainedExecutorBackend的命令。AppClient內部啓動了一個ClientActor,這個ClientActor啓動以後,會嘗試向Master發送一個指令actor ! RegisterApplication(appDescription) 註冊一個Application。

別廢話了,Ctrl +Shift + N吧,定位到Master吧。

    case RegisterApplication(description) => {
        val app = createApplication(description, sender)
        registerApplication(app)
        persistenceEngine.addApplication(app)
        sender ! RegisteredApplication(app.id, masterUrl)
        schedule()
    }

它作了5件事:

一、createApplication爲這個app構建一個描述App數據結構的ApplicationInfo。

二、註冊該Application,更新相應的映射關係,添加到等待隊列裏面。

三、用persistenceEngine持久化Application信息,默認是不保存的,另外還有兩種方式,保存在文件或者Zookeeper當中。

四、經過發送方註冊成功。

五、開始做業調度。

關於調度的問題,在第一章《spark-submit提交做業過程》已經介紹過了,建議回去再看看,搞清楚Application和Executor之間的關係。

Application一旦得到資源,Master會發送launchExecutor指令給Worker去啓動Executor。

進到Worker裏面搜索LaunchExecutor。

  val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host,
            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
  executors(appId + "/" + execId) = manager
  manager.start()
   coresUsed += cores_
   memoryUsed += memory_
   masterLock.synchronized {
      master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
   }

原來ExecutorRunner還不是傳說中的Executor,它內部是執行了appDesc內部的那個命令,啓動了CoarseGrainedExecutorBackend,它纔是咱們的真命天子Executor。

啓動以後ExecutorRunner報告ExecutorStateChanged事件給Master。

Master幹了兩件事:

一、轉發給Driver,這個Driver是以前註冊Application的那個AppClient

二、若是是Executor運行結束,從相應的映射關係裏面刪除

發佈Task

上面又花了那麼多時間講Task的運行環境ExecutorRunner是怎麼註冊,那咱們仍是回到咱們的主題,Task的發佈。

發佈任務是發送LaunchTask指令給CoarseGrainedExecutorBackend,接受到指令以後,讓它內部的executor來發布這個任務。

這裏咱們看一下Executor的launchTask。

  def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

TaskRunner是這裏的重頭戲啊!看它的run方法吧。

    override def run() {
      // 準備工做若干...那天咱們放學回家通過一片玉米地,以上省略一百字

      try {
        // 反序列化Task
        SparkEnv.set(env)
        Accumulators.clear()
        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
        updateDependencies(taskFiles, taskJars)
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

       // 命令爲嘗試運行,和hadoop的mapreduce做業是一致的 
        attemptedTask = Some(task)
        logDebug("Task " + taskId + "'s epoch is " + task.epoch)
        env.mapOutputTracker.updateEpoch(task.epoch)

        // 運行Task, 具體能夠去看以前讓你們關注的ResultTask和ShuffleMapTask
        taskStart = System.currentTimeMillis()
        val value = task.run(taskId.toInt)
        val taskFinish = System.currentTimeMillis()

     // 對結果進行序列化
        val resultSer = SparkEnv.get.serializer.newInstance()
        val beforeSerialization = System.currentTimeMillis()
        val valueBytes = resultSer.serialize(value)
        val afterSerialization = System.currentTimeMillis()
     // 更新任務的相關監控信息,會反映到監控頁面上的
        for (m <- task.metrics) {
          m.hostname = Utils.localHostName()
          m.executorDeserializeTime = taskStart - startTime
          m.executorRunTime = taskFinish - taskStart
          m.jvmGCTime = gcTime - startGCTime
          m.resultSerializationTime = afterSerialization - beforeSerialization
        }

        val accumUpdates = Accumulators.values
     // 對結果進行再包裝,包裝完再進行序列化
        val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
        val serializedDirectResult = ser.serialize(directResult)
        // 若是中間結果的大小超過了spark.akka.frameSize(默認是10M)的大小,就要提高序列化級別了,超過內存的部分要保存到硬盤的
        val serializedResult = {
          if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            ser.serialize(new IndirectTaskResult[Any](blockId))
          } else {
            serializedDirectResult
          }
        }
     // 返回結果
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
      } catch {
        // 這部分是錯誤處理,被我省略掉了,主要內容是通關相關負責人處理後事
      } finally {
        // 清理爲ResultTask註冊的shuffle內存,最後把task從正在運行的列表當中刪除
        val shuffleMemoryMap = env.shuffleMemoryMap
        shuffleMemoryMap.synchronized {
          shuffleMemoryMap.remove(Thread.currentThread().getId)
        }
        runningTasks.remove(taskId)
      }
    }
  }
View Code

以上代碼被我這些了,可是建議你們看看註釋吧。

最後結果是經過statusUpdate返回的。

  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    driver ! StatusUpdate(executorId, taskId, state, data)
  }

這回這個Driver又不是剛纔那個AppClient,而是它的家長SparkDeploySchedulerBackend,是在SparkDeploySchedulerBackend的父類CoarseGrainedSchedulerBackend接受了這個StatusUpdate消息。

這關係真他娘夠亂的。。

繼續,Task裏面走的是TaskSchedulerImpl這個方法。

scheduler.statusUpdate(taskId, state, data.value)

到這裏,一個Task就運行結束了,後面就再也不擴展了,做業運行這塊是Spark的核心,再擴展基本就能寫出來一本書了,限於文章篇幅,這裏就再也不深究了。

以上的過程應該是和下面的圖一致的。

 

看完這篇文章,估計你們會雲裏霧裏的,在下一章《做業生命週期》會把剛纔描述的整個過程從新梳理出來,便於你們記憶,敬請期待!

 

岑玉海

轉載請註明出處,謝謝!

相關文章
相關標籤/搜索