Hive、Inceptor數據傾斜詳解及解決


1、傾斜形成的緣由

正常的數據分佈理論上都是傾斜的,就是咱們所說的20-80原理:80%的財富集中在20%的人手中, 80%的用戶只使用20%的功能 , 20%的用戶貢獻了80%的訪問量。sql

俗話是,一我的累死,其餘人閒死的局面性能

這也違背了並行計算的初衷,首先一個節點要承受着巨大的壓力,而其餘節點計算完畢後要一直等待這個忙碌的節點,也拖累了總體的計算時間,能夠說效率是十分低下的。優化

下面舉個簡單的例子:spa

舉個 word count 的入門例子: blog

它的map 階段就是造成 (「aaa」,1)的形式,而後在reduce 階段進行 value 相加,得出 「aaa」 出現的次數。內存

若進行 word count 的文本有100G,其中 80G 所有是 「aaa」 剩下 20G 是其他單詞,那就會造成 80G 的數據量交給一個 reduce 進行相加,其他 20G 根據 key 不一樣分散到不一樣 reduce 進行相加的狀況。如此就形成了數據傾斜,臨牀反應就是 reduce 跑到 99%而後一直在原地等着 那80G 的reduce 跑完。資源

表現以下:get

有一個多幾個reduce卡住it

各類container報錯OOMspark

讀寫的數據量極大,至少遠遠超過其它正常的reduce

伴隨着數據傾斜,會出現任務被kill等各類詭異的表現。

2、傾斜原理

原理剖析

在進行shuffle的時候,必須將各個節點上相同的Key拉取到某個節點上的一個task來進行處理,好比按照key進行聚合或者join操做。若是某個key對應的數據量特別大的話,會發生數據傾斜。

3、傾斜解決

  1. 將reduce join 轉爲map join-----通常用於直接sql查詢的場景
什麼是MapJoin?

MapJoin顧名思義,就是在Map階段進行表之間的鏈接。而不須要進入到Reduce階段才進行鏈接。這樣就節省了在Shuffle階段時要進行的大量數據傳輸。從而起到了優化做業的做用。

MapJoin的原理:

一般狀況下,要鏈接的各個表裏面的數據會分佈在不一樣的Map中進行處理。即同一個Key對應的Value可能存在不一樣的Map中。這樣就必須等到Reduce中去鏈接。

要使MapJoin可以順利進行,那就必須知足這樣的條件:除了一份表的數據分佈在不一樣的Map中外,其餘鏈接的表的數據必須在每一個Map中有完整的拷貝。

MapJoin適用的場景:

經過上面分析你會發現,並非全部的場景都適合用MapJoin. 它一般會用在以下的一些情景:在二個要鏈接的表中,有一個很大,有一個很小,這個小表能夠存放在內存中而不影響性能。

這樣咱們就把小表文件複製到每個Map任務的本地,再讓Map把文件讀到內存中待用。

MapJoin的實現方法

     1)在Map-Reduce的驅動程序中使用靜態方法DistributedCache.addCacheFile()增長要拷貝的小表文件,。JobTracker在做業啓動以前會獲取這個URI列表,並將相應的文件拷貝到各個TaskTracker的本地磁盤上。

     2)在Map類的setup方法中使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,並使用標準的文件讀寫API讀取相應的文件。

Hive內置提供的優化機制之一就包括MapJoin。

在Hive v0.7以前,須要使用hint提示 /*+ mapjoin(table) */纔會執行MapJoin  。Hive v0.7以後的版本已經不須要給出MapJoin的指示就進行優化。它是經過以下配置參數來控制的:

hive> set hive.auto.convert.join=true;

不然須要經過sql代碼進行修改

select /*+ mapjoin(A)*/ f.a,f.b from A t join B f  on ( f.a=t.a and f.ftime=20110802)

Hive還提供另一個參數--表文件的大小做爲開啓和關閉MapJoin的閾值。

hive.mapjoin.smalltable.filesize=25000000 即25M

實現原理:
普通的join是會走shuffle過程的,而一旦shuffle,就至關於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。可是若是一個RDD是比較小的,則能夠採用廣播小RDD全量數據+map算子來實現與join一樣的效果,也就是mao join ,而此時不會發生shuffle操做,也就不會發生數據傾斜。
方案優勢:
對join操做致使的數據傾斜,效果很是好,由於根本就不會發生shuffle,也就根本不會發生數據傾斜。
方案缺點:
適用場景較少,由於這個方案只適用於一個大表和一個小表的狀況。畢竟咱們須要將小表進行廣播,此時會比較消耗內存資源,driver和每一個Executor內存中都會駐留一份小RDD的全量數據。若是咱們廣播出去的RDD數據比較大,好比10G以上,那麼就可能發生內存溢出了。所以並不適合兩個都是大表的狀況。

    2.提升shuffle操做的並行度

方案使用場景:
若咱們必需要面對數據傾斜問題,要這麼使用。
思路:
在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,如reduceByKey(1000),該參數設置了這個shuffle算子執行時shuffle read task 的數量。對於Spark SQL中的shuffle類語句,如 groupBy 、join 等須要設置一個參數,即spark.sql.shuffle.partitions。該參數表明了shuffle read task 的並行度,默認值是200。
原理:
增長shuffle read task 的數量,可讓本來分配給一個task的多個key分配給多個task,從而讓每一個task處理比原來更少的數據。舉例來講,若是本來有5個key,每一個key對應10條數據,這5個key都是分配給一個task的,那麼這個task就要處理50條數據。而增長了shuffle read task之後,每一個task就分配到一個key,即每一個task就處理10條數據,那麼天然每一個task的執行時間都會變短了。

實現起來比較簡單,能夠有效緩解和減輕數據傾斜的影響。
只是緩解了數據傾斜而已,沒有完全根除問題,根據實踐經驗來看,其效果有限。

3. 兩階段聚合(局部聚合+全局聚合)

方案使用場景:
對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。
思路:
這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每一個key都打上一個隨機數,好比10之內的隨機數,此時原先同樣的key就變成不同的了,好比(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着對打上隨機數後的數據,執行reduceByKey等聚合操做,進行局部聚合,那麼局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。而後將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操做,就能夠獲得最終結果了,好比(hello, 4)。

方案優勢:
對於聚合類的shuffle操做致使的數據傾斜,效果是很是不錯的。一般均可以解決掉數據傾斜,或者至少是大幅度緩解數據傾斜,將Spark做業的性能提高數倍以上。
方案缺點: 僅僅適用於聚合類的shuffle操做,適用範圍相對較窄。若是是join類的shuffle操做,還得用其餘的解決方案。

相關文章
相關標籤/搜索