sparkCore shuffle解析

轉載:時延軍.http://shiyanjun.cnhtml

Spark在Map階段調度運行的ShuffleMapTask,最後會生成.data和.index文件,能夠經過個人這篇文章 Spark Shuffle過程分析:Map階段處理流程 瞭解具體流程和詳情。同時,在Executor上運行一個ShuffleMapTask,返回了一個MapStatus對象,下面是ShuffleMapTask執行後返回結果的相關代碼片斷:數組

var writer: ShuffleWriter[Any, Any] = null
try {
  val manager = SparkEnv.get.shuffleManager
  writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
  writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
  writer.stop(success = true).get
} catch {
  case e: Exception =>
    try {
      if (writer != null) {
        writer.stop(success = false)
      }
    } catch {
      case e: Exception =>
        log.debug("Could not stop writer", e)
    }
    throw e
}

若是ShuffleMapTask執行過程沒有發生異常,則最後執行的調用爲:網絡

writer.stop(success = true).get

這裏返回了一個MapStatus類型的對象,MapStatus的定義以下所示:app

private[spark] sealed trait MapStatus {
  def location: BlockManagerId
  def getSizeForBlock(reduceId: Int): Long
}

其中包含了運行ShuffleMapTask所在的BlockManager的地址,以及後續Reduce階段每一個ResultTask計算須要Map輸出的大小(Size)。咱們能夠看下MapStatus如何建立的,在SortShuffleWriter的write()方法中,能夠看到MapStatus的建立,以下代碼所示:ide

mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)

繼續跟蹤能夠看到,調用了MapStatus的伴生對象的apply()方法:函數

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
  if (uncompressedSizes.length > 2000) {
    HighlyCompressedMapStatus(loc, uncompressedSizes)
  } else {
    new CompressedMapStatus(loc, uncompressedSizes)
  }
}

uncompressedSizes表示Partition的個數,若是大於2000則建立HighlyCompressedMapStatus對象,不然建立CompressedMapStatus對象,他們具體的實現能夠參考源碼。oop

含有Shuffle過程的Spark Application示例優化

咱們先給出一個簡單的Spark Application程序代碼,以下所示:ui

al rdd = sc.textFile("/temp/*.h")
val finalRdd = rdd.flatMap(line => line.split("\\s+")).map(w => (w, 1)).reduceByKey(_ + _)
finalRdd.toDebugString
finalRdd.saveAsTextFile("/temp/output")

經過RDD的toDebugString()方法,打印調試信息:this

