Spark 源碼系列(六)Shuffle 的過程解析

Spark 大會上,全部的演講嘉賓都認爲 shuffle 是最影響性能的地方,可是又迫不得已。以前去百度面試 hadoop 的時候,也被問到了這個問題,直接回答了不知道。html

這篇文章主要是沿着下面幾個問題來開展:面試

一、shuffle 過程的劃分?數組

二、shuffle 的中間結果如何存儲?dom

三、shuffle 的數據如何拉取過來?ide

Shuffle 過程的劃分

Spark 的操做模型是基於 RDD 的,當調用 RDD 的 reduceByKey、groupByKey 等相似的操做的時候,就須要有 shuffle 了。再拿出 reduceByKey 這個來說。函數

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }
複製代碼

reduceByKey 的時候,咱們能夠手動設定 reduce 的個數,若是不指定的話,就可能不受控制了。oop

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    for (r <- bySize if r.partitioner.isDefined) {
      return r.partitioner.get
    }
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(bySize.head.partitions.size)
    }
  }
複製代碼

若是不指定 reduce 個數的話,就按默認的走:性能

一、若是自定義了分區函數 partitioner 的話,就按你的分區函數來走。fetch

二、若是沒有定義,那麼若是設置了 spark.default.parallelism,就使用哈希的分區方式,reduce 個數就是設置的這個值。優化

三、若是這個也沒設置,那就按照輸入數據的分片的數量來設定。若是是 hadoop 的輸入數據的話,這個就多了,你們可要當心啊。

設定完以後,它會作三件事情,也就是以前講的 3 次 RDD 轉換。

//map端先按照key合併一次
val combined = self.mapPartitionsWithContext((context, iter) => {
        aggregator.combineValuesByKey(iter, context)
 }, preservesPartitioning = true)
//reduce抓取數據
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializer)
//合併數據,執行reduce計算
partitioned.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
 }, preservesPartitioning = true)
複製代碼

View Code

img

一、在第一個 MapPartitionsRDD 這裏先作一次 map 端的聚合操做。

二、ShuffledRDD 主要是作從這個抓取數據的工做。

三、第二個 MapPartitionsRDD 把抓取過來的數據再次進行聚合操做。

四、步驟 1 和步驟 3 都會涉及到 spill 的過程。

怎麼作的聚合操做,回去看 RDD 那章。

Shuffle 的中間結果如何存儲

做業提交的時候,DAGScheduler 會把 Shuffle 的過程切分紅 map 和 reduce 兩個 Stage(以前一直被我叫作 shuffle 前和 shuffle 後),具體的切分的位置在上圖的虛線處。

map 端的任務會做爲一個 ShuffleMapTask 提交,最後在 TaskRunner 裏面調用了它的 runTask 方法。

override def runTask(context: TaskContext): MapStatus = {
    val numOutputSplits = dep.partitioner.numPartitions
    metrics = Some(context.taskMetrics)

    val blockManager = SparkEnv.get.blockManager
    val shuffleBlockManager = blockManager.shuffleBlockManager
    var shuffle: ShuffleWriterGroup = null
    var success = false

    try {
      // serializer爲空的狀況調用默認的JavaSerializer,也能夠經過spark.serializer來設置成別的
      val ser = Serializer.getSerializer(dep.serializer)
      // 實例化Writer,Writer的數量=numOutputSplits=前面咱們說的那個reduce的數量
      shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)

      // 遍歷rdd的元素,按照key計算出來它所在的bucketId,而後經過bucketId找到相應的Writer寫入
      for (elem <- rdd.iterator(split, context)) {
        val pair = elem.asInstanceOf[Product2[Any, Any]]
        val bucketId = dep.partitioner.getPartition(pair._1)
        shuffle.writers(bucketId).write(pair)
      }

      // 提交寫入操做. 計算每一個bucket block的大小
      var totalBytes = 0L
      var totalTime = 0L
      val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
        writer.commit()
        writer.close()
        val size = writer.fileSegment().length
        totalBytes += size
        totalTime += writer.timeWriting()
        MapOutputTracker.compressSize(size)
      }

      // 更新 shuffle 監控參數.
      val shuffleMetrics = new ShuffleWriteMetrics
      shuffleMetrics.shuffleBytesWritten = totalBytes
      shuffleMetrics.shuffleWriteTime = totalTime
      metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

      success = true
      new MapStatus(blockManager.blockManagerId, compressedSizes)
    } catch { case e: Exception =>
      // 出錯了,取消以前的操做,關閉writer
      if (shuffle != null && shuffle.writers != null) {
        for (writer <- shuffle.writers) {
          writer.revertPartialWrites()
          writer.close()
        }
      }
      throw e
    } finally {
      // 關閉writer
      if (shuffle != null && shuffle.writers != null) {
        try {
          shuffle.releaseWriters(success)
        } catch {
          case e: Exception => logError("Failed to release shuffle writers", e)
        }
      }
      // 執行註冊的回調函數,通常是作清理工做
      context.executeOnCompleteCallbacks()
    }
  }
