Spark的shuffle剖析!

Spark的shuffle剖析!php


1、什麼是shuffle?python

shuffle是洗牌的意思,總的來講,就是分散在各個節點的數據,在通過計算以後,須要從新將數據進行分配,以進行下一步的計算。好比wordcount,顯示在3臺節點上,分別計算了spark的數量、hadoop的數量、scala的數量,結果以下:算法


節點1:   (spark 1)   (hadoop 1)數組

節點2:   (hadoop 1)  (scala 1)網絡

節點3:   (hadoop 1)  (spark 3)數據結構

在通過計算以後,下一步就須要彙總了,那麼彙總就涉及到shuffle,把數據要分類傳輸,好比spark的都到節點1,hadoop 的都到節點2,scala的都到節點3。框架


2、爲何shuffle重要?
ide

Spark大會上,全部的演講嘉賓都認爲shuffle是最影響性能的地方,在MapReduce框架中,shuffle是鏈接Map和Reduce之間的橋樑,Map的輸出要用到Reduce中必須通過shuffle這個環節,shuffle的性能高低直接影響了整個程序的性能和吞吐量。函數


3、從技術上結識shuffle?
oop


Shuffle是MapReduce框架中的一個特定的phase,介於Map phase和Reduce phase之間,當Map的輸出結果要被Reduce使用時,輸出結果須要按key哈希,而且分發到每個Reducer上去,這個過程就是shuffle。因爲shuffle涉及到了磁盤的讀寫和網絡的傳輸,所以shuffle性能的高低直接影響到了整個程序的運行效率。

下面這幅圖清晰地描述了MapReduce算法的整個流程,其中shuffle phase是介於Map phase和Reduce phase之間。

mapreduce running process


4、shuffle階段的劃分

Spark的操做模型是基於RDD的,當調用RDD的reduceByKey、groupByKey等相似的操做的時候,就須要有shuffle了。再拿出reduceByKey這個來說。

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }

reduceByKey的時候,咱們能夠手動設定reduce的個數,若是不指定的話,就可能不受控制了。

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse    for (r <- bySize if r.partitioner.isDefined) {      return r.partitioner.get
    }    if (rdd.context.conf.contains("spark.default.parallelism")) {      new HashPartitioner(rdd.context.defaultParallelism)
    } else {      new HashPartitioner(bySize.head.partitions.size)
    }
  }

View Code

若是不指定reduce個數的話,就按默認的走:

一、若是自定義了分區函數partitioner的話,就按你的分區函數來走。

二、若是沒有定義,那麼若是設置了 spark.default.parallelism ,就使用哈希的分區方式,reduce個數就是設置的這個值。

三、若是這個也沒設置,那就按照輸入數據的分片的數量來設定。若是是hadoop的輸入數據的話,這個就多了。。。你們可要當心啊。

設定完以後,它會作三件事情,也就是以前講的3次RDD轉換。

//map端先按照key合併一次val combined = self.mapPartitionsWithContext((context, iter) => {  aggregator.combineValuesByKey(iter, context)
 }, preservesPartitioning = true)//reduce抓取數據val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializer)//合併數據,執行reduce計算partitioned.mapPartitionsWithContext((context, iter) => {  new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
 }, preservesPartitioning = true)

View Code

一、在第一個MapPartitionsRDD這裏先作一次map端的聚合操做。

二、SHuffledRDD主要是作從這個抓取數據的工做。

三、第二個MapPartitionsRDD把抓取過來的數據再次進行聚合操做。

四、步驟1和步驟3都會涉及到spill的過程。

怎麼作的聚合操做,回去看RDD那章。

5、Shuffle的中間結果如何存儲

做業提交的時候,DAGScheduler會把Shuffle的過程切分紅map和reduce兩個Stage(以前一直被我叫作shuffle前和shuffle後),具體的切分的位置在上圖的虛線處。

map端的任務會做爲一個ShuffleMapTask提交,最後在TaskRunner裏面調用了它的runTask方法。

override def runTask(context: TaskContext): MapStatus = {
    val numOutputSplits = dep.partitioner.numPartitions
    metrics = Some(context.taskMetrics)

    val blockManager = SparkEnv.get.blockManager
    val shuffleBlockManager = blockManager.shuffleBlockManager    var shuffle: ShuffleWriterGroup = null
    var success = false

    try {      // serializer爲空的狀況調用默認的JavaSerializer,也能夠經過spark.serializer來設置成別的
      val ser = Serializer.getSerializer(dep.serializer)      // 實例化Writer,Writer的數量=numOutputSplits=前面咱們說的那個reduce的數量
      shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)      // 遍歷rdd的元素,按照key計算出來它所在的bucketId,而後經過bucketId找到相應的Writer寫入
      for (elem <- rdd.iterator(split, context)) {
        val pair = elem.asInstanceOf[Product2[Any, Any]]
        val bucketId = dep.partitioner.getPartition(pair._1)
        shuffle.writers(bucketId).write(pair)
      }      // 提交寫入操做. 計算每一個bucket block的大小
      var totalBytes = 0L
      var totalTime = 0L
      val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
        writer.commit()
        writer.close()
        val size = writer.fileSegment().length
        totalBytes += size
        totalTime += writer.timeWriting()
        MapOutputTracker.compressSize(size)
      }      // 更新 shuffle 監控參數.
      val shuffleMetrics = new ShuffleWriteMetrics
      shuffleMetrics.shuffleBytesWritten = totalBytes
      shuffleMetrics.shuffleWriteTime = totalTime
      metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

      success = true
      new MapStatus(blockManager.blockManagerId, compressedSizes)
    } catch { case e: Exception =>      // 出錯了,取消以前的操做,關閉writer
      if (shuffle != null && shuffle.writers != null) {        for (writer <- shuffle.writers) {
          writer.revertPartialWrites()
          writer.close()
        }
      }      throw e
    } finally {      // 關閉writer
      if (shuffle != null && shuffle.writers != null) {        try {
          shuffle.releaseWriters(success)
        } catch {          case e: Exception => logError("Failed to release shuffle writers", e)
        }
      }      // 執行註冊的回調函數,通常是作清理工做      context.executeOnCompleteCallbacks()
    }
  }

