Spark2.x(六十三):(Spark2.4)Driver如何把Task(閉包等)分配給Executor

在Spark中一個appliation可能包含多個job,每一個job都是由SparkContext#runJob(。。。)觸發的,一個Job下包含1個或多個Stage,Job的最後一個stage爲ResultStage,其他的stage都爲ShuffleMapStage。ResultStage會生成一組ResultTask,ResultTask在計算完成以後會將結果返回給Drive;而ShuffleMapStage會生成一組ShuffleMapTask,ShuffleMapTask則是在計算完成以後將結果(根據RDD的Partitioner)劃分到不一樣的buckets中。html

Spark代碼如何被解析爲RDD?

1)spark程序(dataframe,dataset,spark.sql(),rdd)通過catalyst優化解析後,把spark的程序都轉化爲了層級關聯的rdd,通過DAG劃分爲的stage時,實際上就是根據rddshuffle的依賴關係來劃分的(依賴關係分爲窄依賴、寬依賴,若是遇到兩個RDD(父子)依賴是寬依賴,那麼會把RDD拆分爲2個Stage,父類RDD在一個Stage,子類在一個Stage),而且map,reduce等算子轉化爲RDD時,將算子的實現函數(「閉包」或者「自定義函數、自定義類」)賦值到對應的RDD#f屬性下。java

2)在DAGScheduler#submitMissingTasks中會把stage劃分爲兩種task:ShuffleMapTask,ResultTask,這兩個Task會被傳遞給Executor,Executor會使用TaskRunner來運行它們。git

在運行時,會調用ShuffleMapTask,ResultTask#runTask()方法,該方法內部都有rdd.iterator(...)的調用代碼,rdd#iterator(..,)內部調用了rdd.compute(...)。若是RDDA的子是RDDB,RDDB的子是RDDC,執行時:程序員

--------RDDA'compute.github

---------------RDDB'compute.sql

----------------------RDDC'compute。apache

若是使用SparkSQL(dataset,dataframe,spark.sql(''))編寫的代碼通過catalyst優化解析後的代碼後你會發現,實際上它就是把代碼解析後層級關聯RDD。編程

taskBinary中序列化的就是解析後RDD和(RDD依賴關係、ResultStage的話會把ResultTask的最後一個算子實現函數),其中非ResultTask的RDD屬性中包含了算子業務函數,在算子轉化爲RDD時,會將算子的實現函數(「閉包」或者「自定義函數、自定義類」)賦值到對應的RDD#f屬性下,並被RDD#compute()使用。api

  • 若是是「閉包」通常就是把一些常量定義到函數內部;
  • 若是是「自定義函數、自定義類」可能會引用了外部包中的子函數,這時候在TaskRunner運行時會經過反射把jar加載到當前線程中,供調用使用。

算子如何轉化爲RDD(map算子爲例)?

RDD有不少種:MapPartitoinRDD,ShuffleRDD等,可是每一種rdd都有一個compute()和iterator()方法,這個compute()方法就是循環某個partition下全部數據並調用「程序員調用算子時編寫的算子內部業務代碼函數」。閉包

以RDD的map算子爲例來分析,RDD#map()內部是把map算子轉化爲MapPartitionRDD,

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

備註:通常咱們調用RDD的map算子時會實現 f 函數。

 其中MapPartitionRDD的觸發依賴iterator()、compute()。compute的實現就是循環RDD下某個partition下全部元素並執行 f() 函數,RDD#map()的 f() 函數被封裝傳遞給MapPartitionRDD。

/**
 * An RDD that applies the provided function to every partition of the parent RDD.
 *
 * @param prev the parent RDD.
 * @param f The function used to map a tuple of (TaskContext, partition index, input iterator) to
 *          an output iterator.
 * @param preservesPartitioning Whether the input function preserves the partitioner, which should
 *                              be `false` unless `prev` is a pair RDD and the input function
 *                              doesn't modify the keys.
 * @param isFromBarrier Indicates whether this RDD is transformed from an RDDBarrier, a stage
 *                      containing at least one RDDBarrier shall be turned into a barrier stage.
 * @param isOrderSensitive whether or not the function is order-sensitive. If it's order
 *                         sensitive, it may return totally different result when the input order
 *                         is changed. Mostly stateful functions are order-sensitive.
 */
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev) {

  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

  override def clearDependencies() {
    super.clearDependencies()
    prev = null
  }

  @transient protected lazy override val isBarrier_ : Boolean =
    isFromBarrier || dependencies.exists(_.rdd.isBarrier())

  override protected def getOutputDeterministicLevel = {
    if (isOrderSensitive && prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) {
      DeterministicLevel.INDETERMINATE
    } else {
      super.getOutputDeterministicLevel
    }
  }
}

