spark shuffle寫操做三部曲之UnsafeShuffleWriter

前言

在前兩篇文章 spark shuffle的寫操做之準備工做 中引出了spark shuffle的三種實現,spark shuffle寫操做三部曲之BypassMergeSortShuffleWriter 講述了BypassMergeSortShuffleWriter 用於shuffle寫操做的具體細節,實現相對比較樸素,值得學習。本篇文章,主要剖析了 UnsafeShuffleWriter用做寫shuffle數據的具體細節,它在 BypassMergeSortShuffleWriter 的思路上更進一步,建議先看 spark shuffle寫操做三部曲之BypassMergeSortShuffleWriter,再來看本篇文章。下面先來看UnsafeShuffleWriter的主要依賴實現類 -- ShuffleExternalSorter。html

sort-based shuffle的外部sorter -- ShuffleExternalSorter

在看本小節以前,建議先參照 spark 源碼分析之二十二-- Task的內存管理 對任務的內存管理作一下詳細的瞭解,由於ShuffleExternalSorter使用了內存的排序。任務在作大數據量的內存操做時,內存是須要管理的。算法

在正式剖析以前,先剖析其依賴類。apache

依賴之記錄block元信息-- SpillInfo

它記錄了block的一些元數據信息。數組

其類結構以下:app

其中,blockId就是shuffle的臨時的blockId,file就是shuffle合併後的文件,partitionLengths表示每個分區的大小。源碼分析

依賴之分區排序器 -- ShuffleInMemorySorter

能夠在任何內存使用的數組--LongArray

它支持堆內內存和堆外內存,它有四個屬性:post

數組裏的一個元素的地址等於:學習

if (baseObj  == null) ? baseOffset(is real os address) + (length - 1) * WIDTH : address(baseObj) +  baseOffset(is relative address 0) + (length - 1) * WIDTH大數據

全部元素設爲0:優化

設置元素

其底層使用unsafe類來設置值

獲取元素

其底層使用unsafe類來獲取值

記錄指針地址壓縮器 -- PackedRecordPointer

全稱:org.apache.spark.shuffle.sort.PackedRecordPointer

成員常量:

壓縮記錄指針和分區:

 

獲取記錄的地址:

獲取記錄的分區:

自定義比較器--SortComparator

思路也很簡單,就是根據分區來排序,即相同分區的數據被排到了一塊兒。

遍歷自定義數組的迭代器 -- ShuffleSorterIterator

其定義以下:

其思路很簡單,hasNext跟JDK標準庫的實現一致,多了一個loadNext,每次都須要把數組中下一個位置的元素放到packetRecordPointer中,而後從packedRecordPointer中取出數據的地址和分區信息。

獲取迭代器

獲取迭代器的源碼以下:

其中 useRadixSort表示是否使用基數排序,默認是使用基數排序的,由參數 spark.shuffle.sort.useRadixSort 配置。

若是不使用基數排序,則會使用Spark的Sorter排序,sorter底層實現是TimSort,TimSort是優化以後的MergeSort。

總之,ShuffleSorterIterator中的數據已是有序的了,只須要迭代式取出便可。

插入數據到自定義的數組中

思路很簡單,插入的數據就是記錄的地址和分區數據,這兩種數據被PackedRecordPointer壓縮編碼以後被存入到數組中。

 

繼承關係

其繼承關係以下:

即它是MemoryConsumer的子類,其實現了spill方法。

成員變量

其成員變量以下:

DISK_WRITE_BUFFER_SIZE:寫到磁盤前的緩衝區大小爲1M

numPartitions:reduce的分區數

taskMemoryManager:負責任務的內存管理。看 spark 源碼分析之二十二-- Task的內存管理 作進一步瞭解。

blockManager:Spark存儲系統的核心類。看 spark 源碼分析之十八 -- Spark存儲體系剖析 作進一步瞭解。

TaskContext:任務執行的上下文對象。

numElementsForSpillThreshold:ShuffleInMemorySorter 數據溢出前的元素閥值。

fileBufferSizeBytes:DiskBlockObjectWriter溢出前的buffer大小。

diskWriteBufferSize:溢出到磁盤前的buffer大小。

allocatedPages:記錄分配的內存頁。

spills:記錄溢出信息

peakMemoryUsedBytes:內存使用峯值。

inMemSorter:內存排序器

currentPage:當前使用內存頁

pageCursor:內存頁遊標,標誌在內存頁的位置。

構造方法

其構造方法以下:

fileBufferSizeBytes:經過參數 spark.shuffle.file.buffer 來配置,默認爲 32k

