Spark HashShuffle 是它之前的版本,如今1.6x 版本默應是 Sort-Based Shuffle,那爲何要講 HashShuffle 呢,由於有分佈式就必定會有 Shuffle,並且 HashShuffle 是 Spark之前的版本,亦便是 Sort-Based Shuffle 的前身,由於有 HashShuffle 的不足,纔會有後續的 Sorted-Based Shuffle,以及如今的 Tungsten-Sort Shuffle,因此咱們有必要去了解它。html
人們對Spark的印象每每是基於內存進行計算,但實際上來說,Spark能夠基於內存、也能夠基於磁盤或者是第三方的儲存空間進行計算,背後有兩層含意,第1、Spark框架的架構設計和設計模式上是傾向於在內存中計算數據的,第2、這也表達了人們對數據處理的一種美好的願望,就是但願計算數據的時候,數據就在內存中。算法
爲何再一次強調 Shuffle 是 Spark 的性能殺手啦,那不就是說,Spark中的 「Shuffle「 和 「Spark徹底是基於內存計算「 的願景是相違背的!!!但願這篇文章能爲讀者帶出如下的啓發:設計模式
Spark 運行分紅兩部份,第一部份是 Driver Program,裏面的核心是 SparkContext,它驅動着一個程序的開始,負責指揮,另一部份是 Worker 節點上的 Task,它是實際運行任務的,當程序運行時,不間斷地由 Driver 與所在的進程進行交互,交互什麼,有幾點,第1、是讓你去幹什麼,第2、是具體告訴 Task 數據在那裏,例如說有三個 Stage,第二個 Task 要拿數據,它就會向 Driver 要數據,因此在整個工做的過程當中,Executor 中的 Task 會不斷地與 Driver 進行溝通,這是一個網絡傳輸的過程。緩存
[下圖是 Spark 官方網站上的經典Spark架框圖]
性能優化
在這個過程當中一方面是 Driver 跟 Executor 進行網絡傳輸,另外一方面是Task要從 Driver 抓取其餘上游的 Task 的數據結果,因此有這個過程當中就不斷的產生網絡結果。其中,下一個 Stage 向上一個 Stage 要數據這個過程,咱們就稱之爲 Shuffle。bash
思考點:上一個 Stage 爲何要向下一個 Stage 發數據?假設如今有一個程序,裏面有五個 Stage,我把它當作爲一個很大的 Stage,在分佈式系統中,數據分佈在不一樣的節點上,每個節點計算一部份數據,若是不對各個節點上獨立的部份進行匯聚的話,咱們是計算不到最終的結果。這就是由於咱們須要利用分佈式來發揮它自己並行計算的能力,然後續又須要計算各節點上最終的結果,因此須要把數據匯彙集中,這就會致使 Shuffle,這也是說爲何 Shuffle 是分佈式不可避免的命運。網絡
基於 Mapper 和 Reducer 理解的基礎上,當 Reducer 去抓取數據時,它的 Key 究竟是怎麼分配的,核心思考點是:做爲上游數據是怎麼去分配給下游數據的。在這張圖中你能夠看到有4個 Task 在2個 Executors 上面,它們是並行運行的,Hash 自己有一套 Hash算法,能夠把數據的 Key 進行從新分類,每一個 Task 對數據進行分類而後把它們不一樣類別的數據先寫到本地磁盤,而後再通過網絡傳輸 Shuffle,把數據傳到下一個 Stage 進行匯聚。架構
下圖有3個 Reducer,從 Task 開始那邊各自把本身進行 Hash 計算,分類出3個不一樣的類別,每一個 Task 都分紅3種類別的數據,剛剛提過由於分佈式的關係,咱們想把不一樣的數據匯聚而後計算出最終的結果,因此下游的 Reducer 會在每一個 Task 中把屬於本身類別的數據收集過來,匯聚成一個同類別的大集合,抓過來的時候會首先放在內存中,但內存可能放不下,也有可能放在本地 (這也是一個調優勢。能夠參考上一章講過的一些調優參數),每1個 Task 輸出3份本地文件,這裏有4個 Mapper Tasks,因此總共輸出了4個 Tasks x 3個分類文件 = 12個本地小文件。app
[下圖是 Spark 最原始的 Hash-Based Shuffle 概念圖]
負載均衡
HashShuffle 也有它的弱點:
在剛纔 HashShuffle 的基礎上思考該如何進行優化,這是優化後的實現:
[下圖是 Spark Consolidated Hash-Based Shuffle 概念圖]
這裏仍是有4個Tasks,數據類別仍是分紅3種類型,由於Hash算法會根據你的 Key 進行分類,在同一個進程中,不管是有多少過Task,都會把一樣的Key放在同一個Buffer裏,而後把Buffer中的數據寫入以Core數量爲單位的本地文件中,(一個Core只有一種類型的Key的數據),每1個Task所在的進程中,分別寫入共同進程中的3份本地文件,這裏有4個Mapper Tasks,因此總共輸出是 2個Cores x 3個分類文件 = 6個本地小文件。Consoldiated Hash-Shuffle的優化有一個很大的好處就是假設如今有200個Mapper Tasks在同一個進程中,也只會產生3個本地小文件; 若是用原始的 Hash-Based Shuffle 的話,200個Mapper Tasks 會各自產生3個本地小文件,在一個進程已經產生了600個本地小文件。3個對比600已是一個很大的差別了。
這個優化後的 HashShuffle 叫 ConsolidatedShuffle,在實際生產環境下能夠調如下參數:
1
|
spark.shuffle.consolidateFiles=
true
|
Consolidated HashShuffle 也有它的弱點:
Shuffle 不能夠避免是由於在分佈式系統中的基本點就是把一個很大的的任務/做業分紅一百份或者是一千份,這一百份和一千份文件在不一樣的機器上獨自完成各自不一樣的部份,咱們是針對整個做業要結果,因此在後面會進行匯聚,這個匯聚的過程的前一階段到後一階段以致網絡傳輸的過程就叫 Shuffle。在 Spark 中爲了完成 Shuffle 的過程會把真正的一個做業劃分爲不一樣的 Stage,這個Stage 的劃分是跟據依賴關係去決定的,Shuffle 是整個 Spark 中最消耗性能的一個地方。試試想一想若是沒有 Shuffle 的話,Spark能夠完成一個純內存式的操做。
1
|
reduceByKey,它會把每一個 Key 對應的 Value 聚合成一個 value 而後生成新的 RDD
|
Shuffle 是如何破壞了純內存操做呢,由於在不一樣節點上咱們要進行數據傳輸,數據在經過網絡發送以前,要先存儲在內存中,內存達到必定的程度,它會寫到本地磁盤,(在之前 Spark 的版本它沒有Buffer 的限制,會不斷地寫入 Buffer 而後等內存滿了就寫入本地,如今的版本對 Buffer 多少設定了限制,以防止出現 OOM,減小了 IO)
Mapper 端會寫入內存 Buffer,這個便關乎到 GC 的問題,而後 Mapper端的 Block 要寫入本地,大量的磁盤與IO的操做和磁盤與網絡IO的操做,這就構成了分佈式的性能殺手。
若是要對最終計算結果進行排序的話,通常會都會進行 sortByKey,若是以最終結果來思考的話,你能夠認爲是產生了一個很大很大的 partition,你能夠用 reduceByKey 的時候指定它的並行度,例如你把 reduceByKey 的並行度變成爲1,新 RDD 的數據切片就變成1,排序通常都會在不少節點上,若是你把不少節點變成一個節點而後進行排序,有時候會取得更好的效果,由於數據就在一個節點上,技術層面來說就只須要在一個進程裏進行排序。
1
2
|
能夠在調用 reduceByKey()接著調用 mapPartition( );
也能夠用 repartitionAndSortWithPartitions( );
|
還有一個很危險的地方就是數據傾斜,在咱們談的 Shuffle 機制中,不斷強調不一樣機器從Mapper端抓取數據並計算結果,但有沒有意會到數據可能會分佈不均衡,何時會致使數據傾斜,答案就是 Shuffle 時會導政數據分佈不均衡,也就是數據傾斜的問題。數據傾斜的問題會引伸不少其餘問題,好比,網絡帶寬、各重硬件故障、內存過分消耗、文件掉失。由於 Shuffle 的過程當中會產生大量的磁盤 IO、網絡 IO、以及壓縮、解壓縮、序列化和反序列化等等。
Shuffle可能面臨的問題,運行 Task 的時候纔會產生 Shuffle (Shuffle 已經融化在 Spark 的算子中)
具體的 Task 進行計算的時候盡一切最大可能使得數據具有 Process Locality 的特性,退而求其次是增長數據分片,減小每一個 Task 處理的數據量,基於Shuffle 和數據傾斜所致使的一系列問題,能夠延伸出不少不一樣的調優勢,好比說:
咱們說 Shuffle 的過程是Mapper和Reducer以及網絡傳輸構成的,Mapper 這一端會把本身的數據寫入本地磁盤,Reducer 這一端會經過網絡把數據抓取過來。Mapper 會先把數據緩存在內存中,在默應狀況下緩存空間是 32K,數據從內存到本地磁盤的一個過程就是寫數據的一個過程。
這裏有兩個 Stage,上一個 Stage 叫 ShuffleMapTask,下面的一個 Stage 多是 ShuffleMapTask,也有多是 ResultsTask,取決於它這個任務是否是最後一個Stage所產生的。ShuffleMapTask會把咱們處理的RDD的數據分紅苦幹個 Bucket,即一個又一個的 Buffer。一個Task怎麼去切分具體要看你的 partitioner,ShuffleMapTask確定是屬於具體的 Stage。
咱們從 Reducer端藉助了 HashShuffleReader 從遠程抓取數據,抓取數據過來以後進行 Aggregrate 操做匯聚,匯聚具體是進行分組或者是什麼樣的算法是開發者本身決定的。reduceByKey和Hadoop中的mapper與reducer相比,有一個缺點,在 Hadoop 的世界,不管你的數據的什麼樣的類型你均可以自定義,Mapper和Reducer的業務邏輯能夠完成不同。
Reducer端若是內存不夠寫磁盤的代價是雙倍的,在 Mapper端不管內存夠不夠它都須要先寫磁盤,由於Reducer端在計算的時候須要又一次的把數據從磁盤上抓回來,因此實際生產環境下須要適當地把 Shuffle 內存調大一點。
由於想利用分佈式的計算能力,因此要把數據分散到不一樣節點上運行,上游階段數據是並行運行的,下游階段要進行匯聚,因此出現Shuffle,若是下游分紅三類,上游也須要每一個Task把數據分紅三類,雖然有可能有一類是沒有數據,這無所謂,只要在實際運行時按照這套規則就能夠了,這就是最原始的 Shuffle 過程。
Hash-based Shuffle 默認Mapper 階段會爲Reducer 階段的每個Task單首創建一個文件來保存該Task中要使用的數據,可是在一些狀況下(例如說數據量很是龐大的狀況) 會形成大量文件的隨機磁盤IO操做且會性成大量的Memory消耗(極易形成OOM)。
Spark Shuffle 說到底都是離不開讀文件、寫文件、爲了高效咱們須要緩存,因爲有不少不一樣的進程,就須要一個管理者。HashShuffle 適合的埸景是小數據的埸景,對小規模數據的處理效率會比排序後的 Shuffle 高。
資料來源來至