接上篇文章:Spark2.x精通:Job觸發流程源碼深度剖析(一),咱們這裏繼續講解如何進行task建立、task最佳位置計算和task提交,接上篇文章的submitMissingTasks()函數作詳細講解。
緩存
直接看submitMissingTasks()函數如何進行task的建立,這個函數主要乾了如下幾件事:app
1).task最佳位置的計算
ide
2).序列化task數據,根據不一樣類型進行廣播,Executor計算時會進行反序列化
函數
3).根據不一樣stage類型分別建立ShuffleMapTask和ResultTask,封裝成對應的TaskSet,提交給TaskScheduler進行處理
oop
代碼以下:post
/** Called when stage's parents are available and we can now do its task. */ //這裏建立一批task,這裏task數量和partition數據相同 private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")")
// First figure out the indexes of partition ids to compute. //調用findMissingPartitions須要計算的partition val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties = jobIdToActiveJob(jobId).properties //將stage加入running隊列 runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage match { case s: ShuffleMapStage => outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinator.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) } //這裏根據parition的id進行task的建立,一個partition一個task //爲保證數據的本地化,經過getPreferredLocs()函數,計算task的最佳計算位置 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { //這裏匹配 全部的finalStage都是ResultStage, // 以前的都是ShuffleMapStage stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return }
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast // the serialized copy of the RDD and for each task we will deserialize it, which means each // task gets a different copy of the RDD. This provides stronger isolation between tasks that // might modify state of objects referenced in their closures. This is necessary in Hadoop // where the JobConf/Configuration object is not thread-safe. var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). // 這裏就是根據不一樣類型Task作了序列化和廣播化 val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } //經過廣播變量進行廣播 taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString, Some(e)) runningStages -= stage
// Abort execution return case NonFatal(e) => abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } //爲stage建立指定數量的task任務 val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { // ShuffleMapStage生成ShuffleMapTask case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } // ResultStage生成ResultTask case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } //若是task數量大於0,這裏將task封裝成Taskset交給TaskSchduler的submitTasks進行處理 //Stage分兩種類型ShuffleMapStage生成ShuffleMapTask,ResultStage生成ResultTask //這裏的taskScheduler是SparkContext初始化的時候進行建立的,能夠回憶一下 if (tasks.size > 0) { logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None)
val debugString = stage match { case stage: ShuffleMapStage => s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})" case stage : ResultStage => s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString)
submitWaitingChildStages(stage) } }
2.上面是經過getPreferredLocs()函數計算task的最佳計算位置,這個函數的大致邏輯以下:this
1).先看partition有沒有被cache ,若是被cache,則partition所在節點就是最佳位置spa
2).若是沒有cache 再去看有沒有被checkpoint,若是有,則checkpoint所在節點就是最佳位置debug
3).若是既沒有被cache也沒有被checkpoint,就遞歸去找父RDD,查看對應的cache和checkpoint肯定最佳位置code
4).若是上面三項都沒知足,則這個partition沒有最佳位置
這裏咱們跟進去看一下,代碼:
//計算task的最佳計算位置,大致邏輯://1.先看partition有沒有被cache ,若是被cache,則partition所在節點就是最佳位置//2.若是沒有cache 再去看有沒有被checkpoint,若是有,則checkpoint所在節點就是最佳位置//3.若是既沒有被cache也沒有被checkpoint,就遞歸去找父RDD,查看對應的cache和checkpoint肯定最佳位置//若是上面三項都沒知足,則這個partition沒有最佳位置private def getPreferredLocsInternal( rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = { // If the partition has already been visited, no need to re-visit. // This avoids exponential path exploration. SPARK-695 if (!visited.add((rdd, partition))) { // Nil has already been returned for previously visited partitions. return Nil } // If the partition is cached, return the cache locations //先去看cache val cached = getCacheLocs(rdd)(partition) if (cached.nonEmpty) { return cached } //再去看checkpoint // If the RDD has some placement preferences (as is the case for input RDDs), get those val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList if (rddPrefs.nonEmpty) { return rddPrefs.map(TaskLocation(_)) }
// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency // that has any placement preferences. Ideally we would choose based on transfer sizes, // but this will do for now. //遞歸找父RDD是否被緩存和checkpoint rdd.dependencies.foreach { case n: NarrowDependency[_] => for (inPart <- n.getParents(partition)) { val locs = getPreferredLocsInternal(n.rdd, inPart, visited) if (locs != Nil) { return locs } }
case _ => } //若是都沒有,則這個task沒有最佳位置,返回Nil Nil }
至此,task的建立和提交剖析完畢,這裏就兩個函數,可是Spark任務調度的核心之一,尤爲是stage的劃分不太好理解,儘可能把源碼多看幾遍。