要知道怎麼對MapReduce做業進行調優前提條件是須要對Map-Reduce的過程瞭然於胸。
Map-Reduce運行原理圖:
java
1.從磁盤讀取數據並分片node
默認每一個block對應一個分片,一個map task算法
2.進行map處理apache
運行自定義的map業務過程緩存
3.輸出數據到緩衝區中markdown
map輸出的數據並非直接寫入磁盤的,而是會先存儲在一個預約義的buffer中網絡
四、分區、排序分組的過程app
對map輸出的數據進行分區,按照key進行排序和分組負載均衡
五、歸約(可選)socket
至關於本地端的reduce過程
六、合併寫入磁盤
對map的最終數據進行merge以後輸出到磁盤中等待shuffle過程
1.從map端複製數據
2.對數據進行合併
以上兩個步驟即爲shuffle過程
3.對數據進行排序
4.進行reduce操做
5.輸出到磁盤
詳細的過程將會在調優技巧中體現出來
Combiner在Map端提早進行了一次Reduce處理。
可減小Map Task中間輸出的結果,從而減小各個Reduce Task的遠程拷貝數據量,最終表現爲Map Task和Reduce Task執行時間縮短。
爲應用程序處理的數據選擇合適的Writable類型可大大提高性能。
好比處理整數類型數據時,直接採用IntWritable比先以Text類型讀入在轉換爲整數類型要高效。
若是輸出整數的大部分可用一個或兩個字節保存,那麼直接採用VIntWritable或者VLongWritable,它們採用了變長整型的編碼方式,能夠大大減小輸出數據量。
假設集羣有1個Namenode+8個Datanode節點,HDFS默認的副本數爲3
那麼map端讀取數據的時候,在啓動map task的機器上讀取本地的數據爲3/8,一部分數據是經過網絡從其餘節點拿到的
那麼若是副本數設置爲8會是什麼狀況?
至關於每一個子節點上都會有一份完整的數據,map讀取的時候直接從本地拿,不須要經過網絡這一層了
可是在實際狀況中設置副本數爲8是不可行的,由於數據自己很是龐大,副本數超過5對集羣的磁盤就很是有壓力了,因此這項設置須要酌情處理
該配置在hdfs-side.xml的dfs.replication項中設置
這是map階段的第一步,從磁盤讀取數據並切片,每一個分片由一個map task處理
當輸入的是海量的小文件的時候,會啓動大量的map task,效率及其之慢,有效的解決方式是使用CombineInputFormat自定義分片策略對小文件進行合併處理
從而減小map task的數量,減小map過程使用的時間
詳情請看:自定義分片策略解決大量小文件問題
另外,map task的啓動數量也和下面這幾個參數有關係:
- mapred.min.split.size:Input Split的最小值 默認值1
- mapred.max.split.size:Input Split的最大值
- dfs.block.size:HDFS 中一個block大小,默認值128MB
當mapred.min.split.size小於dfs.block.size的時候,一個block會被分爲多個分片,也就是對應多個map task
當mapred.min.split.size大於dfs.block.size的時候,一個分片可能對應多個block,也就是一個map task讀取多個block數據
集羣的網絡、IO等性能很好的時候,建議調高dfs.block.size
根據數據源的特性,主要調整mapred.min.split.size來控制map task的數量
該階段是map side中將結果輸出到磁盤以前的一個處理方式,經過對其進行設置的話能夠減小map任務的IO開銷,從而提升性能
因爲map任務運行時中間結果首先存儲在buffer中,默認當緩存的使用量達到80%的時候就開始寫入磁盤,這個過程叫作spill(溢出)
這個buffer默認的大小是100M能夠經過設定io.sort.mb的值來進行調整
當map產生的數據很是大時,若是默認的buffer大小不夠看,那麼勢必會進行很是屢次的spill,進行spill就意味着要寫磁盤,產生IO開銷
這時候就能夠把io.sort.mb調大,那麼map在整個計算過程當中spill的次數就勢必會下降,map task對磁盤的操做就會變少
若是map tasks的瓶頸在磁盤上,這樣調整就會大大提升map的計算性能
可是若是將io.sort.mb調的很是大的時候,對機器的配置要求就很是高,由於佔用內存過大,因此須要根據狀況進行配置
map並非要等到buffer所有寫滿時才進行spill,由於若是所有寫滿了再去寫spill,勢必會形成map的計算部分等待buffer釋放空間的狀況。
因此,map實際上是當buffer被寫滿到必定程度(好比80%)時,纔開始進行spill
能夠經過設置io.sort.spill.percent的值來調整這個閾值
這個參數一樣也是影響spill頻繁程度,進而影響map task運行週期對磁盤的讀寫頻率
可是一般狀況下只須要對io.sort.mb進行調整便可
該階段是map產生spill以後,對spill進行處理的過程,經過對其進行配置也能夠達到優化IO開銷的目的
map產生spill以後必須將些spill進行合併,這個過程叫作merge
merge過程是並行處理spill的,每次並行多少個spill是由參數io.sort.factor指定的,默認爲10個
若是產生的spill很是多,merge的時候每次只能處理10個spill,那麼仍是會形成頻繁的IO處理
適當的調大每次並行處理的spill數有利於減小merge數所以能夠影響map的性能
可是若是調整的數值過大,並行處理spill的進程過多會對機器形成很大壓力
咱們知道若是map side設置了Combiner,那麼會根據設定的函數對map輸出的數據進行一次類reduce的預處理
可是和分組、排序分組不同的是,combine發生的階段多是在merge以前,也多是在merge以後
這個時機能夠由一個參數控制:min.num.spill.for.combine,默認值爲3
當job中設定了combiner,而且spill數最少有3個的時候,那麼combiner函數就會在merge產生結果文件以前運行
例如,產生的spill很是多,雖然咱們能夠經過merge階段的io.sort.factor進行優化配置,可是在此以前咱們還能夠經過先執行combine對結果進行處理以後再對數據進行merge
這樣一來,到merge階段的數據量將會進一步減小,IO開銷也會被降到最低
這個階段是map side的最後一個步驟,在這個步驟中也能夠經過壓縮選項的配置來獲得任務的優化
其實不管是spill的時候,仍是最後merge產生的結果文件,都是能夠壓縮的
壓縮的好處在於,經過壓縮減小寫入讀出磁盤的數據量。對中間結果很是大,磁盤速度成爲map執行瓶頸的job,尤爲有用
控制輸出是否使用壓縮的參數是mapred.compress.map.output,值爲true或者false
啓用壓縮以後,會犧牲CPU的一些計算資源,可是能夠節省IO開銷,很是適合IO密集型的做業(若是是CPU密集型的做業不建議設置)
設置壓縮的時候,咱們能夠選擇不一樣的壓縮算法
Hadoop默認提供了GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等壓縮格式
一般來講,想要達到比較平衡的cpu和磁盤壓縮比,LzoCodec比較合適,但也要取決於job的具體狀況
若是想要自行選擇中間結果的壓縮算法,能夠設置配置參數:
mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec
//或者其餘用戶自行選擇的壓縮方式
從上面提到的幾點能夠看到,map端的性能瓶頸都是頻繁的IO操做形成的,全部的優化也都是針對IO進行的,而優化的瓶頸又很大程度上被機器的配置等外部因素所限制
map端調優的相關參數:
選項 | 類型 | 默認值 | 描述 |
---|---|---|---|
mapred.min.split.size | int | 1 | Input Split的最小值 |
mapred.max.split.size | int | . | Input Split的最大值 |
io.sort.mb | int | 100 | map緩衝區大小 |
io.sort.spill.percent | float | 0.8 | 緩衝區閾值 |
io.sort.factor | int | 10 | 並行處理spill的個數 |
min.num.spill.for.combine | int | 3 | 最少有多少個spill的時候combine在merge以前進行 |
mapred.compress.map.output | boolean | false | map中間數據是否採用壓縮 |
mapred.map.output.compression.codec | String | . | 壓縮算法 |
1.Copy
因爲job的每個map都會根據reduce(n)數將數據分紅map 輸出結果分紅n個partition,因此map的中間結果中是有可能包含每個reduce須要處理的部分數據的
爲了優化reduce的執行時間,hadoop中等第一個map結束後,全部的reduce就開始嘗試從完成的map中下載該reduce對應的partition部分數據
在這個shuffle過程當中,因爲map的數量一般是不少個的,而每一個map中又都有可能包含每一個reduce所須要的數據
因此對於每一個reduce來講,去各個map中拿數據也是並行的,能夠經過mapred.reduce.parallel.copies這個參數來調整,默認爲5
當map數量不少的時候,就能夠適當調大這個值,減小shuffle過程使用的時間
還有一種狀況是:reduce從map中拿數據的時候,有可能由於中間結果丟失、網絡等其餘緣由致使map任務失敗
而reduce不會由於map失敗就永無止境的等待下去,它會嘗試去別的地方得到本身的數據(這段時間失敗的map可能會被重跑)
因此設置reduce獲取數據的超時時間能夠避免一些由於網絡很差致使沒法得到數據的狀況
mapred.reduce.copy.backoff,默認300s
通常狀況下不用調整這個值,由於生產環境的網絡都是很流暢的
2.Merge
因爲reduce是並行將map結果下載到本地,因此也是須要進行merge的,因此io.sort.factor的配置選項一樣會影響reduce進行merge時的行爲
和map同樣,reduce下載過來的數據也是存入一個buffer中而不是立刻寫入磁盤的,因此咱們一樣能夠控制這個值來減小IO開銷
控制該值的參數爲:
mapred.job.shuffle.input.buffer.percent,默認0.7,這是一個百分比,意思是reduce的可用內存中拿出70%做爲buffer存放數據
reduce的可用內存經過mapred.child.java.opts來設置,好比置爲-Xmx1024m,該參數是同時設定map和reduce task的可用內存,通常爲map buffer大小的兩倍左右
設置了reduce端的buffer大小,咱們一樣能夠經過一個參數來控制buffer中的數據達到一個閾值的時候開始往磁盤寫數據:mapred.job.shuffle.merge.percent,默認爲0.66
sort的過程通常很是短,由於是邊copy邊merge邊sort的,後面就直接進入真正的reduce計算階段了
以前咱們說過reduc端的buffer,默認狀況下,數據達到一個閾值的時候,buffer中的數據就會寫入磁盤,而後reduce會從磁盤中得到全部的數據
也就是說,buffer和reduce是沒有直接關聯的,中間多個一個寫磁盤->讀磁盤的過程,既然有這個弊端,那麼就能夠經過參數來配置
使得buffer中的一部分數據能夠直接輸送到reduce,從而減小IO開銷:mapred.job.reduce.input.buffer.percent,默認爲0.0
當值大於0的時候,會保留指定比例的內存讀buffer中的數據直接拿給reduce使用
這樣一來,設置buffer須要內存,讀取數據須要內存,reduce計算也要內存,因此要根據做業的運行狀況進行調整
和map階段差很少,reduce節點的調優也是主要集中在加大內存使用量,減小IO,增大並行數
reduce調優主要參數:
選項 | 類型 | 默認值 | 描述 |
---|---|---|---|
mapred.reduce.parallel.copies | int | 5 | 每一個reduce去map中拿數據的並行數 |
mapred.reduce.copy.backoff | int | 300 | 獲取map數據最大超時時間 |
mapred.job.shuffle.input.buffer.percent | float | 0.7 | buffer大小佔reduce可用內存的比例 |
mapred.child.java.opts | String | . | -Xmx1024m設置reduce可用內存爲1g |
mapred.job.shuffle.merge.percent | float | 0.66 | buffer中的數據達到多少比例開始寫入磁盤 |
mapred.job.reduce.input.buffer.percent | float | 0.0 | 指定多少比例的內存用來存放buffer中的數據 |
Map Task和Reduce Task調優的一個原則就是
減小數據的傳輸量
儘可能使用內存
減小磁盤IO的次數
增大任務並行數
除此以外還有根據本身集羣及網絡的實際狀況來調優
在集羣部署完畢以後,根據機器的配置狀況,咱們就能夠經過必定的公式知道每一個節點上container的大小和數量
1.mapper數量
每一個做業啓動的mapper由輸入的分片數決定,每一個節點啓動的mapper數應該是在10-100之間,且最好每一個map的執行時間至少一分鐘
若是輸入的文件巨大,會產生無數個mapper的狀況,應該使用mapred.tasktracker.map.tasks.maximum參數肯定每一個tasktracker可以啓動的最大mapper數,默認只有2
以避免同時啓動過多的mapper
2.reducer數量
reducer的啓動數量官方建議是0.95或者1.75*節點數*每一個節點的container數
使用0.95的時候reduce只須要一輪就能夠完成
使用1.75的時候完成較快的reducer會進行第二輪計算,並進行負載均衡
增長reducer的數量會增長集羣的負擔,可是會獲得較好的負載均衡結果和減低失敗成本
一些詳細的參數:
選項 | 類型 | 默認值 | 描述 |
---|---|---|---|
mapred.reduce.tasks | int | 1 | reduce task數量 |
mapred.tasktracker.map.tasks.maximum | int | 2 | 每一個節點上可以啓動map task的最大數量 |
mapred.tasktracker.reduce.tasks.maximum | int | 2 | 每一個節點上可以啓動reduce task的最大數量 |
mapred.reduce.slowstart.completed.maps | float | 0.05 | map階段完成5%的時候開始進行reduce計算 |
map和reduce task是同時啓動的,很長一段時間是並存的
共存的時間取決於mapred.reduce.slowstart.completed.maps的設置
若是設置爲0.6.那麼reduce將在map完成60%後進入運行態
若是設置的map和reduce參數都很大,勢必形成map和reduce爭搶資源,形成有些進程飢餓,超時出錯,最大的可能就是socket.timeout的出錯
reduce是在33%的時候完成shuffle過程,因此確保reduce進行到33%的時候map任務所有完成,能夠經過觀察任務界面的完成度進行調整
當reduce到達33%的時候,map剛好達到100%設置最佳的比例,可讓map先完成,可是不要讓reduce等待計算資源
做者:@小黑