numElementsForSpillThreshold:經過參數spark.shuffle.spill.numElementsForceSpillThreshold來配置,默認是整數的最大值。

diskWriteBufferSize:經過 spark.shuffle.spill.diskWriteBufferSize 來配置,默認爲 1M

核心方法

主要方法以下:

咱們主要分析其主要方法。

溢出操做

其源碼以下:

思路很簡單,調用writeSortedFile將數據寫入到文件中,釋放內存,重置inMemSorter。

freeMemory方法以下:

 

writeSortedFile 源碼以下:

圖中,我大體把步驟劃分爲四部分。總體思路:遍歷sorter中的全部分區數據,最終同一分區的數據被寫入到同一個FileSegment中,這些FileSegment最終又構成了一個合併的文件,其中FileSegment的大小被存放在SpillInfo中,最後放到了spills集合中。重點說一下第三步的獲取地址信息,若是是堆內地址,recordPage就是base對象,recordOffsetInPage就是記錄相對於base對象的偏移量,若是是堆外地址,recordPage爲null,由於堆外地址沒有base對象,其baseOffset就是其在操做系統內存中的絕對地址,recordOffsetInPage = offsetInPage + baseOffset,具體能夠在 spark 源碼分析之二十二-- Task的內存管理 中看TaskMemoryManager的實現細節。

插入記錄

其源碼以下:

注意:若是是堆內內存,baseObject就是分配的數組,baseOffset就是數組的下標索引。若是是堆外內存,baseObject爲null,baseOffset就是操做系統內存中的地址。

在地址編碼的時候,若是是堆內內存,頁內的偏移量就是baseObject,若是是堆外內存,頁內偏移量爲: 真實偏移量 - baseOffset。

它在插入數據以前,offset作了字節對齊,若是系統支持對齊,則向後錯4位,不然向後錯8位。這跟溢出操做裏取數據是對應的,便可以跟上文中 writeSortedFile 方法對比看。

org.apache.spark.shuffle.sort.ShuffleExternalSorter#growPointerArrayIfNecessary源碼以下:

解釋:首先hasSpaceForAnotherRecord會比較數組中下一個寫的索引位置跟數組的最大容量比較,若是索引位置大於最大容量,那麼就沒有空間來存放下一個記錄了,則須要把擴容,used是指的數組如今使用的大小,擴容倍數爲源數組的一倍。

org.apache.spark.shuffle.sort.ShuffleExternalSorter#acquireNewPageIfNecessary 源碼以下:

解釋:分配內存頁的條件是當前頁的遊標 + 須要的頁大小 大於當前頁的最大容量,則須要從新分配一個內存頁。

 

關閉而且獲取spill信息

其源碼以下:

思路:執行最後一次溢出,而後將數據溢出信息返回。

 

清理資源

 

思路:釋放內存排序器的內存,刪除溢出的臨時文件。

 

獲取內存使用峯值

源碼以下:

思路:當前使用內存大於最大峯值則更新最大峯值,不然直接返回。

總結

這個sorter內部集成的內存sorter會把同一分區的數據排序到一塊兒,數據溢出時,相同分區的數據會彙集到溢出文件的一個segment中。

 

 

使用UnsafeShuffleWriter寫數據

先上源碼,後解釋:

 

思路:流程很簡單,將全部的數據逐一遍歷放入sorter,而後將sorter關閉,獲取輸出文件,結束。

下面咱們具體來看每一步是具體怎麼實現的:

初始化Sorter

在org.apache.spark.shuffle.sort.UnsafeShuffleWriter的構造方法源碼以下:

簡單作一下說明:

DEFAULT_INITIAL_SORT_BUFFER_SIZE爲 4096

DEFAULT_INITIAL_SER_BUFFER_SIZE 大小爲 1M

reduce 分區數量最大爲 16777216

SHUFFLE_FILE_BUFFER_SIZE默認爲32k,大小由參數 spark.shuffle.file.buffer 配置。

SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE 默認大小爲32k,大小由參數 spark.shuffle.unsafe.file.output.buffer 配置。

其open方法以下:

這個方法裏涉及了三個類:ShuffleExternalSorter,MyByteArrayOutputStream以及SerializationStream三個類。ShuffleExternalSorter在上文已經剖析過了,MyByteArrayOutputStream是一個ByteArrayOutputStream子類負責想堆內內存中寫數據,SerializationStream是一個序列化以後的流,數據最終會被寫入到serBuffer內存流中,調用其flush方法後,其內部的buf就是寫入的數據,以下:

 

數據寫入概述

核心方法write源碼以下:

