在前兩篇文章 spark shuffle的寫操做之準備工做 中引出了spark shuffle的三種實現,spark shuffle寫操做三部曲之BypassMergeSortShuffleWriter 講述了BypassMergeSortShuffleWriter 用於shuffle寫操做的具體細節,實現相對比較樸素,值得學習。本篇文章,主要剖析了 UnsafeShuffleWriter用做寫shuffle數據的具體細節,它在 BypassMergeSortShuffleWriter 的思路上更進一步,建議先看 spark shuffle寫操做三部曲之BypassMergeSortShuffleWriter,再來看本篇文章。下面先來看UnsafeShuffleWriter的主要依賴實現類 -- ShuffleExternalSorter。html
在看本小節以前,建議先參照 spark 源碼分析之二十二-- Task的內存管理 對任務的內存管理作一下詳細的瞭解,由於ShuffleExternalSorter使用了內存的排序。任務在作大數據量的內存操做時,內存是須要管理的。算法
在正式剖析以前,先剖析其依賴類。apache
它記錄了block的一些元數據信息。數組
其類結構以下:app
其中,blockId就是shuffle的臨時的blockId,file就是shuffle合併後的文件,partitionLengths表示每個分區的大小。源碼分析
它支持堆內內存和堆外內存,它有四個屬性:post
數組裏的一個元素的地址等於:學習
if (baseObj == null) ? baseOffset(is real os address) + (length - 1) * WIDTH : address(baseObj) + baseOffset(is relative address 0) + (length - 1) * WIDTH大數據
全部元素設爲0:優化
設置元素
其底層使用unsafe類來設置值
獲取元素
其底層使用unsafe類來獲取值
全稱:org.apache.spark.shuffle.sort.PackedRecordPointer
成員常量:
壓縮記錄指針和分區:
獲取記錄的地址:
獲取記錄的分區:
思路也很簡單,就是根據分區來排序,即相同分區的數據被排到了一塊兒。
其定義以下:
其思路很簡單,hasNext跟JDK標準庫的實現一致,多了一個loadNext,每次都須要把數組中下一個位置的元素放到packetRecordPointer中,而後從packedRecordPointer中取出數據的地址和分區信息。
獲取迭代器的源碼以下:
其中 useRadixSort表示是否使用基數排序,默認是使用基數排序的,由參數 spark.shuffle.sort.useRadixSort 配置。
若是不使用基數排序,則會使用Spark的Sorter排序,sorter底層實現是TimSort,TimSort是優化以後的MergeSort。
總之,ShuffleSorterIterator中的數據已是有序的了,只須要迭代式取出便可。
思路很簡單,插入的數據就是記錄的地址和分區數據,這兩種數據被PackedRecordPointer壓縮編碼以後被存入到數組中。
其繼承關係以下:
即它是MemoryConsumer的子類,其實現了spill方法。
其成員變量以下:
DISK_WRITE_BUFFER_SIZE:寫到磁盤前的緩衝區大小爲1M
numPartitions:reduce的分區數
taskMemoryManager:負責任務的內存管理。看 spark 源碼分析之二十二-- Task的內存管理 作進一步瞭解。
blockManager:Spark存儲系統的核心類。看 spark 源碼分析之十八 -- Spark存儲體系剖析 作進一步瞭解。
TaskContext:任務執行的上下文對象。
numElementsForSpillThreshold:ShuffleInMemorySorter 數據溢出前的元素閥值。
fileBufferSizeBytes:DiskBlockObjectWriter溢出前的buffer大小。
diskWriteBufferSize:溢出到磁盤前的buffer大小。
allocatedPages:記錄分配的內存頁。
spills:記錄溢出信息
peakMemoryUsedBytes:內存使用峯值。
inMemSorter:內存排序器
currentPage:當前使用內存頁
pageCursor:內存頁遊標,標誌在內存頁的位置。
其構造方法以下:
fileBufferSizeBytes:經過參數 spark.shuffle.file.buffer 來配置,默認爲 32k
numElementsForSpillThreshold:經過參數spark.shuffle.spill.numElementsForceSpillThreshold來配置,默認是整數的最大值。
diskWriteBufferSize:經過 spark.shuffle.spill.diskWriteBufferSize 來配置,默認爲 1M
主要方法以下:
咱們主要分析其主要方法。
其源碼以下:
思路很簡單,調用writeSortedFile將數據寫入到文件中,釋放內存,重置inMemSorter。
freeMemory方法以下:
writeSortedFile 源碼以下:
圖中,我大體把步驟劃分爲四部分。總體思路:遍歷sorter中的全部分區數據,最終同一分區的數據被寫入到同一個FileSegment中,這些FileSegment最終又構成了一個合併的文件,其中FileSegment的大小被存放在SpillInfo中,最後放到了spills集合中。重點說一下第三步的獲取地址信息,若是是堆內地址,recordPage就是base對象,recordOffsetInPage就是記錄相對於base對象的偏移量,若是是堆外地址,recordPage爲null,由於堆外地址沒有base對象,其baseOffset就是其在操做系統內存中的絕對地址,recordOffsetInPage = offsetInPage + baseOffset,具體能夠在 spark 源碼分析之二十二-- Task的內存管理 中看TaskMemoryManager的實現細節。
其源碼以下:
注意:若是是堆內內存,baseObject就是分配的數組,baseOffset就是數組的下標索引。若是是堆外內存,baseObject爲null,baseOffset就是操做系統內存中的地址。
在地址編碼的時候,若是是堆內內存,頁內的偏移量就是baseObject,若是是堆外內存,頁內偏移量爲: 真實偏移量 - baseOffset。
它在插入數據以前,offset作了字節對齊,若是系統支持對齊,則向後錯4位,不然向後錯8位。這跟溢出操做裏取數據是對應的,便可以跟上文中 writeSortedFile 方法對比看。
org.apache.spark.shuffle.sort.ShuffleExternalSorter#growPointerArrayIfNecessary源碼以下:
解釋:首先hasSpaceForAnotherRecord會比較數組中下一個寫的索引位置跟數組的最大容量比較,若是索引位置大於最大容量,那麼就沒有空間來存放下一個記錄了,則須要把擴容,used是指的數組如今使用的大小,擴容倍數爲源數組的一倍。
org.apache.spark.shuffle.sort.ShuffleExternalSorter#acquireNewPageIfNecessary 源碼以下:
解釋:分配內存頁的條件是當前頁的遊標 + 須要的頁大小 大於當前頁的最大容量,則須要從新分配一個內存頁。
其源碼以下:
思路:執行最後一次溢出,而後將數據溢出信息返回。
思路:釋放內存排序器的內存,刪除溢出的臨時文件。
源碼以下:
思路:當前使用內存大於最大峯值則更新最大峯值,不然直接返回。
這個sorter內部集成的內存sorter會把同一分區的數據排序到一塊兒,數據溢出時,相同分區的數據會彙集到溢出文件的一個segment中。
先上源碼,後解釋:
思路:流程很簡單,將全部的數據逐一遍歷放入sorter,而後將sorter關閉,獲取輸出文件,結束。
下面咱們具體來看每一步是具體怎麼實現的:
在org.apache.spark.shuffle.sort.UnsafeShuffleWriter的構造方法源碼以下:
簡單作一下說明:
DEFAULT_INITIAL_SORT_BUFFER_SIZE爲 4096
DEFAULT_INITIAL_SER_BUFFER_SIZE 大小爲 1M
reduce 分區數量最大爲 16777216
SHUFFLE_FILE_BUFFER_SIZE默認爲32k,大小由參數 spark.shuffle.file.buffer 配置。
SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE 默認大小爲32k,大小由參數 spark.shuffle.unsafe.file.output.buffer 配置。
其open方法以下:
這個方法裏涉及了三個類:ShuffleExternalSorter,MyByteArrayOutputStream以及SerializationStream三個類。ShuffleExternalSorter在上文已經剖析過了,MyByteArrayOutputStream是一個ByteArrayOutputStream子類負責想堆內內存中寫數據,SerializationStream是一個序列化以後的流,數據最終會被寫入到serBuffer內存流中,調用其flush方法後,其內部的buf就是寫入的數據,以下:
核心方法write源碼以下:
其主要有兩步,一步是遍歷每一條記錄,將數據寫入到sorter中;第二步是關閉sorter,並將數據寫入到一個shuffle 文件中同時更新shuffle索引信息;最後清除shuffle過程當中sorter使用的資源。
先來看第一步:數據寫入到sorter中。
記錄中的鍵值被序列化到serBuffer的buf字節數組中,而後被寫入到 sorter(ShuffleExternalSorter)中。在sorter中序列化數據被寫入到內存中(內存不足會溢出到磁盤中),其地址信息被寫入到 ShuffleInMemorySorter 中,具體能夠看上文介紹。
一步是遍歷每一條記錄,將數據寫入到sorter中後會調用sorter的closeAndGetSpills方法執行最後一次spill操做,而後獲取到整個shuffle過程當中全部的SpillInfo信息。而後使用ShuffleBlockResolver獲取到shuffle的blockId對應的shuffle文件,最終調用mergeSpills 方法合併全部的溢出文件到最終的shuffle文件,而後更新shuffle索引文件,設置Shuffle結果的MapStatus信息,結束。
org.apache.spark.shuffle.sort.UnsafeShuffleWriter#closeAndWriteOutput 源碼以下:
其關鍵方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 源碼以下:
若是溢出文件爲0,直接返回全是0的分區數組。
若是溢出文件爲1,文件重命名後返回只有一個元素的分區數組。
若是溢出文件多於1個則,多個溢出文件開始merge。
首先先看一下五個變量:
encryptionEnabled:是否啓用加密,默認爲false,經過 spark.io.encryption.enabled 參數來設置。
transferToEnabled:是否可使用nio的transferTo傳輸,默認爲true,經過 spark.file.transferTo 參數來設置。
compressionEnabled:是否使用壓縮,默認爲true,經過 spark.shuffle.compress 參數來設置。
compressionCodec:默認壓縮類,默認爲LZ4CompressionCodec,經過 spark.io.compression.codec 參數來設置。
fastMergeEnabled:是否啓用fast merge,默認爲true,經過 spark.shuffle.unsafe.fastMergeEnabled 參數來設置。
fastMergeIsSupported:是否支持 fast merge,若是不使用壓縮或者是壓縮算法是 org.apache.spark.io.SnappyCompressionCodec、org.apache.spark.io.LZFCompressionCodec、org.apache.spark.io.LZ4CompressionCodec、org.apache.spark.io.ZStdCompressionCodec這四種支持鏈接的壓縮算法中的一種都是可使用 fast merge的。
三種merge多個文件的方式:transfered-based fast merge、fileStream-based fast merge以及slow merge三種方式。
使用transfered-based fast merge條件:使用 fast merge而且壓縮算法支持fast merge,而且啓用了nio的transferTo傳輸且不啓用文件加密。
使用fileStream-based fast merge條件:使用 fast merge而且壓縮算法支持fast merge,而且未啓用nio的transferTo傳輸或啓用了文件加密。
使用slow merge條件:未使用 fast merge或壓縮算法不支持fast merge。
下面咱們來看三種合併溢出的方式。
其核心方法org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithTransferTo 源碼以下:
其依賴方法 org.apache.spark.util.Utils#copyFileStreamNIO 以下:
很簡單,底層依賴於Java的NIO的transferTo方法實現。
其核心方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithFileStream 源碼以下,這裏不傳入任何壓縮類,見 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 源碼。
其其核心方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithFileStream 源碼跟 fileStream-based fast merge 裏的同樣,不作過多解釋,只不過這裏多傳入了一個壓縮類,見 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 源碼。
這部分更詳細的能夠看 org.apache.spark.shuffle.IndexShuffleBlockResolver#writeIndexFileAndCommit 源碼。在上篇文章 spark shuffle寫操做三部曲之BypassMergeSortShuffleWriter 中使用BypassMergeSortShuffleWriter寫數據已經剖析過,再也不剖析。
ShuffleExternalSorter將數據不斷溢出到溢出小文件中,溢出文件內的數據是按分區規則排序的,分區內的數據是亂序的。
多個分區的數據同時溢出到一個溢出文件,最後使用三種歸併方式中的一種將多個溢出文件歸併到一個文件,分區內的數據是亂序的。最終數據的格式跟第一種shuffle寫操做的結果是同樣的,即有分區的shuffle數據文件和記錄分區大小的shuffle索引文件。