spark-shuffle調優

什麼狀況下會發生shuffle,而後shuffle的原理是什麼?網絡

  • 在spark中,主要是如下幾個算子:groupByKey、reduceByKey、countByKey、join,等等。
  • groupByKey,要把分佈在集羣各個節點上的數據中的同一個key,對應的values,都給集中到一起,集中到集羣中同一個節點上,更嚴密一點說,就是集中到一個節點的一個executor的一個task中。而後呢,集中一個key對應的values以後,才能交給咱們來進行處理,<key, Iterable<value>>;reduceByKey,算子函數去對values集合進行reduce操做,最後變成一個value;countByKey,須要在一個task中,獲取到一個key對應的全部的value,而後進行計數,統計總共有多少個value;join,RDD<key, value>,RDD<key, value>,只要是兩個RDD中,key相同對應的2個value,都能到一個節點的executor的task中,給咱們進行處理。
  • 問題在於,同一個單詞,好比說(hello, 1),可能散落在不一樣的節點上;對每一個單詞進行累加計數,就必須讓全部單詞都跑到同一個節點的一個task中,給一個task來進行處理;
  • 每個shuffle的前半部分stage的task,每一個task都會建立下一個stage的task數量相同的文件,好比下一個stage會有100個task,那麼當前stage每一個task都會建立100份文件;會將同一個key對應的values,必定是寫入同一個文件中的;
  • shuffle的後半部分stage的task,每一個task都會從各個節點上的task寫的屬於本身的那一份文件中,拉取key, value對;而後task會有一個內存緩衝區,而後會用HashMap,進行key, values的匯聚;(key ,values);
  • task會用咱們本身定義的聚合函數,好比reduceByKey(_+_),把全部values進行一對一的累加,聚合出來最終的值。就完成了shuffle;
  • shuffle,必定是分爲兩個stage來完成的。由於這實際上是個逆向的過程,不是stage決定shuffle,是shuffle決定stage。
  • reduceByKey(_+_),在某個action觸發job的時候,DAGScheduler,會負責劃分job爲多個stage。劃分的依據,就是,若是發現有會觸發shuffle操做的算子,好比reduceByKey,就將這個操做的前半部分,以及以前全部的RDD和transformation操做,劃分爲一個stage;shuffle操做的後半部分,以及後面的,直到action爲止的RDD和transformation操做,劃分爲另一個stage;
  • shuffle前半部分的task在寫入數據到磁盤文件以前,都會先寫入一個一個的內存緩衝,內存緩衝滿溢以後,再spill溢寫到磁盤文件中。

若是不合並map端輸出文件的話,會怎麼樣?app

  1. 減小網絡傳輸、disk io、減小reduce端內存緩衝
    • 問題來了:默認的這種shuffle行爲,對性能有什麼樣的惡劣影響呢?
      • 實際生產環境的條件:
        • 100個節點(每一個節點一個executor):100個executor,每一個executor:2個cpu core,總共1000個task:每一個executor平均10個task,上游1000個task,下游1000個task,每一個節點,10個task,每一個節點或者說每個executor會輸出多少份map端文件?10 * 1000=1萬個文件(M*R)
        • 總共有多少份map端輸出文件?100 * 10000 = 100萬。
    • shuffle中的寫磁盤的操做,基本上就是shuffle中性能消耗最爲嚴重的部分。
    • 經過上面的分析,一個普通的生產環境的spark job的一個shuffle環節,會寫入磁盤100萬個文件。
    • 磁盤IO對性能和spark做業執行速度的影響,是極其驚人和嚇人的。
    • 基本上,spark做業的性能,都消耗在shuffle中了,雖然不僅是shuffle的map端輸出文件這一個部分,可是這裏也是很是大的一個性能消耗點。
      new SparkConf().set("spark.shuffle.consolidateFiles", "true")

       

  2. 開啓shuffle map端輸出文件合併的機制;默認狀況下,是不開啓的,就是會發生如上所述的大量map端輸出文件的操做,嚴重影響性能。jvm

  • 開啓了map端輸出文件的合併機制以後:
    • 第一個stage,同時就運行cpu core個task,好比cpu core是2個,並行運行2個task;
    • 每一個task都建立下一個stage的task數量個文件;
    • 第一個stage,並行運行的2個task執行完之後,就會執行另外兩個task;
    • 另外2個task不會再從新建立輸出文件;而是複用以前的task建立的map端輸出文件,將數據寫入上一批task的輸出文件中;
    • 第二個stage,task在拉取數據的時候,就不會去拉取上一個stage每個task爲本身建立的那份輸出文件了;


提醒一下(map端輸出文件合併):函數

  • 只有並行執行的task會去建立新的輸出文件;
  • 下一批並行執行的task,就會去複用以前已有的輸出文件;
  • 可是有一個例外,好比2個task並行在執行,可是此時又啓動要執行2個task(不是同一批次);
  • 那麼這個時候的話,就沒法去複用剛纔的2個task建立的輸出文件了;
  • 而是仍是隻能去建立新的輸出文件。

要實現輸出文件的合併的效果,必須是一批task先執行,而後下一批task再執行,
才能複用以前的輸出文件;負責多批task同時起來執行,仍是作不到複用的。
性能

開啓了map端輸出文件合併機制以後,生產環境上的例子,會有什麼樣的變化?spa

實際生產環境的條件:
100個節點(每一個節點一個executor):100個executor
每一個executor:2個cpu core
總共1000個task:每一個executor平均10個task
上游1000個task,下游1000個taskscala


每一個節點,2個cpu core,有多少份輸出文件呢?2 * 1000 = 2000個(C*R)
總共100個節點,總共建立多少份輸出文件呢?100 * 2000 = 20萬個文件code