View Code

遍歷每個記錄,經過它的key來肯定它的bucketId,再經過這個bucket的writer寫入數據。

下面咱們看看ShuffleBlockManager的forMapTask方法吧。

def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {    new ShuffleWriterGroup {
      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))      private val shuffleState = shuffleStates(shuffleId)      private var fileGroup: ShuffleFileGroup = null

      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        fileGroup = getUnusedFileGroup()        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
      // 從已有的文件組裏選文件,一個bucket一個文件,即要發送到同一個reduce的數據寫入到同一個文件          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
        }
      } else {        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>          // 按照blockId來生成文件,文件數爲map數*reduce數
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          val blockFile = blockManager.diskBlockManager.getFile(blockId)          if (blockFile.exists) {            if (blockFile.delete()) {
              logInfo(s"Removed existing shuffle file $blockFile")
            } else {
              logWarning(s"Failed to remove existing shuffle file $blockFile")
            }
          }
          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
        }
      }


幾個重要的結論:

一、map的中間結果是寫入到本地硬盤的,而不是內存

二、默認是一個map的中間結果文件是M*R(M=map數量,R=reduce的數量),設置了spark.shuffle.consolidateFiles爲true以後是R個文件,根據bucketId把要分到同一個reduce的結果寫入到一個文件中。

三、consolidateFiles採用的是一個reduce一個文件,它還記錄了每一個map的寫入起始位置,因此查找的時候,先經過reduceId查找到哪一個文件,再同坐mapId查找索引當中的起始位置offset,長度length=(mapId + 1).offset -(mapId).offset,這樣就能夠肯定一個FileSegment(file, offset, length)。

四、Finally,存儲結束以後, 返回了一個new MapStatus(blockManager.blockManagerId, compressedSizes),把blockManagerId和block的大小都一塊兒返回。

6、Shuffle的數據如何拉取過來

ShuffleMapTask結束以後,最後在DAGScheduler的handleTaskCompletion方法當中。


一、把結果添加到Stage的outputLocs數組裏,它是按照數據的分區Id來存儲映射關係的partitionId->MapStaus。

二、stage結束以後,經過mapOutputTracker的registerMapOutputs方法,把這次shuffle的結果outputLocs記錄到mapOutputTracker裏面。

這個stage結束以後,就到ShuffleRDD運行了,咱們看一下它的compute函數。

SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser)

它是經過ShuffleFetch的fetch方法來抓取的,具體實如今BlockStoreShuffleFetcher裏面。


一、MapOutputTrackerWorker向MapOutputTrackerMaster獲取shuffle相關的map結果信息。

二、把map結果信息構形成BlockManagerId --> Array(BlockId, size)的映射關係。

三、經過BlockManager的getMultiple批量拉取block。

四、返回一個可遍歷的Iterator接口,並更新相關的監控參數。

咱們繼續看getMultiple方法。分兩種狀況處理,分別是netty的和Basic的,Basic的就不講了,就是經過ConnectionManager去指定的BlockManager那裏獲取數據,上一章恰好說了。

咱們講一下Netty的吧,這個是須要設置的才能啓用的,不知道性能會不會好一些呢?

看NettyBlockFetcherIterator的initialize方法,再看BasicBlockFetcherIterator的initialize方法,發現Basic的不能同時抓取超過48Mb的數據。

在NettyBlockFetcherIterator的sendRequest方法裏面,發現它是經過ShuffleCopier來試下的。


這塊接下來就是netty的客戶端調用的方法了,我對這個不瞭解。在服務端的處理是在DiskBlockManager內部啓動了一個ShuffleSender的服務,最終的業務處理邏輯是在FileServerHandler。

它是經過getBlockLocation返回一個FileSegment,下面這段代碼是ShuffleBlockManager的getBlockLocation方法。


獲取的方法前面說了,經過reduceId找到文件,再經過mapId找到它的起始位置。可是這裏有個疑問了,若是啓用了consolidateFiles,一個reduce的所需數據都在一個文件裏,是否是就能夠把整個文件一塊兒返回呢,而不是經過N個map過不須要來分屢次讀取?仍是懼怕一次發送一個大文件容易失敗?這就不得而知了。

到這裏整個過程就講完了。能夠看得出來Shuffle這塊仍是作了一些優化的,可是這些參數並無啓用,有須要的朋友能夠本身啓用一下試試效果。




參考資料:

http://blog.csdn.net/hao707822882/article/details/40581515

http://blog.csdn.net/johnny_lee/article/details/22619585

相關文章
相關標籤/搜索