Spark Shuffle數據處理過程與部分調優(源碼閱讀七)

  shuffle。。。至關重要,爲何咩,由於shuffle的性能優劣直接決定了整個計算引擎的性能和吞吐量。相比於Hadoop的MapReduce,能夠看到Spark提供多種計算結果處理方式,對shuffle過程進行了優化。算法

  那麼咱們從RDD的iterator方法開始:緩存

  

  咱們能夠看到,它調用了cacheManager的getOrCompute方法,若是分區任務第一次執行尚未緩存,那麼會調用computeOrReadCheckpoint。若是某個partition任務執行失敗,能夠利用DAG從新調度,失敗的partition任務將從檢查點恢復狀態,而那些已經成功執行的partition任務因爲其執行結果已經緩存到存儲體系,因此調用CacheManager.getOrCompue方法,不須要再次執行。網絡

  在computeOrReadCheckpoint中,若是存在檢查點時,則進行中間數據的拉取,不然將會從新執行compute,咱們知道RDD具備linkage機制,因此能夠直接找到其父RDD。併發

  那麼compute方法實現了什麼呢?從最底層的HadoopRDD看起,全部類型的RDD都繼承自抽象RDD類。HadoopRDD compute方法以下圖:函數

  

  它實現了一個NextIterator的一個內部類,你有沒有發現那個"input split:"這個日誌很熟悉,沒錯,就是跑任務時在container日誌中打印的日誌信息,也就是第一次數據獲取。而後這個內部類搞了一些事情,從broadcast中獲取jobConf(hadoop的Configuration)建立inputMetrics用於計算字節讀取的測量信息。隨之RecoredReader讀取數據以前建立bytesReadCallback,是用來獲取當前線程從文件系統讀取的字節數。隨後獲取inputFormat:oop

    

  隨後加入hadoop的配置信息,再經過 reader:RecordReader讀取數據。最終會new出一個InterruptibleIterator對象。這個對象用於map結束後的SortShuffleWriter的write方法。由於自己mapReduce的過程就是要寫入磁盤的,如圖:源碼分析

  

  查閱資料,它主要乾了以下事情:性能

  一、建立ExternalSorter,調用insertAll將計算結果寫入緩存。優化

  二、調用shuffleBlockManager.getDataFile方法獲取當前任務要輸出的文件路徑。spa

  三、調用shuffleBlockManager.consolidateId建立blockId。

  四、調用ExternalSorter的writePartitionFile中間結果持久化

  五、調用shuffleBlockManager.writeIndexFile方法建立索引文件。

  六、最終建立MapStatus。

  

  這裏有個重中之重,也就是Hadoop MapReduce過程的問題所在:

  一、Hadoop在reduce任務獲取到map任務的中間輸出後,會對這些數據在磁盤上進行merge sort,產生更多的磁盤I/O.

  二、當數據量很小,可是map任務和reduce任務數目不少時,會產生不少網絡I/O.

  那麼spark的優化在於:

  一、map任務逐條輸出計算結果,而不是一次性輸出到內存,並使用AppendOnlyMap緩存及其聚合算法對中間結果進行聚合,大大減小了中間結果所佔內存的大小。

  二、當超出myMemoryThreshold的大小時,將數據寫入磁盤,防止內存溢出。

  三、reduce任務也是逐條拉取,而且也用了AppendOnlyMap緩存,並在內存中進行聚合和排序,也大大減小了數據佔用的內存。

  四、reduce任務對將要拉取的Block按照BlockManager劃分,而後將同一blockManager地址中的Block累積爲少許網絡請求,減小網絡I/O.

  這裏有個參數,spark.shuffle.sort.bypassMergeThreshold,修改bypassMergeThreshold的大小,在分區數量小的時候提高計算引擎的性能。這個參數主要在partition的數量小於bypassMergeThreshold的值時,就再也不Executor中執行聚合和排序操做,知識將各個partition直接寫入Executor中進行存儲。

  還有一個參數,spark.shuffle.sort.bypassMergeSort,這個參數標記是否傳遞到reduce端再作合併和排序,當沒有定義aggregator、ordering函數,而且partition數量小於等於bypassMergeThreshold時,bypassMergeSort爲true.若是bypassMergeSort爲true,map中間結果將直接輸出到磁盤,就不會佔用內存。

  

  那麼 哪些Block從本地獲取、哪些須要遠程拉取,是獲取中間計算結果的關鍵。那麼reduce端如何處理多個map任務的中間結果?

  這裏有個優化的參數spark.reducer.maxMbInFlight,這是單次航班請求的最大字節數,意思是一批請求,這批請求的字節總數不能超過maxBytesInFlight,並且每一個請求的字節數不能超過maxBytesInfFlight的五分之一,這樣作是爲了提升請求的併發度,容許5個請求分別從5個節點拉取數據。

  調優方案:

  一、在map端溢出分區文件,在reduce端合併組合

  bypassMergeSort不使用緩存,將數據按照paritition寫入不一樣文件,最後按partition順序合併寫入同一文件。但沒有指定聚合、排序函數,且partition數量較小時,通常蠶蛹這種方式。它將多個bucket合併到一個文件,減小map輸出的文件數量,節省磁盤I/O,最終提高了性能。

  

  二、在map端簡單排序、排序分組,在reduce端合併並組合

    在緩存中利用指定的排序函數對數據按照partition或者Key進行排序,按partition順序合併寫入同一文件。當沒有指定聚合函數,且partition數量大時,採用這種方式。

      

  三、在map端緩存中聚合、排序分組,在reduce端組合

    在緩存中對數據按照key聚合,而且利用指定的排序函數對數據按照partition或者key進行排序,最後按partition順序合併寫入同一文件。當指定了聚合函數時,採用這種方式。

 

參考文獻:《深刻理解Spark:核心思想與源碼分析》

相關文章
相關標籤/搜索