Shuffle過程是MapReduce的核心,也被稱爲奇蹟發生的地方。要想理解MapReduce, Shuffle是必需要了解的。我看過不少相關的資料,但每次看完都雲裏霧裏的繞着,很難理清大體的邏輯,反而越攪越混。前段時間在作MapReduce job 性能調優的工做,須要深刻代碼研究MapReduce的運行機制,這纔對Shuffle探了個究竟。考慮到以前我在看相關資料而看不懂時很惱火,因此在這裏我盡最大的可能試着把Shuffle說清楚,讓每一位想了解它原理的朋友都能有所收穫。若是你對這篇文章有任何疑問或建議請留言到後面,謝謝!
Shuffle的正常意思是洗牌或弄亂,可能你們更熟悉的是Java API裏的Collections.shuffle(List)方法,它會隨機地打亂參數list裏的元素順序。若是你不知道MapReduce裏Shuffle是什麼,那麼請看這張圖:
這張是官方對Shuffle過程的描述。但我能夠確定的是,單從這張圖你基本不可能明白Shuffle的過程,由於它與事實相差挺多,細節也是錯亂的。後面我會具體描述Shuffle的事實狀況,因此這裏你只要清楚Shuffle的大體範圍就成-怎樣把map task的輸出結果有效地傳送到reduce端。也能夠這樣理解, Shuffle描述着數據從map task輸出到reduce task輸入的這段過程。
在Hadoop這樣的集羣環境中,大部分map task與reduce task的執行是在不一樣的節點上。固然不少狀況下Reduce執行時須要跨節點去拉取其它節點上的map task結果。若是集羣正在運行的job有不少,那麼task的正常執行對集羣內部的網絡資源消耗會很嚴重。這種網絡消耗是正常的,咱們不能限制,能作的就是最大化地減小沒必要要的消耗。還有在節點內,相比於內存,磁盤IO對job完成時間的影響也是可觀的。從最基本的要求來講,咱們對Shuffle過程的指望能夠有: 數組
OK,看到這裏時,你們能夠先停下來想一想,若是是本身來設計這段Shuffle過程,那麼你的設計目標是什麼。我想能優化的地方主要在於減小拉取數據的量及儘可能使用內存而不是磁盤。
個人分析是基於Hadoop0.21.0的源碼,若是與你所認識的Shuffle過程有差異,不吝指出。我會以WordCount爲例,並假設它有8個map task和3個reduce task。從上圖看出,Shuffle過程橫跨map與reduce兩端,因此下面我也會分兩部分來展開。
先看看map端的狀況,以下圖:
上圖多是某個map task的運行狀況。拿它與官方圖的左半邊比較,會發現不少不一致。官方圖沒有清楚地說明partition, sort與combiner到底做用在哪一個階段。我畫了這張圖,但願讓你們清晰地瞭解從map數據輸入到map端全部數據準備好的全過程。
整個流程我分了四步。簡單些能夠這樣說,每一個map task都有一個內存緩衝區,存儲着map的輸出結果,當緩衝區快滿的時候須要將緩衝區的數據以一個臨時文件的方式存放到磁盤,當整個map task結束後再對磁盤中這個map task產生的全部臨時文件作合併,生成最終的正式輸出文件,而後等待reduce task來拉數據。
固然這裏的每一步均可能包含着多個步驟與細節,下面我對細節來一一說明:
1. 在map task執行時,它的輸入數據來源於HDFS的block,固然在MapReduce概念中,map task只讀取split。Split與block的對應關係多是多對一,默認是一對一。在WordCount例子裏,假設map的輸入數據都是像「aaa」這樣的字符串。
2. 在通過mapper的運行後,咱們得知mapper的輸出是這樣一個key/value對: key是「aaa」, value是數值1。由於當前map端只作加1的操做,在reduce task裏纔去合併結果集。前面咱們知道這個job有3個reduce task,到底當前的「aaa」應該交由哪一個reduce去作呢,是須要如今決定的。
MapReduce提供Partitioner接口,它的做用就是根據key或value及reduce的數量來決定當前的這對輸出數據最終應該交由哪一個reduce task處理。默認對key hash後再以reduce task數量取模。默認的取模方式只是爲了平均reduce的處理能力,若是用戶本身對Partitioner有需求,能夠訂製並設置到job上。
在咱們的例子中,「aaa」通過Partitioner後返回0,也就是這對值應當交由第一個reducer來處理。接下來,須要將數據寫入內存緩衝區中,緩衝區的做用是批量收集map結果,減小磁盤IO的影響。咱們的key/value對以及Partition的結果都會被寫入緩衝區。固然寫入以前,key與value值都會被序列化成字節數組。
整個內存緩衝區就是一個字節數組,它的字節索引及key/value存儲結構我沒有研究過。若是有朋友對它有研究,那麼請大體描述下它的細節吧。
3. 這個內存緩衝區是有大小限制的,默認是100MB。當map task的輸出結果不少時,就可能會撐爆內存,因此須要在必定條件下將緩衝區中的數據臨時寫入磁盤,而後從新利用這塊緩衝區。這個從內存往磁盤寫數據的過程被稱爲Spill,中文可譯爲溢寫,字面意思很直觀。這個溢寫是由單獨線程來完成,不影響往緩衝區寫map結果的線程。溢寫線程啓動時不該該阻止map的結果輸出,因此整個緩衝區有個溢寫的比例spill.percent。這個比例默認是0.8,也就是當緩衝區的數據已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啓動,鎖定這80MB的內存,執行溢寫過程。Map task的輸出結果還能夠往剩下的20MB內存中寫,互不影響。
當溢寫線程啓動後,須要對這80MB空間內的key作排序(Sort)。排序是MapReduce模型默認的行爲,這裏的排序也是對序列化的字節作的排序。
在這裏咱們能夠想一想,由於map task的輸出是須要發送到不一樣的reduce端去,而內存緩衝區沒有對將發送到相同reduce端的數據作合併,那麼這種合併應該是體現是磁盤文件中的。從官方圖上也能夠看到寫到磁盤中的溢寫文件是對不一樣的reduce端的數值作過合併。因此溢寫過程一個很重要的細節在於,若是有不少個key/value對須要發送到某個reduce端去,那麼須要將這些key/value值拼接到一塊,減小與partition相關的索引記錄。
在針對每一個reduce端而合併數據時,有些數據可能像這樣:「aaa」/1, 「aaa」/1。對於WordCount例子,就是簡單地統計單詞出現的次數,若是在同一個map task的結果中有不少個像「aaa」同樣出現屢次的key,咱們就應該把它們的值合併到一塊,這個過程叫reduce也叫combine。但MapReduce的術語中,reduce只指reduce端執行從多個map task取數據作計算的過程。除reduce外,非正式地合併數據只能算作combine了。其實你們知道的,MapReduce中將Combiner等同於Reducer。
若是client設置過Combiner,那麼如今就是使用Combiner的時候了。將有相同key的key/value對的value加起來,減小溢寫到磁盤的數據量。Combiner會優化MapReduce的中間結果,因此它在整個模型中會屢次使用。那哪些場景才能使用Combiner呢?從這裏分析,Combiner的輸出是Reducer的輸入,Combiner毫不能改變最終的計算結果。因此從個人想法來看,Combiner只應該用於那種Reduce的輸入key/value與輸出key/value類型徹底一致,且不影響最終結果的場景。好比累加,最大值等。Combiner的使用必定得慎重,若是用好,它對job執行效率有幫助,反之會影響reduce的最終結果。
4. 每次溢寫會在磁盤上生成一個溢寫文件,若是map的輸出結果然的很大,有屢次這樣的溢寫發生,磁盤上相應的就會有多個溢寫文件存在。當map task真正完成時,內存緩衝區中的數據也所有溢寫到磁盤中造成一個溢寫文件。最終磁盤中會至少有一個這樣的溢寫文件存在(若是map的輸出結果不多,當map執行完成時,只會產生一個溢寫文件),由於最終的文件只有一個,因此須要將這些溢寫文件歸併到一塊兒,這個過程就叫作Merge。Merge是怎樣的?如前面的例子,「aaa」從某個map task讀取過來時值是5,從另一個map 讀取時值是8,由於它們有相同的key,因此得merge成group。什麼是group。對於「aaa」就是像這樣的:{「aaa」, [5, 8, 2, …]},數組中的值就是從不一樣溢寫文件中讀取出來的,而後再把這些值加起來。請注意,由於merge是將多個溢寫文件合併到一個文件,因此可能也有相同的key存在,在這個過程當中若是client設置過Combiner,也會使用Combiner來合併相同的key。
至此,map端的全部工做都已結束,最終生成的這個文件也存放在TaskTracker夠得着的某個本地目錄內。每一個reduce task不斷地經過RPC從JobTracker那裏獲取map task是否完成的信息,若是reduce task獲得通知,獲知某臺TaskTracker上的map task執行完成,Shuffle的後半段過程開始啓動。
簡單地說,reduce task在執行以前的工做就是不斷地拉取當前job裏每一個map task的最終結果,而後對從不一樣地方拉取過來的數據不斷地作merge,也最終造成一個文件做爲reduce task的輸入文件。見下圖:
如map 端的細節圖,Shuffle在reduce端的過程也能用圖上標明的三點來歸納。當前reduce copy數據的前提是它要從JobTracker得到有哪些map task已執行結束,這段過程不表,有興趣的朋友能夠關注下。Reducer真正運行以前,全部的時間都是在拉取數據,作merge,且不斷重複地在作。如前面的方式同樣,下面我也分段地描述reduce 端的Shuffle細節:
1. Copy過程,簡單地拉取數據。Reduce進程啓動一些數據copy線程(Fetcher),經過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。由於map task早已結束,這些文件就歸TaskTracker管理在本地磁盤中。
2. Merge階段。這裏的merge如map端的merge動做,只是數組中存放的是不一樣map端copy來的數值。Copy過來的數據會先放入內存緩衝區中,這裏的緩衝區大小要比map端的更爲靈活,它基於JVM的heap size設置,由於Shuffle階段Reducer不運行,因此應該把絕大部分的內存都給Shuffle用。這裏須要強調的是,merge有三種形式:1)內存到內存 2)內存到磁盤 3)磁盤到磁盤。默認狀況下第一種形式不啓用,讓人比較困惑,是吧。當內存中的數據量到達必定閾值,就啓動內存到磁盤的merge。與map 端相似,這也是溢寫的過程,這個過程當中若是你設置有Combiner,也是會啓用的,而後在磁盤中生成了衆多的溢寫文件。第二種merge方式一直在運行,直到沒有map端的數據時才結束,而後啓動第三種磁盤到磁盤的merge方式生成最終的那個文件。
3. Reducer的輸入文件。不斷地merge後,最後會生成一個「最終文件」。爲何加引號?由於這個文件可能存在於磁盤上,也可能存在於內存中。對咱們來講,固然但願它存放於內存中,直接做爲Reducer的輸入,但默認狀況下,這個文件是存放於磁盤中的。至於怎樣才能讓這個文件出如今內存中,以後的性能優化篇我再說。當Reducer的輸入文件已定,整個Shuffle才最終結束。而後就是Reducer執行,把結果放到HDFS上。
上面就是整個Shuffle的過程。細節不少,我不少都略過了,只試着把要點說明白。固然,我可能也有理解或表述上的不少問題,不吝指點。我但願不斷地完善和修改這篇文章,能讓它通俗、易懂,看完就能知道Shuffle的方方面面。至於具體的實現原理,各位有興趣就本身去探索,若是不方便的話,留言給我,我再來研究並反饋。性能優化