[Spark性能調優] 第三章 : Spark 2.1.0 中 Sort-Based Shuffle 產生的內幕

本課主題

  • Sorted-Based Shuffle 的誕生和介紹
  • Shuffle 中六大使人費解的問題 
  • Sorted-Based Shuffle 的排序和源碼鑑賞
  • Shuffle 在運行時的內存管理

 

引言 

在歷史的發展中,爲何 Spark 最終仍是選擇放棄了 HashShuffle 而使用了 Sorted-Based Shuffle,並且做爲後起之秀的 Tungsten-based Shuffle 它到底在什麼樣的背景下產生的。Tungsten-Sort Shuffle 已經併入了 Sorted-Based Shuffle,Spark 的引擎會自動識別程序須要原生的 Sorted-Based Shuffle 仍是用 Tungsten-Sort Shuffle,那識別的依據是什麼,其實 Spark 會檢查相對的應用程序有沒有 Aggregrate 的操做。文章的後續部份會介紹 Tungsten-Sort Shuffle 是如何管理內存和CPU。其實 Sorted-Based Shuffle 也有缺點,其缺點反而是它排序的特性,它強制要求數據在 Mapper 端必需要先進行排序 (注意,這裏沒有說對計算結果進行排序),因此致使它排序的速度有點慢。而 Tungsten-Sort Shuffle 對它的排序算法進行了改進,優化了排序的速度。但願這篇文章能爲讀者帶出如下的啓發:html

  • 瞭解爲何 Spark 最終選擇了 Sorted-Based Shuffle 而放棄了Hash-Based Shuffle
  • 瞭解什麼是 Spark Sorted-Based Shuffle
  • 瞭解 Spark Shuffle 中六大使人費解的問題
  • 瞭解 Sorted-Based Shuffle 具體是如何排序

 

Spark Sorted-Based Shuffle 的誕生

爲何 Spark 用 Sorted-Based Shuffle 而放棄了 Hash-Based Shuffle?在 Spark 裏爲何最終是 Sorted-Based Shuffle 成爲了核心,有基本瞭解過 Spark 的學習者都會知道,Spark會根據寬依賴把它一系列的算子劃分紅不一樣的 Stage,Stage 的內部會進行 Pipeline,Stage 與 Stage 之間進行 Shuffle,Shuffle 的過程包含三部份。算法

 

第一部份是 Shuffle 的 Write;第二部份是網絡的傳輸;第三部份就是 Shuffle 的 Read,這三大部份設置了內存操做、磁盤IO、網絡IO以及 JVM 的管理。而這些東西影響了 Spark 應用程序在 95%以上效率的惟一緣由,假設你程序代碼的質素自己是很是好的狀況下,你性能的95%都消耗在 Shuffle 階段的本地寫磁盤文件,網絡傳輸數據以及抓取數據這樣的生命週期中。
數組

在 Shuffle 寫數據的時候,內存中有一個緩存區叫 Buffer,你能夠想像成一個Map,同時在本地磁盤有相對應的本地文件。若是本地磁盤有文件你在內存中確定也須要有相對應的管理句柄。也就是說,單是從 ShuffleWrite 內存佔用的角度講,已經有一部分內存空間是用在存儲 Buffer 數據的,另外一部份的內存空間是用來管理文件句柄的,回顧 HashShuffle 所產生小文件的個數是 Mapper 分片數量 x Reducer 分片數量 (MxR)。好比Mapper端有1千個數據分片,Reducer端也有1千過數據分片,在 HashShuffle 的機制下,它在本地內存空間中會產生 1000 * 1000 = 1,000,000 個小文件,可想而知的結果會是什麼,這麼多的 IO,這麼多的內存消耗、這麼容易產生 OOM、以及這麼沉重的 CG 負擔。再說,若是Reducer端去讀取 Mapper端的數據時,Mapper 端有這麼多的小文件,要打開不少網絡通道去讀數據,打開 1,000,000 端口不是一件很輕鬆的事。這會致使一個很是經典的錯誤:Reducer 端也就是下一個 Stage 經過 Driver 去抓取上一個 Stage 屬於它本身的數據的時候,說文件找不到。其實這個時候不是真的是磁盤上文件找不到,而是程序不響應,由於它在進行垃圾回收 (GC) 操做。緩存

