再上一篇文章 spark shuffle的寫操做之準備工做 中,主要介紹了 spark shuffle的準備工做,本篇文章主要介紹spark shuffle使用BypassMergeSortShuffleWriter寫數據詳細細節。html
在本篇文章中若是有不瞭解的術語,也能夠參照 spark shuffle的寫操做之準備工做 作進一步瞭解。apache
這種shuffle寫數據的方式是最簡單的,spark計劃在之後會移除這種shuffle機制。數組
先上源碼,後解釋:函數
流程以下:工具
若是沒有數據要寫,那麼數據文件爲空,索引文件中各個segment的大小爲0,返回初始化的MapStatus。post
若是有數據要寫到各個reducer的文件中,首先初始化序列化工具實例,遍歷初始化各個partition的partitionWriter數組中的DiskBlockObjectWriter對象,初始化各個partition的FileSegment數組。ui
而後遍歷每個要寫入的記錄值,而且取出記錄的key值,根據Partitioner的getPartition函數肯定其reduce到的目標分區索引,而後根據計算出的索引肯定負責寫數據的DiskBlockObjectWriter對象,而後根據該對象將鍵值對寫入到臨時分區文件。url
當每個要寫入的記錄值遍歷操做完畢,遍歷每個分區,將該分區對應的partitionWriter執行commitAndGet操做,返回該分區的FileSegment對象。spa
其依賴方法commitAndGet源碼以下:設計
至此,大多數狀況下,reduce的每個partition的數據有被寫入到一個單獨的文件。明明是FileSegment,爲何是單獨的文件呢?緣由就在於DiskBlockManager返回的臨時ShuffleBlockId是不重複的,org.apache.spark.storage.DiskBlockManager#createTempShuffleBlock源碼以下:
又由於建立臨時文件,只是建立臨時文件的句柄,此時對應的物理文件,並不存在,因此,這個方法不能保證建立的臨時文件不重複。因此多個partition數據寫入到一個臨時文件的機率仍是有的,只不過是小几率事件。
最後小的分區文件會被合併爲一個文件。
首先調用ShuffleBlockResolver(它是IndexShuffleBlockResolver實例)的getDataFile方法獲取數據文件的句柄File對象,org.apache.spark.util.Utils的tempFileWith獲取臨時文件,org.apache.spark.util.Utils#tempFileWith源碼以下,即得到一個帶uuid後綴的文件:
最後調用org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter的writePartitionedFile方法將多個小文件合併爲一個大文件並返回包含每個partition
對應的文件段的大小的數組,源碼以下:
最後更新索引文件,給數據文件重命名後整個寫過程就完全結束了,源碼再也不作過多解釋,在 spark shuffle的寫操做之準備工做 中 IndexShuffleBlockResolver類中有說明。
BypassMergeSortShuffleWriter是基於文件作的分區,沒有sort操做,最後分區數據被寫入一個完整文件,而且有一個索引文件記錄文件中每個分區對應的FileSegment的大小。這種設計是比較樸素的,也很簡單,易實現。