在歷史的發展中,爲何 Spark 最終仍是選擇放棄了 HashShuffle 而使用了 Sorted-Based Shuffle,並且做爲後起之秀的 Tungsten-based Shuffle 它到底在什麼樣的背景下產生的。Tungsten-Sort Shuffle 已經併入了 Sorted-Based Shuffle,Spark 的引擎會自動識別程序須要原生的 Sorted-Based Shuffle 仍是用 Tungsten-Sort Shuffle,那識別的依據是什麼,其實 Spark 會檢查相對的應用程序有沒有 Aggregrate 的操做。文章的後續部份會介紹 Tungsten-Sort Shuffle 是如何管理內存和CPU。其實 Sorted-Based Shuffle 也有缺點,其缺點反而是它排序的特性,它強制要求數據在 Mapper 端必需要先進行排序 (注意,這裏沒有說對計算結果進行排序),因此致使它排序的速度有點慢。而 Tungsten-Sort Shuffle 對它的排序算法進行了改進,優化了排序的速度。但願這篇文章能爲讀者帶出如下的啓發:html
爲何 Spark 用 Sorted-Based Shuffle 而放棄了 Hash-Based Shuffle?在 Spark 裏爲何最終是 Sorted-Based Shuffle 成爲了核心,有基本瞭解過 Spark 的學習者都會知道,Spark會根據寬依賴把它一系列的算子劃分紅不一樣的 Stage,Stage 的內部會進行 Pipeline,Stage 與 Stage 之間進行 Shuffle,Shuffle 的過程包含三部份。算法
第一部份是 Shuffle 的 Write;第二部份是網絡的傳輸;第三部份就是 Shuffle 的 Read,這三大部份設置了內存操做、磁盤IO、網絡IO以及 JVM 的管理。而這些東西影響了 Spark 應用程序在 95%以上效率的惟一緣由,假設你程序代碼的質素自己是很是好的狀況下,你性能的95%都消耗在 Shuffle 階段的本地寫磁盤文件,網絡傳輸數據以及抓取數據這樣的生命週期中。
數組
在 Shuffle 寫數據的時候,內存中有一個緩存區叫 Buffer,你能夠想像成一個Map,同時在本地磁盤有相對應的本地文件。若是本地磁盤有文件你在內存中確定也須要有相對應的管理句柄。也就是說,單是從 ShuffleWrite 內存佔用的角度講,已經有一部分內存空間是用在存儲 Buffer 數據的,另外一部份的內存空間是用來管理文件句柄的,回顧 HashShuffle 所產生小文件的個數是 Mapper 分片數量 x Reducer 分片數量 (MxR)。好比Mapper端有1千個數據分片,Reducer端也有1千過數據分片,在 HashShuffle 的機制下,它在本地內存空間中會產生 1000 * 1000 = 1,000,000 個小文件,可想而知的結果會是什麼,這麼多的 IO,這麼多的內存消耗、這麼容易產生 OOM、以及這麼沉重的 CG 負擔。再說,若是Reducer端去讀取 Mapper端的數據時,Mapper 端有這麼多的小文件,要打開不少網絡通道去讀數據,打開 1,000,000 端口不是一件很輕鬆的事。這會致使一個很是經典的錯誤:Reducer 端也就是下一個 Stage 經過 Driver 去抓取上一個 Stage 屬於它本身的數據的時候,說文件找不到。其實這個時候不是真的是磁盤上文件找不到,而是程序不響應,由於它在進行垃圾回收 (GC) 操做。緩存
由於 Spark 想完成一體化多樣化的數據處理中心或者叫一統大數據領域的一個好夢,確定不甘心於本身只是一個只能處理中小規模的數據計算平臺,因此Spark最根本要優化和逼切要解決的問題是:減小 Mapper 端 ShuffleWriter 所產生的文件數量,這樣即可以能讓 Spark 從幾百臺集羣的規模中瞬間變成能夠支持幾千臺甚至幾萬臺集羣的規模。(一個Task背後多是一個Core去運行、也多是多個Core去運行,但默認狀況下是用一個Core去運行一個Task)。 網絡
減小Mapper端的小文件所帶來的好處是:數據結構
Sorted-Based Shuffle 的出現,最顯著的優點就是把 Spark 從只能處理中小規模的數據平臺,變成能夠處理無限大規模的數據平臺。可能你會問規模真是這麼重要嗎?固然有,集羣規模意爲著它處理數據的規模,也意爲著它的運算能力。併發
Sorted-Based Shuffle 不會爲每一個Reducer 中的Task 生產一個單獨的文件,相反Sorted-Based Shuffle 會把Mapper 中每一個ShuffleMapTask 全部的輸出數據Data 只寫到一個文件中,由於每一個ShuffleMapTask 中的數據會被分類,因此Sort-based Shuffle 使用了index 文件存儲具體ShuffleMapTask 輸出數據在同一個Data 文件中是如何分類的信息。因此說基於 Sort-based Shuffle 會在 Mapper 中的每個 ShuffleMapTask 中產生兩個文件 (併發度的個數 x 2)!!!
app
它會產生一個 Data 文件和一個 Index 文件,其中 Data 文件是存儲當前 Task 的 Shuffle 輸出的, 而 Index 文件則存儲了 Data 文件中的數據經過 Partitioner 的分類信息,此時下一個階段的 Stage 中的 Task 就是根據這個 Index 文件獲取本身所須要抓取的上一個 Stage 中 ShuffleMapTask 所產生的數據;框架
假設如今 Mapper 端有 1000 個數據分片,Reducer 端也有 1000 個數據分片,它的併發度是100,使用 Sorted-Based Shuffle 會產生多少個 Mapper端的小文件,答案是 100 x 2 = 200 個。它的 MapTask 會獨自運行,每一個 MapTask 在運行的時候寫2個文件,運行成功後就不須要這個 MapTask 的文件句柄,不管是文件自己的句柄仍是索引的句柄都不須要,因此若是它的併發度是 100 個 Core,每次運行 100 個任務的話,它最終只會佔用 200 個文件句柄,這跟 HashShuffle 的機制不同,HashShuffle 最差的狀況是 Hashed 句柄存儲在內存中的。
oop
Sorted-Based Shuffle 主要是在Mapper階段,這個跟Reducer端沒有任何關係,在Mapper階段它要進行排序,你能夠認爲是二次排序,它的原理是有2個Key進行排序,第一個是 PartitionId進行排序,第二個是就是自己數據的Key進行排序。看下圖,它會把 PartitionId 分紅3個,分別是索引爲 0、一、2,這個在Mapper端進行排序的過程實際上是讓Reducer去抓取數據的時候變得更高效,好比說第一個Reducer,它會到Mappr端的索引爲 0 的數據分片中抓取數據。
具體而言,Reducer 首先找 Driver 去獲取父 Stage 中每一個 ShuffleMapTask 輸出的位置信息,跟據位置信息獲取 Index 文件,解析 Index 文件,從解析的 Index 文件中獲取 Data 文件中屬於本身的那部分內容。
一個Mapper任務除了有一個數據文件之外,它也會有一個索引文件,Map Task 把數據寫到文件磁盤是順序根據自身的Key寫進去的,也同時按照 Partition寫進去的,由於它是順序寫數據,記錄每一個 Partition 的大小。
Sort-Based Shuffle 的弱點:
spark.shuffle.spill=false
若是設置了這個運行模式,在生產環境下建義對內存的數據做2份備份,由於在默認狀況下內存數據只有1份,它不像HDFS那樣,自然有3份備份。使用 ExternalAppendOnlyMap 的方式時,若是內存佔用率達到必定的臨界值後會首先嚐試在內存中擴大 ExternalAppendOnlyMap (內部有實現算法),若是不能擴容的話纔會 spill 到磁盤。
第三大問題:Shuffle 的數據在 Mapper 端如何存儲,在 Reducer 端如何知道數據具體在那裏的?在Spark的實現上每個Stage (裏面是 ShuffleMapTask) 中的 Task 在 Stage 的最後一個 RDD 上必定會註冊給 Driver 上的 MapOutputTrackerMaster,Mapper 經過和 MapOutputTrackerMaster 來彙報 ShuffleMapTask 具體輸出數據的位置 (具體的輸出文件及內容是和 Reducer 有關的),Reducer 是向 Driver 中的 MapOutputTrackerMaster 請求數據的元數據信息,而後和 Mapper 所在的 Executor 進行通訊。
人們會對 Sorted-Based Shuffle 有一種誤解,就是它產出的結果是有序的,這一節會講解 Sorted-Based Shuffle 是如何工做的並配合源碼看看它具體的實現,Sorted-Based Shuffle 的核心是藉助於 ExternalSorter 把每一個 ShuffleMapTask 的輸出排序到一個文件中 (FileSegmentGroup),爲了區分下一個階段 Reducer Task 不一樣的內容,它還須要有一個索引文件 (Index) 來告訴下游 Stage 的並行任務,那一部份是屬於你的。
上圖在 Reducer 端有4個Reducer Task,它會產生一組 File Group 和 一個索引文件,File Group 裏的 FileSegement 會進行排序,下游的 Task 能夠很容易跟據索引 (index) 定位到這個 Fie 中的那一部份 FileSegement 是屬於下游的,它至關於一個指針,下游的 Task 要向 Driver 去碓定文件在那裏,而後到了這個 File 文件所在的地方,實際上會跟 BlockManager 進行溝通,BlockManager 首先會讀一個 Index 文件,根據它的命名則進行解析,好比說下一個階段的第一個 Task,通常就是抓取第一個 Segment,這是一個指針定位的過程。
再次強調 Sort-Based Shuffle 最大的意義是減小臨時文件的輸出數量,且只會產生兩個文件:一個是包含不一樣內容劃分紅不一樣 FileSegment 構成的單一文件 File,另一個是索引文件 Index。上圖在 Sort-Based Shuffle 的介紹中看見了一個 Sort and Spill 的過程 (它是 Spill 到磁盤的時候再進行排序的),如今咱們從源碼的角度去看看到底它這個排序其實是在幹什麼的。
Sorted-Based Shuffle 的誕生和出現意味著 Spark 從只能處理〝中小規模數據的數據處理平臺"重新定位爲可以處理〝大規模數據的數據處理平臺",更進一步鞏故它在數據處理領域的龍頭地位。它最大的優化就是:減小了由於 HashShuffle 機制不管是原生 HashShuffle、仍是 Consolidated Shuffle 在 Mapper 端所產生的海量小文件 (這是應用程序運行時的一箇中間過程),中間文件數量從 M x R 個數 變成 2M 的小文件。數據量愈大,這個所優化帶來的效果便越來越強烈,這是優化的第一步。
雖然 Sort-Based Shuffle 已經大大提高了序程運行時的效率,但若是 Mapper 端並行度的數據分片過多的話,也會致使大量內存消耗和GC的巨大負擔,形成系統緩慢甚至崩潰。基於這觀點 Spark 再一次突破本身,推出了 Tungsten-Based Shuffle,提高了在Mapper端進行排序的速度,充分利用了的 CPU 等資源。(博客後績會更新 Tungsten 的資料)
Spark 正在它技術發展的黃金十年,當你把一切事情都以藝術的角度去看待的時候,你會發如今學習 Spark 的過程當中會有一種莫明奇妙的幸福感~
資料來源來至 DT大數據夢工廠 大數據商業案例以及性能調優
第26課:電光石火間從根本上理解Spark中Sort-Based Shuffle產生的內幕及其tungsten-sort 背景解密
第27課:完全解密Spark Shuffle使人費解的6大經典問題(課程內容全球獨家)
第28課:完全解密Spark Sort-Based Shuffle排序具體實現內幕和源碼詳解
想了解 JVM 在 Spark 中是如何分配內存空間能夠參考:第四章 : Spark Shuffle 中 JVM 內存使用及配置內幕詳情
Spark源碼圖片取自於 Spark 2.1.0版本