淺析 Hadoop 中的數據傾斜

    最近幾回被問到關於數據傾斜的問題,這裏找了些資料也結合一些本身的理解.html

    在並行計算中咱們總但願分配的每個task 都能以差很少的粒度來切分而且完成時間相差不大,可是集羣中可能硬件不一樣,應用的類型不一樣和切分的數據大小不一致總會致使有部分任務極大的拖慢了整個任務的完成時間,硬件不一樣就不說了,應用的類型不一樣其中就好比page rank 或者data mining 裏面一些計算,它的每條記錄消耗的成本不太同樣,這裏只討論關於關係型運算的(通常能用SQL表述的) 數據切分上的數據傾斜問題.算法

    hadoop 中數據傾斜會極大影響性能的一個背景是mapreduce 框架中老是不分條件的進行sort . 在通用狀況下map sort + partition +reduce sort 能夠獲得結果,可是這個過程不必定是最優的.  對於關係型計算,其中數據傾斜影響最大的地方在reduce 的sort , reduce 處理的數據量的大小若是超過給定的reduce jvm 的大小的2倍不到的閾值的時候(這個閾值是我猜想的,具體以實際監控運行狀況爲準),reduce 端會發生multi-pass merge sort 的狀況, 這個時候觀察這些運行較慢的reduce task 的metrics 會發現reduce 跟IO 相關的metrics 會比其餘reduce 大不少. 具體的細節參考今年hadoop summit 上Todd 的performance tuning 的ppt (26頁):sql

http://www.slideshare.net/cloudera/mr-perfapache

這種在reduce 端不分條件的排序只是hadoop 是這種實現,並非mapreduce 框架必定須要排序,其餘的mapreduce 實現或者其餘的分佈式計算框架可能在reduce 上的這種瓶頸會小一些, 好比shark 裏面的group by 就是基於hash 而不是sort 的.性能優化

 

    對於關係型的計算中常見的數據傾斜有兩種:group by 和 join , 其餘有可能的有:app

 

in或exists 的操做尤爲是in或exists 做爲subquery 的返回(in 或exists 有時候會變成left semi join),框架

有相同輸入源的union 或union all 也許也會有(其餘集合類型的操做intersect 之類也許也是). dom

hive 中的udtf 也算一種.jvm

這裏只討論最多見的group by 和join 的狀況.分佈式

    

數據分佈:

    正常的數據分佈理論上都是傾斜的,就是咱們所說的20-80原理:80%的財富集中在20%的人手中, 80%的用戶只使用20%的功能 , 20%的用戶貢獻了80%的訪問量 , 不一樣的數據字段可能的數據傾斜通常有兩種狀況:

一種是惟一值很是少,極少數值有很是多的記錄值(惟一值少於幾千)

一種是惟一值比較多,這個字段的某些值有遠遠多於其餘值的記錄數,可是它的佔比也小於百分之一或千分之一

 

分區:

常見的mapreduce分區方式爲hash 和range ,

hash partition 的好處是比較彈性,跟數據類型無關,實現簡單(設定reduce個數就好,通常不須要本身實現)

range partition 須要實現者本身瞭解數據分佈, 有時候須要手工作sample取樣. 同時也不夠彈性, 表如今幾個方面,1. 對同一個表的不一樣字段都須要實現不一樣的range partition,  對於時間這種字段根據查詢類型的不一樣或者過濾條件的不一樣切分range 的大小都不必定.

2 .有時候可能設計使用多個字段組合的狀況, 這時候又不能使用以前單個字段的partition 類, 而且多個字段組合之間有可能有隱含的聯繫,好比出生日期和星座,商品和季節.

3. 手工作sample 很是耗時間,須要使用者對查詢使用的數據集的分佈有領域知識.

4. 分配方式是死的,reduce 個數是肯定的,一旦某種狀況下發生傾斜,調整參數

其餘的分區類型還有hbase 的hregionpartitioner  或者totalorder partitioner  等.

 

可以想到的關於數據傾斜的一些解決方式(歡迎補充,尤爲是有沒有作搜索或者數據挖掘的朋友有碰到相似問題):

1. 增長reduce 的jvm內存

2. 增長reduce 個數

3. customer partition

4. 其餘優化的討論.

5. reduce sort merge排序算法的討論

6. 正在實現中的hive skewed join.

7. pipeline

8. distinct

9. index 尤爲是bitmap index

 

方式1:既然reduce 自己的計算須要以合適的內存做爲支持,在硬件環境允許的狀況下,增長reduce 的內存大小顯然有改善數據傾斜的可能,這種方式尤爲適合數據分佈第一種狀況,單個值有大量記錄, 這種值的全部紀錄已經超過了分配給reduce 的內存,不管你怎麼樣分區這種狀況都不會改變. 固然這種狀況的限制也很是明顯, 1.內存的限制存在,2.可能會對集羣其餘任務的運行產生不穩定的影響.

 

