- 本文首發於 vivo互聯網技術 微信公衆號 https://mp.weixin.qq.com/s/lqMu6lfk-Ny1ZHYruEeBdA
- 做者簡介:鄭志彬,畢業於華南理工大學計算機科學與技術(雙語班)。前後從事過電子商務、開放平臺、移動瀏覽器、推薦廣告和大數據、人工智能等相關開發和架構。目前在vivo智能平臺中心從事 AI中臺建設以及廣告推薦業務。擅長各類業務形態的業務架構、平臺化以及各類業務解決方案。
本文從數據傾斜的危害、現象、緣由等方面,由淺入深闡述Spark數據傾斜及其解決方案。sql
對 Spark/Hadoop 這樣的分佈式大數據系統來說,數據量大並不可怕,可怕的是數據傾斜。segmentfault
對於分佈式系統而言,理想狀況下,隨着系統規模(節點數量)的增長,應用總體耗時線性降低。若是一臺機器處理一批大量數據須要120分鐘,當機器數量增長到3臺時,理想的耗時爲120 / 3 = 40分鐘。可是,想作到分佈式狀況下每臺機器執行時間是單機時的1 / N,就必須保證每臺機器的任務量相等。不幸的是,不少時候,任務的分配是不均勻的,甚至不均勻到大部分任務被分配到個別機器上,其它大部分機器所分配的任務量只佔總得的小部分。好比一臺機器負責處理 80% 的任務,另外兩臺機器各處理 10% 的任務。瀏覽器
『不患多而患不均』,這是分佈式環境下最大的問題。意味着計算能力不是線性擴展的,而是存在短板效應: 一個 Stage 所耗費的時間,是由最慢的那個 Task 決定。性能優化
因爲同一個 Stage 內的全部 task 執行相同的計算,在排除不一樣計算節點計算能力差別的前提下,不一樣 task 之間耗時的差別主要由該 task 所處理的數據量決定。因此,要想發揮分佈式系統並行計算的優點,就必須解決數據傾斜問題。微信
當出現數據傾斜時,小量任務耗時遠高於其它任務,從而使得總體耗時過大,未能充分發揮分佈式系統的並行計算優點。 網絡
另外,當發生數據傾斜時,部分任務處理的數據量過大,可能形成內存不足使得任務失敗,並進而引進整個應用失敗。 架構
當發現以下現象時,十有八九是發生數據傾斜了:併發
絕大多數 task 執行得都很是快,但個別 task 執行極慢,總體任務卡在某個階段不能結束。app
TIPS負載均衡
在 Spark streaming 程序中,數據傾斜更容易出現,特別是在程序中包含一些相似 sql 的 join、group 這種操做的時候。由於 Spark Streaming 程序在運行的時候,咱們通常不會分配特別多的內存,所以一旦在這個過程當中出現一些數據傾斜,就十分容易形成 OOM。
在進行 shuffle 的時候,必須將各個節點上相同的 key 拉取到某個節點上的一個 task 來進行處理,好比按照 key 進行聚合或 join 等操做。此時若是某個 key 對應的數據量特別大的話,就會發生數據傾斜。好比大部分 key 對應10條數據,可是個別 key 卻對應了100萬條數據,那麼大部分 task 可能就只會分配到10條數據,而後1秒鐘就運行完了;可是個別 task 可能分配到了100萬數據,要運行一兩個小時。
所以出現數據傾斜的時候,Spark 做業看起來會運行得很是緩慢,甚至可能由於某個 task 處理的數據量過大致使內存溢出。
經過 Spark Web UI 來查看當前運行的 stage 各個 task 分配的數據量(Shuffle Read Size/Records),從而進一步肯定是否是 task 分配的數據不均勻致使了數據傾斜。
知道數據傾斜發生在哪個 stage 以後,接着咱們就須要根據 stage 劃分原理,推算出來發生傾斜的那個 stage 對應代碼中的哪一部分,這部分代碼中確定會有一個 shuffle 類算子。能夠經過 countByKey 查看各個 key 的分佈。
TIPS
數據傾斜只會發生在 shuffle 過程當中。這裏給你們羅列一些經常使用的而且可能會觸發 shuffle 操做的算子: distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等。出現數據傾斜時,可能就是你的代碼中使用了這些算子中的某一個所致使的。
也能夠經過抽樣統計 key 的出現次數驗證。
因爲數據量巨大,能夠採用抽樣的方式,對數據進行抽樣,統計出現的次數,根據出現次數大小排序取出前幾個:
df.select("key").sample(false, 0.1) // 數據採樣 .(k => (k, 1)).reduceBykey(_ + _) // 統計 key 出現的次數 .map(k => (k._2, k._1)).sortByKey(false) // 根據 key 出現次數進行排序 .take(10) // 取前 10 個。
若是發現多數數據分佈都較爲平均,而個別數據比其餘數據大上若干個數量級,則說明發生了數據傾斜。
業務邏輯: 咱們從業務邏輯的層面上來優化數據傾斜,好比要統計不一樣城市的訂單狀況,那麼咱們單獨對這一線城市來作 count,最後和其它城市作整合。
程序實現: 好比說在 Hive 中,常常遇到 count(distinct)操做,這樣會致使最終只有一個 reduce,咱們能夠先 group 再在外面包一層 count,就能夠了;在 Spark 中使用 reduceByKey 替代 groupByKey 等。
若是致使數據傾斜的 key 是異常數據,那麼簡單的過濾掉就能夠了。
首先要對 key 進行分析,判斷是哪些 key 形成數據傾斜。具體方法上面已經介紹過了,這裏不贅述。
而後對這些 key 對應的記錄進行分析:
空值或者異常值之類的,大可能是這個緣由引發
無效數據,大量重複的測試數據或是對結果影響不大的有效數據
解決方案
對於第 1,2 種狀況,直接對數據進行過濾便可。
第3種狀況則須要特殊的處理,具體咱們下面詳細介紹。
Spark 在作 Shuffle 時,默認使用 HashPartitioner(非 Hash Shuffle)對數據進行分區。若是並行度設置的不合適,可能形成大量不相同的 Key 對應的數據被分配到了同一個 Task 上,形成該 Task 所處理的數據遠大於其它 Task,從而形成數據傾斜。
若是調整 Shuffle 時的並行度,使得本來被分配到同一 Task 的不一樣 Key 發配到不一樣 Task 上處理,則可下降原 Task 所需處理的數據量,從而緩解數據傾斜問題形成的短板效應。
(1)操做流程
RDD 操做 可在須要 Shuffle 的操做算子上直接設置並行度或者使用 spark.default.parallelism 設置。若是是 Spark SQL,還可經過 SET spark.sql.shuffle.partitions=[num_tasks] 設置並行度。默認參數由不一樣的 Cluster Manager 控制。
dataFrame 和 sparkSql 能夠設置 spark.sql.shuffle.partitions=[num_tasks] 參數控制 shuffle 的併發度,默認爲200。
(2)適用場景
大量不一樣的 Key 被分配到了相同的 Task 形成該 Task 數據量過大。
(3)解決方案
調整並行度。通常是增大並行度,但有時如減少並行度也可達到效果。
(4)優點
實現簡單,只須要參數調優。可用最小的代價解決問題。通常若是出現數據傾斜,均可以經過這種方法先試驗幾回,若是問題未解決,再嘗試其它方法。
(5)劣勢
適用場景少,只是讓每一個 task 執行更少的不一樣的key。沒法解決個別key特別大的狀況形成的傾斜,若是某些 key 的大小很是大,即便一個 task 單獨執行它,也會受到數據傾斜的困擾。而且該方法通常只能緩解數據傾斜,沒有完全消除問題。從實踐經驗來看,其效果通常。
TIPS 能夠把數據傾斜類比爲 hash 衝突。提升並行度就相似於 提升 hash 表的大小。
(1)原理
使用自定義的 Partitioner(默認爲 HashPartitioner),將本來被分配到同一個 Task 的不一樣 Key 分配到不一樣 Task。
例如,咱們在 groupByKey 算子上,使用自定義的 Partitioner:
.groupByKey(new Partitioner() { @Override public int numPartitions() { return 12; } @Override public int getPartition(Object key) { int id = Integer.parseInt(key.toString()); if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) { return (id - 9500000) / 12; } else { return id % 12; } } })
TIPS 這個作法至關於自定義 hash 表的 哈希函數。
(2)適用場景
大量不一樣的 Key 被分配到了相同的 Task 形成該 Task 數據量過大。
(3)解決方案
使用自定義的 Partitioner 實現類代替默認的 HashPartitioner,儘可能將全部不一樣的 Key 均勻分配到不一樣的 Task 中。
(4)優點
不影響原有的並行度設計。若是改變並行度,後續 Stage 的並行度也會默認改變,可能會影響後續 Stage。
(5)劣勢
適用場景有限,只能將不一樣 Key 分散開,對於同一 Key 對應數據集很是大的場景不適用。效果與調整並行度相似,只能緩解數據傾斜而不能徹底消除數據傾斜。並且須要根據數據特色自定義專用的 Partitioner,不夠靈活。
經過 Spark 的 Broadcast 機制,將 Reduce 端 Join 轉化爲 Map 端 Join,這意味着 Spark 如今不須要跨節點作 shuffle 而是直接經過本地文件進行 join,從而徹底消除 Shuffle 帶來的數據傾斜。
from pyspark.sql.functions import broadcast result = broadcast(A).join(B, ["join_col"], "left")
其中 A 是比較小的 dataframe 而且可以整個存放在 executor 內存中。
(1)適用場景
參與Join的一邊數據集足夠小,可被加載進 Driver 並經過 Broadcast 方法廣播到各個 Executor 中。
(2)解決方案
在 Java/Scala 代碼中將小數據集數據拉取到 Driver,而後經過 Broadcast 方案將小數據集的數據廣播到各 Executor。或者在使用 SQL 前,將 Broadcast 的閾值調整得足夠大,從而使 Broadcast 生效。進而將 Reduce Join 替換爲 Map Join。
(3)優點
避免了 Shuffle,完全消除了數據傾斜產生的條件,可極大提高性能。
(4)劣勢
由於是先將小數據經過 Broadcase 發送到每一個 executor 上,因此須要參與 Join 的一方數據集足夠小,而且主要適用於 Join 的場景,不適合聚合的場景,適用條件有限。
NOTES
使用Spark SQL時須要經過 SET spark.sql.autoBroadcastJoinThreshold=104857600 將 Broadcast 的閾值設置得足夠大,纔會生效。
思路很簡單,就是將一個 join 拆分紅 傾斜數據集 Join 和 非傾斜數據集 Join,最後進行 union:
對包含少數幾個數據量過大的 key 的那個 RDD (假設是 leftRDD),經過 sample 算子採樣出一份樣原本,而後統計一下每一個 key 的數量,計算出來數據量最大的是哪幾個 key。具體方法上面已經介紹過了,這裏不贅述。
而後將這 k 個 key 對應的數據從 leftRDD 中單獨過濾出來,並給每一個 key 都打上 1~n 之內的隨機數做爲前綴,造成一個單獨的 leftSkewRDD;而不會致使傾斜的大部分 key 造成另一個 leftUnSkewRDD。
接着將須要 join 的另外一個 rightRDD,也過濾出來那幾個傾斜 key 並經過 flatMap 操做將該數據集中每條數據均轉換爲 n 條數據(這 n 條數據都按順序附加一個 0~n 的前綴),造成單獨的 rightSkewRDD;不會致使傾斜的大部分 key 也造成另一個 rightUnSkewRDD。
如今將 leftSkewRDD 與 膨脹 n 倍的 rightSkewRDD 進行 join,且在 Join 過程當中將隨機前綴去掉,獲得傾斜數據集的 Join 結果 skewedJoinRDD。注意到此時咱們已經成功將原先相同的 key 打散成 n 份,分散到多個 task 中去進行 join 了。
對 leftUnSkewRDD 與 rightUnRDD 進行Join,獲得 Join 結果 unskewedJoinRDD。
TIPS
- rightRDD 與傾斜 Key 對應的部分數據,須要與隨機前綴集 (1~n) 做笛卡爾乘積 (即將數據量擴大 n 倍),從而保證不管數據傾斜側傾斜 Key 如何加前綴,都能與之正常 Join。
- skewRDD 的 join 並行度能夠設置爲 n * k (k 爲 topSkewkey 的個數)。
- 因爲傾斜Key與非傾斜Key的操做徹底獨立,可並行進行。
(1)適用場景
兩張表都比較大,沒法使用 Map 端 Join。其中一個 RDD 有少數幾個 Key 的數據量過大,另一個 RDD 的 Key 分佈較爲均勻。
(2)解決方案
將有數據傾斜的 RDD 中傾斜 Key 對應的數據集單獨抽取出來加上隨機前綴,另一個 RDD 每條數據分別與隨機前綴結合造成新的RDD(至關於將其數據增到到原來的N倍,N即爲隨機前綴的總個數),而後將兩者Join並去掉前綴。而後將不包含傾斜Key的剩餘數據進行Join。最後將兩次Join的結果集經過union合併,便可獲得所有Join結果。
(3)優點
相對於 Map 則 Join,更能適應大數據集的 Join。若是資源充足,傾斜部分數據集與非傾斜部分數據集可並行進行,效率提高明顯。且只針對傾斜部分的數據作數據擴展,增長的資源消耗有限。
(4)劣勢
若是傾斜 Key 很是多,則另外一側數據膨脹很是大,此方案不適用。並且此時對傾斜 Key 與非傾斜 Key 分開處理,須要掃描數據集兩遍,增長了開銷。
若是出現數據傾斜的 Key 比較多,上一種方法將這些大量的傾斜 Key 分拆出來,意義不大。此時更適合直接對存在數據傾斜的數據集所有加上隨機前綴,而後對另一個不存在嚴重數據傾斜的數據集總體與隨機前綴集做笛卡爾乘積(即將數據量擴大N倍)。
其實就是上一個方法的特例或者簡化。少了拆分,也就沒有 union。
(1)適用場景
一個數據集存在的傾斜 Key 比較多,另一個數據集數據分佈比較均勻。
(2)優點
對大部分場景都適用,效果不錯。
(3)劣勢
須要將一個數據集總體擴大 N 倍,會增長資源消耗。
在 map 端加個 combiner 函數進行局部聚合。加上 combiner 至關於提早進行 reduce ,就會把一個 mapper 中的相同 key 進行聚合,減小 shuffle 過程當中數據量 以及 reduce 端的計算量。這種方法能夠有效的緩解數據傾斜問題,可是若是致使數據傾斜的 key 大量分佈在不一樣的 mapper 的時候,這種方法就不是頗有效了。
TIPS 使用 reduceByKey 而不是 groupByKey。
這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每一個 key 都打上一個 1~n 的隨機數,好比 3 之內的隨機數,此時原先同樣的 key 就變成不同的了,好比 (hello, 1) (hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成 (1_hello, 1) (3_hello, 1) (2_hello, 1) (1_hello, 1) (2_hello, 1)。接着對打上隨機數後的數據,執行 reduceByKey 等聚合操做,進行局部聚合,那麼局部聚合結果,就會變成了 (1_hello, 2) (2_hello, 2) (3_hello, 1)。而後將各個 key 的前綴給去掉,就會變成 (hello, 2) (hello, 2) (hello, 1),再次進行全局聚合操做,就能夠獲得最終結果了,好比 (hello, 5)。
def antiSkew(): RDD[(String, Int)] = { val SPLIT = "-" val prefix = new Random().nextInt(10) pairs.map(t => ( prefix + SPLIT + t._1, 1)) .reduceByKey((v1, v2) => v1 + v2) .map(t => (t._1.split(SPLIT)(1), t2._2)) .reduceByKey((v1, v2) => v1 + v2) }
不過進行兩次 mapreduce,性能稍微比一次的差些。
Hadoop 中直接貼近用戶使用的是 Mapreduce 程序和 Hive 程序,雖然說 Hive 最後也是用 MR 來執行(至少目前 Hive 內存計算並不普及),可是畢竟寫的內容邏輯區別很大,一個是程序,一個是Sql,所以這裏稍做區分。
Hadoop 中的數據傾斜主要表如今 ruduce 階段卡在99.99%,一直99.99%不能結束。
這裏若是詳細的看日誌或者和監控界面的話會發現:
有一個多幾個 reduce 卡住
各類 container報錯 OOM
讀寫的數據量極大,至少遠遠超過其它正常的 reduce
經驗: Hive的數據傾斜,通常都發生在 Sql 中 Group 和 On 上,並且和數據邏輯綁定比較深。
優化方法
這裏列出來一些方法和思路,具體的參數和用法在官網看就好了。
map join 方式
count distinct 的操做,先轉成 group,再 count
參數調優
set hive.map.aggr=true
set hive.groupby.skewindata=true
left semi jion 的使用
說明
hive.map.aggr=true: 在map中會作部分彙集操做,效率更高但須要更多的內存。
hive.groupby.skewindata=true: 數據傾斜時負載均衡,當選項設定爲true,生成的查詢計劃會有兩個MRJob。第一個MRJob 中,Map的輸出結果集合會隨機分佈到Reduce中,每一個Reduce作部分聚合操做,並輸出結果,這樣處理的結果是相同的GroupBy Key有可能被分發到不一樣的Reduce中,從而達到負載均衡的目的;第二個MRJob再根據預處理的數據結果按照GroupBy Key分佈到Reduce中(這個過程能夠保證相同的GroupBy Key被分佈到同一個Reduce中),最後完成最終的聚合操做。