假如咱們有個 spark job 依賴關係以下sql
咱們抽象出來其中的rdd和依賴關係:數組
E <-------n------, C <--n---D---n-----F--s---, A <-------s------ B <--n----`-- G緩存
對應的劃分後的RDD結構爲:數據結構
最終咱們獲得了整個執行過程:併發
中間就涉及到shuffle 過程,前一個stage 的 ShuffleMapTask 進行 shuffle write, 把數據存儲在 blockManager 上面, 而且把數據位置元信息上報到 driver 的 mapOutTrack 組件中, 下一個 stage 根據數據位置元信息, 進行 shuffle read, 拉取上個stage 的輸出數據。app
這篇文章講述的就是其中的 shuffle write 過程。ide
Spark 0.8及之前 Hash Based Shuffle性能
Spark 0.8.1 爲Hash Based Shuffle引入File Consolidation機制優化
Spark 0.9 引入ExternalAppendOnlyMapui
Spark 1.1 引入Sort Based Shuffle,但默認仍爲Hash Based Shuffle
Spark 1.2 默認的Shuffle方式改成Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort併入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出歷史舞臺
總結一下, 就是最開始的時候使用的是 Hash Based Shuffle, 這時候每個Mapper會根據Reducer的數量建立出相應的bucket,bucket的數量是M R ,其中M是Map的個數,R是Reduce的個數。這樣會產生大量的小文件,對文件系統壓力很大,並且也不利於IO吞吐量。後面忍不了了就作了優化,把在同一core上運行的多個Mapper 輸出的合併到同一個文件,這樣文件數目就變成了 cores R 個了,
舉個例子:
原本是這樣的,3個 map task, 3個 reducer, 會產生 9個小文件,
是否是很恐怖, 後面改造以後
4個map task, 4個reducer, 若是不使用 Consolidation機制, 會產生 16個小文件。
可是可是如今這 4個 map task 分兩批運行在 2個core上, 這樣只會產生 8個小文件
在同一個 core 上前後運行的兩個 map task的輸出, 對應同一個文件的不一樣的 segment上, 稱爲一個 FileSegment, 造成一個 ShuffleBlockFile,
後面就引入了 Sort Based Shuffle, map端的任務會按照Partition id以及key對記錄進行排序。同時將所有結果寫到一個數據文件中,同時生成一個索引文件, 再後面就就引入了 Tungsten-Sort Based Shuffle, 這個是直接使用堆外內存和新的內存管理模型,節省了內存空間和大量的gc, 是爲了提高性能。
如今2.1 分爲三種writer, 分爲 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWriter,顧名思義,你們應該能夠對應上,咱們本着過期不講的原則, 本文中只描述這三種 writer 的實現細節, Hash Based Shuffle 已經退出歷史舞臺了,我就不講了。
上面是使用哪一種 writer 的判斷依據, 是否開啓 mapSideCombine 這個判斷,是由於有些算子會在 map 端先進行一次 combine, 減小傳輸數據。 由於 BypassMergeSortShuffleWriter 會臨時輸出Reducer個(分區數目)小文件,因此分區數必需要小於一個閥值,默認是小於200。
UnsafeShuffleWriter須要Serializer支持relocation,Serializer支持relocation:原始數據首先被序列化處理,而且不再須要反序列,在其對應的元數據被排序後,須要Serializer支持relocation,在指定位置讀取對應數據。
BypassMergeSortShuffleWriter和Hash Shuffle中的HashShuffleWriter實現基本一致, 惟一的區別在於,map端的多個輸出文件會被彙總爲一個文件。 全部分區的數據會合併爲同一個文件,會生成一個索引文件,是爲了索引到每一個分區的起始地址,能夠隨機 access 某個partition的全部數據。
可是須要注意的是,這種方式不宜有太多分區,由於過程當中會併發打開全部分區對應的臨時文件,會對文件系統形成很大的壓力。
具體實現就是給每一個分區分配一個臨時文件,對每一個 record的key 使用分區器(模式是hash,若是用戶自定義就使用自定義的分區器)找到對應分區的輸出文件句柄,直接寫入文件,沒有在內存中使用 buffer。 最後copyStream方法把全部的臨時分區文件拷貝到最終的輸出文件中,而且記錄每一個分區的文件起始寫入位置,把這些位置數據寫入索引文件中。
咱們能夠先考慮一個問題,假如我有 100億條數據,可是咱們的內存只有1M,可是咱們磁盤很大, 咱們如今要對這100億條數據進行排序,是無法把全部的數據一次性的load進行內存進行排序的,這就涉及到一個外部排序的問題,咱們的1M內存只能裝進1億條數據,每次都只能對這 1億條數據進行排序,排好序後輸出到磁盤,總共輸出100個文件,最後怎麼把這100個文件進行merge成一個全局有序的大文件。咱們能夠每一個文件(有序的)都取一部分頭部數據最爲一個 buffer, 而且把這 100個 buffer放在一個堆裏面,進行堆排序,比較方式就是對全部堆元素(buffer)的head元素進行比較大小, 而後不斷的把每一個堆頂的 buffer 的head 元素 pop 出來輸出到最終文件中, 而後繼續堆排序,繼續輸出。若是哪一個buffer 空了,就去對應的文件中繼續補充一部分數據。最終就獲得一個全局有序的大文件。
若是你能想通我上面舉的例子,就差很少搞清楚sortshufflewirter的實現原理了,由於解決的是同一個問題。
SortShuffleWriter 中的處理步驟就是
使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在內存中進行排序, 排序的 K 是(partitionId, hash(key)) 這樣一個元組。
若是超過內存 limit, 我 spill 到一個文件中,這個文件中元素也是有序的,首先是按照 partitionId的排序,若是 partitionId 相同, 再根據 hash(key)進行比較排序
若是須要輸出全局有序的文件的時候,就須要對以前全部的輸出文件 和 當前內存中的數據結構中的數據進行 merge sort, 進行全局排序
和咱們開始提的那個問題基本相似,不一樣的地方在於,須要對 Key 相同的元素進行 aggregation, 就是使用定義的 func 進行聚合, 好比你的算子是 reduceByKey(+), 這個func 就是加法運算, 若是兩個key 相同, 就會先找到全部相同的key 進行 reduce(+) 操做,算出一個總結果 Result,而後輸出數據(K,Result)元素。
SortShuffleWriter 中使用 ExternalSorter 來對內存中的數據進行排序,ExternalSorter內部維護了兩個集合PartitionedAppendOnlyMap、PartitionedPairBuffer, 二者都是使用了 hash table 數據結構, 若是須要進行 aggregation, 就使用 PartitionedAppendOnlyMap(支持 lookup 某個Key,若是以前存儲過相同key的K-V 元素,就須要進行 aggregation,而後再存入aggregation後的 K-V), 不然使用 PartitionedPairBuffer(只進行添K-V 元素),
咱們能夠看上圖, PartitionedAppendOnlyMap 中的 K 是(PatitionId, K)的元組, 這樣就是先按照partitionId進行排序,若是 partitionId 相同,再按照 hash(key)再進行排序。
首先看下 AppendOnlyMap, 這個很簡單就是個 hash table,其中的 K 是(PatitionId, hash(Key))的元組, 當要 put(K, V) 時,先 hash(K) 找存放位置,若是存放位置已經被佔用,就使用 Quadratic probing 探測方法來找下一個空閒位置。對於圖中的 K6 來講,第三次查找找到 K4 後面的空閒位置,放進去便可。get(K6) 的時候相似,找三次找到 K6,取出緊挨着的 V6,與先來的 value 作 func,結果從新放到 V6 的位置。
下面看下 ExternalAppendOnlyMap 結構, 這個就是內存中裝不下全部元素,就涉及到外部排序,
上圖中能夠看到整個原理圖,邏輯也很簡單, 內存不夠的時候,先spill了四次,輸出到文件中的元素都是有序的,讀取的時候都是按序讀取,最後跟內存剩餘的數據進行 全局merge。
merge 過程就是 每一個文件讀取部分數據(StreamBuffer)放到 mergeHeap 裏面, 當前內存中的 PartitionedAppendOnlyMap 也進行 sort,造成一個 sortedMap 放在 mergeHeap 裏面, 這個 heap 是一個 優先隊列 PriorityQueue, 而且自定義了排序方式,就是取出堆元素StreamBuffer的head元素進行比較大小,
val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] { // Use the reverse of comparator.compare because PriorityQueue dequeues the max
override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1) })
這樣的話,每次從堆頂的 StreamBuffer 中 pop 出的 head 元素就是全局最小的元素(記住是按照(partitionId,hash(Key))排序的), 若是須要 aggregation, 就把這些key 相同的元素放在一個一個 mergeBuffers 中, 第一個被放入 mergeBuffers 的 StreamBuffer 被稱爲 minBuffer,那麼 minKey 就是 minBuffer 中第一個 record 的 key。當 merge-combine 的時候,與 minKey 有相同的Key的records 被 aggregate 一塊兒,而後輸出。
若是不須要 aggregation, 那就簡單了, 直接把 堆頂的 StreamBuffer 中 pop 出的 head 元素 返回就行了。
最終讀取的時候,從整個 全局 merge 後的讀取迭代器中讀取的數據,就是按照 partitionId 從小到大排序的數據, 讀取過程當中使用再按照 分區分段, 而且記錄每一個分區的文件起始寫入位置,把這些位置數據寫入索引文件中。
UnsafeShuffleWriter 裏面維護着一個 ShuffleExternalSorter, 用來作外部排序, 外部排序就是要先部分排序數據並把數據輸出到磁盤,而後最後再進行merge 全局排序, 既然這裏也是外部排序,跟 SortShuffleWriter 有什麼區別呢, 這裏只根據 record 的 partition id 先在內存 ShuffleInMemorySorter 中進行排序, 排好序的數據通過序列化壓縮輸出到換一個臨時文件的一段,而且記錄每一個分區段的seek位置,方便後續能夠單獨讀取每一個分區的數據,讀取流通過解壓反序列化,就能夠正常讀取了。
整個過程就是不斷地在 ShuffleInMemorySorter 插入數據,若是沒有內存就申請內存,若是申請不到內存就 spill 到文件中,最終合併成一個 依據 partition id 全局有序 的大文件。
SortShuffleWriter 和 UnsafeShuffleWriter 對比
區別 | UnsafeShuffleWriter | SortShuffleWriter |
---|---|---|
排序方式 | 最終只是 partition 級別的排序 | 先 partition 排序,相同分區 key有序 |
aggregation | 沒有反序列化,沒有aggregation | 支持 aggregation |
沒有指定 aggregation 或者key排序, 由於 key 沒有編碼到排序指針中,因此只有 partition 級別的排序
原始數據首先被序列化處理,而且不再須要反序列,在其對應的元數據被排序後,須要Serializer支持relocation,在指定位置讀取對應數據。 KryoSerializer 和 spark sql 自定義的序列化器 支持這個特性。
分區數目必須小於 16777216 ,由於 partition number 使用24bit 表示的。
由於每一個分區使用 27 位來表示 record offset, 因此一個 record 不能大於這個值。
咱們不妨看向對記錄排序的例子。一個標準的排序步驟須要爲記錄儲存一組的指針,並使用quicksort 來互換指針直到全部記錄被排序。基於順序掃描的特性,排序一般能得到一個不錯的緩存命中率。然而,排序一組指針的緩存命中率卻很低,由於每一個比較運算都須要對兩個指針解引用,而這兩個指針對應的倒是內存中兩個隨機位置的數據。
那麼,咱們該如何提升排序中的緩存本地性?其中一個方法就是經過指針順序地儲存每一個記錄的sort key。咱們使用 8個字節(partition id 做爲 key, 和數據真正的指針)來表明一條數據,放在一個 sort array 中,每次對比排序的操做只須要線性的查找每對pointer-key,從而不會產生任何的隨機掃描。 這樣若是對全部記錄的 partion 進行排序的時候, 直接對這個數據裏面的進行排序,就行了,極大的提升了性能。
固然 這裏對數據排序, UnsafeShuffleWriter 使用的是 RadixSort, 這個很簡單,我就不介紹了, 不一樣清楚的能夠參考下 這個文檔 http://bubkoo.com/2014/01/15/sort-algorithm/radix-sort/
上面是申請內存的過程,申請到的內存做爲 一個 page 記錄在 allocatedPages 中,spill的時候進行 free 這些內存, 有一個當前使用的 currentPage, 若是不夠用了,就繼續去申請。
你們能夠看下上面的圖, 每次插入一條 record 到page 中, 就把 partionId + pageNumber + offset in page, 做爲一個元素插入到 LongArray 中, 最終讀取數據的時候, 對LongArray 進行 RadixSort 排序, 排序後依次根據指針元素索引原始數據,就作到 partition 級別有序了。
spill 文件的時候, UnsafeShuffleInMemorySorter 生成一個數據迭代器, 會返回一個根據partition id 排過序迭代器,該迭代器粒度每一個元素就是一個指針,對應 PackedRecordPointer 這個數據結構, 這個 PackedRecordPointer 定義的數據結構就是 [24 bit partition number][13 bit memory page number][27 bit offset in page]
而後到根據該指針能夠拿到真實的record, 在一開始進入UnsafeShuffleExternalSorter 就已經被序列化了,因此在這裏就純粹變成寫字節數組了。一個文件裏不一樣的partiton的數據用fileSegment來表示,對應的信息存在 SpillInfo 數據結構中。
每一個spill 文件的分區索引都保存在 SpillInfo 數據結構中, Task結束前,咱們要作一次mergeSpills操做, 若是 fastMergeEnabled 而且壓縮方式支持 concatenation of compressed data, 就能夠直接 簡單地鏈接相同分區的壓縮數據到一塊兒,並且不用解壓反序列化。使用一種高效的數據拷貝技術,好比 NIO’s transferTo 就能夠避免解壓和 buffer 拷貝。