由於 Spark 想完成一體化多樣化的數據處理中心或者叫一統大數據領域的一個好夢,確定不甘心於本身只是一個只能處理中小規模的數據計算平臺,因此Spark最根本要優化和逼切要解決的問題是:減小 Mapper 端 ShuffleWriter 所產生的文件數量,這樣即可以能讓 Spark 從幾百臺集羣的規模中瞬間變成能夠支持幾千臺甚至幾萬臺集羣的規模。(一個Task背後多是一個Core去運行、也多是多個Core去運行,但默認狀況下是用一個Core去運行一個Task)。 網絡

減小Mapper端的小文件所帶來的好處是:數據結構

  1. Mapper端的內存佔用變少了;
  2. Spark能夠處理不竟竟是小規模的數據,處理大規模的數據也不會很容易達到性能瓶頸;
  3. Reducer端抓取數據的次數也變少了;
  4. 網絡通道的句柄也變少;
  5. 極大了減小 Reducer 的內存不竟竟是由於數據級別的消耗,並且是框架時要運行的必須消耗。 

 

Spark Sorted-Based Shuffle介紹

Sorted-Based Shuffle 的出現,最顯著的優點就是把 Spark 從只能處理中小規模的數據平臺,變成能夠處理無限大規模的數據平臺。可能你會問規模真是這麼重要嗎?固然有,集羣規模意爲著它處理數據的規模,也意爲著它的運算能力。併發

Sorted-Based Shuffle 不會爲每一個Reducer 中的Task 生產一個單獨的文件,相反Sorted-Based Shuffle 會把Mapper 中每一個ShuffleMapTask 全部的輸出數據Data 只寫到一個文件中,由於每一個ShuffleMapTask 中的數據會被分類,因此Sort-based Shuffle 使用了index 文件存儲具體ShuffleMapTask 輸出數據在同一個Data 文件中是如何分類的信息。因此說基於 Sort-based Shuffle 會在 Mapper 中的每個 ShuffleMapTask 中產生兩個文件 (併發度的個數 x 2)!!!
app

它會產生一個 Data 文件和一個 Index 文件,其中 Data 文件是存儲當前 Task 的 Shuffle 輸出的, 而 Index 文件則存儲了 Data 文件中的數據經過 Partitioner 的分類信息,此時下一個階段的 Stage 中的 Task 就是根據這個 Index 文件獲取本身所須要抓取的上一個 Stage 中 ShuffleMapTask 所產生的數據框架

假設如今 Mapper 端有 1000 個數據分片,Reducer 端也有 1000 個數據分片,它的併發度是100,使用 Sorted-Based Shuffle 會產生多少個 Mapper端的小文件,答案是 100 x 2 = 200 個。它的 MapTask 會獨自運行,每一個 MapTask 在運行的時候寫2個文件,運行成功後就不須要這個 MapTask 的文件句柄,不管是文件自己的句柄仍是索引的句柄都不須要,因此若是它的併發度是 100 個 Core,每次運行 100 個任務的話,它最終只會佔用 200 個文件句柄,這跟 HashShuffle 的機制不同,HashShuffle 最差的狀況是 Hashed 句柄存儲在內存中的。
oop

Sorted-Based Shuffle 主要是在Mapper階段,這個跟Reducer端沒有任何關係,在Mapper階段它要進行排序,你能夠認爲是二次排序,它的原理是有2個Key進行排序,第一個是 PartitionId進行排序,第二個是就是自己數據的Key進行排序。看下圖,它會把 PartitionId 分紅3個,分別是索引爲 0、一、2,這個在Mapper端進行排序的過程實際上是讓Reducer去抓取數據的時候變得更高效,好比說第一個Reducer,它會到Mappr端的索引爲 0 的數據分片中抓取數據。

具體而言,Reducer 首先找 Driver 去獲取父 Stage 中每一個 ShuffleMapTask 輸出的位置信息,跟據位置信息獲取 Index 文件,解析 Index 文件,從解析的 Index 文件中獲取 Data 文件中屬於本身的那部分內容。

一個Mapper任務除了有一個數據文件之外,它也會有一個索引文件,Map Task 把數據寫到文件磁盤是順序根據自身的Key寫進去的,也同時按照 Partition寫進去的,由於它是順序寫數據,記錄每一個 Partition 的大小。

Sort-Based Shuffle 的弱點:

  1. 若是 Mapper 中 Task 的數量過大,依舊會產生不少小文件,此時在 Shuffle 傳數據的過程當中到 Reducer 端,Reducer 會須要同時大量的記錄來進行反序例化,致使大量內存消耗和GC 的巨大負擔,形成系統緩慢甚至崩潰!
  2. 強制了在 Mapper 端必順要排序,這裏的前提是自己數據根本不須要排序的話;
  3. 若是須要在分片內也進行排序的話,此時須要進行 Mapper 端和 Reducer 端的兩次排序!
  4. 它要基於記錄自己進行排序,這就是 Sort-Based Shuffle 最致命的性能消耗;

 