方式2:  這個對於數據分佈第二種狀況有效,惟一值較多,單個惟一值的記錄數不會超過度配給reduce 的內存. 若是發生了偶爾的數據傾斜狀況,增長reduce 個數能夠緩解偶然狀況下的某些reduce 不當心分配了多個較多記錄數的狀況. 可是對於第一種數據分佈無效.

 

方式3: 一種狀況是某個領域知識告訴你數據分佈的顯著類型,好比hadoop definitive guide 裏面的溫度問題,一個固定的組合(觀測站點的位置和溫度) 的分佈是固定的, 對於特定的查詢若是前面兩種方式都沒用,實現本身的partitioner 也許是一個好的方式.

 

方式4: 目前有的一些針對數據傾斜的優化好比pig 的skewed join

http://pig.apache.org/docs/r0.7.0/piglatin_ref1.html#Skewed+Joins

pig 文檔上面說是根據數據輸入的統計信息來肯定分區(也就是range partition?),另外不清楚這個行爲是不是動態運行時候才決定的,也就是運行以前有一步pig 自動作sample 的工做,由於pig 是沒有統計信息這一說的.

hive 中的group by

<property> 
  <name>hive.groupby.skewindata</name> 
  <value>false</value> 
  <description>Whether there is skew in data to optimize group by queries</description> 
</property> 
<property> 
  <name>hive.optimize.groupby</name> 
  <value>true</value> 
  <description>Whether to enable the bucketed group by from bucketed partitions / tables.</description> 
</property>

<property> 
  <name>hive.mapjoin.followby.map.aggr.hash.percentmemory</name> 
  <value>0.3</value> 
  <description>Portion of total memory to be used by map-side grup aggregation hash table, when this group by is followed by map join</description> 
</property> 
<property> 
  <name>hive.groupby.mapaggr.checkinterval</name> 
  <value>100000</value> 
  <description>Number of rows after which size of the grouping keys/aggregation classes is performed</description> 
</property>

其中最後一個參數hive.groupby.mapaggr.checkinterval 的思路跟in-memory combiner 類似, in-memeory combiner  是發生在mapper 端sort 以前,而不是如今的combiner發生在mapper sort 以後甚至在寫入磁盤以後從新讀磁盤而後排序合併. in-memeory combiner 最先好像是《Data-Intensive Text Processing with MapReduce》,mapr 去年的介紹ppt 裏面好像提到它們也有這個優化. mapper 端減小數據的機會比reduce 端的要大,因此通常不會看到reduce 端的combiner 的討論,可是這種思路也有,好比google tenzing 的join 討論裏面有一個prev-next 的小優化就是基於reduce 端的combiner, 但那個前提是基於block shuffle 實現的基礎上,數據已經排過序了,因此join 時候前一條數據跟後一條數據相同的機率很大.

hive 中的skewed join :  以前的文章已經介紹過兩表join 中hive 的幾個優化,其中的skewed join 的相似思路就是上面介紹的skewed 的第二種:增長reduce 的個數,hive 中是經過判斷閾值若是大於一個reduce 須要處理的數據量,從新起額外的task 來處理這些超額的reduce 自己須要處理的數據, 這是一種較晚的補救措施,自己hive 開始分區的時候已經傾斜(partition 的方式不合理), 當運行的時候經過運行時監控reduce 發現傾斜的特殊key 而後額外的起task 去處理,效果比較通常,感興趣的同窗能夠參考HIVE-3086 裏面我和facebook 團隊對這種優化思路的討論. 第六節我會討論一下我所認爲的思路和facebook 正在作的思路之間的差異.

 

方式5 :  reduce 分配的內存遠小於處理的數據量時,會產生multi-pass sort 的狀況是瓶頸,那麼就要問

1. 這種排序是有必要的嘛?

2. 是否有其餘排序算法或優化能夠根據特定狀況下降他瓶頸的閾值?

3. map reduce 適合處理這種狀況嘛?

關於問題1. 若是是group by , 那麼對於數據分佈狀況1 ,hash 比sort 好很是多,即便某一個reduce 比其餘reduce 處理多的多的數據,hash 的計算方式也不會差距太大.

問題2. 一個是若是實現block shuffle 確定會極大的減小排序自己的成本, 另外,若是分區以後的reduce 不是使用copy –> sort-merge –> reduce 的計算方式, 在copy 以後將每一個block 的頭部信息保存在內存中,不用sort – merge 也能夠直接計算reduce, 只不過這時候變成了隨機訪問,而不是如今的sort-merge 以後的順序訪問. block shuffle 的實現有兩種類型,一種是當hadoop 中真正有了列數據格式的時候,數據有更大的機會已經排過序而且按照block 來切分,通常block 爲1M ( 能夠關注avro-806 )  , 這時候的mapper 什麼都不作,甚至連計算分區的開銷都小了不少倍,直接進入reduce 最後一步,第二種類型爲沒有列數據格式的支持,須要mapper 排序獲得以後的block 的最大最小值,reduce 端在內存中保存最大最小值,copy  完成後直接用這個值來作隨機讀而後進行reduce. ( block shuffle  的實現能夠關注 MAPREDUCE-4039 , hash 計算能夠關注 MAPREDUCE-1639)

問題3 . map reduce 只有兩個函數,一個map 一個 reduce, 一旦發生數據傾斜就是partition 失效了,對於join 的例子,某一個key 分配了過多的記錄數,對於只有一次partittion的機會,分配錯了數據傾斜的傷害就已經形成了,這種狀況很難調試,可是若是你是基於map-reduce-reduce 的方式計算,那麼對於同一個key 不須要分配到同一個reduce 中,在第一個reduce 中獲得的結果能夠在第二個reduce 才彙總去重,第二個reduce 不須要sort – merge 的步驟,由於前一個reduce 已經排過序了,中間的reduce 處理的數據不用關心partition 怎麼分,處理的數據量都是同樣大,而第二個reduce 又不使用sort-merge 來排序,不會遇到如今的內存大小的問題,對於skewed join 這種狀況瓶頸天然小不少.

 

方式6:  目前hive 有幾個正在開發中的處理skewed join 狀況的jira case,  HIVE-3086 , HIVE-3286 ,HIVE-3026 . 簡單介紹一下就是facebook 但願經過手工處理提早枚舉的方式列出單個傾斜的值,在join 的時候將這些值特殊列出看成map join 來處理,對於其餘值使用原來的方式. 我我的以爲這太不伸縮了,值自己沒有考慮應用過濾條件和優化方式以後的數據量大小問題,他們提早列出的值都是基於整個分區的. join key 若是爲組合key 的狀況也應該沒有考慮,對metastore 的儲存問題有限制,對輸入的大表和小表都會scan 兩次( 一次處理非skew key , 一次處理skew key 作map join), 對輸出表也會scan 兩次(將兩個結果進行merge) , skew key 必須提早手工列出這又存在額外維護的成本,目前由於尚未完整的開發完到可以投入生產的狀況,因此等全部特性處理完了有了文檔在看看這個處理方式是否有效,我我的認爲的思路應該是接着bucked map join 的思路往下走,只不過不用提早處理cluster key 的問題, 這時候cluster key 的選擇應該是join key + 某個能分散join key 的列, 這等於將大表的同一個key 的值分散到了多個不一樣的reduce 中,而小表的join key 也必須cluster 到跟大表對應的同一個key , join 中對於數據分佈第二種狀況不用太難,增長reduce 個數就好,主要是第一種,須要大表的join key 可以分散,對於一樣join key 的小表又可以匹配到全部大表中的記錄. 這種思路就是不用掃描大表兩遍或者結果輸出表,不須要提早手工處理,數據是動態sample 的應用了過濾條件以後的數據,而不是提早基於統計數據的不許確結果. 這個基本思路跟tenzing 裏面描述的distributed hash join 是同樣的,想辦法切成合適的大小而後用hash 和 map join .

 

方式7: 當同時出現join 和group 的時候, 那麼這兩個操做應該是以pipeline (管道) 的方式執行. 在join 的時候就能夠直接使用group 的操做符減小大量的數據,而不是等待join 完成,而後寫入磁盤,group 又讀取磁盤作group操做. HIVE-2206 正在作這個優化. hive 裏面是沒有pipeline 這個概念的. 像是cloudera 的crunch 或者twitter 的Scalding 都是有這種概念的.

 

方式8: distinct 自己就是group by 的一種簡寫,我原先覺得count(distinct x)這種跟group by 是同樣的,可是發現hive 裏面distinct 明顯比group by 要慢,可能跟group by 會有map 端的combiner有關, 另外觀察到hive 在預估count(distinct x) 的reduce 個數比group by 的個數要少 , 因此hive 中使用count(distinct x) , 要麼儘可能把reduce 個數設置大,直接設置reduce 個數或者hive.exec.reducers.bytes.per.reducer 調小,我我的比較喜歡調後面一個,hive 目前的reduce 個數沒有統計信息的狀況下就是用map端輸入以前的數值, 若是你是join 以後還用count(distinct x) 的話,這個默認值通常都會悲劇,若是有where 條件並能過濾必定數量的數據,那麼默認reduce 個數可能就還好一點. 無論怎樣,多浪費一點reduce slot 總比等十幾甚至幾十分鐘要好, 或者轉換成group by 的寫法也不錯,寫成group by 的時候distributed by 也頗有幫助.

 

