上一篇博客《剖析Hadoop和Spark的Shuffle過程差別(一)》剖析了Hadoop MapReduce的Shuffle過程,那麼本篇博客,來聊一聊Spark shuffle。算法
Spark shuffle相對來講更簡單,由於不要求全局有序,因此沒有那麼多排序合併的操做。Spark shuffle分爲write和read兩個過程。咱們先來看shuffle write。緩存
1、shuffle write函數
shuffle write的處理邏輯會放到該ShuffleMapStage的最後(由於spark以shuffle發生與否來劃分stage,也就是寬依賴),final RDD的每一條記錄都會寫到對應的分區緩存區bucket,以下圖所示:oop
說明:fetch
一、上圖有2個CPU,能夠同時運行兩個ShuffleMapTaskspa
二、每一個task將寫一個buket緩衝區,緩衝區的數量和reduce任務的數量相等.net
三、 每一個buket緩衝區會生成一個對應ShuffleBlockFileblog
四、ShuffleMapTask 如何決定數據被寫到哪一個緩衝區呢?這個就是跟partition算法有關係,這個分區算法能夠是hash的,也能夠是range的 排序
五、最終產生的ShuffleBlockFile會有多少呢?就是ShuffleMapTask 數量乘以reduce的數量,這個是很是巨大的get
那麼有沒有辦法解決生成文件過多的問題呢?有,開啓FileConsolidation便可,開啓FileConsolidation以後的shuffle過程以下:
在同一核CPU執行前後執行的ShuffleMapTask能夠共用一個bucket緩衝區,而後寫到同一份ShuffleFile裏去,上圖所示的ShuffleFile其實是用多個ShuffleBlock構成,那麼,那麼每一個worker最終生成的文件數量,變成了cpu核數乘以reduce任務的數量,大大縮減了文件量。
2、Shuffle read
Shuffle write過程將數據分片寫到對應的分片文件,這時候萬事具有,只差去拉取對應的數據過來計算了。
那麼Shuffle Read發送的時機是什麼?是要等全部ShuffleMapTask執行完,再去fetch數據嗎?理論上,只要有一個 ShuffleMapTask執行完,就能夠開始fetch數據了,實際上,spark必須等到父stage執行完,才能執行子stage,因此,必須等到全部 ShuffleMapTask執行完畢,纔去fetch數據。fetch過來的數據,先存入一個Buffer緩衝區,因此這裏一次性fetch的FileSegment不能太大,固然若是fetch過來的數據大於每個閥值,也是會spill到磁盤的。
fetch的過程過來一個buffer的數據,就能夠開始聚合了,這裏就遇到一個問題,每次fetch部分數據,怎麼能實現全局聚合呢?以word count的reduceByKey(《Spark RDD操做之ReduceByKey 》)爲例,假設單詞hello有十個,可是一次fetch只拉取了2個,那麼怎麼全局聚合呢?Spark的作法是用HashMap,聚合操做其實是map.put(key,map.get(key)+1),將map中的聚合過的數據get出來相加,而後put回去,等到全部數據fetch完,也就完成了全局聚合。
3、總結
Hadoop的MapReduce Shuffle和Spark Shuffle差異總結以下:
一、Hadoop的有一個Map完成,Reduce即可以去fetch數據了,沒必要等到全部Map任務完成,而Spark的必須等到父stage完成,也就是父stage的map操做所有完成才能去fetch數據。
二、Hadoop的Shuffle是sort-base的,那麼不論是Map的輸出,仍是Reduce的輸出,都是partion內有序的,而spark不要求這一點。
三、Hadoop的Reduce要等到fetch徹底部數據,纔將數據傳入reduce函數進行聚合,而spark是一邊fetch一邊聚合。