默認一個block對應一個分片,對應一個map taskjava
map業務處理算法
map 輸出的數據並非直接寫入磁盤的,而是會預先存儲在一個預約義的buffer中apache
對map的輸出結果進行分區,按照key進行排序和分組緩存
至關於本地端的reduce網絡
將最終的數據進行merge以後輸出到磁盤中,等待shuffleapp
對map結果進行了一次reduce操做,減小了map的輸出結果和reduce的遠程拷貝數據量,使得Map task和 Reduce task的執行時間縮短負載均衡
好比,要處理整型類型數據時,輸出類型IntWritable就比Text高效(須要轉換)
若是輸出整數的大部分能夠用一個或者兩個字節保存,可直接採用VIntWritable,VLongWritable,採用變長整型的編碼方式,減小數據輸出量。socket
map的第一步,從磁盤讀取數據並切片。當讀取海量的小文件時,會啓動大量的map task,效率很是慢,能夠經過CombineInputFormat 自定義分片策略對小文件進行合併,從而減小map task的數量,減小map 時間,此外:ide
當mapred.min.split.size小於dfs.block.size的時候,一個block會被分爲多個分片,也就是對應多個map task。函數
當mapred.min.split.size大於dfs.block.size的時候,一個分片可能對應多個block,也就是一個map task讀取多個block數據。
集羣的網絡、IO等性能很好的時候,建議調高dfs.block.size。
根據數據源的特性,主要調整mapred.min.split.size來控制map task的數量
設置Buffer話能夠減小map任務的IO開銷,從而提升性能。
首先將map結果輸出到buffer,當緩存的使用量超過80%的時候,開始溢出到磁盤(splll),buffer默認100M,能夠經過io.sort.mb設置。
可是若是將io.sort.mb調的很是大的時候,對機器的配置要求就很是高,由於佔用內存過大,因此須要根據狀況進行配置。
map並非等buffer寫滿才spill,經過io.sort.spill.percent能夠調整。這個會影響spill的頻繁程度,進而影響map task
該階段是map產生spill以後,對spill進行處理的過程,經過對其進行配置也能夠達到優化IO開銷的目的。
merge過程是並行處理spill的,每次並行多少個spill是由參數io.sort.factor指定的,默認爲10個
若是產生的spill很是多,merge的時候每次只能處理10個spill,那麼仍是會形成頻繁的IO處理
適當的調大每次並行處理的spill數有利於減小merge數所以能夠影響map的性能
可是若是調整的數值過大,並行處理spill的進程過多會對機器形成很大壓力
咱們知道若是map side設置了Combiner,那麼會根據設定的函數對map輸出的數據進行一次類reduce的預處理
可是和分組、排序分組不同的是,combine發生的階段多是在merge以前,也多是在merge以後
這個時機能夠由一個參數控制:min.num.spill.for.combine,默認值爲3 當job中設定了combiner,而且spill數最少有3個的時候,那麼combiner函數就會在merge產生結果文件以前運行
例如,產生的spill很是多,雖然咱們能夠經過merge階段的io.sort.factor進行優化配置,可是在此以前咱們還能夠經過先執行combine對結果進行處理以後再對數據進行merge 這樣一來,到merge階段的數據量將會進一步減小,IO開銷也會被降到最低
其實不管是spill的時候,仍是最後merge產生的結果文件,都是能夠壓縮的,控制輸出是否使用壓縮的參數是mapred.compress.map.output,值爲true或者false.啓用壓縮以後,會犧牲CPU的一些計算資源,可是能夠節省IO開銷,很是適合IO密集型的做業(若是是CPU密集型的做業不建議設置)
設置壓縮的時候,咱們能夠選擇不一樣的壓縮算法 Hadoop默認提供了GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等壓縮格式
一般來講,想要達到比較平衡的cpu和磁盤壓縮比,LzoCodec比較合適,但也要取決於job的具體狀況
若是想要自行選擇中間結果的壓縮算法,能夠設置配置參數:
mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec
//或者其餘用戶自行選擇的壓縮方式
==map端的性能瓶頸都是頻繁的IO操做形成的==,全部的優化也都是針對IO進行的,而優化的瓶頸又很大程度上被機器的配置等外部因素所限制
選項 | 類型 | 默認值 | 描述 |
---|---|---|---|
mapred.min.split.size | int | 1 | Input Split的最小值 |
mapred.max.split.size | int | . | Input Split的最大值 |
io.sort.mb | int | 100 | map緩衝區大小 |
io.sort.spill.percent | float | 0.8 | 緩衝區閾值 |
io.sort.factor | int | 10 | 並行處理spill的個數 |
min.num.spill.for.combine | int | 3 | 最少有多少個spill的時候combine在merge以前進行 |
mapred.compress.map.output | boolean | false | map中間數據是否採用壓縮 |
mapred.map.output.compression.codec | String | . | 壓縮算法 |
每個job都會將map輸出結果根據reduce(n)分紅n個partition。因此,爲了節省時間,==在第一個map結束後,全部reduce就開始嘗試從完成的map中下載該reduce對應的partition==
在這個shuffle過程當中,因爲map的數量一般是不少個的,而每一個map中又都有可能包含每一個reduce所須要的數據
因此對於每一個reduce來講,去各個map中拿數據也是並行的,能夠經過mapred.reduce.parallel.copies這個參數來調整,默認爲5
當map數量不少的時候,就能夠適當調大這個值,減小shuffle過程使用的時間 還有一種狀況是:reduce從map中拿數據的時候,有可能由於中間結果丟失、網絡等其餘緣由致使map任務失敗
而reduce不會由於map失敗就永無止境的等待下去,它會嘗試去別的地方得到本身的數據(這段時間失敗的map可能會被重跑) 因此設置reduce獲取數據的超時時間能夠避免一些由於網絡很差致使沒法得到數據的狀況
mapred.reduce.copy.backoff,默認300s,通常狀況下不用調整這個值,由於生產環境的網絡都是很流暢的
因爲reduce是並行將map結果下載到本地,因此也是須要進行merge的,因此io.sort.factor的配置選項一樣會影響reduce進行merge時的行爲
和map同樣,reduce下載過來的數據也是存入一個buffer中而不是立刻寫入磁盤的,因此咱們一樣能夠控制這個值來減小IO開銷 控制該值的參數爲: mapred.job.shuffle.input.buffer.percent,默認0.7,這是一個百分比,意思是reduce的可用內存中拿出70%做爲buffer存放數據
reduce的可用內存經過mapred.child.Java.opts來設置,好比置爲-Xmx1024m,該參數是同時設定map和reduce task的可用內存,通常爲map buffer大小的兩倍左右
設置了reduce端的buffer大小,咱們一樣能夠經過一個參數來控制buffer中的數據達到一個閾值的時候開始往磁盤寫數據:mapred.job.shuffle.merge.percent,默認爲0.66
sort的過程通常很是短,由於是邊copy邊merge邊sort的,後面就直接進入真正的reduce計算階段了
以前咱們說過reduc端的buffer,默認狀況下,數據達到一個閾值的時候,buffer中的數據就會寫入磁盤,而後reduce會從磁盤中得到全部的數據
也就是說,buffer和reduce是沒有直接關聯的,中間多個一個寫磁盤->讀磁盤的過程,既然有這個弊端,那麼就能夠經過參數來配置使得buffer中的一部分數據能夠直接輸送到reduce,從而減小IO開銷:==mapred.job.reduce.input.buffer.percent==,默認爲0.0
當值大於0的時候,會保留指定比例的內存讀buffer中的數據直接拿給reduce使用 這樣一來,設置buffer須要內存,讀取數據須要內存,reduce計算也要內存,因此要根據做業的運行狀況進行調整
和map階段差很少,reduce節點的調優也是主要集中在加大內存使用量,減小IO,增大並行數
reduce調優主要參數:
選項 | 類型 | 默認值 | 描述 |
---|---|---|---|
mapred.reduce.parallel.copies | int | 5 | 每一個reduce去map中拿數據的並行數 |
mapred.reduce.copy.backoff | int | 300 | 獲取map數據最大超時時間 |
mapred.job.shuffle.input.buffer.percent | float | 0.7 | buffer大小佔reduce可用內存的比例 |
mapred.child.java.opts | String | . | -Xmx1024m設置reduce可用內存爲1g |
mapred.job.shuffle.merge.percent | float | 0.66 | buffer中的數據達到多少比例開始寫入磁盤 |
mapred.job.reduce.input.buffer.percent | float | 0.0 | 指定多少比例的內存用來存放buffer中的數據 |
Map Task和Reduce Task調優的一個原則就是
減小數據的傳輸量
儘可能使用內存
減小磁盤IO的次數
增大任務並行數
除此以外還有根據本身集羣及網絡的實際狀況來調優
每一個做業啓動的mapper由輸入的分片數決定,每一個節點啓動的mapper數應該是在10-100之間,且最好每一個map的執行時間至少一分鐘 若是輸入的文件巨大,會產生無數個mapper的狀況,應該使用mapred.tasktracker.map.tasks.maximum參數肯定每一個tasktracker可以啓動的最大mapper數,默認只有2 以避免同時啓動過多的mapper
reducer的啓動數量官方建議是0.95或者1.75節點數每一個節點的container數 使用0.95的時候reduce只須要一輪就能夠完成 使用1.75的時候完成較快的reducer會進行第二輪計算,並進行負載均衡
增長reducer的數量會增長集羣的負擔,可是會獲得較好的負載均衡結果和減低失敗成本
一些詳細的參數:
選項 | 類型 | 默認值 | 描述 |
---|---|---|---|
mapred.reduce.tasks | int | 1 | reduce task數量 |
mapred.tasktracker.map.tasks.maximum | int | 2 | 每一個節點上可以啓動map task的最大數量 |
mapred.tasktracker.reduce.tasks.maximum | int | 2 | 每一個節點上可以啓動reduce task的最大數量 |
mapred.reduce.slowstart.completed.maps | float | 0.05 | map階段完成5%的時候開始進行reduce計算 |
map和reduce task是同時啓動的,很長一段時間是並存的,共存的時間取決於mapred.reduce.slowstart.completed.maps的設置,若是設置爲0.6.那麼reduce將在map完成60%後進入運行態
若是設置的map和reduce參數都很大,勢必形成map和reduce爭搶資源,形成有些進程飢餓,超時出錯,最大的可能就是socket.timeout的出錯
reduce是在33%的時候完成shuffle過程,因此==確保reduce進行到33%的時候map任務所有完成==,能夠經過觀察任務界面的完成度進行調整。當reduce到達33%的時候,map剛好達到100%設置最佳的比例,可讓map先完成,可是不要讓reduce等待計算資源