Shuffle 中六大使人費解的問題 

  1. 第一大問題:何時進行 Shuffle 的 fetch 操做?Shuffle 是在一邊進行 Mapper 端 map 操做的同時,一邊進行 Reducer 端的 shuffle 和 reduce 操做嗎?
    錯誤的觀點是:Spark 是一邊 Mapper 一邊 Shuffle 的,而 Hadoop MapReduce 是先完成 Mapper 而後才進行 Reducer 的 Shuffle。正確的觀點是 Spark 必定是先完成 Mapper 端全部的 Tasks,纔會進行 Reducer 端的 Shuffle 過程。這是由於 Spark Job 是按照 Stage 線性執行的,前面的 Stage 必須執行完畢,纔可以執行後面 Reducer 端的 Shuffle 過程。

    • 更準確來講 Spark Shuffle 的過程是邊拉取數據邊進行 Aggregrate 操做的,其實與 Hadoop MapReduce 相比其優點確實是在速度上,可是也會致使一些算法很差實現,例如求平均值等,爲何呢?由於邊拉取數據邊進行 Aggregrate 這個過程當中,後面的Stage依賴於前面的Stage,Spark 是以 Stage 爲單位進行計算的,若是裏面的任務沒有計算完,後面你怎麼計算呢。但若是你是求和的話,它就會計算的特別快; 
    • Hadoop MapReduce 是把數據拉過來以後,而後進行計算,若是用 MapReduce 求平均值的話,它的算法就會很好實現。
  2. 第二大問題:Shuffle fetch 過來的數據到底放在了那裏?
    Spark 這種很靈活地使用並行度以及傾向於優先使用內存的計算模型,若是不正常地使用這些特徵的話會很容易致使 Spark 的應用程序出現 OOM 的狀況,Spark 在不一樣的版本 fetch 過來的數據放在哪裏是有不一樣的答案。抓過來的數據首先會放在 Reducer 端的內存緩存區中,Spark曾經有版本要求只能放在內存緩存中,其數據結構相似於 HashMap (AppendOnlyMap),顯然這個設計特別消耗內存和極易出現OOM,同時這也極大的限制了 Spark 集羣的規模,如今的實現都是內存 + 磁盤的方式 (數據結構類使用了 ExternalAppendOnlyMap),固然也能夠經過調如下參數來設置只能使用內存。
    spark.shuffle.spill=false
    

    若是設置了這個運行模式,在生產環境下建義對內存的數據做2份備份,由於在默認狀況下內存數據只有1份,它不像HDFS那樣,自然有3份備份。使用 ExternalAppendOnlyMap 的方式時,若是內存佔用率達到必定的臨界值後會首先嚐試在內存中擴大 ExternalAppendOnlyMap (內部有實現算法),若是不能擴容的話纔會 spill 到磁盤。

  3. 第三大問題:Shuffle 的數據在 Mapper 端如何存儲,在 Reducer 端如何知道數據具體在那裏的?在Spark的實現上每個Stage (裏面是 ShuffleMapTask) 中的 Task 在 Stage 的最後一個 RDD 上必定會註冊給 Driver 上的 MapOutputTrackerMaster,Mapper 經過和 MapOutputTrackerMaster 來彙報 ShuffleMapTask 具體輸出數據的位置 (具體的輸出文件及內容是和 Reducer 有關的),Reducer 是向 Driver 中的 MapOutputTrackerMaster 請求數據的元數據信息,而後和 Mapper 所在的 Executor 進行通訊。

  4. 第四大問題:竟竟從 HashShuffle 的角度來說,咱們在 Shuffle 的時候到底能夠產生多少 Mapper 端的中間文件?
    這裏有一個很重要的調優參數 (能夠在 TaskSchedulerImpl.scala 中找到此參數),該參數決定了 Spark 在運行時每一個 Task 所須要的 Core 的個數,默認狀況是1個,如今假設 spark.task.cpus=T。
    • 問題1、例如說有M個Mapper、R個Reducer 和 C個Core,那麼 HashShuffle 能夠產生多少個 Mapper 的中間文件?
      • HashShuffle 會產生 C x R 的小文件
      • Consolidated HashShuffle 有可能產生 C x R 個小文件。由於設置了 spark.task.cpus 的參數,那麼真實的答案是 (C / T) x R 個小文件
    • 問題2、例如在生產環境下有 E 個 Executors (例如100個),每一個 Executor上有 C 個Cores (例如10個),同時也有 R 個Reducer,那麼 HashShuffle 能夠產生多少個 Mapper 的中間文件?
      • HashShuffle 會產生實際 Task 的個數 x R 個的小文件
      • Consolidated HashShuffle 會產生 (E x (C / T)) x R 個的小文件
  5. 第五大問題:Spark中Sorted-Based Shuffle 數據結果默認是排序的嗎?Sorted-Based Shuffle 採用了什麼的排序算法?這個排序算法的好處是什麼?
    Spark Sorted-Based Shuffle 在 Mapper 端是排序的,包括 partition 的排序和每一個 partition 內部元素的排序!但在 Reducer 端是沒有進行排序,因此 Job 的結果默認不是排序的。Sorted-Based Shuffle 採用了 Tim-Sort 排序算法,好處是能夠極爲高效的使用 Mapper 端的排序成果全局排序。
  6. 第六大問題:Spark Tungsten-Sorted Shuffle 在 Mapper 中會對內部元素進行排序嗎?Tungsten-Sorted Shuffle不適用於什麼狀況?說出具體的緣由。
    Tungsten-Sorted Shuffle 在 Mapper 中不會對內部元素進行排序 (它只會對Partition進行排序),緣由是它本身管理的二進制序列化後的數據,問題來啦:數據是進入 Buffer 時或者是進入磁盤的時才進行排序呢?答案是數據的排序是發生在 Buffer 要滿了 spill 到磁盤時才進行排序的。因此 Tungsten-Sorted Shuffle 它對內部不會進行排序
    Tungsten-Sorted Shuffle 何時會退化成爲 Sorted-Based Shuffle?它是在程序有 Aggregrate 操做的時候;或者是 Mapper 端輸出的 partition 大於 16777216;或者是一條 Record 大於128M的時候,緣由也是由於它本身管理的二進制序列化後的數據以及數組指針管理範圍。

 