方式9: hive 中的index 就是物化視圖,對於group by 和distinct 的狀況等於變成了map 端在作計算,天然不存在傾斜. 尤爲是bitmap index , 對於惟一值比較少的列優點更大,不過index 麻煩的地方在於須要判斷你的sql 是否是經常使用sql , 另外若是create index 的時候沒有選你查詢的時候用的字段,這個index 是不能用的( hive 中是永遠不可能有DBMS中的用index 去lookup 或者join 原始表這種概念的)

 

 

 

其餘建議:

網上能找到的另一份很好的描述數據傾斜的資料是

http://nuage.cs.washington.edu/pubs/opencirrus2011.pdf

裏面的map side skew 和expensive record 都不是關係型計算中的問題,因此不是這篇文章關注點. 對於關係型計算,其中數據傾斜影響最大的地方在reduce 的sort. 這篇文章裏面最後總結的5點好的建議值得參考,

其中第三條須要你知道應用combiner 和特殊優化方式是否帶來了性能的提高,hive 的map aggr 在數據分佈狀況1效果會比較好,數據分佈狀況2效果就不大,還有combiner 應用的時候是消耗了系統資源的,確認這種消耗是否值得而不是任何狀況下都使用combiner. 

對於第四點關係型計算中map 傾斜狀況不太常見. 一種能夠舉出來的例子是分區不合理,或者hive 中的cluster by 的key 選擇不合理(都是使用目錄的方式分區, 目錄是最小處理單元了).

 

  • Use domain knowledge when choosing the 
    map output partitioning scheme if the reduce operation is 
    expensive: Range partition or some other form of explicit 
    partition may be better than the default hash-partition
  • Try different partitioning schemes on sample 
    workloads or collect the data distribution at the reduce input 
    if a MapReduce job is expected to run several times
  • Implement a combiner to reduce the amount 
    of data going into the reduce-phase and, as such, significantly 
    dampen the effects of any type of reduce-skew
  • Use a pre-processing MapReduce job that 
    extracts properties of the input data in the case of a longruning, 
    skew-prone map phase. Appropriately partitioning the 
    data before the real application runs can significantly reduce 
    skew problems in the map phase.
  • Best Practice 5. Design algorithms whose runtime depends 
    only on the amount of input data and not the data distribution.

    另一份是淘寶的數據傾斜總結:

    http://www.alidata.org/archives/2109

    不過我我的以爲幫助不是太大,裏面第一個解決方式空值產生的影響第一個Union All 的方式我的是極力反對的,同一個表尤爲是大表掃描兩遍這額外的成本跟收益太不匹配,不推薦,第二個將特殊值變成random 的方式, 這個產生的結果是正確的嘛? 尤爲是在各類狀況下輸出結果是正確的嘛?裏面背景好像是那個小表users 的主鍵爲userid, 而後userid 又是join key , 並且還不爲空? 不太推薦,背景條件和輸出的正確性與否存疑.

    第二個數據類型不一樣的問題我以爲跟HIVE-3445 都算是數據建模的問題,提早修改好是同樣的.

    第三個是由於淘寶的hadoop 版本中沒有map side hash aggr 的參數吧. 並且寫成distinct 還多了一個MR 步驟,不太推薦.

     

    數據傾斜在MPP 中也是一個課題,這也設計到一個數據重分配的問題,可是相對於MPP 中有比較成熟的機制,一個是mpp 在處理數據初始分佈的時候老是會指定segmented by 或者distributed by 這種顯示分配到不一樣物理機器上的建表語句. 還有就是統計信息會幫助執行引擎選擇合適的從新分佈.可是統計信息也不是萬能的,好比

    1:統計信息的粒度和更新問題.

    2: 應用了過濾條件以後的數據也許不符合原始指望的數據分佈.

    3: 統計信息是基於採樣的,總於真實全部數據存在偏差.

    4: 統計信息是基於partittion 的, 對於查詢沒有涉及到partition 字段的切分就不能使用各partition 只和來表示整體的統計信息.

    5. 臨時表或者多步驟查詢的中間過程數據沒有統計信息的狀況.

    6. 各類其餘的算法優化好比in-mapper combiner 或者google Tenzing 的prev – next combine 都會影響統計信息對於算法選擇的不一樣.

     

    總結:

    數據傾斜沒有一勞永逸的方式能夠解決,瞭解你的數據集的分佈狀況,而後瞭解你所使用計算框架的運行機制和瓶頸,針對特定的狀況作特定的優化,作多種嘗試,觀察是否有效.

Refer:

[1] 【技術博客】Spark性能優化指南——高級篇:數據傾斜調優

http://bit.ly/2286KL3

相關文章
相關標籤/搜索