1. shuffle過程的數據是如何傳輸過來的,是按文件來傳輸,仍是隻傳輸該reduce對應在文件中的那部分數據?html
2. shuffle讀過程是否有溢出操做?是如何處理的?git
3. shuffle讀過程是否能夠排序、聚合?是如何作的?github
。。。。。。apache
在 spark shuffle的寫操做之準備工做 中的 ResultTask 和 ShuffleMapTask 看到了,rdd讀取數據是調用了其 iterator 方法。緩存
org.apache.spark.rdd.RDD#iterator源碼以下,它是一個final方法,只在此有實現,子類不容許重實現這個方法:網絡
思路:若是是已經緩存下來了,則調用 org.apache.spark.rdd.RDD#getOrCompute 方法,經過底層的存儲系統或者從新計算來獲取父RDD的map數據。不然調用 org.apache.spark.rdd.RDD#computeOrReadCheckpoint ,從checkpoint中讀取或者是經過計算來來獲取父RDD的map數據。app
咱們逐一來看其依賴方法:ide
org.apache.spark.rdd.RDD#getOrCompute 源碼以下:源碼分析
首先先經過Spark底層的存儲系統獲取 block。若是底層存儲沒有則調用 org.apache.spark.rdd.RDD#computeOrReadCheckpoint,其源碼以下:post
主要經過三種途徑獲取數據 -- 經過spark 底層的存儲系統、經過父RDD的checkpoint、直接計算。
讀取完畢以後,數據的處理基本上同樣,都使用 org.apache.spark.InterruptibleIterator 以迭代器的形式返回,org.apache.spark.InterruptibleIterator 源碼以下:
比較簡單,使用委託模式,將迭代下一個行爲委託給受委託類。
下面咱們逐一來看三種獲取數據的實現細節。
其核心源碼以下:
思路:首先先從本地或者是遠程executor中的存儲系統中獲取到block,若是是block存在,則直接返回,若是不存在,則調用 computeOrReadCheckpoint計算或者經過讀取父RDD的checkpoint來獲取RDD的分區信息,而且將根據其持久化級別(即StorageLevel)將數據作持久化。 關於持久化的內容 能夠參考 Spark 源碼分析系列 中的 Spark存儲部分 作深刻了解。
其核心源碼以下:
經過父RDD的checkpoint也是須要經過spark底層存儲系統或者是直接計算來得出數據的。
不作過多的說明。
下面咱們直接進入主題,看shuffle的讀操做是如何進行的。
其核心方法以下:
首先,org.apache.spark.rdd.RDD#compute是一個抽象方法。
咱們來看shuffle過程reduce的讀map數據的實現。
表示shuffle結果的是 org.apache.spark.rdd.ShuffledRDD。
其compute 方法以下:
總體思路:首先從 shuffleManager中獲取一個 ShuffleReader 對象,並調用該reader對象的read方法將數據讀取出來,最後將讀取結果強轉爲Iterator[(K,C)]
該shuffleManager指的是org.apache.spark.shuffle.sort.SortShuffleManager。
其 getReader 源碼以下:
簡單來講明一下參數:
handle:是一個ShuffleHandle的實例,它有三個子類,能夠參照 spark shuffle的寫操做之準備工做 作深刻了解。
startPartition:表示開始partition的index
endPartition:表示結束的partition的index
context:表示Task執行的上下文對象
其返回的是一個 org.apache.spark.shuffle.BlockStoreShuffleReader 對象,下面直接來看這個對象。
這個類的繼承關係以下:
其中ShuffleReader的說明以下:
Obtained inside a reduce task to read combined records from the mappers.
ShuffleReader只有一個read方法,其子類BlockStoreShuffleReader也比較簡單,也只有一個實現了的read方法。
下面咱們直接來看這個方法的源碼。
在上圖,把整個流程劃分爲5個步驟 -- 獲取block輸入流、反序列化輸入流、添加監控、數據聚合、數據排序。
下面咱們分別來看這5個步驟。這5個流程中輸入流和迭代器都沒有把大數據量的數據一次性所有加載到內存中。而且即便在數據聚合和數據排序階段也沒有,可是會有數據溢出的操做。咱們下面具體來看每一步的具體流程是如何進行的。
其核心源碼以下:
咱們先來對 ShuffleBlockFetcherIterator 作進一步瞭解。
這個類就是用來獲取block的輸入流的。
其構造方法以下:
它繼承了Iterator trait,是一個 [(BlockId,InputStream)] 的迭代器。
對構造方法參數作進一步說明:
context:TaskContext,是做業執行的上下文對象
shuffleClieent:默認爲 NettyBlockTransferService,若是使用外部shuffle系統則使用 ExternalShuffleClient
blockManager:底層存儲系統的核心類
blocksByAddress:須要的block的blockManager的信息以及block的信息。
經過 org.apache.spark.MapOutputTracker#getMapSizesByExecutorId 獲取,其源碼以下:
org.apache.spark.MapOutputTrackerWorker#getStatuses 其源碼以下:
思路:若是有shuffleId對應的MapStatus則返回,不然使用 MapOutputTrackerMasterEndpointRef 請求 driver端的 MapOutputTrackerMaster 返回 對應的MapStatus信息。
org.apache.spark.MapOutputTracker#convertMapStatuses 源碼以下:
思路:將MapStatus轉換爲一個能夠迭代查看BlockManagerId、BlockId以及對應大小的迭代器。
streamWrapper:輸入流的解密以及解壓縮操做的包裝器,其依賴方法 org.apache.spark.serializer.SerializerManager#wrapStream 源碼以下:
這部分在 spark 源碼分析之十三 -- SerializerManager剖析 部分有相關剖析,再也不說明。
maxBytesInFlight: max size (in bytes) of remote blocks to fetch at any given point.
maxReqsInFlight: max number of remote requests to fetch blocks at any given point.
maxBlocksInFlightPerAddress: max number of shuffle blocks being fetched at any given point
maxReqSizeShuffleToMem: max size (in bytes) of a request that can be shuffled to memory.
detectCorrupt: whether to detect any corruption in fetched blocks.
在迭代方法next中不斷去讀取遠程的block以及本地的block輸入流。不作詳細剖析,見 ShuffleBlockFetcherIterator.scala 中next 相關方法的剖析。
核心方法以下:
其依賴方法 scala.collection.Iterator#flatMap 源碼以下:
可見,即便是在這裏,數據並無所有落到內存中。流跟管道的概念很相似,數據並無一次性加載到內存中。它只不過是在使用迭代器的不斷銜接,最終造成了新的處理鏈。在這個鏈中的每個環節,數據都是懶加載式的被加載到內存中,這在處理大數據量的時候是一個很好的技巧。固然也是責任鏈的一種具體實現方式。
其實這一步跟上一步本質上區別並不大,都是在責任鏈上添加了一個新的環節,其核心源碼以下:
其中,核心方法 scala.collection.Iterator#map 源碼以下:
又是一個新的迭代器處理環節被加到責任鏈中。
數據聚合其實也很簡單。
其核心源碼以下:
在聚合的過程當中涉及到了數據的溢出操做,若是有溢出操做還涉及 ExternalSorter的溢出合併操做。
其核心源碼不作進一步解釋,有興趣能夠看 spark shuffle寫操做三部曲之SortShuffleWriter 作進一步瞭解。
數據排序其實也很簡單。若是使用了排序,則使用ExternalSorter則在分區內部進行排序。
其核心源碼以下:
其內部使用了ExternalSorter進行排序,其中也涉及到了溢出操做的處理。有興趣能夠看 spark shuffle寫操做三部曲之SortShuffleWriter 作進一步瞭解。
主要從實現細節和設計思路上來講。
首先在實現細節上,先使用ShuffleBlockFetcherIterator獲取本地或遠程節點上的block並轉化爲流,最終返回一小部分數據的迭代器,隨後序列化、解壓縮、解密流操做被放在一個迭代器中該迭代器後執行,而後添加了監控相關的迭代器、數據聚合相關的迭代器、數據排序相關的迭代器等等。這些迭代器保證了處理大量數據的高效性,在數據聚合和排序階段,大數據量被不斷溢出到磁盤中,數據最終仍是以迭代器形式返回,確保了內存不會被大數據量佔用,提升了數據的吞吐量和處理數據的高效性。
在設計上,主要說三點:
至此,spark 的shuffle階段的細節就完全剖析完畢。
最後,明天週末,玩得開心~