複製代碼

遍歷每個記錄,經過它的 key 來肯定它的 bucketId,再經過這個 bucket 的 writer 寫入數據。

下面咱們看看 ShuffleBlockManager 的 forMapTask 方法吧。

def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
    new ShuffleWriterGroup {
      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
      private val shuffleState = shuffleStates(shuffleId)
      private var fileGroup: ShuffleFileGroup = null

      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        fileGroup = getUnusedFileGroup()
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;// 從已有的文件組裏選文件,一個bucket一個文件,即要發送到同一個reduce的數據寫入到同一個文件
          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
        }
      } else {
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          // 按照blockId來生成文件,文件數爲map數*reduce數
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          val blockFile = blockManager.diskBlockManager.getFile(blockId)
          if (blockFile.exists) {
            if (blockFile.delete()) {
              logInfo(s"Removed existing shuffle file $blockFile")
            } else {
              logWarning(s"Failed to remove existing shuffle file $blockFile")
            }
          }
          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
        }
      }
複製代碼

一、map 的中間結果是寫入到本地硬盤的,而不是內存。

二、默認是一個 Executor 的中間結果文件是 M*R(M=map 數量,R=reduce 的數量),設置了 spark.shuffle.consolidateFiles 爲 true 以後是 R 個文件,根據 bucketId 把要分到同一個 reduce 的結果寫入到一個文件中。

三、consolidateFiles 採用的是一個 reduce 一個文件,它還記錄了每一個 map 的寫入起始位置,因此查找的時候先經過 reduceId 查找到哪一個文件,再經過 mapId 查找索引當中的起始位置 offset,長度 length=(mapId + 1).offset -(mapId).offset,這樣就能夠肯定一個 FileSegment(file, offset, length)。

四、Finally,存儲結束以後, 返回了一個 new MapStatus(blockManager.blockManagerId, compressedSizes),把 blockManagerId 和 block 的大小都一塊兒返回。

我的想法,shuffle 這塊和 hadoop 的機制差異不大,tez 這樣的引擎會遇上 spark 的速度呢?仍是讓咱們拭目以待吧!

Shuffle 的數據如何拉取過來

ShuffleMapTask 結束以後,最後走到 DAGScheduler 的 handleTaskCompletion 方法當中(關於中間的過程,請看《圖解做業生命週期》)。

case smt: ShuffleMapTask =>
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
    logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
} else {
    stage.addOutputLoc(smt.partitionId, status)
}
if (runningStages.contains(stage) && pendingTasks(stage).isEmpty) {
    markStageAsFinished(stage)
    if (stage.shuffleDep.isDefined) {
         // 真的map過程纔會有這個依賴,reduce過程None
         mapOutputTracker.registerMapOutputs(
   &emsp;&emsp;stage.shuffleDep.get.shuffleId,
         stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
         changeEpoch = true)
     }
      clearCacheLocs()
      if (stage.outputLocs.exists(_ == Nil)) {
          // 一些任務失敗了,須要從新提交stage
          submitStage(stage)
       } else {
          // 提交下一批任務 
&emsp;&emsp;&emsp;}
}
複製代碼

一、把結果添加到 Stage 的 outputLocs 數組裏,它是按照數據的分區 Id 來存儲映射關係的 partitionId->MapStaus。

二、stage 結束以後,經過 mapOutputTracker 的 registerMapOutputs 方法,把這次 shuffle 的結果 outputLocs 記錄到 mapOutputTracker 裏面。

這個 stage 結束以後,就到 ShuffleRDD 運行了,咱們看一下它的 compute 函數。

SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser)
複製代碼

它是經過 ShuffleFetch 的 fetch 方法來抓取的,具體實如今 BlockStoreShuffleFetcher 裏面。

