Shuffle本意是 混洗, 洗牌
的意思, 在MapReduce過程當中須要各節點上同一類數據聚集到某一節點進行計算,把這些分佈在不一樣節點的數據按照必定的規則彙集到一塊兒的過程成爲Shuffle.網絡
在Hadoop的MapReduce框架中, Shuffle是鏈接Map和Reduce之間的橋樑, Map的數據要用到Reduce中必須通過Shuffle這個環節. 因爲Shuffle涉及到磁盤的讀寫和網絡的傳輸, 因此Shuffle的性能高低直接影響到整個程序的性能和吞吐量.數據結構

框架
這張圖是官網對Shuffle過程的描述,咱們來分別看下map端和reduce端作了什麼, 如何作的.jvm
上面就是MapReduce的Shuffle過程, 其實Spark2.0以後的Shuffle過程與MapReduce的基本一致,都是基於排序的,早期spark版本中的shuffle是基於hash的,讓咱們來一塊兒看下.函數
Spark有兩種Shuffle機制. 一種是基於Hash的Shuffle, 還有一種是基於Sort的Shuffle.在Shuffle機制轉變的過程當中, 主要的一個優化點就是產生的小文件個數.oop

性能
以上圖爲例,在Spark的算子reduceByKey(_ + _, 2)
產生的shuffle中,咱們先看Shuffle Write階段.fetch

如圖所示, hash-based的Shuffle, 每一個map會根據reduce的個數建立對應的bucket, 那麼bucket的總數量是: M * R (map的個數 * reduce的個數).
(假如分別有1k個map和reduce,將產生1百萬的小文件!)
如上圖所示,2個core, 4個map task, 3個reduce task 產生了4*3 = 12個小文件.(每一個文件中是不排序的)優化
因爲hash-based產生的小文件太多, 對文件系統的壓力很大, 後來作了優化.
把同一個core上的多個map輸出到同一個文件. 這樣文件數就變成了 core * R個.以下圖:
spa
2個core, 4個map task, 3個 reduce task, 產生了2*3 = 6個文件.
(每一個文件中仍然不是排序的)
因爲優化後的hash-based Shuffle的文件數爲: core * R, 產生的小文件仍然過大, 因此引入了 sort-based Shuffle
sort-based Shuffle中, 一個map task 輸出一個文件.
文件在一些到磁盤以前, 會根據key進行排序. 排序後, 分批寫入磁盤. task完成以後會將屢次溢寫的文件合併成一個文件. 因爲一個task只對應一個磁盤文件, 所以還會單獨寫一份索引文件, 標識下游各個task的數據對應在文件中的起始和結束offset.
目前,hash-based 和 sort-based寫方式公用相同的shuffle read.
以下圖所示:
shuffle read task從多個map的輸出文件中fetch本身須要的已排序好的數據.
read task 會先從索引文件中獲取本身須要的數據對應的索引, 在讀文件的時候跳過對應的Block數據區, 只讀當前本身這個task須要的數據.
當 parent stage 的全部ShuffleMapTasks結束後再fetch(這裏和MapReduce不一樣). 理論上講, 一個ShuffleMapTask結束後就能夠fetch, 可是爲了迎合 stage 的概念(即一個stage若是其parent stages沒有執行完,本身是不能被提交執行的),仍是選擇所有ShuffleMapTasks執行完再去 etch.由於fetch來的 FileSegments要先在內存作緩衝(默認48MB緩衝界限), 因此一次fetch的 FileSegments總大小不能太大. 一個 softBuffer裏面通常包含多個 FileSegment,但若是某個FileSegment特別大的話, 這一個就能夠填滿甚至超過 softBuffer 的界限.
邊 fetch 邊處理.本質上,MapReduce shuffle階段就是邊fetch邊使用 combine()進行處理,只是combine()處理的是部分數據. MapReduce爲了讓進入 reduce()的records有序, 必須等到所有數據都shuffle-sort後再開始 reduce(). 由於Spark不要求shuffle後的數據全局有序,所以不必等到所有數據 shuffle完成後再處理.
那麼如何實現邊shuffle邊處理, 並且流入的records是無序的?答案是使用能夠 aggregate 的數據結構, 好比 HashMap. 每從shuffle獲得(從緩衝的 FileSegment中deserialize出來)一個 <key, value="">record, 直接將其放進 HashMap 裏面.若是該HashMap已經存在相應的 Key. 那麼直接進行 aggregate 也就是 func(hashMap.get(Key), Value).
shuffle read task拿到多個map產生的相同的key的數據後,須要對數據進行聚合,把相同key的數據放到一塊兒,這個過程叫作aggregate.
大體過程以下圖: 
task把讀來的 records 被逐個 aggreagte 到 HashMap 中,等到全部 records 都進入 HashMap,就獲得最後的處理結果。
剛 fetch 來的 FileSegment 存放在 softBuffer 緩衝區,通過處理後的數據放在內存 + 磁盤上。
其實MapReduce Suffle 和 Spark的Shuffle在主要方面仍是基本一致的, 好比:都是基於sort的. 細節上有一些區別, 好比 mapreduce完成一個map,就開始reduce, 而spark因爲stage的概念,須要等全部ShuffleMap完成再ShuffleReduce.