spark_shuffle方式的演進過程

spark shuffle有四種方式,分別是網絡

  1. hashshuffle
  2. 優化後的hashshuffle
  3. sortshuffle
  4. bypass

1、hashshuffle與優化併發

一開始spark的shuffle方式是hashshuffle。hashshuffle有一個嚴重的問題,就是產生的小文件數量比較多。jvm

咱們知道,shuffle分爲map端的shuffle write 和reduce端的shuffle read。優化

hashshuffle,每一個task爲下游每個task都建立了一個文件,因此就產生了M*R的小文件數量,其中M是map的task數量,R是reduce的數量。spa

爲了減小小文件的數量,spark隨後提出了優化後的hashshuffle,即每一個core處理的task文件的結果合併起來排序

好比:task1 write了3個小文件,裏面是keyA keyB keyC,分別叫BlockA BlockB BlockC 吧索引

這時又來了一個task2,優化前task2 也產生三個小文件BlockA BlockB BlockC。這樣就有6個小文件內存

可是task2 和task1是同一個core處理的,因此task2的BlockA BlockB BlockC 寫入了task1 的BlockA BlockB BlockC,因爲同一個文件中是同一個下游task處理的,因此只要追加寫入就能夠了。hash

該優化方法,本質上是小文件的合併(同一個core的小文件數量合併),朝着這個方向,咱們能夠繼續優化成「同一個executor(jvm)產生的同一個下游task的小文件合併」,和同一臺機器合併,不過spark並無實現這些優化。it

因爲同一個stage裏的map的task不分前後順序,可是同一個core裏的task是前後處理的。因此同一個jvm或者同一臺機器的小文件合併,沒有同一個core的合併那麼簡單。

spark也提出過內存共享的優化方法,跨executor(jvm)去共享一臺機器上面的內存。

該優化最終將小文件數量降低了X倍,X爲平均每一個core處理的task數量,

  • 好比,有100個map,下游是500個reduce,那本來是100*500=5w的小文件數量,如今map端分配的core是20個(每一個core處理5個task),就變成了20*500=1w,縮小了5倍。

該優化方法雖然好,可是受限於機器並行的能力,若是機器並行的能力強,分配的core數量接近於map的數量,該優化就十分有限。

 

2、sortshuffle

爲了實現更進一步的小文件合併,spark在隨後提出了sortshuffle。

sortshuffle的目的,仍是實現"更寬"的「共享」。

上述講到hashshuffle對spark的小文件作了同一個core的合併,可是因爲不一樣core的task沒有前後順序,很難合併。sortshuffle就是爲了實現「每臺機器上的map task小文件都合併」。

由於一臺機器上的task之間執行沒有順序,因此要等待全部的task執行完成。沒法有效利用併發能力。

若是按照以前的作法,爲每一個reduce task建立一個文件,而且同一臺機器上的同一個reduce task的文件合併的話,這樣最多能節省的倍數爲平均一臺機器處理的task數量,

  • 好比,有100個map,下游是500個reduce,那本來是100*500=5w的小文件數量,如今map端分配的core是20個,20個core在5臺機器上(機器是4核的),就變成了5*500=2000,縮小了20倍。

顯然優化者不知足於這種程度的優化,因而優化者此次對"*500" 動手了(reduce task的數量)。如何優化"*500", 原來一個map task 會產生500個task,如今將這些都合併了。因此就沒有了"*500"

  • 好比,有100個map,下游是500個reduce,那本來是100*500=5w的小文件數量,如今合併了同一個map task產生的reduce task,那麼文件數量就變成了100個。雖小了500倍,(因爲要增長索引文件,因此實際上要除以2,是250倍)

只不過若是一個map,爲500個reduce task 產生一個文件,這500個reduce task要怎麼使用呢?答案是排序+索引,這就是sortshuffle的由來。

爲了實現500個reduce task對一個文件的高效操做,map write的文件內部是有序的,而且還爲該文件提供了一個索引文件。這樣reduce task 就能夠根據索引文件在該文件中找到本身對應的數據了。

該方式的map write的方式也發生了改變,相似於lsm的操做那樣,每次內存寫滿了就一次性把內存數據進行排序,寫入到磁盤生成一個文件,等到全部的數據都寫入磁盤後,再對全部生成的文件進行一個歸併排序。

這種shuffle的好處是大大減小了小文件的數量,且優化的力度不受限於reduce task的數量,只受限於map的數量。壞處是增長了一個排序的開銷。

既然map write產生的文件都是有序的,爲何再也不對同一臺機器或core的的文件進行歸併呢。答案是沒有必要,太多的小文件會增大網絡IO的開銷,太少的文件對下降併發的利用率。不過,若是上游的數據量很大(map數量不少),而該key的基數(去重後的數量)比較小的話,仍是能夠考慮按core或jvm或機器級別進行合併的(合併還要考慮到索引的合併),不過spark沒有提供這類優化。

3、bypass

bypass是sortshuffle的一種優化。上述提到,map write的文件內部作了全局排序。是爲了多個reduce task能找到各自須要read的數據。可是在一個reduce task內,數據並不須要有序,而reduce task的數量通常遠遠小於數據行數(數據key的基數),因此這就形成了計算的浪費。

爲了中止這種浪費,spark提出了新的shuffle優化bypass,即改變map write的方式,在write的時候,原來要對每個flush到磁盤的小文件進行排序,如今不排序了,複製以前hashshuffle的作法,爲每一個reduce task寫一個小文件。最後,將這些同一個map產生的小文件合併成一個大文件,合併的方式很簡單,就直接追加就能夠了,最後對結果文件增長一個索引,這樣下游每個reduce task都能找到本身要讀的數據。這樣既省下了排序的開銷,又將小文件數量縮小到了2*M的數量(和sortshuffle同樣)。可謂是兼具了hashshuffle和sortshuffle的優勢。

不過該方法也有限制的地方。該shuffle不支持預聚合,map的數量也儘可能要小(和最初的hashshuffle同樣,map數量過大會產生過多的臨時文件)

觸發bypass的map數量上限能夠用參數 spark.shuffle.sort.bypassMergeThreshold 設置

相關文章
相關標籤/搜索