MapReduce Shuffle 和 Spark Shuffle 原理概述

Shuffle簡介

Shuffle的本意是洗牌、混洗的意思,把一組有規則的數據儘可能打亂成無規則的數據。而在MapReduce中,Shuffle更像是洗牌的逆過程,指的是將map端的無規則輸出按指定的規則「打亂」成具備必定規則的數據,以便reduce端接收處理。其在MapReduce中所處的工做階段是map輸出後到reduce接收前,具體能夠分爲map端和reduce端先後兩個部分。編程

在shuffle以前,也就是在map階段,MapReduce會對要處理的數據進行分片(split)操做,爲每個分片分配一個MapTask任務。接下來map會對每個分片中的每一行數據進行處理獲得鍵值對(key,value)此時獲得的鍵值對又叫作「中間結果」。此後便進入reduce階段,由此能夠看出Shuffle階段的做用是處理「中間結果」。數組

因爲Shuffle涉及到了磁盤的讀寫和網絡的傳輸,所以Shuffle性能的高低直接影響到了整個程序的運行效率。網絡

MapReduce Shuffle

Hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心。shuffle的主要工做是從Map結束到Reduce開始之間的過程。shuffle階段又能夠分爲Map端的shuffle和Reduce端的shuffle。數據結構

Map端的shuffle

下圖是MapReduce Shuffle的官方流程:
app

由於頻繁的磁盤I/O操做會嚴重的下降效率,所以「中間結果」不會立馬寫入磁盤,而是優先存儲到map節點的「環形內存緩衝區」,在寫入的過程當中進行分區(partition),也就是對於每一個鍵值對來講,都增長了一個partition屬性值,而後連同鍵值對一塊兒序列化成字節數組寫入到緩衝區(緩衝區採用的就是字節數組,默認大小爲100M)。oop

當寫入的數據量達到預先設置的闕值後便會啓動溢寫出線程將緩衝區中的那部分數據溢出寫(spill)到磁盤的臨時文件中,並在寫入前根據key進行排序(sort)和合並(combine,可選操做)。性能

溢出寫過程按輪詢方式將緩衝區中的內容寫到mapreduce.cluster.local.dir屬性指定的本地目錄中。當整個map任務完成溢出寫後,會對磁盤中這個map任務產生的全部臨時文件(spill文件)進行歸併(merge)操做生成最終的正式輸出文件,此時的歸併是將全部spill文件中的相同partition合併到一塊兒,並對各個partition中的數據再進行一次排序(sort),生成key和對應的value-list,文件歸併時,若是溢寫文件數量超過參數min.num.spills.for.combine的值(默認爲3)時,能夠再次進行合併。優化

至此map端的工做已經所有結束,最終生成的文件也會存儲在TaskTracker可以訪問的位置。每一個reduce task不間斷的經過RPC從JobTracker那裏獲取map task是否完成的信息,若是獲得的信息是map task已經完成,那麼Shuffle的後半段開始啓動。spa

Reduce端的shuffle

當mapreduce任務提交後,reduce task就不斷經過RPC從JobTracker那裏獲取map task是否完成的信息,若是獲知某臺TaskTracker上的map task執行完成,Shuffle的後半段過程就開始啓動。Reduce端的shuffle主要包括三個階段,copy、merge和reduce。
線程

每一個reduce task負責處理一個分區的文件,如下是reduce task的處理流程:

  1. reduce task從每一個map task的結果文件中拉取對應分區的數據。由於數據在map階段已是分好區了,而且會有一個額外的索引文件記錄每一個分區的起始偏移量。因此reduce task取數的時候直接根據偏移量去拉取數據就ok。

  2. reduce task從每一個map task拉取分區數據的時候會進行再次合併,排序,按照自定義的reducer的邏輯代碼去處理。

  3. 最後就是Reduce過程了,在這個過程當中產生了最終的輸出結果,並將其寫到HDFS上。

爲何要排序

  1. key存在combine操做,排序以後相同的key放到一塊顯然方便作合併操做。

  2. reduce task是按key去處理數據的。 若是沒有排序那必須從全部數據中把當前相同key的全部value數據拿出來,而後進行reduce邏輯處理。顯然每一個key到這個邏輯都須要作一次全量數據掃描,影響性能,有了排序很方便的獲得一個key對於的value集合。

  3. reduce task按key去處理數據時,若是key按順序排序,那麼reduce task就按key順序去讀取,顯然當讀到的key是文件末尾的key那麼就標誌數據處理完畢。若是沒有排序那還得有其餘邏輯來記錄哪些key處理完了,哪些key沒有處理完。

雖有千萬種理由須要這麼作,可是很耗資源,而且像排序其實咱們有些業務並不須要排序。

