不可不知的spark shuffle

640?wx_fmt=pngshuffle概覽html

一個spark的RDD有一組固定的分區組成,每一個分區有一系列的記錄組成。對於由窄依賴變換(例如map和filter)返回的RDD,會延續父RDD的分區信息,以pipeline的形式計算。每一個對象僅依賴於父RDD中的單個對象。諸如coalesce之類的操做可能致使任務處理多個輸入分區,但轉換仍然被認爲是窄依賴的,由於一個父RDD的分區只會被一個子RDD分區繼承。網絡

Spark還支持寬依賴的轉換,例如groupByKey和reduceByKey。在這些依賴項中,計算單個分區中的記錄所需的數據能夠來自於父數據集的許多分區中。要執行這些轉換,具備相同key的全部元組必須最終位於同一分區中,由同一任務處理。爲了知足這一要求,Spark產生一個shuffle,它在集羣內部傳輸數據,併產生一個帶有一組新分區的新stage。分佈式

能夠看下面的代碼片斷:性能

 

上面的代碼片斷只有一個action操做,count,從輸入textfile到action通過了三個轉換操做。這段代碼只會在一個stage中運行,由於,三個轉換操做沒有shuffle,也便是三個轉換操做的每一個分區都是隻依賴於它的父RDD的單個分區。大數據

可是,下面的單詞統計就跟上面有很大區別:優化

 

這段代碼裏有兩個reducebykey操做,三個stage。spa

下面圖更復雜,由於有一個join操做:.net

640?wx_fmt=png

粉框圈住的就是整個DAG的stage劃分。線程

640?wx_fmt=png

在每一個stage的邊界,父stage的task會將數據寫入磁盤,子stage的task會將數據經過網絡讀取。因爲它們會致使很高的磁盤和網絡IO,因此shuffle代價至關高,應該儘可能避免。父stage的數據分區每每和子stage的分區數不一樣。觸發shuffle的操做算子每每能夠指定分區數的,也便是numPartitions表明下個stage會有多少個分區。就像mr任務中reducer的數據是很是重要的一個參數同樣,shuffle的時候指定分區數也將在很大程度上決定一個應用程序的性能。orm

640?wx_fmt=png優化shuffle

一般狀況能夠選擇使用產生相同結果的action和transform相互替換。可是並非產生相同結果的算子就會有相同的性能。一般避免常見的陷阱並選擇正確的算子能夠顯著提升應用程序的性能。

當選擇轉換操做的時候,應最小化shuffle次數和shuffle的數據量。shuffle是很是消耗性能的操做。全部的shuffle數據都會被寫入磁盤,而後經過網絡傳輸。repartition , join, cogroup, 和 ?*By 或者 *ByKey 類型的操做都會產生shuffle。咱們能夠對一下幾個操做算子進行優化:

1. groupByKey某些狀況下能夠被reducebykey代替。

2. reduceByKey某些狀況下能夠被 aggregatebykey代替。

3. flatMap-join-groupBy某些狀況下能夠被cgroup代替。

具體細節,知識星球球友能夠點擊閱讀原文進入知識星球閱讀。

640?wx_fmt=pngno shuffle

在某些狀況下,前面描述的轉換操做不會致使shuffle。當先前的轉換操做已經使用了和shuffle相同的分區器分區數據的時候,spark就不會產生shuffle。

舉個例子:

 

因爲使用redcuebykey的時候沒有指定分區器,因此都是使用的默認分區器,會致使rdd1和rdd2都採用的是hash分區器。兩個reducebykey操做會產生兩個shuffle過程。若是,數據集有相同的分區數,執行join操做的時候就不須要進行額外的shuffle。因爲數據集的分區相同,所以rdd1的任何單個分區中的key集合只能出如今rdd2的單個分區中。 所以,rdd3的任何單個輸出分區的內容僅取決於rdd1中單個分區的內容和rdd2中的單個分區,而且不須要第三個shuffle。

例如,若是someRdd有四個分區,someOtherRdd有兩個分區,而reduceByKeys都使用三個分區,運行的任務集以下所示:

640?wx_fmt=png

若是rdd1和rdd2使用不一樣的分區器或者相同的分區器不一樣的分區數,僅僅一個數據集在join的過程當中須要從新shuffle

640?wx_fmt=png

 

在join的過程當中爲了不shuffle,可使用廣播變量。當executor內存能夠存儲數據集,在driver端能夠將其加載到一個hash表中,而後廣播到executor。而後,map轉換能夠引用哈希表來執行查找。

640?wx_fmt=png增長shuffle

有時候須要打破最小化shuffle次數的規則。

當增長並行度的時候,額外的shuffle是有利的。例如,數據中有一些文件是不可分割的,那麼該大文件對應的分區就會有大量的記錄,而不是說將數據分散到儘量多的分區內部來使用全部已經申請cpu。在這種狀況下,使用reparition從新產生更多的分區數,以知足後面轉換算子所需的並行度,這會提高很大性能。

使用reduce和aggregate操做將數據聚合到driver端,也是修改區數的很好的例子。

在對大量分區執行聚合的時候,在driver的單線程中聚合會成爲瓶頸。要減driver的負載,雅思報名條件能夠首先使用reducebykey或者aggregatebykey執行一輪分佈式聚合,同時將結果數據集分區數減小。實際思路是首先在每一個分區內部進行初步聚合,同時減小分區數,而後再將聚合的結果發到driver端實現最終聚合。典型的操做是treeReduce?和?treeAggregate。

當聚合已經按照key進行分組時,此方法特別適用。例如,假如一個程序計算語料庫中每一個單詞出現的次數,並將結果使用map返回到driver。一種方法是可使用聚合操做完成在每一個分區計算局部map,而後在driver中合併map。能夠用aggregateByKey以徹底分佈的方式進行統計,而後簡單的用collectAsMap將結果返回到driver。

更多spark技巧,大數據技巧,歡迎點擊閱讀原文加入知識星球

推薦閱讀:

經驗|如何設置Spark資源

戳破 | hive on spark 調優勢

640?wx_fmt=png


文章來源:http://www.javashuo.com/article/p-xcnyrpde-kg.html

相關文章
相關標籤/搜索