詳細探究Spark的shuffle實現

Background

在MapReduce框架中,shuffle是鏈接Map和Reduce之間的橋樑,Map的輸出要用到Reduce中必須通過shuffle這個環節,shuffle的性能高低直接影響了整個程序的性能和吞吐量。Spark做爲MapReduce框架的一種實現,天然也實現了shuffle的邏輯,本文就深刻研究Spark的shuffle是如何實現的,有什麼優缺點,與Hadoop MapReduce的shuffle有什麼不一樣。git

Shuffle

Shuffle是MapReduce框架中的一個特定的phase,介於Map phase和Reduce phase之間,當Map的輸出結果要被Reduce使用時,輸出結果須要按key哈希,而且分發到每個Reducer上去,這個過程就是shuffle。因爲shuffle涉及到了磁盤的讀寫和網絡的傳輸,所以shuffle性能的高低直接影響到了整個程序的運行效率。github

下面這幅圖清晰地描述了MapReduce算法的整個流程,其中shuffle phase是介於Map phase和Reduce phase之間。算法

mapreduce running process

概念上shuffle就是一個溝通數據鏈接的橋樑,那麼實際上shuffle這一部分是如何實現的的呢,下面咱們就以Spark爲例講一下shuffle在Spark中的實現。apache

Spark Shuffle進化史

先以圖爲例簡單描述一下Spark中shuffle的整一個流程:緩存

spark shuffle process

  • 首先每個Mapper會根據Reducer的數量建立出相應的bucket,bucket的數量是M×R,其中M是Map的個數,R是Reduce的個數。網絡

  • 其次Mapper產生的結果會根據設置的partition算法填充到每一個bucket中去。這裏的partition算法是能夠自定義的,固然默認的算法是根據key哈希到不一樣的bucket中去。app

  • 當Reducer啓動時,它會根據本身task的id和所依賴的Mapper的id從遠端或是本地的block manager中取得相應的bucket做爲Reducer的輸入進行處理。框架

這裏的bucket是一個抽象概念,在實現中每一個bucket能夠對應一個文件,能夠對應文件的一部分或是其餘等。socket

接下來咱們分別從shuffle writeshuffle fetch這兩塊來說述一下Spark的shuffle進化史。ide

Shuffle Write

在Spark 0.6和0.7的版本中,對於shuffle數據的存儲是以文件的方式存儲在block manager中,與rdd.persist(StorageLevel.DISk_ONLY)採起相同的策略,能夠參看:

override def run(attemptId: Long): MapStatus = {  val numOutputSplits = dep.partitioner.numPartitions     ...    // Partition the map output.    val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])    for (elem <- rdd.iterator(split, taskContext)) {      val pair = elem.asInstanceOf[(Any, Any)]      val bucketId = dep.partitioner.getPartition(pair._1)      buckets(bucketId) += pair    }    ...    val blockManager = SparkEnv.get.blockManager    for (i <- 0 until numOutputSplits) {      val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i      // Get a Scala iterator from Java map      val iter: Iterator[(Any, Any)] = buckets(i).iterator      val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)      totalBytes += size    }  ...}

我已經將一些干擾代碼刪去。能夠看到Spark在每個Mapper中爲每一個Reducer建立一個bucket,並將RDD計算結果放進bucket中。須要注意的是每一個bucket是一個ArrayBuffer,也就是說Map的輸出結果是會先存儲在內存。

接着Spark會將ArrayBuffer中的Map輸出結果寫入block manager所管理的磁盤中,這裏文件的命名方式爲:shuffle_ + shuffle_id + "_" + map partition id + "_" + shuffle partition id

早期的shuffle write有兩個比較大的問題:

  1. Map的輸出必須先所有存儲到內存中,而後寫入磁盤。這對內存是一個很是大的開銷,當內存不足以存儲全部的Map output時就會出現OOM。

  2. 每個Mapper都會產生Reducer number個shuffle文件,若是Mapper個數是1k,Reducer個數也是1k,那麼就會產生1M個shuffle文件,這對於文件系統是一個很是大的負擔。同時在shuffle數據量不大而shuffle文件又很是多的狀況下,隨機寫也會嚴重下降IO的性能。

在Spark 0.8版本中,shuffle write採用了與RDD block write不一樣的方式,同時也爲shuffle write單首創建了ShuffleBlockManager,部分解決了0.6和0.7版本中遇到的問題。

首先咱們來看一下Spark 0.8的具體實現:

override def run(attemptId: Long): MapStatus = {  ...  val blockManager = SparkEnv.get.blockManager  var shuffle: ShuffleBlocks = null  var buckets: ShuffleWriterGroup = null  try {    // Obtain all the block writers for shuffle blocks.    val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)    shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)    buckets = shuffle.acquireWriters(partition)    // Write the map output to its associated buckets.    for (elem <- rdd.iterator(split, taskContext)) {      val pair = elem.asInstanceOf[Product2[Any, Any]]      val bucketId = dep.partitioner.getPartition(pair._1)      buckets.writers(bucketId).write(pair)    }    // Commit the writes. Get the size of each bucket block (total block size).    var totalBytes = 0L    val compressedSizes: Array[Byte] = buckets.writers.map { writer:   BlockObjectWriter =>      writer.commit()      writer.close()      val size = writer.size()      totalBytes += size      MapOutputTracker.compressSize(size)    }    ...  } catch { case e: Exception =>    // If there is an exception from running the task, revert the partial writes    // and throw the exception upstream to Spark.    if (buckets != null) {      buckets.writers.foreach(_.revertPartialWrites())    }    throw e  } finally {    // Release the writers back to the shuffle block manager.    if (shuffle != null && buckets != null) {      shuffle.releaseWriters(buckets)    }    // Execute the callbacks on task completion.    taskContext.executeOnCompleteCallbacks()    }  }}

在這個版本中爲shuffle write添加了一個新的類ShuffleBlockManager,由ShuffleBlockManager來分配和管理bucket。同時ShuffleBlockManager爲每個bucket分配一個DiskObjectWriter,每一個write handler擁有默認100KB的緩存,使用這個write handler將Map output寫入文件中。能夠看到如今的寫入方式變爲buckets.writers(bucketId).write(pair),也就是說Map output的key-value pair是逐個寫入到磁盤而不是預先把全部數據存儲在內存中在總體flush到磁盤中去。

ShuffleBlockManager的代碼以下所示:

private[spark]class ShuffleBlockManager(blockManager: BlockManager) {  def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = {    new ShuffleBlocks {      // Get a group of writers for a map task.      override def acquireWriters(mapId: Int): ShuffleWriterGroup = {        val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024        val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>          val blockId = ShuffleBlockManager.blockId(shuffleId, bucketId, mapId)          blockManager.getDiskBlockWriter(blockId, serializer, bufferSize)        }        new ShuffleWriterGroup(mapId, writers)      }      override def releaseWriters(group: ShuffleWriterGroup) = {        // Nothing really to release here.      }    }  }}

Spark 0.8顯著減小了shuffle的內存壓力,如今Map output不須要先所有存儲在內存中,再flush到硬盤,而是record-by-record寫入到磁盤中。同時對於shuffle文件的管理也獨立出新的ShuffleBlockManager進行管理,而不是與rdd cache文件在一塊兒了。

可是這一版Spark 0.8的shuffle write仍然有兩個大的問題沒有解決:

  • 首先依舊是shuffle文件過多的問題,shuffle文件過多一是會形成文件系統的壓力過大,二是會下降IO的吞吐量。

  • 其次雖然Map output數據再也不須要預先在內存中evaluate顯著減小了內存壓力,可是新引入的DiskObjectWriter所帶來的buffer開銷也是一個不容小視的內存開銷。假定咱們有1k個Mapper和1k個Reducer,那麼就會有1M個bucket,於此同時就會有1M個write handler,而每個write handler默認須要100KB內存,那麼總共須要100GB的內存。這樣的話僅僅是buffer就須要這麼多的內存,內存的開銷是驚人的。固然實際狀況下這1k個Mapper是分時運行的話,所需的內存就只有cores * reducer numbers * 100KB大小了。可是reducer數量不少的話,這個buffer的內存開銷也是蠻厲害的。

爲了解決shuffle文件過多的狀況,Spark 0.8.1引入了新的shuffle consolidation,以期顯著減小shuffle文件的數量。

首先咱們以圖例來介紹一下shuffle consolidation的原理。

spark shuffle  consolidation process

假定該job有4個Mapper和4個Reducer,有2個core,也就是能並行運行兩個task。咱們能夠算出Spark的shuffle write共須要16個bucket,也就有了16個write handler。在以前的Spark版本中,每個bucket對應的是一個文件,所以在這裏會產生16個shuffle文件。

而在shuffle consolidation中每個bucket並不是對應一個文件,而是對應文件中的一個segment,同時shuffle consolidation所產生的shuffle文件數量與Spark core的個數也有關係。在上面的圖例中,job的4個Mapper分爲兩批運行,在第一批2個Mapper運行時會申請8個bucket,產生8個shuffle文件;而在第二批Mapper運行時,申請的8個bucket並不會再產生8個新的文件,而是追加寫到以前的8個文件後面,這樣一共就只有8個shuffle文件,而在文件內部這有16個不一樣的segment。所以從理論上講shuffle consolidation所產生的shuffle文件數量爲C×R,其中C是Spark集羣的core number,R是Reducer的個數。

須要注意的是當 M=C時shuffle consolidation所產生的文件數和以前的實現是同樣的。

Shuffle consolidation顯著減小了shuffle文件的數量,解決了以前版本一個比較嚴重的問題,可是writer handler的buffer開銷過大依然沒有減小,若要減小writer handler的buffer開銷,咱們只能減小Reducer的數量,可是這又會引入新的問題,下文將會有詳細介紹。

講完了shuffle write的進化史,接下來要講一下shuffle fetch了,同時還要講一下Spark的aggregator,這一塊對於Spark實際應用的性能相當重要。

Shuffle Fetch and Aggregator

Shuffle write寫出去的數據要被Reducer使用,就須要shuffle fetcher將所需的數據fetch過來,這裏的fetch包括本地和遠端,由於shuffle數據有可能一部分是存儲在本地的。Spark對shuffle fetcher實現了兩套不一樣的框架:NIO經過socket鏈接去fetch數據;OIO經過netty server去fetch數據。分別對應的類是BasicBlockFetcherIteratorNettyBlockFetcherIterator

在Spark 0.7和更早的版本中,只支持BasicBlockFetcherIterator,而BasicBlockFetcherIterator在shuffle數據量比較大的狀況下performance始終不是很好,沒法充分利用網絡帶寬,爲了解決這個問題,添加了新的shuffle fetcher來試圖取得更好的性能。對於早期shuffle性能的評測能夠參看Spark usergroup。固然如今BasicBlockFetcherIterator的性能也已經好了不少,使用的時候能夠對這兩種實現都進行測試比較。

接下來講一下aggregator。咱們都知道在Hadoop MapReduce的shuffle過程當中,shuffle fetch過來的數據會進行merge sort,使得相同key下的不一樣value按序歸併到一塊兒供Reducer使用,這個過程能夠參看下圖:

mapreduce shuffle process

全部的merge sort都是在磁盤上進行的,有效地控制了內存的使用,可是代價是更多的磁盤IO。

那麼Spark是否也有merge sort呢,仍是以別的方式實現,下面咱們就細細說明。

首先雖然Spark屬於MapReduce體系,可是對傳統的MapReduce算法進行了必定的改變。Spark假定在大多數用戶的case中,shuffle數據的sort不是必須的,好比word count,強制地進行排序只會使性能變差,所以Spark並不在Reducer端作merge sort。既然沒有merge sort那Spark是如何進行reduce的呢?這就要說到aggregator了。

aggregator本質上是一個hashmap,它是以map output的key爲key,以任意所要combine的類型爲value的hashmap。當咱們在作word count reduce計算count值的時候,它會將shuffle fetch到的每個key-value pair更新或是插入到hashmap中(若在hashmap中沒有查找到,則插入其中;若查找到則更新value值)。這樣就不須要預先把全部的key-value進行merge sort,而是來一個處理一個,省下了外部排序這一步驟。但同時須要注意的是reducer的內存必須足以存放這個partition的全部key和count值,所以對內存有必定的要求。

在上面word count的例子中,由於value會不斷地更新,而不須要將其所有記錄在內存中,所以內存的使用仍是比較少的。考慮一下若是是group by key這樣的操做,Reducer須要獲得key對應的全部value。在Hadoop MapReduce中,因爲有了merge sort,所以給予Reducer的數據已是group by key了,而Spark沒有這一步,所以須要將key和對應的value所有存放在hashmap中,並將value合併成一個array。能夠想象爲了可以存放全部數據,用戶必須確保每個partition足夠小到內存可以容納,這對於內存是一個很是嚴峻的考驗。所以Spark文檔中建議用戶涉及到這類操做的時候儘可能增長partition,也就是增長Mapper和Reducer的數量。

增長Mapper和Reducer的數量當然能夠減少partition的大小,使得內存能夠容納這個partition。可是咱們在shuffle write中提到,bucket和對應於bucket的write handler是由Mapper和Reducer的數量決定的,task越多,bucket就會增長的更多,由此帶來write handler所需的buffer也會更多。在一方面咱們爲了減小內存的使用採起了增長task數量的策略,另外一方面task數量增多又會帶來buffer開銷更大的問題,所以陷入了內存使用的兩難境地。

爲了減小內存的使用,只能將aggregator的操做從內存移到磁盤上進行,Spark社區也意識到了Spark在處理數據規模遠遠大於內存大小時所帶來的問題。所以PR303提供了外部排序的實現方案,相信在Spark 0.9 release的時候,這個patch應該能merge進去,到時候內存的使用量能夠顯著地減小。

End

本文詳細地介紹了Spark的shuffle實現是如何進化的,以及遇到問題解決問題的過程。shuffle做爲Spark程序中很重要的一個環節,直接影響了Spark程序的性能,現現在的Spark版本雖然shuffle實現還存在着種種問題,可是相比於早期版本,已經有了很大的進步。開源代碼就是如此不停地迭代推動,隨着Spark的普及程度愈來愈高,貢獻的人愈來愈多,相信後續的版本會有更大的提高。

相關文章
相關標籤/搜索