其主要有兩步,一步是遍歷每一條記錄,將數據寫入到sorter中;第二步是關閉sorter,並將數據寫入到一個shuffle 文件中同時更新shuffle索引信息;最後清除shuffle過程當中sorter使用的資源。

先來看第一步:數據寫入到sorter中。

數據插入到Sorter

記錄中的鍵值被序列化到serBuffer的buf字節數組中,而後被寫入到 sorter(ShuffleExternalSorter)中。在sorter中序列化數據被寫入到內存中(內存不足會溢出到磁盤中),其地址信息被寫入到 ShuffleInMemorySorter 中,具體能夠看上文介紹。

溢出文件歸併爲一個文件

一步是遍歷每一條記錄,將數據寫入到sorter中後會調用sorter的closeAndGetSpills方法執行最後一次spill操做,而後獲取到整個shuffle過程當中全部的SpillInfo信息。而後使用ShuffleBlockResolver獲取到shuffle的blockId對應的shuffle文件,最終調用mergeSpills 方法合併全部的溢出文件到最終的shuffle文件,而後更新shuffle索引文件,設置Shuffle結果的MapStatus信息,結束。

org.apache.spark.shuffle.sort.UnsafeShuffleWriter#closeAndWriteOutput 源碼以下:

其關鍵方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 源碼以下:

若是溢出文件爲0,直接返回全是0的分區數組。

若是溢出文件爲1,文件重命名後返回只有一個元素的分區數組。

若是溢出文件多於1個則,多個溢出文件開始merge。

 

首先先看一下五個變量:

encryptionEnabled:是否啓用加密,默認爲false,經過 spark.io.encryption.enabled 參數來設置。

transferToEnabled:是否可使用nio的transferTo傳輸,默認爲true,經過 spark.file.transferTo 參數來設置。

compressionEnabled:是否使用壓縮,默認爲true,經過 spark.shuffle.compress 參數來設置。

compressionCodec:默認壓縮類,默認爲LZ4CompressionCodec,經過 spark.io.compression.codec 參數來設置。

fastMergeEnabled:是否啓用fast merge,默認爲true,經過 spark.shuffle.unsafe.fastMergeEnabled 參數來設置。

fastMergeIsSupported:是否支持 fast merge,若是不使用壓縮或者是壓縮算法是 org.apache.spark.io.SnappyCompressionCodec、org.apache.spark.io.LZFCompressionCodec、org.apache.spark.io.LZ4CompressionCodec、org.apache.spark.io.ZStdCompressionCodec這四種支持鏈接的壓縮算法中的一種都是可使用 fast merge的。

 

三種merge多個文件的方式:transfered-based fast merge、fileStream-based fast merge以及slow merge三種方式。

使用transfered-based fast merge條件:使用 fast merge而且壓縮算法支持fast merge,而且啓用了nio的transferTo傳輸且不啓用文件加密。

使用fileStream-based fast merge條件:使用 fast merge而且壓縮算法支持fast merge,而且未啓用nio的transferTo傳輸或啓用了文件加密。

使用slow merge條件:未使用 fast merge或壓縮算法不支持fast merge。

下面咱們來看三種合併溢出的方式。

transfered-based fast merge

其核心方法org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithTransferTo 源碼以下:

其依賴方法 org.apache.spark.util.Utils#copyFileStreamNIO 以下:

很簡單,底層依賴於Java的NIO的transferTo方法實現。

fileStream-based fast merge

其核心方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithFileStream 源碼以下,這裏不傳入任何壓縮類,見 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 源碼。

slow merge

其其核心方法 org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpillsWithFileStream 源碼跟 fileStream-based fast merge 裏的同樣,不作過多解釋,只不過這裏多傳入了一個壓縮類,見  org.apache.spark.shuffle.sort.UnsafeShuffleWriter#mergeSpills 源碼。

更新shuffle索引

這部分更詳細的能夠看 org.apache.spark.shuffle.IndexShuffleBlockResolver#writeIndexFileAndCommit 源碼。在上篇文章 spark shuffle寫操做三部曲之BypassMergeSortShuffleWriter 中使用BypassMergeSortShuffleWriter寫數據已經剖析過,再也不剖析。

總結

ShuffleExternalSorter將數據不斷溢出到溢出小文件中,溢出文件內的數據是按分區規則排序的,分區內的數據是亂序的。

多個分區的數據同時溢出到一個溢出文件,最後使用三種歸併方式中的一種將多個溢出文件歸併到一個文件,分區內的數據是亂序的。最終數據的格式跟第一種shuffle寫操做的結果是同樣的,即有分區的shuffle數據文件和記錄分區大小的shuffle索引文件。

相關文章
相關標籤/搜索