本文內容主要轉至如下兩篇文章:html
http://www.cnblogs.com/skyl/p/4776083.htmlgit
map/reduce程序執行時,reduce節點大部分執行完畢,可是有一個或者幾個reduce節點運行很慢(或卡在完成率99%左右),致使整個程序的處理時間很長,這是由於某一個key的條數比其餘key多不少(有時是百倍或者千倍之多),這條key所在的reduce節點所處理的數據量比其餘節點就大不少,從而致使某幾個節點遲遲運行不完,此稱之爲數據傾斜。sql
傾斜分紅group by形成的傾斜和join形成的傾斜,須要分開看。apache
group by形成的傾斜相對來講比較容易解決。hive提供兩個參數能夠解決,一個是hive.map.aggr
,默認值已經爲true,他的意思是作map aggregation,也就是在mapper裏面作聚合。這個方法不一樣於直接寫mapreduce的時候能夠實現的combiner,可是卻實現了相似combiner的效果。事實上各類基於mr的框架如pig,cascading等等用的都是map aggregation(或者叫partial aggregation)而非combiner的策略,也就是在mapper裏面直接作聚合操做而不是輸出到buffer給combiner作聚合。對於map aggregation,hive還會作檢查,若是aggregation的效果很差,那麼hive會自動放棄map aggregation。判斷效果的依據就是通過一小批數據的處理以後,檢查聚合後的數據量是否減少到必定的比例,默認是0.5,由hive.map.aggr.hash.min.reduction
這個參數控制。因此若是確認數據裏面確實有個別取值傾斜,可是大部分值是比較稀疏的,這個時候能夠把比例強制設爲1,避免極端狀況下map aggr失效。編程
hive.map.aggr
還有一些相關參數,好比map aggr的內存佔用等,具體能夠參考這篇文章。http://dev.bizo.com/2013/02/map-side-aggregations-in-apache-hive.htmlapp
另外一個參數是hive.groupby.skewindata
。這個參數的意思是作reduce操做的時候,拿到的key並非全部相同值給同一個reduce,而是隨機分發,而後reduce作聚合,作完以後再作一輪MR,拿前面聚合過的數據再算結果。因此這個參數其實跟hive.map.aggr
作的是相似的事情,只是拿到reduce端來作,並且要額外啓動一輪job,因此其實不怎麼推薦用,效果不明顯。負載均衡
另外須要注意的是count distinct操做每每須要改寫SQL,能夠按照下面這麼作:框架
/*改寫前*/ select a, count(distinct b) as c from tbl group by a; /*改寫後*/ select a, count(*) as c from (select a, b from tbl group by a, b) group by a;
join形成的傾斜,常見狀況是不能作map join的兩個表(能作map join的話基本上能夠避免傾斜),其中一個是行爲表,另外一個應該是屬性表。好比咱們有三個表,一個用戶屬性表users
,一個商品屬性表items
,還有一個用戶對商品的操做行爲表日誌表logs
。假設如今須要將行爲表關聯用戶表:ide
select * from logs a join users b on a.user_id = b.user_id;
其中logs表裏面會有一個特殊用戶user_id = 0
,表明未登陸用戶,假如這種用戶佔了至關的比例,那麼個別reduce會收到比其餘reduce多得多的數據,由於它要接收全部user_id = 0
的記錄進行處理,使得其處理效果會很是差,其餘reduce都跑完好久了它還在運行。
hive給出的解決方案叫skew join,其原理把這種user_id = 0
的特殊值先不在reduce端計算掉,而是先寫入hdfs,而後啓動一輪map join專門作這個特殊值的計算,指望能提升計算這部分值的處理速度。固然你要告訴hive這個join是個skew join,即:
set hive.optimize.skewjoin = true;
還有要告訴hive如何判斷特殊值,根據hive.skewjoin.key
設置的數量hive能夠知道,好比默認值是100000,那麼超過100000條記錄的值就是特殊值。總結起來,skew join的流程能夠用下圖描述:
不過,這種方法還要去考慮閾值之類的狀況,其實也不夠通用。因此針對join傾斜的問題,通常都是經過改寫sql解決。對於上面這個問題,咱們已經知道user_id = 0
是一個特殊key,那麼能夠把特殊值隔離開來單獨作join,這樣特殊值確定會轉化成map join,非特殊值就是沒有傾斜的普通join了:
select * from (select * from logs where user_id = 0) a join (select * from users where user_id = 0) b on a.user_id = b.user_id union all select * from logs a join users b on a.user_id <> 0 and a.user_id = b.user_id;
上面這種個別key傾斜的狀況只是一種傾斜狀況。最多見的傾斜是由於數據分佈自己就具備長尾性質,好比咱們將日誌表和商品表關聯:
select * from logs a join items b on a.item_id = b.item_id;
這個時候,分配到熱門商品的reducer就會很慢,由於熱門商品的行爲日誌確定是最多的,並且咱們也很難像上面處理特殊user那樣去處理item。這個時候就會用到加隨機數的方法,也就是在join的時候增長一個隨機數,隨機數的取值範圍n至關於將item給分散到n個reducer:
select a.*, b.* from (select *, cast(rand() * 10 as int) as r_id from logs)a join (select *, r_id from items lateral view explode(range_list(1,10)) rl as r_id )b on a.item_id = b.item_id and a.r_id = b.r_id
上面的寫法裏,對行爲表的每條記錄生成一個1-10的隨機整數,對於item屬性表,每一個item生成10條記錄,隨機key分別也是1-10,這樣就能保證行爲表關聯上屬性表。其中range_list(1,10)表明用udf實現的一個返回1-10整數序列的方法。這個作法是一個解決join傾斜比較根本性的通用思路,就是如何用隨機數將key進行分散。固然,能夠根據具體的業務場景作實現上的簡化或變化。
除了上面兩類狀況,還有一類狀況是由於業務設計致使的問題,也就是說即便行爲日誌裏面join key的數據分佈自己並不明顯傾斜,可是業務設計致使其傾斜。好比對於商品item_id
的編碼,除了自己的id序列,還人爲的把item的類型也做爲編碼放在最後兩位,這樣若是類型1(電子產品)的編碼是00,類型2(家居產品)的編碼是01,而且類型1是主要商品類,將會形成以00爲結尾的商品總體傾斜。這時,若是reduce的數量剛好是100的整數倍,會形成partitioner把00結尾的item_id
都hash到同一個reducer,引爆問題。這種特殊狀況能夠簡單的設置合適的reduce值來解決,可是這種坑對於不瞭解業務的狀況下就會比較隱蔽。
1.萬能膏藥:hive.groupby.skewindata=true
當選項設定爲 true,生成的查詢計劃會有兩個 MR Job。
第一個 MR Job 中,Map 的輸出結果集合會隨機分佈到 Reduce 中,每一個 Reduce 作部分聚合操做,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不一樣的 Reduce 中,從而達到負載均衡的目的
第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分佈到 Reduce 中(這個過程能夠保證相同的 Group By Key 被分佈到同一個 Reduce 中),最後完成最終的聚合操做。
1.1.參數調優:hive.map.aggr=true. Map端部分聚合,至關於Combiner 。
2. 大小表關聯:
可使用Map Join讓小的維度表(1000條如下的記錄條數)先進內存。在map端完成reduce.
3. 大表和大表關聯:
把空值NULL的key變成一個字符串加上隨機數,把傾斜的數據分到不一樣的reduce上,因爲null值關聯不上,處理後並不影響最終結果。例如:Demo1.空值數據傾斜 (下面的例子)。
4. count distinct大量相同特殊值:
count distinct時,將值爲空的狀況單獨處理。若是是計算count distinct,能夠不用處理,直接過濾,在最後結果中加1。若是還有其餘計算,須要進行group by,能夠先將值爲空的記錄單獨處理,再和其餘計算結果進行union。
Demo1.空值數據傾斜
場景:如日誌中,常會有信息丟失的問題,好比全網日誌中的user_id,若是取其中的user_id和bmw_users關聯,會碰到數據傾斜的問題。
解決方法1: user_id爲空的不參與關聯
Select * From log a Join bmw_users b On a.user_id is not nullAnd a.user_id = b.user_id Union all Select * from log a where a.user_id is null;
解決方法2 :賦予空值新的key值
Select * from log a left outer Join bmw_users b on case when a.user_id is null then concat(‘dp_hive’,rand()) else a.user_id end = b.user_id;
結論:方法2比方法1效率更好,不但io少了,並且做業數也少了。
方法1的log讀取兩次,jobs是2。方法2的job數是1。這個優化適合無效id(好比-99,’’,null等)產生的傾斜問題。把空值的key變成一個字符串加上隨機數,就能把傾斜的數據分到不一樣的reduce上 ,解決數據傾斜問題。
Demo2.不一樣數據類型關聯產生數據傾斜
場景:一張表s8的日誌,每一個商品一條記錄,要和商品表關聯。但關聯卻碰到傾斜的問題,s8的日誌中有字符串商品id,也有數字的商品id,類型是string的,但商品中的數字id是bigint的。
問題緣由:把s8的商品id轉成數字id作hash來分配reduce,因此字符串id的s8日誌,都到一個reduce上了,解決的方法驗證了這個猜想。
解決方法:把數字類型轉換成字符串類型;
Select * from s8_log a Left outer join r_auction_auctions b On a.auction_id = cast(b.auction_id as string);
Demo3.大表Join的數據偏斜
MapReduce編程模型下開發代碼須要考慮數據偏斜的問題,Hive代碼也是同樣。數據偏斜的緣由包括如下兩點:
1. Map輸出key數量極少,致使reduce端退化爲單機做業。
2. Map輸出key分佈不均,少許key對應大量value,致使reduce端單機瓶頸。
Hive中咱們使用MapJoin解決數據偏斜的問題,即將其中的某個小表(全量)分發到全部Map端的內存進行Join,從而避免了reduce。這要求分發的表能夠被全量載入內存。
極限狀況下,Join兩邊的表都是大表,就沒法使用MapJoin。這種問題最爲棘手,目前已知的解決思路有兩種:
1. 若是是上述狀況1,考慮先對Join中的一個表去重,以此結果過濾無用信息。
這樣通常會將其中一個大表轉化爲小表,再使用MapJoin 。一個實例是廣告投放效果分析,
例如將廣告投放者信息表i中的信息填充到廣告曝光日誌表w中,使用投放者id關聯。由於實際廣告投放者數量不多(可是投放者信息表i很大),所以能夠考慮先在w表中去重查詢全部實際廣告投放者id列表,以此Join過濾表i,這一結果必然是一個小表,就可使用MapJoin。
select /*+mapjoin(x)*/ * from log a left outer join ( select /*+mapjoin(c)*/ d.* from (select distinct user_id from log ) c join users d on c.user_id = d.user_id ) x on a.user_id = b.user_id;
2. 若是是上述狀況2,考慮切分Join中的一個表爲多片,以便將切片所有載入內存,而後採用屢次MapJoin獲得結果。
一個實例是商品瀏覽日誌分析,例如將商品信息表i中的信息填充到商品瀏覽日誌表w中,使用商品id關聯。可是某些熱賣商品瀏覽量很大,形成數據偏斜。例如,如下語句實現了一個inner join邏輯,將商品信息表拆分紅2個表:
select * from ( select w.id, w.time, w.amount, i1.name, i1.loc, i1.cat from w left outer join i sampletable(1 out of 2 on id) i1 ) union all ( select w.id, w.time, w.amount, i2.name, i2.loc, i2.cat from w left outer join i sampletable(1 out of 2 on id) i2 );