大話Spark(4)-一文理解MapReduce Shuffle和Spark Shuffle

Shuffle本意是 混洗, 洗牌的意思, 在MapReduce過程當中須要各節點上同一類數據聚集到某一節點進行計算,把這些分佈在不一樣節點的數據按照必定的規則彙集到一塊兒的過程成爲Shuffle.網絡

在Hadoop的MapReduce框架中, Shuffle是鏈接Map和Reduce之間的橋樑, Map的數據要用到Reduce中必須通過Shuffle這個環節. 因爲Shuffle涉及到磁盤的讀寫和網絡的傳輸, 因此Shuffle的性能高低直接影響到整個程序的性能和吞吐量.數據結構

MapReduce中的Shuffle

框架

 

這張圖是官網對Shuffle過程的描述,咱們來分別看下map端和reduce端作了什麼, 如何作的.jvm

Map端

  1. map執行task時, 輸入數據來源於HDFS的block, 在MapReduce概念中, map的task只讀取split. split與block的對應關係多是多對一, 默認是一對一. 
  2. map在寫磁盤以前, 會根據最終要傳給的reduce把數據劃分紅相應的分區(partition). 每一個分區中,後臺線程按鍵進行排序,若是有combiner,它在排序後的輸出上運行.(combiner可使map的結果更緊湊,減小寫磁盤的數據和傳遞給reduce的數據[省空間和io])
  3. map產生文件時, 並非簡單地將它寫到磁盤. 它利用緩衝的方式把數據寫到內存並處於效率的考慮進行與排序.(如圖中 buffer in memory). 每個map都有一個環形內存緩衝區用於存儲任務輸出.緩衝區大小默認100MB, 一旦達到閾值(默認80%), 一個後臺線程便開始把內容溢出(split)到磁盤.(若是在此期間[split期間]緩衝區被填滿,map會被阻塞,直到寫磁盤過程完成. 
  4. 每次內存緩衝區達到閾值移出,就會新建一個溢出文件(split file)(上圖 partition,sort and split to disk). 所以在map任務最後一個記錄輸出以後,任務完成以前會把一出的多個文件合併成一個已分區且已排序的輸出文件.(上圖 merge on task)

Reduce端

  1. map的輸出文件在map運行的機器的本地磁盤(reduce通常不寫本地), map的輸出文件包括多個分區須要的數據, reduce的輸入須要集羣上多個map的輸出. 每一個map的完成時間可能不一樣, 所以只要有一個map任務完成, reduce就開始複製其輸出.(上圖 fetch階段) reduce有少許複製線程(默認5個),所以可以並行取得map輸出(帶寬瓶頸).
  2. reduce如何知道從哪臺機器獲取map結果? map執行完會通知master, reduce總有一個線程按期輪詢master(心跳)能夠得到map輸出的位置. master會等到全部reduce完成以後再通知map刪除其輸出.
  3. 若是map的輸出很小,會被複制到reduce任務jvm的內存.不然map輸出會被複制到磁盤(又寫磁盤)
  4. 複製完全部map輸出後,reduce任務進入排序合併階段(實際上是合併階段,由於map的輸出上有序的).這個維持順序的合併過程是循環進行的.(好比50個map輸出,合併因子是10(默認值), 則合併將進行5次, 每次合併10個文件, 最終有5箇中間文件)
  5. 在最後reduce階段,直接把數據輸入reduce函數(上面的5箇中間文件不會再合併成一個已排序的中間文件). 輸出直接寫到文件系統, 通常爲HDFS. 

map輸出爲何要排序?

  1. key存在combine操做,排序以後相同的key在一塊兒方便合併.
  2. reduce按照key讀數據時, 按照key的順序去讀, 遇到不同的 key時便可知道以前的key的數據是否讀取完畢. 若是沒排序,則須要把所有數據都作處理. 

上面就是MapReduce的Shuffle過程, 其實Spark2.0以後的Shuffle過程與MapReduce的基本一致,都是基於排序的,早期spark版本中的shuffle是基於hash的,讓咱們來一塊兒看下.函數

Spark中的Shuffle

Spark有兩種Shuffle機制. 一種是基於Hash的Shuffle, 還有一種是基於Sort的Shuffle.在Shuffle機制轉變的過程當中, 主要的一個優化點就是產生的小文件個數.oop

性能

 


以上圖爲例,在Spark的算子reduceByKey(_ + _, 2)產生的shuffle中,咱們先看Shuffle Write階段.fetch

Shuffle Write (Hash-based)

 


如圖所示, hash-based的Shuffle, 每一個map會根據reduce的個數建立對應的bucket, 那麼bucket的總數量是: M * R (map的個數 * reduce的個數).
(假如分別有1k個map和reduce,將產生1百萬的小文件!)
如上圖所示,2個core, 4個map task, 3個reduce task 產生了4*3 = 12個小文件.(每一個文件中是不排序的)優化

Shuffle Write (Hash-based) 優化!

因爲hash-based產生的小文件太多, 對文件系統的壓力很大, 後來作了優化. 
把同一個core上的多個map輸出到同一個文件. 這樣文件數就變成了 core * R個.以下圖:
spa

 

2個core, 4個map task, 3個 reduce task, 產生了2*3 = 6個文件.
(每一個文件中仍然不是排序的)

Shuffle Write (Sort-based)

因爲優化後的hash-based Shuffle的文件數爲: core * R, 產生的小文件仍然過大, 因此引入了 sort-based Shuffle

 


sort-based Shuffle中, 一個map task 輸出一個文件.
文件在一些到磁盤以前, 會根據key進行排序. 排序後, 分批寫入磁盤. task完成以後會將屢次溢寫的文件合併成一個文件. 因爲一個task只對應一個磁盤文件, 所以還會單獨寫一份索引文件, 標識下游各個task的數據對應在文件中的起始和結束offset.

Shuffle Read

 


目前,hash-based 和 sort-based寫方式公用相同的shuffle read. 
以下圖所示: 

 

 


shuffle read task從多個map的輸出文件中fetch本身須要的已排序好的數據. 
read task 會先從索引文件中獲取本身須要的數據對應的索引, 在讀文件的時候跳過對應的Block數據區, 只讀當前本身這個task須要的數據. 

何時開始fetch數據?

當 parent stage 的全部ShuffleMapTasks結束後再fetch(這裏和MapReduce不一樣). 理論上講, 一個ShuffleMapTask結束後就能夠fetch, 可是爲了迎合 stage 的概念(即一個stage若是其parent stages沒有執行完,本身是不能被提交執行的),仍是選擇所有ShuffleMapTasks執行完再去 etch.由於fetch來的 FileSegments要先在內存作緩衝(默認48MB緩衝界限), 因此一次fetch的 FileSegments總大小不能太大. 一個 softBuffer裏面通常包含多個 FileSegment,但若是某個FileSegment特別大的話, 這一個就能夠填滿甚至超過 softBuffer 的界限.

邊 fetch 邊處理仍是一次性 fetch 完再處理?

邊 fetch 邊處理.本質上,MapReduce shuffle階段就是邊fetch邊使用 combine()進行處理,只是combine()處理的是部分數據. MapReduce爲了讓進入 reduce()的records有序, 必須等到所有數據都shuffle-sort後再開始 reduce(). 由於Spark不要求shuffle後的數據全局有序,所以不必等到所有數據 shuffle完成後再處理. 
那麼如何實現邊shuffle邊處理, 並且流入的records是無序的?答案是使用能夠 aggregate 的數據結構, 好比 HashMap. 每從shuffle獲得(從緩衝的 FileSegment中deserialize出來)一個 <key, value="">record, 直接將其放進 HashMap 裏面.若是該HashMap已經存在相應的 Key. 那麼直接進行 aggregate 也就是 func(hashMap.get(Key), Value).

Shuffle aggregate

shuffle read task拿到多個map產生的相同的key的數據後,須要對數據進行聚合,把相同key的數據放到一塊兒,這個過程叫作aggregate.

 


大體過程以下圖: 

 


task把讀來的 records 被逐個 aggreagte 到 HashMap 中,等到全部 records 都進入 HashMap,就獲得最後的處理結果。

fetch 來的數據存放到哪裏?

剛 fetch 來的 FileSegment 存放在 softBuffer 緩衝區,通過處理後的數據放在內存 + 磁盤上。

小結:

其實MapReduce Suffle 和 Spark的Shuffle在主要方面仍是基本一致的, 好比:都是基於sort的. 細節上有一些區別, 好比 mapreduce完成一個map,就開始reduce, 而spark因爲stage的概念,須要等全部ShuffleMap完成再ShuffleReduce.

相關文章
相關標籤/搜索