性能調優-shuffle調優

shuffle調優 什麼狀況下會發生shuffle,而後shuffle的原理是什麼? 在spark中,主要是如下幾個算子:groupByKey、reduceByKey、countByKey、join,等等。 什麼是shuffle? groupByKey,要把分佈在集羣各個節點上的數據中的同一個key,對應的values,都給集中到一起, 集中到集羣中同一個節點上,更嚴密一點說,就是集中到一個節點的一個executor的一個task中。 而後呢,集中一個key對應的values以後,才能交給咱們來進行處理,<key, Iterable<value>>; reduceByKey,算子函數去對values集合進行reduce操做,最後變成一個value;countByKey, 須要在一個task中,獲取到一個key對應的全部的value,而後進行計數,統計總共有多少個value; join,RDD<key, value>,RDD<key, value>,只要是兩個RDD中,key相同對應的2個value, 都能到一個節點的executor的task中,給咱們進行處理。 問題在於,同一個單詞,好比說(hello, 1),可能散落在不一樣的節點上;對每一個單詞進行累加計數, 就必須讓全部單詞都跑到同一個節點的一個task中,給一個task來進行處理。 每個shuffle的前半部分stage的task,每一個task都會建立下一個stage的task數量相同的文件, 好比下一個stage會有100個task,那麼當前stage每一個task都會建立100份文件;會將同一個key對應的values,必定是寫入同一個文件中的; shuffle的後半部分stage的task,每一個task都會從各個節點上的task寫的屬於本身的那一份文件中, 拉取key, value對;而後task會有一個內存緩衝區,而後會用HashMap,進行key, values的匯聚;(key ,values); task會用咱們本身定義的聚合函數,好比reduceByKey(_+_),把全部values進行一對一的累加; 聚合出來最終的值。就完成了shuffle。 shuffle,必定是分爲兩個stage來完成的。由於這實際上是個逆向的過程,不是stage決定shuffle,是shuffle決定stage。 reduceByKey(_+_),在某個action觸發job的時候,DAGScheduler,會負責劃分job爲多個stage。 劃分的依據,就是,若是發現有會觸發shuffle操做的算子,好比reduceByKey,就將這個操做的前半部分, 以及以前全部的RDD和transformation操做,劃分爲一個stage;shuffle操做的後半部分,以及後面的, 直到action爲止的RDD和transformation操做,劃分爲另一個stage。 shuffle前半部分的task在寫入數據到磁盤文件以前,都會先寫入一個一個的內存緩衝,內存緩衝滿溢以後,再spill溢寫到磁盤文件中。 合併map端輸出文件,這個是很是有用的!!! 若是不合並map端輸出文件的話,會怎麼樣? 一、減小網絡傳輸、disk io、減小reduce端內存緩衝 問題來了:默認的這種shuffle行爲,對性能有什麼樣的惡劣影響呢? 實際生產環境的條件: 100個節點(每一個節點一個executor):100個executor 每一個executor:2個cpu core 總共1000個task:每一個executor平均10個task 上游1000個task,下游1000個task 每一個節點,10個task,每一個節點或者說每個executor會輸出多少份map端文件?10 * 1000=1萬個文件 總共有多少份map端輸出文件?100 * 10000 = 100萬。 shuffle中的寫磁盤的操做,基本上就是shuffle中性能消耗最爲嚴重的部分。 經過上面的分析,一個普通的生產環境的spark job的一個shuffle環節,會寫入磁盤100萬個文件。 磁盤IO對性能和spark做業執行速度的影響,是極其驚人和嚇人的。 基本上,spark做業的性能,都消耗在shuffle中了,雖然不僅是shuffle的map端輸出文件這一個部分, 可是這裏也是很是大的一個性能消耗點。 new SparkConf().set("spark.shuffle.consolidateFiles", "true") 開啓shuffle map端輸出文件合併的機制;默認狀況下,是不開啓的, 就是會發生如上所述的大量map端輸出文件的操做,嚴重影響性能。 開啓了map端輸出文件的合併機制以後: 第一個stage,同時就運行cpu core個task,好比cpu core是2個,並行運行2個task; 每一個task都建立下一個stage的task數量個文件; 第一個stage,並行運行的2個task執行完之後;就會執行另外兩個task; 另外2個task不會再從新建立輸出文件;而是複用以前的task建立的map端輸出文件, 將數據寫入上一批task的輸出文件中。 第二個stage,task在拉取數據的時候,就不會去拉取上一個stage每個task爲本身建立的那份輸出文件了; 提醒一下(map端輸出文件合併): 只有並行執行的task會去建立新的輸出文件;下一批並行執行的task,就會去複用以前已有的輸出文件; 可是有一個例外,好比2個task並行在執行,可是此時又啓動要執行2個task(不是同一批次);那麼這個時候的話, 就沒法去複用剛纔的2個task建立的輸出文件了;而是仍是隻能去建立新的輸出文件。 要實現輸出文件的合併的效果,必須是一批task先執行,而後下一批task再執行, 才能複用以前的輸出文件;負責多批task同時起來執行,仍是作不到複用的。 開啓了map端輸出文件合併機制以後,生產環境上的例子,會有什麼樣的變化? 實際生產環境的條件: 100個節點(每一個節點一個executor):100個executor 每一個executor:2個cpu core 總共1000個task:每一個executor平均10個task 上游1000個task,下游1000個task 每一個節點,2個cpu core,有多少份輸出文件呢?2 * 1000 = 2000個 總共100個節點,總共建立多少份輸出文件呢?100 * 2000 = 20萬個文件 相比較開啓合併機制以前的狀況,100萬個 map端輸出文件,在生產環境中,立減5倍! 合併map端輸出文件,對我們的spark的性能有哪些方面的影響呢? 一、map task寫入磁盤文件的IO,減小:100萬文件 -> 20萬文件 二、第二個stage,本來要拉取第一個stage的task數量份文件,1000個task,第二個stage的每一個task, 都要拉取1000份文件,走網絡傳輸;合併之後,100個節點,每一個節點2個cpu core, 第二個stage的每一個task,主要拉取100 * 2 = 200個文件便可;網絡傳輸的性能消耗是否是也大大減小 分享一下,實際在生產環境中,使用了spark.shuffle.consolidateFiles機制之後, 實際的性能調優的效果:對於上述的這種生產環境的配置,性能的提高,仍是至關的客觀的。 spark做業,5個小時 -> 2~3個小時。 你們不要小看這個map端輸出文件合併機制。實際上,在數據量比較大,你本身自己作了前面的性能調優, executor上去->cpu core上去->並行度(task數量)上去,shuffle沒調優,shuffle就很糟糕了; 大量的map端輸出文件的產生。對性能有比較惡劣的影響。 這個時候,去開啓這個機制,能夠頗有效的提高性能。 spark.shuffle.manager hash M*R 個小文件 spark.shuffle.manager sort 默認