spark shuffle讀操做

提出問題

1. shuffle過程的數據是如何傳輸過來的,是按文件來傳輸,仍是隻傳輸該reduce對應在文件中的那部分數據?html

2. shuffle讀過程是否有溢出操做?是如何處理的?git

3. shuffle讀過程是否能夠排序、聚合?是如何作的?github

。。。。。。apache

概述

在 spark shuffle的寫操做之準備工做 中的 ResultTask 和 ShuffleMapTask 看到了,rdd讀取數據是調用了其 iterator 方法。緩存

計算或者讀取RDD

org.apache.spark.rdd.RDD#iterator源碼以下,它是一個final方法,只在此有實現,子類不容許重實現這個方法:網絡

思路:若是是已經緩存下來了,則調用 org.apache.spark.rdd.RDD#getOrCompute 方法,經過底層的存儲系統或者從新計算來獲取父RDD的map數據。不然調用 org.apache.spark.rdd.RDD#computeOrReadCheckpoint ,從checkpoint中讀取或者是經過計算來來獲取父RDD的map數據。app

咱們逐一來看其依賴方法:ide

org.apache.spark.rdd.RDD#getOrCompute 源碼以下:源碼分析

首先先經過Spark底層的存儲系統獲取 block。若是底層存儲沒有則調用 org.apache.spark.rdd.RDD#computeOrReadCheckpoint,其源碼以下:post

主要經過三種途徑獲取數據 -- 經過spark 底層的存儲系統、經過父RDD的checkpoint、直接計算。

處理返回的數據

讀取完畢以後,數據的處理基本上同樣,都使用 org.apache.spark.InterruptibleIterator 以迭代器的形式返回,org.apache.spark.InterruptibleIterator 源碼以下:

比較簡單,使用委託模式,將迭代下一個行爲委託給受委託類。

 

下面咱們逐一來看三種獲取數據的實現細節。

經過spark 底層的存儲系統

其核心源碼以下:

思路:首先先從本地或者是遠程executor中的存儲系統中獲取到block,若是是block存在,則直接返回,若是不存在,則調用 computeOrReadCheckpoint計算或者經過讀取父RDD的checkpoint來獲取RDD的分區信息,而且將根據其持久化級別(即StorageLevel)將數據作持久化。 關於持久化的內容 能夠參考 Spark 源碼分析系列 中的 Spark存儲部分 作深刻了解。

經過父RDD的checkpoint

其核心源碼以下:

經過父RDD的checkpoint也是須要經過spark底層存儲系統或者是直接計算來得出數據的。

不作過多的說明。

下面咱們直接進入主題,看shuffle的讀操做是如何進行的。

直接計算

其核心方法以下:

首先,org.apache.spark.rdd.RDD#compute是一個抽象方法。

咱們來看shuffle過程reduce的讀map數據的實現。

表示shuffle結果的是 org.apache.spark.rdd.ShuffledRDD。

其compute 方法以下:

總體思路:首先從 shuffleManager中獲取一個 ShuffleReader 對象,並調用該reader對象的read方法將數據讀取出來,最後將讀取結果強轉爲Iterator[(K,C)]

該shuffleManager指的是org.apache.spark.shuffle.sort.SortShuffleManager。

其 getReader 源碼以下:

簡單來講明一下參數:

handle:是一個ShuffleHandle的實例,它有三個子類,能夠參照 spark shuffle的寫操做之準備工做 作深刻了解。

startPartition:表示開始partition的index

endPartition:表示結束的partition的index

context:表示Task執行的上下文對象

其返回的是一個 org.apache.spark.shuffle.BlockStoreShuffleReader 對象,下面直接來看這個對象。

BlockStoreShuffleReader

這個類的繼承關係以下:

其中ShuffleReader的說明以下:

Obtained inside a reduce task to read combined records from the mappers.

ShuffleReader只有一個read方法,其子類BlockStoreShuffleReader也比較簡單,也只有一個實現了的read方法。

下面咱們直接來看這個方法的源碼。

在上圖,把整個流程劃分爲5個步驟 -- 獲取block輸入流、反序列化輸入流、添加監控、數據聚合、數據排序。

下面咱們分別來看這5個步驟。這5個流程中輸入流和迭代器都沒有把大數據量的數據一次性所有加載到內存中。而且即便在數據聚合和數據排序階段也沒有,可是會有數據溢出的操做。咱們下面具體來看每一步的具體流程是如何進行的。 

獲取block輸入流

其核心源碼以下:

咱們先來對 ShuffleBlockFetcherIterator 作進一步瞭解。

使用ShuffleBlockFetcherIterator獲取輸入流

這個類就是用來獲取block的輸入流的。

blockId等相關信息傳入構造方法

其構造方法以下:

它繼承了Iterator trait,是一個 [(BlockId,InputStream)] 的迭代器。

對構造方法參數作進一步說明:

context:TaskContext,是做業執行的上下文對象

