問題導讀:
1.map會將已經產生的部分結果先寫入到該buffer中.buffer大小能夠經過那個參數來設置?
2.如何下降map的split的次數?
3.map中的數據什麼狀況下會寫入磁盤?spill是什麼?
4.map實際上是當buffer被寫滿到必定程度(好比80%)時,就開始進行spill有由那個參數來決定?
5.經過哪一個參數能夠控制map中間結果是否使用壓縮的?
6.reduce包含幾個階段,是每一個reduce都必須包含?
7.Reduce task在作shuffle的過程是什麼樣子的?如何調整多個並行map個數的下載?
8.reduce的數據是否所有來源磁盤,如何調整使用內存?
1 Map side tuning參數1.1 MapTask運行內部原理java
1.當map task開始運算,併產生中間數據時,其產生的中間結果並不是直接就簡單的寫入磁盤。這中間的過程比較複雜,而且利用到了內存buffer來進行已經產生的部分結果的緩存,並在內存buffer中進行一些預排序來優化整個map的性能。如上圖所示,每個map都會對應存在一個內存buffer(MapOutputBuffer,即上圖的buffer in memory),map會將已經產生的部分結果先寫入到該buffer中,這個buffer默認是100MB大小,可是這個大小是能夠根據job提交時的參數設定來調整的,算法
該參數即爲:mapreduce.task.io.sort.mb。apache
當map的產生數據很是大時,而且把mapreduce.task.io.sort.mb調大,那麼map在整個計算過程當中spill的次數就勢必會下降,map task對磁盤的操做就會變少,緩存
若是map tasks的瓶頸在磁盤上,這樣調整就會大大提升map的計算性能。網絡
map作sort和spill的內存結構以下如所示:併發
2.map在運行過程當中,不停的向該buffer中寫入已有的計算結果,可是該buffer並不必定能將所有的map輸出緩存下來,當map輸出超出必定閾值(好比100M),那麼map就必須將該buffer中的數據寫入到磁盤中去,這個過程在mapreduce中叫作spill。map並非要等到將該buffer所有寫滿時才進行spill,由於若是所有寫滿了再去寫spill,勢必會形成map的計算部分等待buffer釋放空間的狀況。因此,map實際上是當buffer被寫滿到必定程度(好比80%)時,就開始進行spill。ide
這個閾值也是由一個job的配置參數來控制,函數
即mapreduce.map.sort.spill.percent,默認爲0.80或80%。oop
這個參數一樣也是影響spill頻繁程度,進而影響map task運行週期對磁盤的讀寫頻率的。但非特殊狀況下,一般不須要人爲的調整。調整mapreduce.task.io.sort.mb對用戶來講更加方便。性能
3.當map task的計算部分所有完成後,若是map有輸出,就會生成一個或者多個spill文件,這些文件就是map的輸出結果。map在正常退出以前(cleanup),須要將這些spill合併(merge)成一個,因此map在結束以前還有一個merge的過程。merge的過程當中,有一個參數能夠調整這個過程的行爲,該參數爲:mapreduce.task.io.sort.factor。該參數默認爲10。它表示當merge spill文件時,最多能有多少並行的stream向merge文件中寫入。好比若是map產生的數據很是的大,產生的spill文件大於10,而mapreduce.task.io.sort.factor使用的是默認的10,那麼當map計算完成作merge時,就沒有辦法一次將全部的spill文件merge成一個,而是會分屢次,每次最多10個stream。這也就是說,當map的中間結果很是大,調大mapreduce.task.io.sort.factor,有利於減小merge次數,進而減小map對磁盤的讀寫頻率,有可能達到優化做業的目的。
4.當job指定了combiner的時候,咱們都知道map介紹後會在map端根據combiner定義的函數將map結果進行合併。運行combiner函數的時機有可能會是merge完成以前,或者以後,這個時機能夠由一個參數控制,即mapreduce.map.combine.minspills(default 3),當job中設定了combiner,而且spill數大於等於3的時候,那麼combiner函數就會在merge產生結果文件以前運行。經過這樣的方式,就能夠在spill很是多須要merge,而且不少數據須要作conbine的時候,減小寫入到磁盤文件的數據數量,一樣是爲了減小對磁盤的讀寫頻率,有可能達到優化做業的目的。
5.減小中間結果讀寫進出磁盤的方法不止這些,還有就是壓縮。也就是說map的中間,不管是spill的時候,仍是最後merge產生的結果文件,都是能夠壓縮的。壓縮的好處在於,經過壓縮減小寫入讀出磁盤的數據量。對中間結果很是大,磁盤速度成爲map執行瓶頸的job,尤爲有用。控制map中間結果是否使用壓縮的參數爲:mapreduce.map.output.compress(true/false)。 將這個參數設置爲true時,那麼map在寫中間結果時,就會將數據壓縮後再寫入磁盤,讀結果時也會採用先解壓後讀取數據。這樣作的後果就是:寫入磁盤的 中間結果數據量會變少,可是cpu會消耗一些用來壓縮和解壓。因此這種方式一般適合job中間結果很是大,瓶頸不在cpu,而是在磁盤的讀寫的狀況。說的 直白一些就是用cpu換IO。根據觀察,一般大部分的做業cpu都不是瓶頸,除非運算邏輯異常複雜。因此對中間結果採用壓縮一般來講是有收益的。如下是一 個wordcount中間結果採用壓縮和不採用壓縮產生的map中間結果本地磁盤讀寫的數據量對比:
map中間結果不壓縮:
map中間結果壓縮:
能夠看出,一樣的job,一樣的數據,在採用壓縮的狀況下,map中間結果能縮小將近10倍,若是map的瓶頸在磁盤,那麼job的性能提高將會很是可觀。
當採用map中間結果壓縮的狀況下,用戶還能夠選擇壓縮時採用哪一種壓縮格式進行壓縮,如今hadoop支持的壓縮格式有:GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等壓縮格式。一般來講,想要達到比較平衡的cpu和磁盤壓縮比,LzoCodec比較適合。但也要取決於job的具體狀況。用戶若想要自行選擇中間結果的壓縮算法,能夠設置配置參數:mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec或者其餘用戶自行選擇的壓縮方式。
1.2 Map side相關參數調優
選項 | 類型 | 默認值 | 描述 |
mapreduce.task.io.sort.mb |
int | 100 | 緩存map中間結果的buffer大小(in MB) |
io.sort.record.percent |
float | 0.05 | io.sort.mb中用來保存map output記錄邊界的百分比,其餘緩存用來保存數據 |
io.sort.spill.percent mapreduce.map.sort.spill.percent |
float | 0.80 | map開始作spill操做的閾值 |
io.sort.factor mapreduce.task.io.sort.factor |
int | 10 | 作merge操做時同時操做的stream數上限。 |
min.num.spill.for.combine | int | 3 | combiner函數運行的最小spill數 |
mapred.compress.map.output | boolean | false | map中間結果是否採用壓縮 |
mapred.map.output.compression.codec | class name | org.apache.hadoop.io. compress.DefaultCodec |
map中間結果的壓縮格式 |
2 Reduce side tuning參數2.1 ReduceTask運行內部原理
1.reduce的運行是分紅三個階段的。分別爲copy->sort->reduce。因爲job的每個map都會根據reduce(n)數將數據分紅map 輸出結果分紅n個partition,因此map的中間結果中是有可能包含每個reduce須要處理的部分數據的。因此,爲了優化reduce的執行時間,hadoop中是等job的第一個map結束後,全部的reduce就開始嘗試從完成的map中下載該reduce對應的partition部分數據。這個過程就是一般所說的shuffle,也就是copy過程。
2.Reduce task在作shuffle時,實際上就是從不一樣的已經完成的map上去下載屬於本身這個reduce的部分數據,因爲map一般有許多個,因此對一個reduce來講,下載也能夠是並行的從多個map下載,這個並行度是能夠調整的,調整參數爲:mapreduce.reduce.shuffle.parallelcopies(default 5)。默 認狀況下,每一個只會有5個並行的下載線程在從map下數據,若是一個時間段內job完成的map有100個或者更多,那麼reduce也最多隻能同時下載 5個map的數據,因此這個參數比較適合map不少而且完成的比較快的job的狀況下調大,有利於reduce更快的獲取屬於本身部分的數據。
3.reduce 的每個下載線程在下載某個map數據的時候,有可能由於那個map中間結果所在機器發生錯誤,或者中間結果的文件丟失,或者網絡瞬斷等等狀況,這樣 reduce的下載就有可能失敗,因此reduce的下載線程並不會無休止的等待下去,當必定時間後下載仍然失敗,那麼下載線程就會放棄此次下載,並在隨 後嘗試從另外的地方下載(由於這段時間map可能重跑)。因此reduce下載線程的這個最大的下載時間段是能夠調整的,調整參數爲:mapred.reduce.copy.backoff(default 300秒)。若是集羣環境的網絡自己是瓶頸,那麼用戶能夠經過調大這個參數來避免reduce下載線程被誤判爲失敗的狀況。不過在網絡環境比較好的狀況下,沒有必要調整。一般來講專業的集羣網絡不該該有太大問題,因此這個參數須要調整的狀況很少。
4.Reduce將map結果下載到本地時,一樣也是須要進行merge的,因此mapreduce.task.io.sort.factor的配置選項一樣會影響reduce進行merge時的行爲,該參數的詳細介紹上文已經提到,當發現reduce在shuffle階段iowait很是的高的時候,就有可能經過調大這個參數來加大一次merge時的併發吞吐,優化reduce效率。
5.Reduce在shuffle階段對下載來的map數據,並非馬上就寫入磁盤的,而是會先緩存在內存中,而後當使用內存達到必定量的時候才刷入磁盤。這個內存大小的控制就不像map同樣能夠經過mapreduce.task.io.sort.mb來設定了,而是經過另一個參數來設置:mapreduce.reduce.shuffle.input.buffer.percent(default 0.7),這個參數實際上是一個百分比,意思是說,shuffile在reduce內存中的數據最多使用內存量爲:0.7 × maxHeap of reduce task。也就是說,若是該reduce task的最大heap使用量(一般經過mapreduce.reduce.java.opts來設置,好比設置爲-Xmx1024m)的必定比例用來緩存數據。默認狀況下,reduce會使用其heapsize的70%來在內存中緩存數據。若是reduce的heap因爲業務緣由調整的比較大,相應的緩存大小也會變大,這也是爲何reduce用來作緩存的參數是一個百分比,而不是一個固定的值了。
6.假設mapreduce.reduce.shuffle.input.buffer.percent爲0.7,reduce task的max heapsize爲1G,那麼用來作下載數據緩存的內存就爲大概700MB左右,這700M的內存,跟map端同樣,也不是要等到所有寫滿纔會往磁盤刷的,而是當這700M中被使用到了必定的限度(一般是一個百分比),就會開始往磁盤刷。這個限度閾值也是能夠經過job參數來設定的,設定參數爲:mapreduce.reduce.shuffle.merge.percent(default 0.66)。若是下載速度很快,很容易就把內存緩存撐大,那麼調整一下這個參數有可能會對reduce的性能有所幫助。
7.當reduce將全部的map上對應本身partition的數據下載完成後,就會開始真正的reduce計算階段(中間有個sort階段一般時間很是短,幾秒鐘就完成了,由於整個下載階段就已是邊下載邊sort,而後邊merge的)。當reduce task真正進入reduce函數的計算階段的時候,有一個參數也是能夠調整reduce的計算行爲。也就是:mapreduce.reduce.input.buffer.percent(default 0.0)。 因爲reduce計算時確定也是須要消耗內存的,而在讀取reduce須要的數據時,一樣是須要內存做爲buffer,這個參數是控制,須要多少的內存百 分比來做爲reduce讀已經sort好的數據的buffer百分比。默認狀況下爲0,也就是說,默認狀況下,reduce是所有從磁盤開始讀處理數據。 若是這個參數大於0,那麼就會有必定量的數據被緩存在內存並輸送給reduce,當reduce計算邏輯消耗內存很小時,能夠分一部份內存用來緩存數據, 反正reduce的內存閒着也是閒着。
2.2 Reduce side相關參數調優
選項 | 類型 | 默認值 | 描述 |
mapred.reduce.parallel.copies | int | 5 | 每一個reduce並行下載map結果的最大線程數 |
mapred.reduce.copy.backoff | int | 300 | reduce下載線程最大等待時間(in sec) |
io.sort.factor | int | 10 | 同上 |
mapred.job.shuffle.input.buffer.percent | float | 0.7 | 用來緩存shuffle數據的reduce task heap百分比 |
mapred.job.shuffle.merge.percent | float | 0.66 | 緩存的內存中多少百分比後開始作merge操做 |
mapred.job.reduce.input.buffer.percent | float | 0.0 | sort完成後reduce計算階段用來緩存數據的百分比 |