數據傾斜問題剖析
數據傾斜是分佈式系統不可避免的問題,任何分佈式系統都有概率發生數據傾斜,但有些小夥伴在平時工做中感知不是很明顯。這裏要注意本篇文章的標題—「千億級數據」,爲何說千億級,由於若是一個任務的數據量只有幾百萬,它即便發生了數據傾斜,全部數據都跑到一臺機器去執行,對於幾百萬的數據量,一臺機器執行起來仍是毫無壓力的,這時數據傾斜對咱們感知不大,只有數據達到一個量級時,一臺機器應付不了這麼多數據,這時若是發生數據傾斜,最後就很難算出結果。算法
因此就須要咱們對數據傾斜的問題進行優化,儘可能避免或減輕數據傾斜帶來的影響。sql
在解決數據傾斜問題以前,還要再提一句:沒有瓶頸時談論優化,都是自尋煩惱。數組
你們想一想,在map和reduce兩個階段中,最容易出現數據傾斜的就是reduce階段,由於map到reduce會通過shuffle階段,在shuffle中默認會按照key進行hash,若是相同的key過多,那麼hash的結果就是大量相同的key進入到同一個reduce中,致使數據傾斜。緩存
那麼有沒有可能在map階段就發生數據傾斜呢,是有這種可能的。網絡
一個任務中,數據文件在進入map階段以前會進行切分,默認是128M一個數據塊,可是若是當對文件使用GZIP壓縮等不支持文件分割操做的壓縮方式時,MR任務讀取壓縮後的文件時,是對它切分不了的,該壓縮文件只會被一個任務所讀取,若是有一個超大的不可切分的壓縮文件被一個map讀取時,就會發生map階段的數據傾斜。分佈式
因此,從本質上來講,發生數據傾斜的緣由有兩種:一是任務中須要處理大量相同的key的數據。二是任務讀取不可分割的大文件。函數
數據傾斜解決方案
MapReduce和Spark中的數據傾斜解決方案原理都是相似的,如下討論Hive使用MapReduce引擎引起的數據傾斜,Spark數據傾斜也能夠此爲參照。性能
1. 空值引起的數據傾斜
實際業務中有些大量的null值或者一些無心義的數據參與到計算做業中,表中有大量的null值,若是表之間進行join操做,就會有shuffle產生,這樣全部的null值都會被分配到一個reduce中,必然產生數據傾斜。優化
以前有小夥伴問,若是A、B兩表join操做,假如A表中須要join的字段爲null,可是B表中須要join的字段不爲null,這兩個字段根本就join不上啊,爲何還會放到一個reduce中呢?spa
這裏咱們須要明確一個概念,數據放到同一個reduce中的緣由不是由於字段能不能join上,而是由於shuffle階段的hash操做,只要key的hash結果是同樣的,它們就會被拉到同一個reduce中。
解決方案:
第一種:能夠直接不讓null值參與join操做,即不讓null值有shuffle階段
SELECT * FROM log a JOIN users b ON a.user_id IS NOT NULL AND a.user_id = b.user_id UNION ALL SELECT * FROM log a WHERE a.user_id IS NULL;
第二種:由於null值參與shuffle時的hash結果是同樣的,那麼咱們能夠給null值隨機賦值,這樣它們的hash結果就不同,就會進到不一樣的reduce中:
SELECT * FROM log a LEFT JOIN users b ON CASE WHEN a.user_id IS NULL THEN concat('hive_', rand()) ELSE a.user_id END = b.user_id;
- 不一樣數據類型引起的數據傾斜
對於兩個表join,表a中須要join的字段key爲int,表b中key字段既有string類型也有int類型。當按照key進行兩個表的join操做時,默認的Hash操做會按int型的id來進行分配,這樣全部的string類型都被分配成同一個id,結果就是全部的string類型的字段進入到一個reduce中,引起數據傾斜。
解決方案:
若是key字段既有string類型也有int類型,默認的hash就都會按int類型來分配,那咱們直接把int類型都轉爲string就行了,這樣key字段都爲string,hash時就按照string類型分配了:
SELECT * FROM users a LEFT JOIN logs b ON a.usr_id = CAST(b.user_id AS string);
- 不可拆分大文件引起的數據傾斜
當集羣的數據量增加到必定規模,有些數據須要歸檔或者轉儲,這時候每每會對數據進行壓縮;當對文件使用GZIP壓縮等不支持文件分割操做的壓縮方式,在往後有做業涉及讀取壓縮後的文件時,該壓縮文件只會被一個任務所讀取。若是該壓縮文件很大,則處理該文件的Map須要花費的時間會遠多於讀取普通文件的Map時間,該Map任務會成爲做業運行的瓶頸。這種狀況也就是Map讀取文件的數據傾斜。
解決方案:
這種數據傾斜問題沒有什麼好的解決方案,只能將使用GZIP壓縮等不支持文件分割的文件轉爲bzip和zip等支持文件分割的壓縮方式。
因此,咱們在對文件進行壓縮時,爲避免因不可拆分大文件而引起數據讀取的傾斜,在數據壓縮的時候能夠採用bzip2和Zip等支持文件分割的壓縮算法。
- 數據膨脹引起的數據傾斜
在多維聚合計算時,若是進行分組聚合的字段過多,以下:
select a,b,c,count(1)from log group by a,b,c with rollup;
注:對於最後的with rollup關鍵字不知道你們用過沒,with rollup是用來在分組統計數據的基礎上再進行統計彙總,即用來獲得group by的彙總信息。
若是上面的log表的數據量很大,而且Map端的聚合不能很好地起到數據壓縮的狀況下,會致使Map端產出的數據急速膨脹,這種狀況容易致使做業內存溢出的異常。若是log表含有數據傾斜key,會加重Shuffle過程的數據傾斜。
解決方案:
能夠拆分上面的sql,將with rollup拆分紅以下幾個sql:
SELECT a, b, c, COUNT(1) FROM log GROUP BY a, b, c; SELECT a, b, NULL, COUNT(1) FROM log GROUP BY a, b; SELECT a, NULL, NULL, COUNT(1) FROM log GROUP BY a; SELECT NULL, NULL, NULL, COUNT(1) FROM log;
可是,上面這種方式不太好,由於如今是對3個字段進行分組聚合,那若是是5個或者10個字段呢,那麼須要拆解的SQL語句會更多。
在Hive中能夠經過參數 hive.new.job.grouping.set.cardinality 配置的方式自動控制做業的拆解,該參數默認值是30。表示針對grouping sets/rollups/cubes這類多維聚合的操做,若是最後拆解的鍵組合大於該值,會啓用新的任務去處理大於該值以外的組合。若是在處理數據時,某個分組聚合的列有較大的傾斜,能夠適當調小該值。
- 錶鏈接時引起的數據傾斜
兩表進行普通的repartition join時,若是錶鏈接的鍵存在傾斜,那麼在 Shuffle 階段必然會引發數據傾斜。
解決方案:
一般作法是將傾斜的數據存到分佈式緩存中,分發到各個Map任務所在節點。在Map階段完成join操做,即MapJoin,這避免了 Shuffle,從而避免了數據傾斜。
MapJoin是Hive的一種優化操做,其適用於小表JOIN大表的場景,因爲表的JOIN操做是在Map端且在內存進行的,因此其並不須要啓動Reduce任務也就不須要通過shuffle階段,從而能在必定程度上節省資源提升JOIN效率。
在Hive 0.11版本以前,若是想在Map階段完成join操做,必須使用MAPJOIN來標記顯示地啓動該優化操做,因爲其須要將小表加載進內存因此要注意小表的大小。
如將a表放到Map端內存中執行,在Hive 0.11版本以前須要這樣寫:
select /* +mapjoin(a) */ a.id , a.name, b.age
from a join b
on a.id = b.id;
若是想將多個表放到Map端內存中,只需在mapjoin()中寫多個表名稱便可,用逗號分隔,如將a表和c表放到Map端內存中,則 / +mapjoin(a,c) / 。
在Hive 0.11版本及以後,Hive默認啓動該優化,也就是不在須要顯示的使用MAPJOIN標記,其會在必要的時候觸發該優化操做將普通JOIN轉換成MapJoin,能夠經過如下兩個屬性來設置該優化的觸發時機:
hive.auto.convert.join=true 默認值爲true,自動開啓MAPJOIN優化。 hive.mapjoin.smalltable.filesize=2500000 默認值爲2500000(25M),經過配置該屬性來肯定使用該優化的表的大小,若是表的大小小於此值就會被加載進內存中。
注意:使用默認啓動該優化的方式若是出現莫名其妙的BUG(好比MAPJOIN並不起做用),就將如下兩個屬性置爲fase手動使用MAPJOIN標記來啓動該優化:
hive.auto.convert.join=false (關閉自動MAPJOIN轉換操做) hive.ignore.mapjoin.hint=false (不忽略MAPJOIN標記)
再提一句:將表放到Map端內存時,若是節點的內存很大,但仍是出現內存溢出的狀況,咱們能夠經過這個參數 mapreduce.map.memory.mb 調節Map端內存的大小。
- 確實沒法減小數據量引起的數據傾斜
在一些操做中,咱們沒有辦法減小數據量,如在使用 collect_list 函數時:
select s_age,collect_list(s_score) list_score from student group by s_age
collect_list:將分組中的某列轉爲一個數組返回。
在上述sql中,s_age若是存在數據傾斜,當數據量大到必定的數量,會致使處理傾斜的reduce任務產生內存溢出的異常。
注:collect_list輸出一個數組,中間結果會放到內存中,因此若是collect_list聚合太多數據,會致使內存溢出。
有小夥伴說這是 group by 分組引發的數據傾斜,能夠開啓hive.groupby.skewindata參數來優化。咱們接下來分析下:
開啓該配置會將做業拆解成兩個做業,第一個做業會盡量將Map的數據平均分配到Reduce階段,並在這個階段實現數據的預聚合,以減小第二個做業處理的數據量;第二個做業在第一個做業處理的數據基礎上進行結果的聚合。
hive.groupby.skewindata的核心做用在於生成的第一個做業可以有效減小數量。可是對於collect_list這類要求全量操做全部數據的中間結果的函數來講,明顯起不到做用,反而由於引入新的做業增長了磁盤和網絡I/O的負擔,而致使性能變得更爲低下。
解決方案:
這類問題最直接的方式就是調整reduce所執行的內存大小。
調整reduce的內存大小使用mapreduce.reduce.memory.mb這個配置。