shuffleClieent:默認爲 NettyBlockTransferService,若是使用外部shuffle系統則使用 ExternalShuffleClient

blockManager:底層存儲系統的核心類

blocksByAddress:須要的block的blockManager的信息以及block的信息。

經過 org.apache.spark.MapOutputTracker#getMapSizesByExecutorId 獲取,其源碼以下:

org.apache.spark.MapOutputTrackerWorker#getStatuses 其源碼以下:

思路:若是有shuffleId對應的MapStatus則返回,不然使用 MapOutputTrackerMasterEndpointRef 請求 driver端的 MapOutputTrackerMaster 返回 對應的MapStatus信息。

 org.apache.spark.MapOutputTracker#convertMapStatuses 源碼以下:

思路:將MapStatus轉換爲一個能夠迭代查看BlockManagerId、BlockId以及對應大小的迭代器。

streamWrapper:輸入流的解密以及解壓縮操做的包裝器,其依賴方法 org.apache.spark.serializer.SerializerManager#wrapStream 源碼以下:

這部分在 spark 源碼分析之十三 -- SerializerManager剖析 部分有相關剖析,再也不說明。

maxBytesInFlight: max size (in bytes) of remote blocks to fetch at any given point. 
maxReqsInFlight: max number of remote requests to fetch blocks at any given point.
maxBlocksInFlightPerAddress: max number of shuffle blocks being fetched at any given point
maxReqSizeShuffleToMem: max size (in bytes) of a request that can be shuffled to memory.
detectCorrupt: whether to detect any corruption in fetched blocks.

讀取數據

在迭代方法next中不斷去讀取遠程的block以及本地的block輸入流。不作詳細剖析,見 ShuffleBlockFetcherIterator.scala 中next 相關方法的剖析。

反序列化輸入流

核心方法以下:

其依賴方法 scala.collection.Iterator#flatMap 源碼以下:

 可見,即便是在這裏,數據並無所有落到內存中。流跟管道的概念很相似,數據並無一次性加載到內存中。它只不過是在使用迭代器的不斷銜接,最終造成了新的處理鏈。在這個鏈中的每個環節,數據都是懶加載式的被加載到內存中,這在處理大數據量的時候是一個很好的技巧。固然也是責任鏈的一種具體實現方式。

添加監控

其實這一步跟上一步本質上區別並不大,都是在責任鏈上添加了一個新的環節,其核心源碼以下:

其中,核心方法 scala.collection.Iterator#map 源碼以下:

又是一個新的迭代器處理環節被加到責任鏈中。

數據聚合

數據聚合其實也很簡單。

其核心源碼以下:

在聚合的過程當中涉及到了數據的溢出操做,若是有溢出操做還涉及 ExternalSorter的溢出合併操做。

其核心源碼不作進一步解釋,有興趣能夠看 spark shuffle寫操做三部曲之SortShuffleWriter 作進一步瞭解。

數據排序

數據排序其實也很簡單。若是使用了排序,則使用ExternalSorter則在分區內部進行排序。

其核心源碼以下:

其內部使用了ExternalSorter進行排序,其中也涉及到了溢出操做的處理。有興趣能夠看 spark shuffle寫操做三部曲之SortShuffleWriter 作進一步瞭解。

總結

主要從實現細節和設計思路上來講。

實現細節

首先在實現細節上,先使用ShuffleBlockFetcherIterator獲取本地或遠程節點上的block並轉化爲流,最終返回一小部分數據的迭代器,隨後序列化、解壓縮、解密流操做被放在一個迭代器中該迭代器後執行,而後添加了監控相關的迭代器、數據聚合相關的迭代器、數據排序相關的迭代器等等。這些迭代器保證了處理大量數據的高效性,在數據聚合和排序階段,大數據量被不斷溢出到磁盤中,數據最終仍是以迭代器形式返回,確保了內存不會被大數據量佔用,提升了數據的吞吐量和處理數據的高效性。

設計思路

在設計上,主要說三點:

  1. 責任鏈和迭代器的混合使用,即便得程序易擴展,處理環節可插拔,處理流程清晰易懂。
  2. 關於聚合和排序的使用,在前面文章中shuffle寫操做也提到了,聚合和排序的類是獨立出來的,跟shuffle的處理耦合性很低,這使得在shuffle的讀和寫階段的數據內存排序聚合溢出操做的處理類能夠重複使用。
  3. shuffle數據的設計也很巧妙,shuffle的數據是按reduceId分區的,分區信息被保存在索引文件中,這使得每個reduce task只須要取得一個文件中屬於它分區的那部分shuffle數據就能夠了,極大地減小無用了數據量的網絡傳輸,提升了shuffle的效率。還值得說的是,shuffle數據的格式是一個約定,無論map階段的數據是如何被處理,最終數據形式確定是約定好的,這使得map和reduce階段的處理類之間的耦合性大大地下降。

至此,spark 的shuffle階段的細節就完全剖析完畢。

最後,明天週末,玩得開心~

相關文章
相關標籤/搜索