shuffle。。。至關重要,爲何咩,由於shuffle的性能優劣直接決定了整個計算引擎的性能和吞吐量。相比於Hadoop的MapReduce,能夠看到Spark提供多種計算結果處理方式,對shuffle過程進行了優化。算法
那麼咱們從RDD的iterator方法開始:緩存
咱們能夠看到,它調用了cacheManager的getOrCompute方法,若是分區任務第一次執行尚未緩存,那麼會調用computeOrReadCheckpoint。若是某個partition任務執行失敗,能夠利用DAG從新調度,失敗的partition任務將從檢查點恢復狀態,而那些已經成功執行的partition任務因爲其執行結果已經緩存到存儲體系,因此調用CacheManager.getOrCompue方法,不須要再次執行。網絡
在computeOrReadCheckpoint中,若是存在檢查點時,則進行中間數據的拉取,不然將會從新執行compute,咱們知道RDD具備linkage機制,因此能夠直接找到其父RDD。併發
那麼compute方法實現了什麼呢?從最底層的HadoopRDD看起,全部類型的RDD都繼承自抽象RDD類。HadoopRDD compute方法以下圖:函數
它實現了一個NextIterator的一個內部類,你有沒有發現那個"input split:"這個日誌很熟悉,沒錯,就是跑任務時在container日誌中打印的日誌信息,也就是第一次數據獲取。而後這個內部類搞了一些事情,從broadcast中獲取jobConf(hadoop的Configuration)、建立inputMetrics用於計算字節讀取的測量信息。隨之RecoredReader讀取數據以前建立bytesReadCallback,是用來獲取當前線程從文件系統讀取的字節數。隨後獲取inputFormat:oop
隨後加入hadoop的配置信息,再經過 reader:RecordReader讀取數據。最終會new出一個InterruptibleIterator對象。這個對象用於map結束後的SortShuffleWriter的write方法。由於自己mapReduce的過程就是要寫入磁盤的,如圖:源碼分析
查閱資料,它主要乾了以下事情:性能
一、建立ExternalSorter,調用insertAll將計算結果寫入緩存。優化
二、調用shuffleBlockManager.getDataFile方法獲取當前任務要輸出的文件路徑。spa
三、調用shuffleBlockManager.consolidateId建立blockId。
四、調用ExternalSorter的writePartitionFile將中間結果持久化。
五、調用shuffleBlockManager.writeIndexFile方法建立索引文件。
六、最終建立MapStatus。
這裏有個重中之重,也就是Hadoop MapReduce過程的問題所在:
一、Hadoop在reduce任務獲取到map任務的中間輸出後,會對這些數據在磁盤上進行merge sort,產生更多的磁盤I/O.
二、當數據量很小,可是map任務和reduce任務數目不少時,會產生不少網絡I/O.
那麼spark的優化在於:
一、map任務逐條輸出計算結果,而不是一次性輸出到內存,並使用AppendOnlyMap緩存及其聚合算法對中間結果進行聚合,大大減小了中間結果所佔內存的大小。
二、當超出myMemoryThreshold的大小時,將數據寫入磁盤,防止內存溢出。
三、reduce任務也是逐條拉取,而且也用了AppendOnlyMap緩存,並在內存中進行聚合和排序,也大大減小了數據佔用的內存。
四、reduce任務對將要拉取的Block按照BlockManager劃分,而後將同一blockManager地址中的Block累積爲少許網絡請求,減小網絡I/O.
這裏有個參數,spark.shuffle.sort.bypassMergeThreshold,修改bypassMergeThreshold的大小,在分區數量小的時候提高計算引擎的性能。這個參數主要在partition的數量小於bypassMergeThreshold的值時,就再也不Executor中執行聚合和排序操做,知識將各個partition直接寫入Executor中進行存儲。
還有一個參數,spark.shuffle.sort.bypassMergeSort,這個參數標記是否傳遞到reduce端再作合併和排序,當沒有定義aggregator、ordering函數,而且partition數量小於等於bypassMergeThreshold時,bypassMergeSort爲true.若是bypassMergeSort爲true,map中間結果將直接輸出到磁盤,就不會佔用內存。
那麼 哪些Block從本地獲取、哪些須要遠程拉取,是獲取中間計算結果的關鍵。那麼reduce端如何處理多個map任務的中間結果?
這裏有個優化的參數spark.reducer.maxMbInFlight,這是單次航班請求的最大字節數,意思是一批請求,這批請求的字節總數不能超過maxBytesInFlight,並且每一個請求的字節數不能超過maxBytesInfFlight的五分之一,這樣作是爲了提升請求的併發度,容許5個請求分別從5個節點拉取數據。
調優方案:
一、在map端溢出分區文件,在reduce端合併組合
bypassMergeSort不使用緩存,將數據按照paritition寫入不一樣文件,最後按partition順序合併寫入同一文件。但沒有指定聚合、排序函數,且partition數量較小時,通常蠶蛹這種方式。它將多個bucket合併到一個文件,減小map輸出的文件數量,節省磁盤I/O,最終提高了性能。
二、在map端簡單排序、排序分組,在reduce端合併並組合
在緩存中利用指定的排序函數對數據按照partition或者Key進行排序,按partition順序合併寫入同一文件。當沒有指定聚合函數,且partition數量大時,採用這種方式。
三、在map端緩存中聚合、排序分組,在reduce端組合
在緩存中對數據按照key聚合,而且利用指定的排序函數對數據按照partition或者key進行排序,最後按partition順序合併寫入同一文件。當指定了聚合函數時,採用這種方式。
參考文獻:《深刻理解Spark:核心思想與源碼分析》