1. spark shuffle的預聚合操做是如何作的,其中底層的數據結構是什麼?在數據寫入到內存中有預聚合,在讀溢出文件合併到最終的文件時是否也有預聚合操做?html
2. shuffle數據的排序是如何作的? 分區內的數據是不是有序的?如有序,spark 內部是按照什麼排序算法來排序每個分區上的key的?算法
3. shuffle的溢出操做和TaskMemoryManager的關係?apache
4. 在數據溢出階段,內存中數據的排序是使用算法進行排序的?數組
5. 在溢出文件數據合併階段,內存中的數據的排序是使用的什麼算法?緩存
6. 爲何在讀取溢出文件到內存中時,返回的結果是迭代器而不是直接的數據結果?數據結構
。。。。。。還有不少的細節。函數
咱們先來回首前幾篇文章的關係: spark 源碼分析之二十一 -- Task的執行流程 從調度的角度說明了TaskScheduler是如何調度任務的,其中任務的執行目前爲止寫了三篇文章,分別是 剖析Task運行時內存的管理的 spark 源碼分析之二十二-- Task的內存管理,剖析shuffle寫操做執行前的準備工做,引出了三種shuffle的寫方式,前兩篇文章分別介紹了 spark shuffle寫操做三部曲之UnsafeShuffleWriter 和 spark shuffle寫操做三部曲之BypassMergeSortShuffleWriter 前兩種shuffle的寫的方式。本篇文章來剖析最後一種 shuffle 寫的方式。源碼分析
咱們先來看第三種shuffle的相關依賴類。post
這個類繼承了AppendOnlyMap並實現了SizeTracker trait。fetch
其內部方法以下:
它依賴的類都是其父類,他只是它的兩個父類的拼湊,因此要想了解真正的動做,仍是須要去看其父類AppendOnlyMap和trait SizeTracker。
這個類繼承了Iterable trait和 Serializable 接口。
其類結構以下:
成員變量以下:
LOAD_FACTOR:負載因子,爲0.7,實際存儲數據佔比大於負載因子則須要擴容。
mask的做用:將任意的數映射到[0,mask]的範圍內。
data:是真正保存數據的數組。
haveNullValue:是否有null值,由於數組中的null值還有一個做用,那就是表示該索引位置沒有元素存在。
nullValue:null值。
destoryed:表示數據是否已經被銷燬。
理論最大容量爲:512MB
成員方法以下:
解釋:
1.若是是null值,則返回null值,由於約定 null值key對應null值value。
2. 首先先把原來的hashcode再求一次hash碼,而後和掩碼作與操做將其映射到 [0,mask] 範圍內。
3. 嘗試取出數據若是取出來的key是指定的key,則返回數據,若取出的key是null,表示以前沒有保存過,返回null,若取出的數據的key不是當前key,則使用再散列法 先有pos + delta逐步散列,求得下一次的pos,而後再重複第三步,直至找匹配的值或null值後返回。
更新鍵值思路:跟查找的思路同樣,只不過找到以後不返回,是執行更新操做。
更新鍵值思路:跟查找的思路同樣,只不過找到以後不返回,若是找的的值是null值,則執行賦值操做,不然更新value爲執行更新函數後的值。
本質上是遍歷數組,只不過這裏的元素是稀疏的,只返回有元素的數據,不作過多說明。
先整理數組,將數組的數據變爲緊湊的數據。再按照key來進行排序。最後返回一個迭代器,這個迭代器裏的數據是有序的。
若是當前使用容量佔比大於負載因子,則開始擴容。
新容量是舊容量的一倍。遍歷舊的數組中的每個非null元素,將其映射到新的數組中。
A general interface for collections to keep track of their estimated sizes in bytes. We sample with a slow exponential back-off using the SizeEstimator to amortize the time, as each call to SizeEstimator is somewhat expensive (order of a few milliseconds).
集合的通用接口,用於跟蹤其估計的大小(以字節爲單位)。 咱們使用SizeEstimator以緩慢的指數退避進行採樣以分攤時間,由於每次調用SizeEstimator都有點昂貴。
SAMPLE_GROWTH_RATE指數增加因子,好比是2,則是 1,2,4,8,16,......
核心方法以下:
主要用於數據佔用內存的估算。
其繼承關係以下:
其父類是Spillable抽象類。
先來看父類Spillable
類說明:當內存不足時,這個類會把內存裏的集合溢出到磁盤中。
其成員變量以下,不作過多解釋。
主要方法以下:
它實現了父類的抽象方法 spill方法,源碼以下:
思路:若是consumer不是這個類而且內存模式是堆內內存才支持內存溢出。
其依賴方法以下:
org.apache.spark.util.collection.Spillable#forceSpill源碼以下,它是一個抽象方法,沒有具體實現。
釋放內存方法,其調用了 父類的freeMemory方法:
org.apache.spark.util.collection.Spillable#maybeSpill 源碼以下:
其依賴方法spill方法以下,注意這個方法是用來溢出集合的數據到內存的,它是抽象方法,待子類實現。
這個類留給子類兩個方法來實現,forceSpill和spill方法。
ExternalAppendOnlyMap這個類裏面的是對 SizeTrackingAppendOnlyMap 的進一步封裝,下面咱們先看 SizeTrackingAppendOnlyMap。
其源碼以下:
總之,它是根據哈希碼進行比較的。
首先,它是org.apache.spark.util.collection.ExternalAppendOnlyMap的內部類,實現了Iterator trait,它是跟ExternalAppendOnlyMap一塊兒使用的,也使用了 ExternalAppendOnlyMap 裏的方法。
其成員變量以下:
SPILL_LOCK是一個對象鎖,每次執行溢出操做都會先獲取鎖再執行溢出操做,執行完畢後釋放鎖。
cur表示下一個未讀的元素。
hasSpilled表示是否有溢出。
1.溢出
其源碼以下:
2.銷燬數據釋放內存
其依賴方法 org.apache.spark.util.collection.ExternalAppendOnlyMap#freeCurrentMap 以下:
3. 讀取下一個
4. 是否有下一個
5. 獲取下一個元素
6. 轉換爲CompletionIterator
從本質來來講,它是一個包裝類,數據從構造方法以Iterator的形式傳遞過來,而它本身也是一個Iterator,除了實現了Iterator自己的方法外,還具有了溢出到磁盤、銷燬內存數據、轉換爲CompletionIterator的功能。
這個類就是用來讀取文件的數據的,只不過文件被劃分爲了多個文件段,有一個數組專門記錄這多個文件段的段大小,如構造函數所示:
其中file就是要讀取的數據文件,blockId表示文件在shuffle系統中對應的blockId,batchSize就是指的每個文件段的大小。
成員變量以下:
下面從Iterator的主要方法入手,去剖析整個類。
其依賴方法 org.apache.spark.util.collection.ExternalAppendOnlyMap.DiskMapIterator#readNextItem 源碼以下:
思路:首先先讀取下一個key-value對,若讀取完畢後,發現這個批次的數據已經讀取完畢,則調用 nextBatchStream 方法,關閉現有反序列化流,初始化讀取下一個文件段的反序列化流。
其依賴方法 org.apache.spark.util.collection.ExternalAppendOnlyMap.DiskMapIterator#nextBatchStream 以下:
思路:首先先肯定該批次的數據是否讀取完畢,若讀取完畢,則作完清理操做後,返回null值,不然先關閉現有的反序列化流,而後獲取下一個反序列化流的開始和結束offset,最後初始化一個反序列化流返回給調用端。
其依賴方法 org.apache.spark.util.collection.ExternalAppendOnlyMap.DiskMapIterator#cleanup 方法以下:
思路:首先關閉現有的反序列化流和文件流,最後若是文件存在,則刪除之。
思路很簡單,其中,nextItem已經在是否有下一個元素的時候反序列化出來了。
它有兩個重載的構造方法:
和
解釋一下其中的參數:
createCombiner:是根據一個原值來建立其combine以後的值的函數。
mergeValue:是根據一個combine以後的值和一個原值求combine以後的值的函數。
mergeCombiner:是根據兩個combine以後的值求combine以後的值函數。
本質上這幾個函數就是逐步歸併聚合的體現。
serializerBatchSize:表示每次溢出時,寫入文件的批次大小,這個批次是指的寫入的對象的次數,而不是一般意義上的buffer的緩衝區大小。
_diskBytesSpilled :表示總共溢出的字節大小
fileBufferSize: 文件緩存大小,默認爲 32k
_peakMemoryUsedBytes: 表示內存使用峯值
keyComparater:表示內存排序的比較器
思路:首先先調用currentMap的destructiveSortedIterator方法,先整理其內部的數據成緊湊的數據,而後對數據進行排序,最終有序數據以Iterator的結果返回。而後調用
將數據溢出到磁盤,最後將溢出的信息記錄到spilledMaps中,其依賴方法 org.apache.spark.util.collection.ExternalAppendOnlyMap#spillMemoryIteratorToDisk 源碼以下:
思路:建立本地臨時block,並獲取其writer,最終遍歷內存數組的迭代器,將數據都經過writer寫入到file中,其中寫文件是分批寫入的,即每次知足serializerBatchSize大小以後,執行flush寫入,最後執行一次flush寫入,關閉文件,最終返回DiskMapIterator對象。
其源碼以下:
這個類的兩個方法 combineValuesByKey 和 combineCombinersByKey 都依賴於 ExternalAppendOnlyMap類。
下面繼續來看ExternalSorter類的內部實現。
Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner pairs of type (K, C). Uses a Partitioner to first group the keys into partitions, and then optionally sorts keys within each partition using a custom Comparator. Can output a single partitioned file with a different byte range for each partition, suitable for shuffle fetches. If combining is disabled, the type C must equal V -- we'll cast the objects at the end. Note: Although ExternalSorter is a fairly generic sorter, some of its configuration is tied to its use in sort-based shuffle (for example, its block compression is controlled by spark.shuffle.compress). We may need to revisit this if ExternalSorter is used in other non-shuffle contexts where we might want to use different configuration settings.
對類型(K,V)的多個鍵值對進行排序並可能合併,以生成類型(K,C)的鍵組合對。使用分區程序首先將key分組到分區中,而後能夠選擇使用自定義Comparator對每一個分區中的key進行排序。能夠爲每一個分區輸出具備不一樣字節範圍的單個分區文件,適用於隨機提取。若是禁用了組合,則類型C必須等於V - 咱們將在末尾轉換對象。注意:雖然ExternalSorter是一個至關通用的排序器,但它的一些配置與基於排序的shuffle的使用有關(例如,它的塊壓縮由spark.shuffle.compress控制)。若是在咱們可能想要使用不一樣配置設置的其餘非隨機上下文中使用ExternalSorter,咱們可能須要從新審視這一點。
下面,先來看其構造方法:
參數以下:
aggregator:可選的聚合器,能夠用於歸併數據
partitioner :可選的分區器,若是有的話,先按分區Id排序,再按key排序
ordering : 可選的排序,它在每個分區內按key進行排序,它也能夠是全局排序
serializer :用於溢出內存數據到磁盤的序列化器
其成員變量和核心方法,先不作剖析,其方法圍繞兩個核心展開,一部分是跟數據的插入有關的方法,一部分是跟多個溢出文件的合併操做有關的方法。
下面來看看它的一些內部類。
這個類實現了Iterator trait,只負責迭代讀取一個特定分區的數據,其定義以下:
比較簡單,不作過多說明。
這個類是一個 case class ,它記錄了溢出文件的一些關鍵信息,構造方法的各個字段以下:
file:溢出文件
blockId:溢出文件對應的blockId
serializerBatchSizes:表示每個序列化類對應的batch的大小。
elementsPerPartition:表示每個分區的元素的個數。
比較簡單,沒有類的方法定義。
它負責讀取一個按分區作文件分區的文件,但願按分區順序讀取分區文件的內容。
其類結構以下:
先看其成員變量:
batchOffsets:序列化類的每個批次的offset
partitionId:分區id
indexInPartition:在分區內的索引信息
batchId:batch的id
indexInBatch:在batch中的索引信息
lastPartitionId:上一個partition ID
nextPartitionToRead:下一個要讀取的partition的id
fileStream:文件輸入流
deserializeStream:分序列化流
nextItem:下一個鍵值對
finished:是否讀取完畢
下面,來看其核心方法:
思路跟DiskMapIterator的獲取下一個流的思路很相似,不作過多解釋。
其返回的是一個迭代器,org.apache.spark.util.collection.ExternalSorter.SpillReader#readNextPartition源碼以下:
思路:其返回迭代器中,的hasNext中先去讀取下一個item,若是讀取到的下一個元素爲null,則返回false,表示沒有數據能夠返回。
其依賴方法 org.apache.spark.util.collection.ExternalSorter.SpillReader#readNextItem 源碼以下:
思路:首先該批次數據讀取完畢,則關閉掉讀取該批次數據的流,繼續讀取下一個批次的流。
其依賴方法 org.apache.spark.util.collection.ExternalSorter.SpillReader#skipToNextPartition 方法以下:
下面,整理一下思路:
每次讀取一個文件的分區,該分區讀取完畢,關閉分區文件,讀取下一個文件的下一個分區數據。只不過它在讀文件的分區的時候,會有batch操做,一個分區可能會對應多個batch,可是一個batch有且只能有一個分區。
首先它跟 org.apache.spark.util.collection.ExternalAppendOnlyMap.SpillableIterator 很像, 實現方法也很相似,都是實現了一個Iterator trait,構造方法以一個Iterator對象傳入,而且對其作了封裝,能夠跟上文的 SpillableIterator 對比剖析。
其成員變量以下:
nextUpStream:下一個批次的stream
先來看Iterator的方法實現:
其源碼以下:
思路以下:首先建立內存迭代器,而後遍歷內存迭代器,將數據溢出到磁盤中,其關鍵方法 spillMemoryIteratorToDisk。
這個類底層是數組,數據按照Map的形式稀疏排列,它還支持多個key的預聚合操做。
它是SizeTrackingAppendOnlyMap和 WritablePartitionPairCollection的子類。
其源碼以下:
這個類底層是數組,數據按數組的形式緊湊排列。不支持多個相同key的預聚合操做。
它是SizeTracker 和 WritablePartitionPairCollection的子類。
其源碼以下:
下面來看最後一種shuffle數據寫的方式。
這種shuffle方式支持預聚合操做。
其下操做源碼以下:
若是須要在map段作combine操做,則須要指定 aggragator和 keyOrdering,即map端的數據會作預聚合操做,而且分區內的數據有序,其排序規則是按照hashCode作排序的。
不然這兩個參數爲null,即map端的數據沒有預聚合,而且分區內數據無序。
其源碼以下:
org.apache.spark.util.collection.ExternalSorter#insertAll的源碼以下:
思路:首先若是數據須要執行map端的combine操做,則使用 PartitionedAppendOnlyMap 類來操做,這個類能夠支持數據的combine操做。若是不須要 執行map 端的combine 操做,則使用 PartitionedPairBuffer 來實現,這個類不會對數據進行預聚合。每次數據寫入以後,都要查看是否須要執行溢出內存數據到磁盤的操做。
這兩個類在上文中已經作了詳細的說明。
其依賴方法 addElementsRead 源碼以下:
溢出內存數據到磁盤的核心方法 maybeSpillCollection 源碼以下:
思路:它有一個標誌位 usingMap表示是否使用的是map的數據結構,便是否是 PartitionedAppendOnlyMap,其思路幾乎同樣,只不過在調用 mayBeSpill 方法中傳入的參數不同。其中使用的內存的大小,都是通過採樣評估計算過的。其依賴方法 org.apache.spark.util.collection.Spillable#maybeSpill 以下:
思路:若是讀取的數據是 32 的整數倍而且當前使用的內存比初始內存大,則開始向TaskMemoryManager申請分配內存,若是申請成功,則返回申請的大小,注意:在向TaskMemoryManager申請內存的過程當中,若是內存不夠,也會去調用 org.apache.spark.util.collection.Spillable#spill 方法,在其內部也會去調用 org.apache.spark.util.collection.ExternalSorter#forceSpill 方法其源碼以下,其中readingIterator是SpillableIterator類型的對象。
其依賴方法 org.apache.spark.util.collection.Spillable#logSpillage 會打印一些溢出日誌。再也不過多說明。
其依賴方法 org.apache.spark.util.collection.ExternalSorter#spill 源碼以下:
思路相對比較簡單,主要是先獲取排序後集合的迭代器,而後將迭代器傳入 org.apache.spark.util.collection.ExternalSorter#spillMemoryIteratorToDisk ,將內存數據溢出到臨時的磁盤文件後返回一個SpilledFile對象,將其記錄到 spills中,spills這個變量主要記錄了內存數據的溢出過程當中的溢出文件的信息。
其溢出磁盤方法 org.apache.spark.util.collection.ExternalSorter#spillMemoryIteratorToDisk 源碼以下:
首先獲取寫序列化文件的writer,而後遍歷數據的迭代器,將數據迭代寫入到磁盤中,在寫入過程當中,不斷將每個分區的大小信息以及每個分區內元素的個數記錄下來,最終將溢出文件、分區元素個數,以及每個segment的大小信息封裝到SpilledFile對象中返回。
其核心代碼以下:
思路:首先先初始化一個臨時的最終文件(以uuid做爲後綴),而後初始化blockId,最後調用 org.apache.spark.util.collection.ExternalSorter的writePartitionedFile 方法。將數據寫入一個臨時文件,並將該文件中每個分區對應的FileSegment的大小返回。
其關鍵方法 org.apache.spark.util.collection.ExternalSorter#writePartitionedFile 源碼以下:
思路:首先若是歷來沒有過溢出文件,則首先先看一下是否須要map端聚合,如果須要,則數據已經被寫入到了map中,不然是buffer中。而後調用集合的轉成迭代器的方法,將內存的數據排序後輸出,最終迭代遍歷這個迭代器,將數據不斷寫入到最終的臨時文件中,更新分區大小返回。
若是以前已經有溢出文件了,則先調用 org.apache.spark.util.collection.ExternalSorter的partitionedIterator 方法將數據合併後返回合併後的迭代器。
最終遍歷每個分區的數據,將分區的數據寫入到最終的臨時文件,更新分區大小;最後返回分區大小。
下面重點剖析一下合併方法 org.apache.spark.util.collection.ExternalSorter#partitionedIterator,其源碼以下:
首先,要說明的是,經過咱們上面的程序分支進入該程序,此時歷史溢出文件集合是空的,即它不會執行第一個分支的處理流程,但仍是要作一下簡單的說明。
它有三個依賴方法分別以下:
依賴方法 org.apache.spark.util.collection.ExternalSorter#destructiveIterator 源碼以下:
思路:首先 isShuffleSort爲 true,咱們如今就是走的 shuffle sort的流程,確定是須要走第一個分支的,即它不會返回一個SpillableIterator迭代器。
值得注意的是,這裏的comparator跟內存排序使用的comparator是同樣的,即排序方式是同樣的。
依賴方法 org.apache.spark.util.collection.ExternalSorter#groupByPartition 源碼以下:
思路:遍歷每個分區返回一個IteratorForPartition的分區迭代器。
注意:因爲歷史溢出文件集合此時不爲空,將不會調用這個方法。
依賴方法 org.apache.spark.util.collection.ExternalSorter#merge 源碼以下:
思路:傳給merge方法的有兩個參數,一個是表明溢出文件的SpiiledFile集合,一個是表明內存數據的迭代器。
首先遍歷每個溢出文件,建立一個讀取該溢出文件的SpillReader對象,而後遍歷每個分區建立一個IteratorForPartition迭代器,而後讀取每個溢出文件的分區的迭代器,最終和 做爲參數傳入merge 方法的內存迭代器合併到一個迭代器集合中。
若是是須要預聚合的,則調用 mergeWithAggregation 方法,若是是須要排序的,則調用mergeSort 方法,對其進行排序,最後若是不知足前兩種狀況,調用集合的flatten 方法,將打平到一個迭代器中返回。
它有兩個依賴方法,分別以下:
org.apache.spark.util.collection.ExternalSorter#mergeSort 源碼以下:
思路:使用堆排序構造優先隊列,對數據進行排序,最終返回一個迭代器。每次先從堆中根據partitionID排序,將同一個partition的排到前面,每次取出一個Iterator,而後取出該Iterator中的一個元素,再放入堆中,由於可能取出一個元素後,Iterator的頭節點的partitionId改變了,因此須要再次排序,就這樣動態的出堆入堆,讓不一樣Iterator的相同partition的數據老是在一塊兒被迭代取出。注意這裏的comparator在指定ordering或aggragator的時候,是支持二級排序的,即不只僅支持分區排序,還支持分區內的數據按key進行排序,其排序器源碼以下:
若是ordering和aggragator沒有指定,則數據排序器爲:
即只按分區排序,跟第二種shuffle的最終格式很相似,分區內部數據無序。
org.apache.spark.util.collection.ExternalSorter#mergeWithAggregation源碼以下:
思路:若是數據總體並不要求有序,則會使用combiner將數據總體進行combine操做,最終相同key的數據被聚合在一塊兒。若是數據總體要求有序,則直接對有序的數據按照順序一邊聚合一邊迭代輸出下一個元素,最終數據是總體有序的。
其關鍵源碼以下:
其思路很簡單,能夠參考 spark shuffle寫操做三部曲之UnsafeShuffleWriter 對應部分的說明。
在本篇文章中,剖析了spark shuffle的最後一種寫方式。溢出前數據使用數組自定義的Map或者是列表來保存,若是指定了aggerator,則使用Map結構,Map數據結構支持map端的預聚合操做,可是列表方式的不支持預聚合。
數據每次溢出數據都進行排序,若是指定了ordering,則先按分區排序,再按每一個分區內的key排序,最終數據溢出到磁盤中的臨時文件中,在merge階段,數據被SpillReader讀取出來和未溢出的數據總體排序,最終數據能夠總體有序的落到最終的數據文件中。
至此,spark shuffle的三種寫方式都剖析完了。以後會有文章來剖析shuffle的讀取操做。
不足之處:這篇文章歷時比較久,也因爲平時工做緣由,用的都是碎片時間,可能有一些部分思路接不上,可能還有部分類沒有剖析,望見諒,雖然本文有諸多問題,可是對預總體理解第三種shuffle的寫方式來講,都無足輕重了。