爲何要文件合併

  1. 由於內存放不下就會溢寫文件,就會發生屢次溢寫,造成不少小文件,若是不合並,顯然會小文件氾濫,集羣須要資源開銷去管理這些小文件數據。

  2. 任務去讀取文件的數增多,打開的文件句柄數也會增多。

  3. mapreduce是全局有序。單個文件有序,不表明全局有序,只有把小文件合併一塊兒排序纔會全局有序。

    Spark的Shuffle

    Spark的Shuffle是在MapReduce Shuffle基礎上進行的調優。其實就是對排序、合併邏輯作了一些優化。在Spark中Shuffle write至關於MapReduce 的map,Shuffle read至關於MapReduce 的reduce。

Spark豐富了任務類型,有些任務之間數據流轉不須要經過Shuffle,可是有些任務之間仍是須要經過Shuffle來傳遞數據,好比寬依賴的group by key以及各類by key算子。寬依賴之間會劃分stage,而Stage之間就是Shuffle,以下圖中的stage0,stage1和stage3之間就會產生Shuffle。

在Spark的中,負責shuffle過程的執行、計算和處理的組件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager隨着Spark的發展有兩種實現的方式,分別爲HashShuffleManager和SortShuffleManager,所以spark的Shuffle有Hash Shuffle和Sort Shuffle兩種。

Spark Shuffle發展史
Spark 0.8及之前 Hash Based Shuffle
Spark 0.8.1 爲Hash Based Shuffle引入File Consolidation機制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle,但默認仍爲Hash Based Shuffle
Spark 1.2 默認的Shuffle方式改成Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort併入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出歷史舞臺

在Spark的版本的發展,ShuffleManager在不斷迭代,變得愈來愈先進。
在Spark 1.2之前,默認的shuffle計算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有着一個很是嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操做影響了性能。所以在Spark 1.2之後的版本中,默認的ShuffleManager改爲了SortShuffleManager。

SortShuffleManager相較於HashShuffleManager來講,有了必定的改進。主要就在於,每一個Task在進行shuffle操做時,雖然也會產生較多的臨時磁盤文件,可是最後會將全部的臨時文件合併(merge)成一個磁盤文件,所以每一個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取本身的數據時,只要根據索引讀取每一個磁盤文件中的部分數據便可。

Hash Shuffle

HashShuffleManager的運行機制主要分紅兩種,一種是普通運行機制,另外一種是合併的運行機制。合併機制主要是經過複用buffer來優化Shuffle過程當中產生的小文件的數量。Hash shuffle是不具備排序的Shuffle。

普通機制的Hash Shuffle

最開始使用的Hash Based Shuffle,每一個Mapper會根據Reducer的數量建立對應的bucket,bucket的數量是M * R,M是map的數量,R是Reduce的數量。
以下圖所示:2個core 4個map task 3 個reduce task,會產生4*3=12個小文件。

優化後的Hash Shuffle

普通機制Hash Shuffle會產生大量的小文件(M * R),對文件系統的壓力也很大,也不利於IO的吞吐量,後來作了優化(設置spark.shuffle.consolidateFiles=true開啓,默認false),把在同一個core上的多個Mapper輸出到同一個文件,這樣文件數就變成core * R 個了。
以下圖所示:2個core 4個map task 3 個reduce task,會產生2*3=6個小文件。

Hash shuffle合併機制的問題:
若是 Reducer 端的並行任務或者是數據分片過多的話則 Core * Reducer Task 依舊過大,也會產生不少小文件。進而引出了更優化的sort shuffle。
在Spark 1.2之後的版本中,默認的ShuffleManager改爲了SortShuffleManager。

Sort Shuffle

SortShuffleManager的運行機制主要分紅兩種,一種是普通運行機制,另外一種是bypass運行機制。當shuffle read task的數量小於等於spark.shuffle.sort.bypassMergeThreshold參數的值時(默認爲200),就會啓用bypass機制。

普通機制的Sort Shuffle

這種機制和mapreduce差很少,在該模式下,數據會先寫入一個內存數據結構中,此時根據不一樣的shuffle算子,可能選用不一樣的數據結構。若是是reduceByKey這種聚合類的shuffle算子,那麼會選用Map數據結構,一邊經過Map進行聚合,一邊寫入內存;若是是join這種普通的shuffle算子,那麼會選用Array數據結構,直接寫入內存。接着,每寫一條數據進入內存數據結構以後,就會判斷一下,是否達到了某個臨界閾值。若是達到臨界閾值的話,那麼就會嘗試將內存數據結構中的數據溢寫到磁盤,而後清空內存數據結構。

在溢寫到磁盤文件以前,會先根據key對內存數據結構中已有的數據進行排序。排序事後,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁盤文件。
一個task將全部數據寫入內存數據結構的過程當中,會發生屢次磁盤溢寫操做,也會產生多個臨時文件。最後會將以前全部的臨時磁盤文件都進行合併,因爲一個task就只對應一個磁盤文件所以還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。
SortShuffleManager因爲有一個磁盤文件merge的過程,所以大大減小了文件數量,因爲每一個task最終只有一個磁盤文件因此文件個數等於上游shuffle write個數。