MapPartitionRDD的父類RDD類中定義了iterator()函數:

  /**
   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
   * This should ''not'' be called by users directly, but is available for implementors of custom
   * subclasses of RDD.
   */
  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context) // 會調用compute()
    } else {
      computeOrReadCheckpoint(split, context) // 會調用compute()
    }
  }

ShuflleMapTask和ResultTask

ShuffleMapTask類

ShuffleMapTask將RDD的元素分爲多個存儲桶(基於 ShuffleDependency 中指定的分區器)。
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

private[spark] class ShuffleMapTask(
    stageId: Int,
    stageAttemptId: Int,
    taskBinary: Broadcast[Array[Byte]],
    partition: Partition,
    @transient private var locs: Seq[TaskLocation],
    localProperties: Properties,
    serializedTaskMetrics: Array[Byte],
    jobId: Option[Int] = None,
    appId: Option[String] = None,
    appAttemptId: Option[String] = None,
    isBarrier: Boolean = false)
  extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties,
    serializedTaskMetrics, jobId, appId, appAttemptId, isBarrier)
  with Logging {

  /** A constructor used only in test suites. This does not require passing in an RDD. */
  def this(partitionId: Int) {
    this(0, 0, null, new Partition { override def index: Int = 0 }, null, new Properties, null)
  }

  @transient private val preferredLocs: Seq[TaskLocation] = {
    if (locs == null) Nil else locs.toSet.toSeq
  }

  override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
      
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

  override def preferredLocations: Seq[TaskLocation] = preferredLocs

  override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, partitionId)
}

@param stageId 此Task所屬Stage的ID(id of the stage this task belongs to)
@param stageAttemptId 此Task所屬Stage的嘗試ID (attempt id of the stage this task belongs to)
@param taskBinary RDD和ShuffleDependency的廣播版本。反序列化後,類型應爲(RDD[_], ShuffleDependency[_, _, _])。(broadcast version of the RDD and the ShuffleDependency. Once deserialized,the type should be (RDD[_], ShuffleDependency[_, _, _]))
@param partition 與此Task關聯的RDD分區(partition of the RDD this task is associated with)
@param locs 區域調度的首選Task執行位置 (preferred task execution locations for locality scheduling)
@param localProperties 用戶在driver端設置的線程本地屬性的副本。 (copy of thread-local properties set by the user on the driver side.)
@param serializedTaskMetrics 在driver端建立並序列化併發送到executor端的「TaskMetrics」。 (a `TaskMetrics` that is created and serialized on the driver side and sent to executor side.)
 如下參數是可選的:
@param jobId 此Task所屬Job的id(id of the job this task belongs to)
@param appId 此Task所屬application的id(id of the app this task belongs to)
@param appAttemptId 此Task所屬application的嘗試id(attempt id of the app this task belongs to)
@param isBarrier 此Task是否屬於屏障Stage。Spark必須同時啓動全部Task以進入屏障Stage(whether this task belongs to a barrier stage. Spark must launch all the tasks at the same time for a barrier stage.)

TaskMetrics就是對task的執行信息的一個描述類

class TaskMetrics private[spark] () extends Serializable {
  // Each metric is internally represented as an accumulator
  private val _executorDeserializeTime = new LongAccumulator    // executor端反序列化耗時
  private val _executorDeserializeCpuTime = new LongAccumulator // executor端反序列化CPU耗時
  private val _executorRunTime = new LongAccumulator   // executor端運行時間
  private val _executorCpuTime = new LongAccumulator   // executor端CPU耗時
  private val _resultSize = new LongAccumulator        // 結果大小
  private val _jvmGCTime = new LongAccumulator         // JVM GC耗時
  private val _resultSerializationTime = new LongAccumulator // 結果序列化耗時
  private val _memoryBytesSpilled = new LongAccumulator      // 溢出的內存字節
  private val _diskBytesSpilled = new LongAccumulator        // 溢出的磁盤字節
  private val _peakExecutionMemory = new LongAccumulator     // 峯值執行內存
  private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)] // 修改的block狀態信息集合
}

ResultTask類

將輸出發送回Driver應用程序的Task。
https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala

private[spark] class ResultTask[T, U](
    stageId: Int,
    stageAttemptId: Int,
    taskBinary: Broadcast[Array[Byte]],
    partition: Partition,
    locs: Seq[TaskLocation],
    val outputId: Int,
    localProperties: Properties,
    serializedTaskMetrics: Array[Byte],
    jobId: Option[Int] = None,
    appId: Option[String] = None,
    appAttemptId: Option[String] = None,
    isBarrier: Boolean = false)
  extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics,
    jobId, appId, appAttemptId, isBarrier)
  with Serializable {

  @transient private[this] val preferredLocs: Seq[TaskLocation] = {
    if (locs == null) Nil else locs.toSet.toSeq
  }

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    func(context, rdd.iterator(partition, context))
  }

  // This is only callable on the driver side.
  override def preferredLocations: Seq[TaskLocation] = preferredLocs

  override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")"
}

斷點監控Task序列化攜帶信息

使用RDD來編寫一個測試程序:

package com.dx.test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.util.Map;

public class TestBroadcast {
   public static void main(String[] args) {
      SparkConf conf = new SparkConf();
      conf.setMaster("local[*]");
      conf.setAppName("test application");
      JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
      Map<String, String> resource = new java.util.HashMap<String, String>();
      for (int i = 0; i < 10000; i++) {
         resource.put(String.valueOf(i), String.valueOf(i));
      }
      final Broadcast<Map<String, String>> broadcastMap = javaSparkContext.broadcast(resource);

      StructType resulStructType = new StructType();
      resulStructType = resulStructType.add("int_id", DataTypes.StringType, false);
      resulStructType = resulStructType.add("job_result", DataTypes.StringType, true);
      ExpressionEncoder<Row> resultEncoder = RowEncoder.apply(resulStructType);

      JavaRDD<String> sourceDataset = javaSparkContext.textFile("E:\\test2");
      JavaRDD<Row> dataset = sourceDataset.map(new Function<String, Row>() {
         public Row call(String line) throws Exception {

            String[] fields = line.split(",");
            int int_id = Integer.valueOf(fields[1]);

            Map<String, String> resources = broadcastMap.getValue();
            String job_result = resources.get(int_id);

            Object[] values = new Object[2];
            values[0] = int_id;
            values[1] = job_result;

            return RowFactory.create(values);
         }
      });
      dataset.saveAsTextFile("E:\\test3");
   }
}

端點設置到DAGScheduler#submitMissingTasks(stage: Stage, jobId: Int)

  /** Called when stage's parents are available and we can now do its task. */
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")

    // First figure out the indexes of partition ids to compute.
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

    // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
    // with this Stage
    val properties = jobIdToActiveJob(jobId).properties

    runningStages += stage
    // SparkListenerStageSubmitted should be posted before testing whether tasks are
    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    // event.
    stage match {
      case s: ShuffleMapStage =>
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
      case s: ResultStage =>
        outputCommitCoordinator.stageStart(
          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
    }
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch {
      case NonFatal(e) =>
        stage.makeNewStageAttempt(partitionsToCompute.size)
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

    // If there are tasks to execute, record the submission time of the stage. Otherwise,
    // post the even without the submission time, which indicates that this stage was
    // skipped.
    if (partitionsToCompute.nonEmpty) {
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    }
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
    // the serialized copy of the RDD and for each task we will deserialize it, which means each
    // task gets a different copy of the RDD. This provides stronger isolation between tasks that
    // might modify state of objects referenced in their closures. This is necessary in Hadoop
    // where the JobConf/Configuration object is not thread-safe.
    var taskBinary: Broadcast[Array[Byte]] = null
    var partitions: Array[Partition] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      var taskBinaryBytes: Array[Byte] = null
      // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
      // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
      // consistent view of both variables.
      RDDCheckpointData.synchronized {
        taskBinaryBytes = stage match {
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

        partitions = stage.rdd.partitions
      }

      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      // In the case of a failure during serialization, abort the stage.
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString, Some(e))
        runningStages -= stage

        // Abort execution
        return
      case e: Throwable =>
        abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage

        // Abort execution
        return
    }

    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

    if (tasks.size > 0) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)

      stage match {
        case stage: ShuffleMapStage =>
          logDebug(s"Stage ${stage} is actually done; " +
              s"(available: ${stage.isAvailable}," +
              s"available outputs: ${stage.numAvailableOutputs}," +
              s"partitions: ${stage.numPartitions})")
          markMapStageJobsAsFinished(stage)
        case stage : ResultStage =>
          logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
      }
      submitWaitingChildStages(stage)
    }
  }
View Code

