Spark2.x精通:Job觸發流程源碼深度剖析(二)

 接上篇文章:Spark2.x精通:Job觸發流程源碼深度剖析(一),咱們這裏繼續講解如何進行task建立、task最佳位置計算和task提交,接上篇文章的submitMissingTasks()函數作詳細講解。
緩存


  1. 直接看submitMissingTasks()函數如何進行task的建立,這個函數主要乾了如下幾件事:app

1).task最佳位置的計算
ide

2).序列化task數據,根據不一樣類型進行廣播,Executor計算時會進行反序列化
函數

3).根據不一樣stage類型分別建立ShuffleMapTask和ResultTask,封裝成對應的TaskSet,提交給TaskScheduler進行處理
oop


代碼以下:post

  /** Called when stage's parents are available and we can now do its task. */  //這裏建立一批task,這裏task數量和partition數據相同  private def submitMissingTasks(stage: Stage, jobId: Int) {    logDebug("submitMissingTasks(" + stage + ")")
   // First figure out the indexes of partition ids to compute.    //調用findMissingPartitions須要計算的partition    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
   // Use the scheduling pool, job group, description, etc. from an ActiveJob associated    // with this Stage    val properties = jobIdToActiveJob(jobId).properties    //將stage加入running隊列    runningStages += stage    // SparkListenerStageSubmitted should be posted before testing whether tasks are    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted    // event.    stage match {      case s: ShuffleMapStage =>        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)      case s: ResultStage =>        outputCommitCoordinator.stageStart(          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)    }    //這裏根據parition的id進行task的建立,一個partition一個task    //爲保證數據的本地化,經過getPreferredLocs()函數,計算task的最佳計算位置    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {    //這裏匹配 全部的finalStage都是ResultStage,    // 以前的都是ShuffleMapStage      stage match {        case s: ShuffleMapStage =>          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap        case s: ResultStage =>          partitionsToCompute.map { id =>            val p = s.partitions(id)            (id, getPreferredLocs(stage.rdd, p))          }.toMap      }    } catch {      case NonFatal(e) =>        stage.makeNewStageAttempt(partitionsToCompute.size)        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))        runningStages -= stage        return    }
   stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
   // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast    // the serialized copy of the RDD and for each task we will deserialize it, which means each    // task gets a different copy of the RDD. This provides stronger isolation between tasks that    // might modify state of objects referenced in their closures. This is necessary in Hadoop    // where the JobConf/Configuration object is not thread-safe.    var taskBinary: Broadcast[Array[Byte]] = null    try {      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).      // For ResultTask, serialize and broadcast (rdd, func).      // 這裏就是根據不一樣類型Task作了序列化和廣播化      val taskBinaryBytes: Array[Byte] = stage match {        case stage: ShuffleMapStage =>          JavaUtils.bufferToArray(            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))        case stage: ResultStage =>          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))      }      //經過廣播變量進行廣播      taskBinary = sc.broadcast(taskBinaryBytes)    } catch {      // In the case of a failure during serialization, abort the stage.      case e: NotSerializableException =>        abortStage(stage, "Task not serializable: " + e.toString, Some(e))        runningStages -= stage
       // Abort execution        return      case NonFatal(e) =>        abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))        runningStages -= stage        return    }    //爲stage建立指定數量的task任務    val tasks: Seq[Task[_]] = try {      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()      stage match {       // ShuffleMapStage生成ShuffleMapTask        case stage: ShuffleMapStage =>          stage.pendingPartitions.clear()          partitionsToCompute.map { id =>            val locs = taskIdToLocations(id)            val part = stage.rdd.partitions(id)            stage.pendingPartitions += id            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),              Option(sc.applicationId), sc.applicationAttemptId)          }         // ResultStage生成ResultTask        case stage: ResultStage =>          partitionsToCompute.map { id =>            val p: Int = stage.partitions(id)            val part = stage.rdd.partitions(p)            val locs = taskIdToLocations(id)            new ResultTask(stage.id, stage.latestInfo.attemptId,              taskBinary, part, locs, id, properties, serializedTaskMetrics,              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)          }      }    } catch {      case NonFatal(e) =>        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))        runningStages -= stage        return    }    //若是task數量大於0,這裏將task封裝成Taskset交給TaskSchduler的submitTasks進行處理    //Stage分兩種類型ShuffleMapStage生成ShuffleMapTask,ResultStage生成ResultTask    //這裏的taskScheduler是SparkContext初始化的時候進行建立的,能夠回憶一下    if (tasks.size > 0) {      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")      taskScheduler.submitTasks(new TaskSet(        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())    } else {      // Because we posted SparkListenerStageSubmitted earlier, we should mark      // the stage as completed here in case there are no tasks to run      markStageAsFinished(stage, None)
     val debugString = stage match {        case stage: ShuffleMapStage =>          s"Stage ${stage} is actually done; " +            s"(available: ${stage.isAvailable}," +            s"available outputs: ${stage.numAvailableOutputs}," +            s"partitions: ${stage.numPartitions})"        case stage : ResultStage =>          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"      }      logDebug(debugString)
     submitWaitingChildStages(stage)    }  }

  

2.上面是經過getPreferredLocs()函數計算task的最佳計算位置,這個函數的大致邏輯以下:this

    1).先看partition有沒有被cache ,若是被cache,則partition所在節點就是最佳位置spa

    2).若是沒有cache 再去看有沒有被checkpoint,若是有,則checkpoint所在節點就是最佳位置debug

    3).若是既沒有被cache也沒有被checkpoint,就遞歸去找父RDD,查看對應的cache和checkpoint肯定最佳位置code

    4).若是上面三項都沒知足,則這個partition沒有最佳位置


這裏咱們跟進去看一下,代碼:

//計算task的最佳計算位置,大致邏輯://1.先看partition有沒有被cache ,若是被cache,則partition所在節點就是最佳位置//2.若是沒有cache 再去看有沒有被checkpoint,若是有,則checkpoint所在節點就是最佳位置//3.若是既沒有被cache也沒有被checkpoint,就遞歸去找父RDD,查看對應的cache和checkpoint肯定最佳位置//若是上面三項都沒知足,則這個partition沒有最佳位置private def getPreferredLocsInternal(      rdd: RDD[_],      partition: Int,      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {    // If the partition has already been visited, no need to re-visit.    // This avoids exponential path exploration.  SPARK-695    if (!visited.add((rdd, partition))) {      // Nil has already been returned for previously visited partitions.      return Nil    }    // If the partition is cached, return the cache locations   //先去看cache     val cached = getCacheLocs(rdd)(partition)    if (cached.nonEmpty) {      return cached    }    //再去看checkpoint    // If the RDD has some placement preferences (as is the case for input RDDs), get those    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList    if (rddPrefs.nonEmpty) {      return rddPrefs.map(TaskLocation(_))    }
   // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency    // that has any placement preferences. Ideally we would choose based on transfer sizes,    // but this will do for now.    //遞歸找父RDD是否被緩存和checkpoint    rdd.dependencies.foreach {      case n: NarrowDependency[_] =>        for (inPart <- n.getParents(partition)) {          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)          if (locs != Nil) {            return locs          }        }
     case _ =>    }   //若是都沒有,則這個task沒有最佳位置,返回Nil    Nil  }


    至此,task的建立和提交剖析完畢,這裏就兩個函數,可是Spark任務調度的核心之一,尤爲是stage的劃分不太好理解,儘可能把源碼多看幾遍。

相關文章
相關標籤/搜索