scala> finalRdd.toDebugString
res0: String = 
(133) ShuffledRDD[6] at reduceByKey at <console>:30 []
  +-(133) MapPartitionsRDD[5] at map at <console>:30 []
      |   MapPartitionsRDD[4] at flatMap at <console>:30 []
      |   /temp/*.h MapPartitionsRDD[3] at textFile at <console>:29 []
      |   /temp/*.h HadoopRDD[2] at textFile at <console>:29 []

能夠看到這個過程當中,調用了reduceByKey(),建立了一個ShuffledRDD,這在計算過程當中會執行Shuffle操做。

ShuffleMapTask執行結果上報處理流程

Spark Application提交之後,會生成ShuffleMapStage和/或ResultStage,而一個ShuffleMapStage對應一組實際須要運行的ShuffleMapTask,ResultStage對應一組實際須要運行ResultTask,每組Task都是有TaskSetManager來管理的,而且只有ShuffleMapStage對應的一組ShuffleMapTask都運行成功結束之後,纔會調度ResultStage。 因此,咱們這裏關注的是,當ShuffleMapStage中最後一個ShuffleMapTask運行成功後,如何將Map階段的信息上報給調度器(Driver上的TaskScheduler和DAGScheduler),瞭解這個處理流程對理解後續的Reduce階段處理相當重要,這個過程的詳細處理流程,以下圖所示: ShuffleMapTask.succeeded 咱們將整個流程按照順序分爲以下幾個過程來描述:

ShuffleMapTask完成後處理結果 Executor會啓動一個TaskRunner線程來運行ShuffleMapTask,ShuffleMapTask完成後,會對結果進行序列化處理,代碼以下所示:

val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit

根據序列化後結果serializedDirectResult的大小resultSize,會進行一些優化,代碼以下所示:

val serializedResult: ByteBuffer = {
  if (maxResultSize > 0 && resultSize > maxResultSize) {
    logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
      s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
      s"dropping it.")
    ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
  } else if (resultSize > maxDirectResultSize) {
    val blockId = TaskResultBlockId(taskId)
    env.blockManager.putBytes(
      blockId,
      new ChunkedByteBuffer(serializedDirectResult.duplicate()),
      StorageLevel.MEMORY_AND_DISK_SER)
    logInfo(
      s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
    ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
  } else {
    logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
    serializedDirectResult
  }
}

若是結果大小沒有超過指定的DirectTaskResult的最大限制值maxDirectResultSize,就直接將上面的DirectTaskResult的序列化結果發送給Driver;若是結果大小超過了Task結果的最大限制值maxResultSize,則直接丟棄結果;不然,當結果大小介於maxDirectResultSize與maxResultSize之間時,會基於Task ID建立一個TaskResultBlockId,而後經過BlockManager將結果暫時保存在Executor上(DiskStore或MemoryStore),以便後續計算直接請求獲取該數據。 最後,結果會調用CoarseGrainedExecutorBackend的statusUpdate方法,以下所示:

execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

將Task對應的運行狀態、運行結果發送給Driver。

Driver獲取Task運行結果 集羣模式下,Driver端負責接收Task運行結果的是CoarseGrainedSchedulerBackend,它內部有一個DriverEndpoint來負責實際網絡通訊,以及接收Task狀態及其結果,代碼以下所示:

case StatusUpdate(executorId, taskId, state, data) =>
  scheduler.statusUpdate(taskId, state, data.value)
  if (TaskState.isFinished(state)) {
    executorDataMap.get(executorId) match {
      case Some(executorInfo) =>
        executorInfo.freeCores += scheduler.CPUS_PER_TASK
        makeOffers(executorId)
      case None =>
        // Ignoring the update since we don't know about the executor.
        logWarning(s"Ignored task status update ($taskId state $state) " +
          s"from unknown executor with ID $executorId")
    }
  }

若是消息類型爲StatusUpdate,則首先直接調用了TaskSchedulerImpl的statusUpdate()方法,來獲取Task的運行狀態及其結果,代碼以下所示:

case StatusUpdate(executorId, taskId, state, data) =>
  scheduler.statusUpdate(taskId, state, data.value)
  if (TaskState.isFinished(state)) {
    executorDataMap.get(executorId) match {
      case Some(executorInfo) =>
        executorInfo.freeCores += scheduler.CPUS_PER_TASK
        makeOffers(executorId)
      case None =>
        // Ignoring the update since we don't know about the executor.
        logWarning(s"Ignored task status update ($taskId state $state) " +
          s"from unknown executor with ID $executorId")
    }
  }

若是Task狀態是TaskState.FINISHED,則經過TaskResultGetter來獲取Task運行返回的結果,這裏存在DirectTaskResult和IndirectTaskResult兩種類型的結果,他們的處理方式不一樣:對於DirectTaskResult類型的結果,以下所示:

case directResult: DirectTaskResult[_] =>
  if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
    return
  }
  // deserialize "value" without holding any lock so that it won't block other threads.
  directResult.value(taskResultSerializer.get())

直接從DirectTaskResult中就能夠經過反序列化獲得結果,而對於IndirectTaskResult類型的結果,邏輯相對複雜一些,以下所示:

case directResult: DirectTaskResult[_] =>
  if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
    return
  }
  // deserialize "value" without holding any lock so that it won't block other threads.
  directResult.value(taskResultSerializer.get())

結果大小超過指定的限制值,在ShuffleMapTask運行過程當中會直接經過BlockManager存儲到Executor的內存/磁盤上,這裏就會根據結果Block ID,經過BlockManager來獲取到結果對應的Block數據。

更新Driver端Task、Stage狀態,並調度Stage運行 獲取到ShuffleMapTask運行的結果數據後,須要更新TaskSetManager中對應的狀態信息,以便爲後續調度Task運行提供決策支持,代碼以下所示:

scheduler.handleSuccessfulTask(taskSetManager, tid, result)

上面代碼調用了TaskSetManager的handleSuccessfulTask()方法,更新相關狀態,同時繼續更新DAGScheduler中對應的狀態,代碼片斷以下所示:

sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
maybeFinishTaskSet()

調用DAGScheduler的taskEnded()方法,更新Stage信息。若是一個ShuffleMapTask運行完成後,並且是對應的ShuffleMapStage中最後一個ShuffleMapTask,則該ShuffleMapStage也完成了,則會註冊該ShuffleMapStage運行獲得的全部Map輸出結果,代碼以下所示:

mapOutputTracker.registerMapOutputs(
  shuffleStage.shuffleDep.shuffleId,
  shuffleStage.outputLocInMapOutputTrackerFormat(),
  changeEpoch = true)

上面MapOutputTracker維護了一個ConcurrentHashMap[Int, Array[MapStatus]]內存結構,用來管理每一個ShuffleMapTask運行完成返回的結果數據,其中Key是Shuffle ID,Value使用數組記錄每一個Map ID對應的輸出結果信息。 下面代碼判斷ShuffleMapStage是否可用,從而進行相應的處理:

if (!shuffleStage.isAvailable) {
  // Some tasks had failed; let's resubmit this shuffleStage.
  // TODO: Lower-level scheduler should also deal with this
  logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
    ") because some of its tasks had failed: " +
    shuffleStage.findMissingPartitions().mkString(", "))
  submitStage(shuffleStage)
} else {
  // Mark any map-stage jobs waiting on this stage as finished
  if (shuffleStage.mapStageJobs.nonEmpty) {
    val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
    for (job <- shuffleStage.mapStageJobs) {
      markMapStageJobAsFinished(job, stats)
    }
  }
  submitWaitingChildStages(shuffleStage)
}

若是ShuffleMapStage不可用,說明還有某些Partition對應的結果沒有計算(或者某些計算失敗),Spark會從新提交該ShuffleMapStage;若是可用,則說明當前ShuffleMapStage已經運行完成,更新對應的狀態和結果信息:標記ShuffleMapStage已經完成,同時提交Stage依賴關係鏈中相鄰下游的Stage運行。若是後面是ResultStage,則會提交該ResultStage運行。

釋放資源、從新調度Task運行 一個ShuffleMapTask運行完成,要釋放掉對應的Executor佔用的資源,在Driver端會增長對應的資源列表,同時調度Task到該釋放的Executor上運行,可見CoarseGrainedSchedulerBackend.DriverEndpoint中對應的處理邏,代碼以下所示:

if (TaskState.isFinished(state)) {
  executorDataMap.get(executorId) match {
    case Some(executorInfo) =>
      executorInfo.freeCores += scheduler.CPUS_PER_TASK
      makeOffers(executorId)

上面makeOffers()方法,會調度一個Task到該executorId標識的Executor上運行。若是ShuffleMapStage已經完成,那麼這裏可能會調度ResultStage階段的ResultTask運行。

Reduce階段處理流程

上面咱們給出的例子中,執行reduceByKey後,因爲上游的RDD沒有按照key執行分區操做,因此一定會建立一個ShuffledRDD,能夠在PairRDDFunctions類的源碼中看到combineByKeyWithClassTag方法,實現代碼以下所示:

def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
  if (keyClass.isArray) {
    if (mapSideCombine) {
      throw new SparkException("Cannot use map-side combining with array keys.")
    }
    if (partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("Default partitioner cannot partition array keys.")
    }
  }
  val aggregator = new Aggregator[K, V, C](
    self.context.clean(createCombiner),
    self.context.clean(mergeValue),
    self.context.clean(mergeCombiners))
  if (self.partitioner == Some(partitioner)) {
    self.mapPartitions(iter => {
      val context = TaskContext.get()
      new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
    }, preservesPartitioning = true)
  } else {
    new ShuffledRDD[K, V, C](self, partitioner)
      .setSerializer(serializer)
      .setAggregator(aggregator)
      .setMapSideCombine(mapSideCombine)
  }
}

這裏,由於咱們給出的例子的上下文中,self.partitioner == Some(partitioner)不成立,因此最終建立了一個ShuffledRDD對象。因此,對於Reduce階段的處理流程,咱們基於ShuffledRDD的處理過程來進行分析。 咱們從ResultTask類開始,該類中實現了runTask()方法,代碼以下所示:

override def runTask(context: TaskContext): U = {
  // Deserialize the RDD and the func using the broadcast variables.
  val deserializeStartTime = System.currentTimeMillis()
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
 
  func(context, rdd.iterator(partition, context))
}

其中,最核心的就是上面的rdd.iterator()調用,具體處理過程,以下圖所示: RDD.iterator 最終,它用來計算一個RDD,即對應ShuffledRDD的計算。iterator()方法是在RDD類中給出的,以下所示:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

跟蹤getOrCompute()方法,最終應該是在ShuffledRDD類的compute()方法中定義。

ShuffledRDD計算 ShuffledRDD對應的compute方法的實現代碼,以下所示:

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
  val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
  SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
    .read()
    .asInstanceOf[Iterator[(K, C)]]
}

上面主要是經過BlockStoreShuffleReader的read()方法,來實現ShuffledRDD的計算,咱們經過下面的序列圖來看一下詳細的執行流程: Reduce.run 跟蹤Map的輸出結果,是基於Executor端的MapOutputTracker與Driver端的MapOutputTrackerMaster來實現的,其中MapOutputTrackerMaster做爲Server端,MapOutputTracker做爲Client端。Driver端管理了一個Spark Application計算程序的ShuffleMapStage中全部ShuffleMapTask的輸出,因此在Reduce過程當中Executor會經過MapOutputTracker與Driver的MapOutputTrackerMaster進行通訊獲取。 調用BlockStoreShuffleReader的read()方法,最終獲得了Reduce過程當中須要的輸入,即ShuffleMapTask的輸出結果所在的位置。一般,爲了可以使計算在數據本地進行,每一個ResultTask運行所在的Executor節點會存在對應的Map輸出,是經過BlockManager來管理這些數據的,經過Block ID來標識。因此,上圖中最後返回了一個BlockManager ID及受其管理的一個Block ID列表,而後Executor上的ResultTask就可以根據BlockManager ID來獲取到對應的Map輸出數據,從而進行數據的計算。 ResultTask運行完成後,最終返回一個記錄的迭代器,此時計算獲得的最終結果數據,是在各個ResultTask運行所在的Executor上的,而數據又是按Block來存儲的,是經過BlockManager來管理的。

保存結果RDD 根據前面的程序示例,最後調用了RDD的saveAsTextFile(),這會又生成一個ResultStage,進而對應着一組ResultTask。保存結果RDD的處理流程,以下圖所示: RDD.saveAsTextFile 上面整個流程,會執行設置RDD輸出到HDFS的Writer(一個寫文件的函數)、提交ResultStage、構建包含ResultTask的TaskSet、調度ResultTask到指定Executor上執行這幾個核心的過程。實際上,在每一個Executor上運行的ResultTask的核心處理邏輯,主要是下面這段函數代碼:

val writer = new SparkHadoopWriter(hadoopConf)
writer.preSetup()
 
val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
  val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
  val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
 
  writer.setup(context.stageId, context.partitionId, taskAttemptId)
  writer.open()
  var recordsWritten = 0L
 
  Utils.tryWithSafeFinallyAndFailureCallbacks {
    while (iter.hasNext) {
      val record = iter.next()
      writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
 
      // Update bytes written metric every few records
      SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten)
      recordsWritten += 1
    }
  }(finallyBlock = writer.close())
  writer.commit()
  outputMetrics.setBytesWritten(callback())
  outputMetrics.setRecordsWritten(recordsWritten)
}

還記得咱們在計算ShuffledRDD的過程當中,最終的ResultTask生成了一個結果的迭代器。當調用saveAsTextFile()時,ResultStage對應的一組ResultTask會在Executor上運行,將每一個迭代器對應的結果數據保存到HDFS上。

相關文章
相關標籤/搜索