Sorted-Based Shuffle 的排序算法

人們會對 Sorted-Based Shuffle 有一種誤解,就是它產出的結果是有序的,這一節會講解 Sorted-Based Shuffle 是如何工做的並配合源碼看看它具體的實現,Sorted-Based Shuffle 的核心是藉助於 ExternalSorter 把每一個 ShuffleMapTask 的輸出排序到一個文件中 (FileSegmentGroup),爲了區分下一個階段 Reducer Task 不一樣的內容,它還須要有一個索引文件 (Index) 來告訴下游 Stage 的並行任務,那一部份是屬於你的。

上圖在 Reducer 端有4個Reducer Task,它會產生一組 File Group 和 一個索引文件,File Group 裏的 FileSegement 會進行排序,下游的 Task 能夠很容易跟據索引 (index) 定位到這個 Fie 中的那一部份 FileSegement 是屬於下游的,它至關於一個指針,下游的 Task 要向 Driver 去碓定文件在那裏,而後到了這個 File 文件所在的地方,實際上會跟 BlockManager 進行溝通,BlockManager 首先會讀一個 Index 文件,根據它的命名則進行解析,好比說下一個階段的第一個 Task,通常就是抓取第一個 Segment,這是一個指針定位的過程。

再次強調 Sort-Based Shuffle 最大的意義是減小臨時文件的輸出數量,且只會產生兩個文件:一個是包含不一樣內容劃分紅不一樣 FileSegment 構成的單一文件 File,另一個是索引文件 Index。上圖在 Sort-Based Shuffle 的介紹中看見了一個 Sort and Spill 的過程 (它是 Spill 到磁盤的時候再進行排序的),如今咱們從源碼的角度去看看到底它這個排序其實是在幹什麼的。