override def fetch[T](
      shuffleId: Int,
      reduceId: Int,
      context: TaskContext,
      serializer: Serializer)
    : Iterator[T] =
{
    val blockManager = SparkEnv.get.blockManager
    val startTime = System.currentTimeMillis
&emsp;&emsp; // mapOutputTracker也分Master和Worker,Worker向Master請求獲取reduce相關的MapStatus,主要是(BlockManagerId和size)
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
    // 一個BlockManagerId對應多個文件的大小
    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
    for (((address, size), index) <- statuses.zipWithIndex) {
      splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
    }
    // 構造BlockManagerId 和 BlockId的映射關係,想不到ShffleBlockId的mapId,竟然是1,2,3,4的序列...
    val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
      case (address, splits) =>
        (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
    }
    // 名爲updateBlock,實際是檢驗函數,每一個Block都對應着一個Iterator接口,若是該接口爲空,則應該報錯
    def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
      val blockId = blockPair._1
      val blockOption = blockPair._2
      blockOption match {
        case Some(block) => {
          block.asInstanceOf[Iterator[T]]
        }
        case None => {
          blockId match {
            case ShuffleBlockId(shufId, mapId, _) =>
              val address = statuses(mapId.toInt)._1
              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
            case _ =>
              throw new SparkException("Failed to get block " + blockId + ", which is not a shuffle block")
          }
        }
      }
    }
    // 從blockManager獲取reduce所須要的所有block,並添加校驗函數
    val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
    val itr = blockFetcherItr.flatMap(unpackBlock)
    
&emsp;&emsp;val completionIter = CompletionIterator[T, Iterator[T]](itr, {
      // CompelteIterator迭代結束以後,會執行如下這部分代碼,提交它記錄的各類參數
      val shuffleMetrics = new ShuffleReadMetrics
      shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
      shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
      shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
      shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
      shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
      shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
      context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics)
    })

    new InterruptibleIterator[T](context, completionIter)
  }
}
複製代碼

一、MapOutputTrackerWorker 向 MapOutputTrackerMaster 獲取 shuffle 相關的 map 結果信息。

二、把 map 結果信息構形成 BlockManagerId --> Array(BlockId, size) 的映射關係。

三、經過 BlockManager 的 getMultiple 批量拉取 block。

四、返回一個可遍歷的 Iterator 接口,並更新相關的監控參數。

咱們繼續看 getMultiple 方法。

def getMultiple(
      blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
      serializer: Serializer): BlockFetcherIterator = {
    val iter =
      if (conf.getBoolean("spark.shuffle.use.netty", false)) {
        new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
      } else {
        new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
      }

    iter.initialize()
    iter
  }
複製代碼

分兩種狀況處理,分別是 netty 的和 Basic 的,Basic 的就不講了,就是經過 ConnectionManager 去指定的 BlockManager 那裏獲取數據,上一章恰好說了。

咱們講一下 Netty 的吧,這個是須要設置的才能啓用的,不知道性能會不會好一些呢?

看 NettyBlockFetcherIterator 的 initialize 方法,再看 BasicBlockFetcherIterator 的 initialize 方法,發現 Basic 的不能同時抓取超過 48Mb 的數據。

override def initialize() {
      // 分開本地請求和遠程請求,返回遠程的FetchRequest
      val remoteRequests = splitLocalRemoteBlocks()
      // 抓取順序隨機
      for (request <- Utils.randomize(remoteRequests)) {
        fetchRequestsSync.put(request)
      }
      // 默認是開6個線程去進行抓取
      copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))// 讀取本地的block
      getLocalBlocks()
   }
複製代碼

在 NettyBlockFetcherIterator 的 sendRequest 方法裏面,發現它是經過 ShuffleCopier 來試下的。

&emsp;&emsp;val cpier = new ShuffleCopier(blockManager.conf)
   cpier.getBlocks(cmId, req.blocks, putResult)
複製代碼

這塊接下來就是 netty 的客戶端調用的方法了,我對這個不瞭解。在服務端的處理是在 DiskBlockManager 內部啓動了一個 ShuffleSender 的服務,最終的業務處理邏輯是在 FileServerHandler。

它是經過 getBlockLocation 返回一個 FileSegment,下面這段代碼是 ShuffleBlockManager 的 getBlockLocation 方法。

def getBlockLocation(id: ShuffleBlockId): FileSegment = {
    // Search all file groups associated with this shuffle.
    val shuffleState = shuffleStates(id.shuffleId)
    for (fileGroup <- shuffleState.allFileGroups) {
      val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId)
      if (segment.isDefined) { return segment.get }
    }
    throw new IllegalStateException("Failed to find shuffle block: " + id)
  }
複製代碼

先經過 shuffleId 找到 ShuffleState,再經過 reduceId 找到文件,最後經過 mapId 肯定它的文件分片的位置。可是這裏有個疑問了,若是啓用了 consolidateFiles,一個 reduce 的所需數據都在一個文件裏,是否是就能夠把整個文件一塊兒返回呢,而不是經過 N 個 map 來屢次讀取?仍是懼怕一次發送一個大文件容易失敗?這就不得而知了。

到這裏整個過程就講完了。能夠看得出來 Shuffle 這塊仍是作了一些優化的,可是這些參數並無啓用,有須要的朋友能夠本身啓用一下試試效果。

參考文獻

Shuffle

相關文章
相關標籤/搜索