相比較開啓合併機制以前的狀況,100萬個orm

map端輸出文件,在生產環境中,立減5倍!進程

合併map端輸出文件,對我們的spark的性能有哪些方面的影響呢?

  1. map task寫入磁盤文件的IO,減小:100萬文件 -> 20萬文件
  2. 第二個stage,本來要拉取第一個stage的task數量份文件,1000個task,第二個stage的每一個task,都要拉取1000份文件,走網絡傳輸;合併之後,100個節點,每一個節點2個cpu core,第二個stage的每一個task,主要拉取1000 * 2 = 2000個文件便可;網絡傳輸的性能消耗是否是也大大減小分享一下,實際在生產環境中,使用了spark.shuffle.consolidateFiles機制之後,實際的性能調優的效果:對於上述的這種生產環境的配置,性能的提高,仍是至關的客觀的。

spark做業,5個小時 -> 2~3個小時。

你們不要小看這個map端輸出文件合併機制。實際上,在數據量比較大,你本身自己作了前面的性能調優,
executor上去->cpu core上去->並行度(task數量)上去,shuffle沒調優,shuffle就很糟糕了;
大量的map端輸出文件的產生。對性能有比較惡劣的影響。

這個時候,去開啓這個機制,能夠頗有效的提高性能。
spark.shuffle.manager hash M*R 個小文件
spark.shuffle.manager sort   C*R 個小文件  (默認的shuffle管理機制

spark.shuffle.file.buffer,默認32k
spark.shuffle.memoryFraction,0.2

        默認狀況下,shuffle的map task,輸出到磁盤文件的時候,統一都會先寫入每一個task本身關聯的一個內存緩衝區。這個緩衝區大小,默認是32kb。每一次,當內存緩衝區滿溢以後,纔會進行spill操做,溢寫操做,溢寫到磁盤文件中去reduce端task,在拉取到數據以後,會用hashmap的數據格式,來對各個key對應的values進行匯聚。針對每一個key對應的values,執行咱們自定義的聚合函數的代碼,好比_ + _(把全部values累加起來)reduce task,在進行匯聚、聚合等操做的時候,實際上,使用的就是本身對應的executor的內存,executor(jvm進程,堆),默認executor內存中劃分給reduce task進行聚合的比例,是0.2。問題來了,由於比例是0.2,因此,理論上,頗有可能會出現,拉取過來的數據不少,那麼在內存中,放不下;這個時候,默認的行爲,就是說,將在內存放不下的數據,都spill(溢寫)到磁盤文件中去。

原理說完以後,來看一下,默認狀況下,不調優,可能會出現什麼樣的問題?

默認,map端內存緩衝是每一個task,32kb。
默認,reduce端聚合內存比例,是0.2,也就是20%。

若是map端的task,處理的數據量比較大,可是呢,你的內存緩衝大小是固定的。
可能會出現什麼樣的狀況?

每一個task就處理320kb,32kb,總共會向磁盤溢寫320 / 32 = 10次。
每一個task處理32000kb,32kb,總共會向磁盤溢寫32000 / 32 = 1000次。

在map task處理的數據量比較大的狀況下,而你的task的內存緩衝默認是比較小的,32kb。可能會形成屢次的map端往磁盤文件的spill溢寫操做,發生大量的磁盤IO,從而下降性能。

reduce端聚合內存,佔比。默認是0.2。若是數據量比較大,reduce task拉取過來的數據不少,那麼就會頻繁發生reduce端聚合內存不夠用,頻繁發生spill操做,溢寫到磁盤上去。並且最要命的是,磁盤上溢寫的數據量越大,後面在進行聚合操做的時候,極可能會屢次讀取磁盤中的數據,進行聚合。

默認不調優,在數據量比較大的狀況下,可能頻繁地發生reduce端的磁盤文件的讀寫。

這兩個點之因此放在一塊兒講,是由於他們倆是有關聯的。數據量變大,map端確定會出點問題;
reduce端確定也會出點問題;出的問題是同樣的,都是磁盤IO頻繁,變多,影響性能。

調優:

調節map task內存緩衝:spark.shuffle.file.buffer,默認32k(spark 1.3.x不是這個參數,
後面還有一個後綴,kb;spark 1.5.x之後,變了,就是如今這個參數)
調節reduce端聚合內存佔比:spark.shuffle.memoryFraction,0.2

在實際生產環境中,咱們在何時來調節兩個參數?

看Spark UI,若是你的公司是決定採用standalone模式,那麼很簡單,你的spark跑起來,會顯示一個Spark UI的地址,4040的端口,進去看,依次點擊進去,能夠看到,你的每一個stage的詳情,有哪些executor,有哪些task,每一個task的shuffle write和shuffle read的量,shuffle的磁盤和內存,讀寫的數據量;若是是用的yarn模式來提交,課程最前面,從yarn的界面進去,點擊對應的application,進入Spark UI,查看詳情。

若是發現shuffle 磁盤的write和read,很大,能夠調節這兩個參數

調節上面說的那兩個參數。調節的時候的原則。spark.shuffle.file.buffer,每次擴大一倍,而後看看效果,64,128;spark.shuffle.memoryFraction,每次提升0.1,看看效果。不能調節的太大,太大了之後過猶不及,由於內存資源是有限的,你這裏調節的太大了,其餘環節的內存使用就會有問題了。

調節了之後,效果?map task內存緩衝變大了,減小spill到磁盤文件的次數;reduce端聚合內存變大了, 減小spill到磁盤的次數,並且減小了後面聚合讀取磁盤文件的數量。  

相關文章
相關標籤/搜索