因爲 MapReduce 確保每一個 reducer 的輸入都是按鍵排序的,所以在 map 處理完數據以後傳給 reducer 的這個過程當中須要進行一系列操做,這個操做過程就是 shuffle。在《hadoop權威指南》中指出,shuffle 是 MapReduce 的 「心臟」,瞭解 shuffle 工做機制有助於咱們優化 MapReduce 程序,接下來咱們就來看看它的運行機制。緩存
先用一張圖表示 shuffle 的整個過程。從圖中咱們能夠看到 shuffle 流程主要是對 map 的數據進行排序、分組發送給 reduce 後再進行合併的一個過程,咱們將分 map 和 reduce 兩個部分來說解 shuffle 的流程。網絡
map 任務開始產生數據時,會先將這些數據存儲在一個 內存緩衝區 中,這個緩衝區大小默認爲 100MB,能夠經過設置 mapreduce.task.io.sort.mb 來改變其大小。因爲 hadoop 處理的是海量數據,100MB 的內存顯然是不夠用的,所以達到必定 閾值 時(默認爲 0.8,能夠經過設置 mapreduce.map.sort.spill.percent 來改變其大小),會將內存中的內容溢出(spill)到磁盤當中,溢出的路徑是由 mapreduce.cluster.local.dir 屬性指定的。在溢出到磁盤的過程當中,若是緩衝區中還有空間,map 程序會繼續輸出數據到緩衝區中,若是沒有空間的話,map 輸出程序則會阻塞直到數據寫入到磁盤後。函數
在上圖中 buffer in memory(輸出到緩存中) 和 merge on disk(合併到磁盤) 這兩個步驟中間還有一個 分區、排序 的步驟。分區能達到跟分組相似的效果,例如讀取一個含有大量電話號碼的數據時,把 138 的分爲一組,把 135 分爲一組。這個效果能夠經過自定一個類繼承 Partitioner,而後在 Job 中調用 setPartitionerClass 方法設置分區類來完成。在每一個分區中,後臺線程按照鍵的值對數據在內存中進行排序,若是有一個 combiner 方法,則在排序完成以後運行它。combiner 方法會使 map 輸出更緊湊,減小寫到磁盤中的數據和傳給 reducer 的數據。oop
通常狀況下,map 的輸出結果並不會進行壓縮,因爲數據量大,對網絡資源的耗費很大,爲了對 mapreduce 程序進行優化,咱們能夠將 mapreduce.map.output.compress 屬性設置爲 true,這樣當 map 將數據寫到磁盤時就會對數據進行壓縮。具體的壓縮格式能夠經過 mapreduce.map.output.compress.codec 屬性來設置。當全部記錄都寫完以後,map 會合並所有的溢出文件爲一個分區且排序的文件傳給 reduce。優化
reducer 經過 HTTP 的方式獲取 map 的的輸出數據,這是複製階段。reducer 在複製階段把 Map 輸出複製到 Reducer 的內存或磁盤,一個 Map 任務完成後,Reduce 就開始複製輸出。複製完全部的 map 輸出以後,reducer 對這些數據進行合併,使它們仍然保持有序。合併完成以後,直接將這些數據輸入到 reduce 函數中,從而省略一次寫入磁盤的時間。至此,整個 shuffle 流程就完成了。線程
以上即是我對 MR shuffle 機制的理解,若是其中有錯,歡迎指出。3d