中的

  if (tasks.size > 0) { // 斷點

IDEA下Debug運行 TestBroadcast ,以後能拿到斷點:

從Stage下能夠看到stage#rdd屬性,該rdd就是當前stage中包含執行邏輯代碼的解析結果。stage 裏邊就是一層的rdd,無論是spark sql,dataframe,dataset仍是rdd編程,最終程序都被解析(spark sql,dataframe,dataset通過catalyst解析後)爲RDD,每一個rdd包含了都有能夠接收實現函數,好比map算子被轉化爲 MapPartitionRDD,轉化後,把實現函數轉化爲 mapPartitionRDD實例的一個屬性函數。

一個rdd,在執行過程當中屬性列表

 其中f就是咱們的實現的函數,該函數被當作RDD的一個屬性

 

 當stage轉化爲Task後,Task內部包含的操做數據實際上就是RDD的某一個分區,該RDD依然攜帶了RDD#f 函數屬性,所以當task被序列化爲Task時,這些實現函數也被序列化。等到達了Executor後會被反序列化加載到TaskRunner中去執行。

TaskRunner執行時,會加載driver傳遞給container的application.jar等jar,若是Task的反序列化的RDD的f依賴jar包的會從加載jar包中讀取依賴函數等。

ResultTask的func從哪裏來?

以上邊例子來講

dataset.saveAsTextFile("E:\\test3");

的調用信息爲:

CallSite(
runJob at 
SparkHadoopWriter.scala:78,org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151))

Dataset#saveAsTextFile(。。。)內部實現是調用SparkContext.runJob(...)來提交任務:

org.apache.spark.internal.io.SparkHadoopWriter

  def write[K, V: ClassTag](
      rdd: RDD[(K, V)],
      config: HadoopWriteConfigUtil[K, V]): Unit = {
    // Extract context and configuration from RDD.
    val sparkContext = rdd.context
    val commitJobId = rdd.id

    // Set up a job.
    val jobTrackerId = createJobTrackerID(new Date())
    val jobContext = config.createJobContext(jobTrackerId, commitJobId)
    config.initOutputFormat(jobContext)

    // Assert the output format/key/value class is set in JobConf.
    config.assertConf(jobContext, rdd.conf)

    val committer = config.createCommitter(commitJobId)
    committer.setupJob(jobContext)

    // Try to write all RDD partitions as a Hadoop OutputFormat.
    try {
      val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
        // SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.
        // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
        val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber

        executeTask(
          context = context,
          config = config,
          jobTrackerId = jobTrackerId,
          commitJobId = commitJobId,
          sparkPartitionId = context.partitionId,
          sparkAttemptNumber = attemptId,
          committer = committer,
          iterator = iter)
      })

      committer.commitJob(jobContext, ret)
      logInfo(s"Job ${jobContext.getJobID} committed.")
    } catch {
      case cause: Throwable =>
        logError(s"Aborting job ${jobContext.getJobID}.", cause)
        committer.abortJob(jobContext)
        throw new SparkException("Job aborted.", cause)
    }
  }

其中SparkContext#runJob(...)傳遞的第二個參數就是func。SparkContext#runJob(...)內部調用DAGScheduler#runJob(...)

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

在DAGScheduler內部會將func做爲ResultStage的屬性,

  /**
   * Create a ResultStage associated with the provided jobId.
   */
  private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

在DAGScheduler#submitMissingTasks(...)中生成序列化的taskBinary時,若是stage爲ResultStage時,將stage#func也和stage#rdd一塊兒序列化,最終跟隨Task一塊兒被髮送到executor上。

      var taskBinaryBytes: Array[Byte] = null
      // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
      // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
      // consistent view of both variables.
      RDDCheckpointData.synchronized {
        taskBinaryBytes = stage match {
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

        partitions = stage.rdd.partitions
      }

      taskBinary = sc.broadcast(taskBinaryBytes)

在Executor中啓動Task是會調用org.apache.spark.executor.Executor#launchTask()加載task進行反序列化,在org.apache.spark.executor.Executor.TaskRunner中對Task執行,若是Task是ResultTask時,會調用ResultTask#runTask()。

在ResultTask#runTask()中會反序列化taskBinary,反序列化出func和rdd,以後調動func函數,函數內部進行RDD迭代執行。

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    func(context, rdd.iterator(partition, context))
  }

到這裏,應該能夠清楚的知道ResultTask中從taskBinary中反序列化的func就是SparkContext#runJob(...)的第二個參數。在ResutlTask中rdd#compute()在func內部迭代被調用,這也是真正算子觸發的地方。

參考:

[譯]Spark編程指南(二)

[spark] Task執行流程

Apache Spark源碼走讀之3 -- Task運行期之函數調用關係分析

相關文章
相關標籤/搜索