Sort-Based Shuffle 排序源碼

  1. 首先從 ShuffleMapTask 的 runTask方法中找出當前的 ShuffleManager,而後找出 writer 的方法,而後把 shuffleHandle、partitionId 以及內容做爲參數傳進來,並調用它的 getWriter 方法。
    [下圖是一個 ShuffleMapTask.scala 中 writer 方法]

  2. 找出 ShuffleManager,在Spark2.X版本中只有 SortShuffleManager,已經沒有了 Hash-Based Shuffle Manager 了
    [下圖是一個 SparkEnv.scala 中 shuffleManager 成員]

  3. 定義 ShuffleManager 的 Handler 方式,經過調用 registerShuffle 方法來定義要用那種排序策略。
    [下圖是一個 Dependency.scala 中 shuffleHandle 成員]

    [下圖是一個 SortShuffleManager.scala 中 registerShuffle 方法]

  4. 在這裏看到有三種 ShuffleWriter 的實現方式,第一種是 UnsafeShuffleWriter,第二種是 BypassMergeSortShuffleWriter; 第三種是 SortShuffleWriter。如今咱們深刻看看 SortShuffleWriter 的具體實現方式。
    [下圖是一個 SortShuffleManager.scala 中 getWriter 方法]

  5. 而後建立了 SortShuffleWriter 的實例對象,在這裏你看見它建立了一個 ExternalSorter 的實例,write 方法會首先判斷一下會不會進行 combine 操做,其實就是本地需不須要有 aggregrator 的問題,建立了 sorter 以後,它會先對咱們的 sorted 進行 insertAll 的操做,它會根據 ShuffleId 和 StageId 去得到 output,而後它經過 sorter 中的 writePartitionedFie 方法來得到 partitionLengths。最後是一個 mapStatus,它是咱們運製程序最後返回的一個數據結構,它會告訴你數據在那裏。
    [下圖是一個 SortShuffleManager.scala 中 write 方法]

  6. 這種基於Sort的Shuffle實現機制中引入了外部排序器(ExternalSorter),ExternalSorter繼承了 spillable,由於內存攸用在達到必定值時,會spill到磁盤中,這樣的設計能夠減小內存的開銷。在 External Sorter 中會定義 fileBufferSize 的大小,默應是 32k x 1024 的大小。
    [下圖是一個 ExternalSorter.scala 中 fileBufferSize 大小]

  7. 經過查看外部排序器(ExternalSorter)的insertAll方法,而後 sorter 調用了 insertAll( ) 的方法。
    [下圖是一個 ExternalSorter.scala 中 insertAll 方法]

  8. 對於外部排序器(External Sorter),除了 insertAll 方法外,它的 writePartitionedFile 方法也很是重要,其中BlockId是數據塊的邏輯位置,File 參數則是對應邏輯位置的物理存儲位置,這兩個參數值獲取方法和使用 BypassMergeSOrtShuffleHandle及其對應的 ShuffleWriter 是同樣的。

  9. xxxxx

  10. xxx

  11. 建立了一個 Sorter 的實例

  12. 代碼追跡到這裏能夠看見它使用了什麼排序的方法

  

總結

Sorted-Based Shuffle 的誕生和出現意味著 Spark 從只能處理〝中小規模數據的數據處理平臺"重新定位爲可以處理〝大規模數據的數據處理平臺",更進一步鞏故它在數據處理領域的龍頭地位。它最大的優化就是:減小了由於 HashShuffle 機制不管是原生 HashShuffle、仍是 Consolidated Shuffle 在 Mapper 端所產生的海量小文件 (這是應用程序運行時的一箇中間過程),中間文件數量從 M x R 個數 變成 2M 的小文件。數據量愈大,這個所優化帶來的效果便越來越強烈,這是優化的第一步。

雖然 Sort-Based Shuffle 已經大大提高了序程運行時的效率,但若是 Mapper 端並行度的數據分片過多的話,也會致使大量內存消耗和GC的巨大負擔,形成系統緩慢甚至崩潰。基於這觀點 Spark 再一次突破本身,推出了 Tungsten-Based Shuffle,提高了在Mapper端進行排序的速度,充分利用了的 CPU 等資源。(博客後績會更新 Tungsten 的資料)

Spark 正在它技術發展的黃金十年,當你把一切事情都以藝術的角度去看待的時候,你會發如今學習 Spark 的過程當中會有一種莫明奇妙的幸福感~

 

參考資料 

資料來源來至 DT大數據夢工廠 大數據商業案例以及性能調優 

第26課:電光石火間從根本上理解Spark中Sort-Based Shuffle產生的內幕及其tungsten-sort 背景解密

第27課:完全解密Spark Shuffle使人費解的6大經典問題(課程內容全球獨家)

第28課:完全解密Spark Sort-Based Shuffle排序具體實現內幕和源碼詳解

TimSort 排序原理介紹

想了解 JVM 在 Spark 中是如何分配內存空間能夠參考:第四章 : Spark Shuffle 中 JVM 內存使用及配置內幕詳情

 

Spark源碼圖片取自於 Spark 2.1.0版本

相關文章
相關標籤/搜索