Spark 源碼分析(八):DAGScheduler 源碼分析2(task 最佳位置計算)

前面一篇文章已經講了 DAGScheduler 中的 stage 劃分算法。java

實際上就是每當執行到 RDD 的 action 算子時會去調用 DAGScheduler 的 handleJobSubmitted 方法,這個方法內部會根據當前的 RDD 建立一個 ResultStage,而後根據這個 ResultStage 對象建立一個 Job。算法

再將這個 stage 對象傳入 submitStage 方法,這個方法內部會調用一些其它方法,會根據當前 stage 中的那個 RDD 的依賴鏈往前推,依據 RDD 之間的依賴關係,碰到寬依賴就建立一個新的 stage,窄依賴就將當前 RDD 加入當前 stage 中,一直到全部 RDD 都遍歷完。緩存

至此全部的 stage 就劃分完了。app

在前面的 submitStage 方法中會找到劃分出的 stage 中的第一個 stage,而後調用 submitMissingTasks 方法。post

if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          // 找到第一個 stage 去調用 submitMissingTasks 方法
          submitMissingTasks(stage, jobId.get)
        }
複製代碼

submitMissingTasks 方法中作了這些事:spa

1,拿到 stage 中沒有計算的 partition;debug

2,獲取 task 對應的 partition 的最佳位置,這個是這裏主要講解的算法;code

3,獲取 taskBinary,將 stage 的 RDD 和 ShuffleDependency(或 func)廣播到 Executor;對象

4,爲 stage 建立 task;遞歸

這個方法的代碼不少,咱們主要分析下怎麼分配 task 到最優的 partition 上去的,也就是計算 partitionId 和 taskId 的對應關係。

// 計算 taskId 和 partition 的對應關係 
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        // 若是是 ShuffleMapStage
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        // 若是是 ResultStage
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
複製代碼

能夠看出主要是調用了 getPreferredLocs 這個方法,這個方法其實是調用了 getPreferredLocsInternal 這個方法。

private[spark]
  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
    getPreferredLocsInternal(rdd, partition, new HashSet)
  }
複製代碼

下面主要看下 getPreferredLocsInternal 這個方法作了哪些操做:

1,判斷 RDD 的 partition 是否被操做過了,若是被操做過了就什麼都不作;

2,查看當前 RDD 的 partition 的最佳計算位置是否被緩存過,若是被緩存過直接返回對應的緩存位置;

3,若是沒有緩存,就調用 RDD 的 preferredLocations 去計算最佳位置,實際上就是看看當前 RDD 是否被 checkpoint 了,若是有就返回 checkpoint 的位置;

4,若是當前 RDD 既沒有被緩存又沒有 checkpoint 的話,就去遍歷 RDD 的依賴鏈,若是有窄依賴,就去遍歷父 RDD 的全部 partition,遞歸調用 getPreferredLocsInternal 方法。

這裏實際上就是找出當前 stage 中是否存在某個 RDD 被緩存或者 checkpoint了,若是有就返回其緩存或者 checkpoint 的位置,添加到序列中,而後返回。若是當前 stage 中的全部 RDD 都沒有被緩存或者 checkpoint 的話,那麼 task 的最佳計算位置就返回 Nil。

private def getPreferredLocsInternal( rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    // 若是這個 rdd 的 partition 已經計算過了位置了就忽略
  	// 由於這個方法是被遞歸調用的
    if (!visited.add((rdd, partition))) {
      // Nil has already been returned for previously visited partitions.
      return Nil
    }
    // 若是這個 partition 被緩存過就返回緩存的位置
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // 調用 RDD 內部的 preferredLocations 方法去找最佳計算位置,實際上內部是看當前
    // RDD 是否 checkpoint 了,若是作了 checkpoint 就會返回 checkpoint 的位置
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }

    /** * 若是該 RDD 既沒有沒緩存有沒有 checkpoint 的話那麼就會去遍歷他的依賴鏈,發現是窄依賴的時候 * 去就去遞歸調用 getPreferredLocsInternal 去看看該 RDD 是否被緩存或者 checkpoint 了。若是 * 是,就返回緩存或者 checkpoint 的位置。若是一直沒找到的話就返回 Nil **/
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }

      case _ =>
    }

    Nil
  }
複製代碼

當獲取到 task 的最佳位置後,根據 stage 的類型匹配,爲每一個 partition 的數據建立一個 task,若是是 ShuffleMapStage 就建立 ShuffleMapTask,若是是 ResultStage 就建立 ResultTask。而後將整個 stage 建立的全部 task都放到一個 Seq 中。

建立 task 的過程會將每一個 task前面計算出來的最佳位置和 taskBinary 等參數帶進去。

val tasks: Seq[Task[_]] = try {
      stage match {
        // 若是是 ShuffleMapStage
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            // 建立 ShuffleMapTask
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId)
          }

        // 若是是 ResultStage
        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            // 建立 ResultTask
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
複製代碼

建立好該 stage 的 tasks 後,若是 tasks 的長度大於 0,會將這些 task 建立一個 TaskSet ,而後調用 TaskScheduler 的 submitTasks 方法,提交 TaskSet 給 TaskScheduler。

若是 tasks 的長度小於等於 0 的話,會將當前 stage 標記完成,而後調用 submitWaitingChildStages 方法,提交當前 stage 的子 stage。

// 若是 tasks 長度大於 0
if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingPartitions ++= tasks.map(_.partitionId)
      logDebug("New pending partitions: " + stage.pendingPartitions)
      // 將 tasks 封裝到 TaskSet 內部,而後經過 taskScheduler 的 submitTasks 方法提交
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {// tasks 長度小於等於 0
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
  		// 標記當前 stage 已完成
      markStageAsFinished(stage, None)

      val debugString = stage match {
        case stage: ShuffleMapStage =>
          s"Stage ${stage} is actually done; " +
            s"(available: ${stage.isAvailable}," +
            s"available outputs: ${stage.numAvailableOutputs}," +
            s"partitions: ${stage.numPartitions})"
        case stage : ResultStage =>
          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
      }
      logDebug(debugString)
			// 提交當前 stage 的子 stage
      submitWaitingChildStages(stage)
    }
複製代碼

至此 Stage 的 TaskSet 已經提交給 TaskScheduler 了,下面就是看 TaskScheduler 怎麼對 Task 進行調度處理了。

相關文章
相關標籤/搜索