bypass機制的Sort Shuffle

bypass運行機制的觸發條件以下:
1)shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值,默認值200。
2)不是聚合類的shuffle算子(好比reduceByKey)。

此時task會爲每一個reduce端的task都建立一個臨時磁盤文件,並將數據按key進行hash而後根據key的hash值,將key寫入對應的磁盤文件之中。固然,寫入磁盤文件時也是先寫入內存緩衝,緩衝寫滿以後再溢寫到磁盤文件的。最後,一樣會將全部臨時磁盤文件都合併成一個磁盤文件,並建立一個單獨的索引文件。

該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是如出一轍的,由於都要建立數量驚人的磁盤文件,只是在最後會作一個磁盤文件的合併而已。所以少許的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來講,shuffle read的性能會更好。

而該機制與普通SortShuffleManager運行機制的不一樣在於:
第一,磁盤寫機制不一樣;
第二,不會進行排序。也就是說,啓用該機制的最大好處在於,shuffle write過程當中,不須要進行數據的排序操做,也就節省掉了這部分的性能開銷。

Spark Shuffle總結

Shuffle 過程本質上都是將 Map 端得到的數據使用分區器進行劃分,並將數據發送給對應的 Reducer 的過程。

Shuffle做爲處理鏈接map端和reduce端的樞紐,其shuffle的性能高低直接影響了整個程序的性能和吞吐量。map端的shuffle通常爲shuffle的Write階段,reduce端的shuffle通常爲shuffle的read階段。Hadoop和spark的shuffle在實現上面存在很大的不一樣,spark的shuffle分爲兩種實現,分別爲HashShuffle和SortShuffle。

HashShuffle又分爲普通機制和合並機制,普通機制由於其會產生MR個數的巨量磁盤小文件而產生大量性能低下的Io操做,從而性能較低,由於其巨量的磁盤小文件還可能致使OOM,HashShuffle的合併機制經過重複利用buffer從而將磁盤小文件的數量下降到CoreR個,可是當Reducer 端的並行任務或者是數據分片過多的時候,依然會產生大量的磁盤小文件。

SortShuffle也分爲普通機制和bypass機制,普通機制在內存數據結構(默認爲5M)完成排序,會產生2M個磁盤小文件。而當shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值。或者算子不是聚合類的shuffle算子(好比reduceByKey)的時候會觸發SortShuffle的bypass機制,SortShuffle的bypass機制不會進行排序,極大的提升了其性能。

在Spark 1.2之前,默認的shuffle計算引擎是HashShuffleManager,由於HashShuffleManager會產生大量的磁盤小文件而性能低下,在Spark 1.2之後的版本中,默認的ShuffleManager改爲了SortShuffleManager。

SortShuffleManager相較於HashShuffleManager來講,有了必定的改進。主要就在於,每一個Task在進行shuffle操做時,雖然也會產生較多的臨時磁盤文件,可是最後會將全部的臨時文件合併(merge)成一個磁盤文件,所以每一個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取本身的數據時,只要根據索引讀取每一個磁盤文件中的部分數據便可。

Spark與MapReduce Shuffle的異同

  1. 從總體功能上看,二者並無大的差異。 都是將 mapper(Spark 裏是 ShuffleMapTask)的輸出進行 partition,不一樣的 partition 送到不一樣的 reducer(Spark 裏 reducer 多是下一個 stage 裏的 ShuffleMapTask,也多是 ResultTask)。Reducer 之內存做緩衝區,邊 shuffle 邊 aggregate 數據,等到數據 aggregate 好之後進行 reduce(Spark 裏多是後續的一系列操做)。

  2. 從流程的上看,二者差異不小。 Hadoop MapReduce 是 sort-based,進入 combine和 reduce的 records 必須先 sort。這樣的好處在於 combine/reduce能夠處理大規模的數據,由於其輸入數據能夠經過外排獲得(mapper 對每段數據先作排序,reducer 的 shuffle 對排好序的每段數據作歸併)。之前 Spark 默認選擇的是 hash-based,一般使用 HashMap 來對 shuffle 來的數據進行合併,不會對數據進行提早排序。若是用戶須要通過排序的數據,那麼須要本身調用相似 sortByKey的操做。在Spark 1.2以後,sort-based變爲默認的Shuffle實現。

  3. 從流程實現角度來看,二者也有很多差異。 Hadoop MapReduce 將處理流程劃分出明顯的幾個階段:map, spill, merge, shuffle, sort, reduce等。每一個階段各司其職,能夠按照過程式的編程思想來逐一實現每一個階段的功能。在 Spark 中,沒有這樣功能明確的階段,只有不一樣的 stage 和一系列的 transformation,因此 spill, merge, aggregate 等操做須要蘊含在 transformation中。

相關文章
相關標籤/搜索