什麼狀況下會發生shuffle,而後shuffle的原理是什麼?網絡
- 在spark中,主要是如下幾個算子:groupByKey、reduceByKey、countByKey、join,等等。
- groupByKey,要把分佈在集羣各個節點上的數據中的同一個key,對應的values,都給集中到一起,集中到集羣中同一個節點上,更嚴密一點說,就是集中到一個節點的一個executor的一個task中。而後呢,集中一個key對應的values以後,才能交給咱們來進行處理,<key, Iterable<value>>;reduceByKey,算子函數去對values集合進行reduce操做,最後變成一個value;countByKey,須要在一個task中,獲取到一個key對應的全部的value,而後進行計數,統計總共有多少個value;join,RDD<key, value>,RDD<key, value>,只要是兩個RDD中,key相同對應的2個value,都能到一個節點的executor的task中,給咱們進行處理。
若是不合並map端輸出文件的話,會怎麼樣?app
new SparkConf().set("spark.shuffle.consolidateFiles", "true")
開啓shuffle map端輸出文件合併的機制;默認狀況下,是不開啓的,就是會發生如上所述的大量map端輸出文件的操做,嚴重影響性能。jvm
提醒一下(map端輸出文件合併):函數
要實現輸出文件的合併的效果,必須是一批task先執行,而後下一批task再執行,
才能複用以前的輸出文件;負責多批task同時起來執行,仍是作不到複用的。性能
開啓了map端輸出文件合併機制以後,生產環境上的例子,會有什麼樣的變化?spa
實際生產環境的條件:
100個節點(每一個節點一個executor):100個executor
每一個executor:2個cpu core
總共1000個task:每一個executor平均10個task
上游1000個task,下游1000個taskscala
每一個節點,2個cpu core,有多少份輸出文件呢?2 * 1000 = 2000個(C*R)
總共100個節點,總共建立多少份輸出文件呢?100 * 2000 = 20萬個文件code
相比較開啓合併機制以前的狀況,100萬個orm
map端輸出文件,在生產環境中,立減5倍!進程
合併map端輸出文件,對我們的spark的性能有哪些方面的影響呢?
spark做業,5個小時 -> 2~3個小時。
你們不要小看這個map端輸出文件合併機制。實際上,在數據量比較大,你本身自己作了前面的性能調優,
executor上去->cpu core上去->並行度(task數量)上去,shuffle沒調優,shuffle就很糟糕了;
大量的map端輸出文件的產生。對性能有比較惡劣的影響。
這個時候,去開啓這個機制,能夠頗有效的提高性能。
spark.shuffle.manager hash M*R 個小文件
spark.shuffle.manager sort C*R 個小文件 (默認的shuffle管理機制)
spark.shuffle.file.buffer,默認32k
spark.shuffle.memoryFraction,0.2
默認狀況下,shuffle的map task,輸出到磁盤文件的時候,統一都會先寫入每一個task本身關聯的一個內存緩衝區。這個緩衝區大小,默認是32kb。每一次,當內存緩衝區滿溢以後,纔會進行spill操做,溢寫操做,溢寫到磁盤文件中去reduce端task,在拉取到數據以後,會用hashmap的數據格式,來對各個key對應的values進行匯聚。針對每一個key對應的values,執行咱們自定義的聚合函數的代碼,好比_ + _(把全部values累加起來)reduce task,在進行匯聚、聚合等操做的時候,實際上,使用的就是本身對應的executor的內存,executor(jvm進程,堆),默認executor內存中劃分給reduce task進行聚合的比例,是0.2。問題來了,由於比例是0.2,因此,理論上,頗有可能會出現,拉取過來的數據不少,那麼在內存中,放不下;這個時候,默認的行爲,就是說,將在內存放不下的數據,都spill(溢寫)到磁盤文件中去。
原理說完以後,來看一下,默認狀況下,不調優,可能會出現什麼樣的問題?
默認,map端內存緩衝是每一個task,32kb。
默認,reduce端聚合內存比例,是0.2,也就是20%。
若是map端的task,處理的數據量比較大,可是呢,你的內存緩衝大小是固定的。
可能會出現什麼樣的狀況?
每一個task就處理320kb,32kb,總共會向磁盤溢寫320 / 32 = 10次。
每一個task處理32000kb,32kb,總共會向磁盤溢寫32000 / 32 = 1000次。
在map task處理的數據量比較大的狀況下,而你的task的內存緩衝默認是比較小的,32kb。可能會形成屢次的map端往磁盤文件的spill溢寫操做,發生大量的磁盤IO,從而下降性能。
reduce端聚合內存,佔比。默認是0.2。若是數據量比較大,reduce task拉取過來的數據不少,那麼就會頻繁發生reduce端聚合內存不夠用,頻繁發生spill操做,溢寫到磁盤上去。並且最要命的是,磁盤上溢寫的數據量越大,後面在進行聚合操做的時候,極可能會屢次讀取磁盤中的數據,進行聚合。
默認不調優,在數據量比較大的狀況下,可能頻繁地發生reduce端的磁盤文件的讀寫。
這兩個點之因此放在一塊兒講,是由於他們倆是有關聯的。數據量變大,map端確定會出點問題;
reduce端確定也會出點問題;出的問題是同樣的,都是磁盤IO頻繁,變多,影響性能。
調優:
調節map task內存緩衝:spark.shuffle.file.buffer,默認32k(spark 1.3.x不是這個參數,
後面還有一個後綴,kb;spark 1.5.x之後,變了,就是如今這個參數)
調節reduce端聚合內存佔比:spark.shuffle.memoryFraction,0.2
在實際生產環境中,咱們在何時來調節兩個參數?
看Spark UI,若是你的公司是決定採用standalone模式,那麼很簡單,你的spark跑起來,會顯示一個Spark UI的地址,4040的端口,進去看,依次點擊進去,能夠看到,你的每一個stage的詳情,有哪些executor,有哪些task,每一個task的shuffle write和shuffle read的量,shuffle的磁盤和內存,讀寫的數據量;若是是用的yarn模式來提交,課程最前面,從yarn的界面進去,點擊對應的application,進入Spark UI,查看詳情。
若是發現shuffle 磁盤的write和read,很大,能夠調節這兩個參數
調節上面說的那兩個參數。調節的時候的原則。spark.shuffle.file.buffer,每次擴大一倍,而後看看效果,64,128;spark.shuffle.memoryFraction,每次提升0.1,看看效果。不能調節的太大,太大了之後過猶不及,由於內存資源是有限的,你這裏調節的太大了,其餘環節的內存使用就會有問題了。
調節了之後,效果?map task內存緩衝變大了,減小spill到磁盤文件的次數;reduce端聚合內存變大了, 減小spill到磁盤的次數,並且減小了後面聚合讀取磁盤文件的數量。