Spark HashShuffle 是它之前的版本,如今1.6x 版本默應是 Sort-Based Shuffle,那爲何要講 HashShuffle 呢,由於有分佈式就必定會有 Shuffle,並且 HashShuffle 是 Spark之前的版本,亦便是 Sort-Based Shuffle 的前身,由於有 HashShuffle 的不足,纔會有後續的 Sorted-Based Shuffle,以及如今的 Tungsten-Sort Shuffle,因此咱們有必要去了解它。html
人們對Spark的印象每每是基於內存進行計算,但實際上來說,Spark能夠基於內存、也能夠基於磁盤或者是第三方的儲存空間進行計算,背後有兩層含意,第1、Spark框架的架構設計和設計模式上是傾向於在內存中計算數據的,第2、這也表達了人們對數據處理的一種美好的願望,就是但願計算數據的時候,數據就在內存中。java
爲何再一次強調 Shuffle 是 Spark 的性能殺手啦,那不就是說,Spark中的 「Shuffle「 和 「Spark徹底是基於內存計算「 的願景是相違背的!!!但願這篇文章能爲讀者帶出如下的啓發:算法
Spark 運行分紅兩部份,第一部份是 Driver Program,裏面的核心是 SparkContext,它驅動著一個程序的開始,負責指揮,另一部份是 Worker 節點上的 Task,它是實際運行任務的,當程序運行時,不間斷地由 Driver 與所在的進程進行交互,交互什麼,有幾點,第1、是讓你去幹什麼,第2、是具體告訴 Task 數據在那裏,例如說有三個 Stage,第二個 Task 要拿數據,它就會向 Driver 要數據,因此在整個工做的過程當中,Executor 中的 Task 會不斷地與 Driver 進行溝通,這是一個網絡傳輸的過程。設計模式
在這個過程當中一方面是 Driver 跟 Executor 進行網絡傳輸,另外一方面是Task要從 Driver 抓取其餘上游的 Task 的數據結果,因此有這個過程當中就不斷的產生網絡結果。其中,下一個 Stage 向上一個 Stage 要數據這個過程,咱們就稱之爲 Shuffle。網絡
思考點:上一個 Stage 爲何要向下一個 Stage 發數據?假設如今有一個程序,裏面有五個 Stage,我把它當作爲一個很大的 Stage,在分佈式系統中,數據分佈在不一樣的節點上,每個節點計算一部份數據,若是不對各個節點上獨立的部份進行匯聚的話,咱們是計算不到最終的結果。這就是由於咱們須要利用分佈式來發揮它自己並行計算的能力,然後續又須要計算各節點上最終的結果,因此須要把數據匯彙集中,這就會致使 Shuffle,這也是說爲何 Shuffle 是分佈式不可避免的命運。架構
基於 Mapper 和 Reducer 理解的基礎上,當 Reducer 去抓取數據時,它的 Key 究竟是怎麼分配的,核心思考點是:做爲上游數據是怎麼去分配給下游數據的。在這張圖中你能夠看到有4個 Task 在2個 Executors 上面,它們是並行運行的,Hash 自己有一套 Hash算法,能夠把數據的 Key 進行從新分類,每一個 Task 對數據進行分類而後把它們不一樣類別的數據先寫到本地磁盤,而後再通過網絡傳輸 Shuffle,把數據傳到下一個 Stage 進行匯聚。app
下圖有3個 Reducer,從 Task 開始那邊各自把本身進行 Hash 計算,分類出3個不一樣的類別,每一個 Task 都分紅3種類別的數據,剛剛提過由於分佈式的關係,咱們想把不一樣的數據匯聚而後計算出最終的結果,因此下游的 Reducer 會在每一個 Task 中把屬於本身類別的數據收集過來,匯聚成一個同類別的大集合,抓過來的時候會首先放在內存中,但內存可能放不下,也有可能放在本地 (這也是一個調優勢。能夠參考上一章講過的一些調優參數),每1個 Task 輸出3份本地文件,這裏有4個 Mapper Tasks,因此總共輸出了4個 Tasks x 3個分類文件 = 12個本地小文件。負載均衡
Hash Shuffle 也有它的弱點:框架
在剛纔 HashShuffle 的基礎上思考該如何進行優化,這是優化後的實現:分佈式
這裏仍是有4個Tasks,數據類別仍是分紅3種類型,由於Hash算法會根據你的 Key 進行分類,在同一個進程中,不管是有多少過Task,都會把一樣的Key放在同一個Buffer裏,而後把Buffer中的數據寫入以Core數量爲單位的本地文件中,(一個Core只有一種類型的Key的數據),每1個Task所在的進程中,分別寫入共同進程中的3份本地文件,這裏有4個Mapper Tasks,因此總共輸出是 2個Cores x 3個分類文件 = 6個本地小文件。Consoldiated Hash-Shuffle的優化有一個很大的好處就是假設如今有200個Mapper Tasks在同一個進程中,也只會產生3個本地小文件; 若是用原始的 Hash-Based Shuffle 的話,200個Mapper Tasks 會各自產生3個本地小文件,在一個進程已經產生了600個本地小文件。3個對比600已是一個很大的差別了。
這個優化後的 HashShuffle 叫 ConsolidatedShuffle,在實際生產環境下能夠調如下參數:
spark.shuffle.consolidateFiles=true
Consolidated HashShuffle 也有它的弱點:
Shuffle 不能夠避免是由於在分佈式系統中的基本點就是把一個很大的的任務/做業分紅一百份或者是一千份,這一百份和一千份文件在不一樣的機器上獨自完成各自不一樣的部份,咱們是針對整個做業要結果,因此在後面會進行匯聚,這個匯聚的過程的前一階段到後一階段以致網絡傳輸的過程就叫 Shuffle。在 Spark 中爲了完成 Shuffle 的過程會把真正的一個做業劃分爲不一樣的 Stage,這個Stage 的劃分是跟據依賴關係去決定的,Shuffle 是整個 Spark 中最消耗性能的一個地方。試試想一想若是沒有 Shuffle 的話,Spark能夠完成一個純內存式的操做。
reduceByKey,它會把每一個 Key 對應的 Value 聚合成一個 value 而後生成新的 RDD
Shuffle 是如何破壞了純內存操做呢,由於在不一樣節點上咱們要進行數據傳輸,數據在經過網絡發送以前,要先存儲在內存中,內存達到必定的程度,它會寫到本地磁盤,(在之前 Spark 的版本它沒有Buffer 的限制,會不斷地寫入 Buffer 而後等內存滿了就寫入本地,如今的版本對 Buffer 多少設定了限制,以防止出現 OOM,減小了 IO)
Mapper 端會寫入內存 Buffer,這個便關乎到 GC 的問題,而後 Mapper端的 Block 要寫入本地,大量的磁盤與IO的操做和磁盤與網絡IO的操做,這就構成了分佈式的性能殺手。
若是要對最終計算結果進行排序的話,通常會都會進行 sortByKey,若是以最終結果來思考的話,你能夠認爲是產生了一個很大很大的 partition,你能夠用 reduceByKey 的時候指定它的並行度,例如你把 reduceByKey 的並行度變成爲1,新 RDD 的數據切片就變成1,排序通常都會在不少節點上,若是你把不少節點變成一個節點而後進行排序,有時候會取得更好的效果,由於數據就在一個節點上,技術層面來說就只須要在一個進程裏進行排序。
能夠在調用 reduceByKey()接著調用 mapPartition( ); 也能夠用 repartitionAndSortWithPartitions( );
還有一個很危險的地方就是數據傾斜,在咱們談的 Shuffle 機制中,不斷強調不一樣機器從Mapper端抓取數據並計算結果,但有沒有意會到數據可能會分佈不均衡,何時會致使數據傾斜,答案就是 Shuffle 時會導政數據分佈不均衡,也就是數據傾斜的問題。數據傾斜的問題會引伸不少其餘問題,好比,網絡帶寬、各重硬件故障、內存過分消耗、文件掉失。由於 Shuffle 的過程當中會產生大量的磁盤 IO、網絡 IO、以及壓縮、解壓縮、序列化和反序列化等等。
Shuffle可能面臨的問題,運行 Task 的時候纔會產生 Shuffle (Shuffle 已經融化在 Spark 的算子中)
具體的 Task 進行計算的時候盡一切最大可能使得數據具有 Process Locality 的特性,退而求其次是增長數據分片,減小每一個 Task 處理的數據量,基於Shuffle 和數據傾斜所致使的一系列問題,能夠延伸出不少不一樣的調優勢,好比說: