Spark學習——性能調優(二)

其餘更多java基礎文章:
java基礎學習(目錄)java


繼續上一篇Spark學習——性能調優(一)的講解緩存

下降cache操做的內存佔比

關於RDD內存的使用,後面會專門寫一篇,能夠配合學習閱讀bash

spark中,堆內存又被劃分紅了兩塊兒,一起是專門用來給RDD的cache、persist操做進行RDD數據緩存用的;另一塊兒,就是咱們剛纔所說的,用來給spark算子函數的運行使用的,存放函數中本身建立的對象。網絡

默認狀況下,給RDD cache操做的內存佔比,是0.6,60%的內存都給了cache操做了。可是問題是,若是某些狀況下,cache不是那麼的緊張,問題在於task算子函數中建立的對象過多,而後內存又不太大,致使了頻繁的minor gc,甚至頻繁full gc,致使spark頻繁的中止工做。性能影響會很大。app

針對上述這種狀況,你們能夠在spark ui界面,若是經過yarn去運行的話,那麼就經過yarn的界面,去查看你的spark做業的運行統計。能夠看到每一個stage的運行狀況,包括每一個task的運行時間、gc時間等等。若是發現gc太頻繁,時間太長。此時就能夠適當調價這個比例。jvm

下降cache操做的內存佔比,大不了用persist操做,選擇將一部分緩存的RDD數據寫入磁盤,或者序列化方式,配合Kryo序列化類,減小RDD緩存的內存佔用;下降cache操做內存佔比;對應的,算子函數的內存佔比就提高了。這個時候,可能,就能夠減小minor gc的頻率,同時減小full gc的頻率。對性能的提高是有必定的幫助的。函數

一句話,讓task執行算子函數時,有更多的內存可使用。post

SparkConf conf = new SparkConf()
  .set("spark.storage.memoryFraction", "0.5")
複製代碼

調節executor堆外內存

有時候,若是你的spark做業處理的數據量特別特別大,幾億數據量;而後spark做業一運行,時不時的報錯,shuffle file cannot find,executor、task lost,out of memory(內存溢出);

多是說executor的堆外內存不太夠用,致使executor在運行的過程當中,可能會內存溢出;而後可能致使後續的stage的task在運行的時候,可能要從一些executor中去拉取shuffle map output文件,可是executor可能已經掛掉了,關聯的block manager也沒有了;因此可能會報shuffle output file not found;resubmitting task;executor lost;spark做業完全崩潰。性能

上述狀況下,就能夠去考慮調節一下executor的堆外內存。也許就能夠避免報錯;此外,有時,堆外內存調節的比較大的時候,對於性能來講,也會帶來必定的提高。學習

如何調節executor堆外內存

--conf spark.yarn.executor.memoryOverhead=2048
複製代碼

spark-submit腳本里面,去用--conf的方式,去添加配置;必定要注意!!!切記,不是在你的spark做業代碼中,用new SparkConf().set()這種方式去設置,不要這樣去設置,是沒有用的!必定要在spark-submit腳本中去設置。

spark.yarn.executor.memoryOverhead(看名字,顧名思義,針對的是基於yarn的提交模式)

默認狀況下,這個堆外內存上限大概是300多M;咱們一般項目中,真正處理大數據的時候,這裏都會出現問題,致使spark做業反覆崩潰,沒法運行;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G

一般這個參數調節上去之後,就會避免掉某些JVM OOM的異常問題,同時呢,會讓總體spark做業的性能,獲得較大的提高。

調節鏈接等待時長

若是本地block manager沒有的話,那麼會經過TransferService,去遠程鏈接其餘節點上executor的block manager去獲取。若是正好其餘節點上的executor正在GC,此時呢,就會沒有響應,沒法創建網絡鏈接;會卡住;ok,spark默認的網絡鏈接的超時時長,是60s;若是卡住60s都沒法創建鏈接的話,那麼就宣告失敗了。

碰到一種狀況,偶爾,偶爾,偶爾!!!沒有規律!!!某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。

這種狀況下,頗有多是有那份數據的executor在jvm gc。因此拉取數據的時候,創建不了鏈接。而後超過默認60s之後,直接宣告失敗。

報錯幾回,幾回都拉取不到數據的話,可能會致使spark做業的崩潰。也可能會致使DAGScheduler,反覆提交幾回stage。TaskScheduler,反覆提交幾回task。大大延長咱們的spark做業的運行時間。

能夠考慮調節鏈接的超時時長。

--conf spark.core.connection.ack.wait.timeout=300
複製代碼

spark-submit腳本,切記,不是在new SparkConf().set()這種方式來設置的。

合併map端輸出文件

實際生產環境的條件:
100個節點(每一個節點一個executor):100個executor
每一個executor:2個cpu core
總共1000個task:每一個executor平均10個task
每一個節點,10個task,每一個節點會輸出多少份map端文件?10 * 1000=1萬個文件
總共有多少份map端輸出文件?100 * 10000 = 100萬。

  • shuffle中的寫磁盤的操做,基本上就是shuffle中性能消耗最爲嚴重的部分。 經過上面的分析,一個普通的生產環境的spark job的一個shuffle環節,會寫入磁盤100萬個文件。 磁盤IO對性能和spark做業執行速度的影響,是極其驚人和嚇人的。 基本上,spark做業的性能,都消耗在shuffle中了,雖然不僅是shuffle的map端輸出文件這一個部分,可是這裏也是很是大的一個性能消耗點。

開啓map端輸出文件的合併機制

經過一下命令能夠開啓map端輸出文件的合併機制

new SparkConf().set("spark.shuffle.consolidateFiles", "true")
複製代碼

如上圖:

  • 第一個stage,同時就運行cpu core個task,好比cpu core是2個,並行運行2個task;每一個task都建立下一個stage的task數量個文件;
  • 第一個stage,並行運行的2個task執行完之後;就會執行另外兩個task;另外2個task不會再從新建立輸出文件;而是複用以前的task建立的map端輸出文件,將數據寫入上一批task的輸出文件中。
  • 第二個stage,task在拉取數據的時候,就不會去拉取上一個stage每個task爲本身建立的那份輸出文件了;而是拉取少許的輸出文件,每一個輸出文件中,可能包含了多個task給本身的map端輸出。

合併map端輸出文件後,上面的例子會有什麼改變呢?

  1. map task寫入磁盤文件的IO,減小:100萬文件 -> 20萬文件
  2. 第二個stage,本來要拉取第一個stage的task數量份文件,1000個task,第二個stage的每一個task,都要拉取1000份文件,走網絡傳輸;合併之後,100個節點,每一個節點2個cpu core,第二個stage的每一個task,主要拉取100 * 2 = 200個文件便可;網絡傳輸的性能消耗是否是也大大減小

提醒一下(map端輸出文件合併):
只有並行執行的task會去建立新的輸出文件;下一批並行執行的task,就會去複用以前已有的輸出文件;可是有一個例外,好比2個task並行在執行,可是此時又啓動要執行2個task;那麼這個時候的話,就沒法去複用剛纔的2個task建立的輸出文件了;而是仍是隻能去建立新的輸出文件。

要實現輸出文件的合併的效果,必須是一批task先執行,而後下一批task再執行,才能複用以前的輸出文件;負責多批task同時起來執行,仍是作不到複用的。

調節map端內存緩存和reduce端內存佔比

默認map端內存緩衝是每一個task,32kb。reduce端聚合內存比例,是0.2,也就是20%。

若是map端的task,處理的數據量比較大,可是呢,你的內存緩衝大小是固定的。可能會出現什麼樣的狀況?

每一個task就處理320kb,32kb,總共會向磁盤溢寫320 / 32 = 10次。
每一個task處理32000kb,32kb,總共會向磁盤溢寫32000 / 32 = 1000次。

在map task處理的數據量比較大的狀況下,而你的task的內存緩衝默認是比較小的,32kb。可能會形成屢次的map端往磁盤文件的spill溢寫操做,發生大量的磁盤IO,從而下降性能。

reduce端聚合內存,佔比。默認是0.2。若是數據量比較大,reduce task拉取過來的數據不少,那麼就會頻繁發生reduce端聚合內存不夠用,頻繁發生spill操做,溢寫到磁盤上去。並且最要命的是,磁盤上溢寫的數據量越大,後面在進行聚合操做的時候,極可能會屢次讀取磁盤中的數據,進行聚合。默認不調優,在數據量比較大的狀況下,可能頻繁地發生reduce端的磁盤文件的讀寫。

這兩個點之因此放在一塊兒講,是由於他們倆是有關聯的。數據量變大,map端確定會出點問題;reduce端確定也會出點問題;出的問題是同樣的,都是磁盤IO頻繁,變多,影響性能。

如何調優

調節map task內存緩衝:
new SparkConf().set("spark.shuffle.file.buffer", "64")
默認32k(spark 1.3.x不是這個參數,後面還有一個後綴,kb;spark 1.5.x之後,變了,就是如今這個參數)

調節reduce端聚合內存佔比:
new SparkConf().set("spark.shuffle.memoryFraction", "0.3")
默認0.2
複製代碼

在實際生產環境中,咱們在何時來調節兩個參數?

看Spark UI,若是你的公司是決定採用standalone模式,那麼很簡單,你的spark跑起來,會顯示一個Spark UI的地址,4040的端口,進去看,依次點擊進去,能夠看到,你的每一個stage的詳情,有哪些executor,有哪些task,每一個task的shuffle write和shuffle read的量,shuffle的磁盤和內存,讀寫的數據量;若是是用的yarn模式來提交,從yarn的界面進去,點擊對應的application,進入Spark UI,查看詳情。

若是發現shuffle 磁盤的write和read很大。這個時候,就意味着最好調節一些shuffle的參數。進行調優。首先固然是考慮開啓map端輸出文件合併機制。

調節上面說的那兩個參數。調節的時候的原則。spark.shuffle.file.buffer,每次擴大一倍,而後看看效果,64,128;spark.shuffle.memoryFraction,每次提升0.1,看看效果。

不能調節的太大,太大了之後過猶不及,由於內存資源是有限的,你這裏調節的太大了,其餘環節的內存使用就會有問題了。

SortShuffleManager調優

//閾值設置
new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")
複製代碼
  • 在spark 1.5.x之後,對於shuffle manager又出來了一種新的manager,tungsten-sort(鎢絲),鎢絲sort shuffle manager。官網上通常說,鎢絲sort shuffle manager,效果跟sort shuffle manager是差很少的。 可是,惟一的不一樣之處在於,鎢絲manager,是使用了本身實現的一套內存管理機制,性能上有很大的提高, 並且能夠避免shuffle過程當中產生的大量的OOM,GC,等等內存相關的異常。

來一個總結,如今至關於把spark的shuffle的東西又多講了一些。你們理解的更加深刻了。hash、sort、tungsten-sort。如何來選擇?

  1. 需不須要數據默認就讓spark給你進行排序?就好像mapreduce,默認就是有按照key的排序。若是不須要的話,其實仍是建議搭建就使用最基本的HashShuffleManager,由於最開始就是考慮的是不排序,換取高性能;

  2. 何時須要用sort shuffle manager?若是你須要你的那些數據按key排序了,那麼就選擇這種吧,並且要注意,reduce task的數量應該是超過200的,這樣sort、merge(多個文件合併成一個)的機制,才能生效把。可是這裏要注意,你必定要本身考量一下,有沒有必要在shuffle的過程當中,就作這個事情,畢竟對性能是有影響的。

  3. 若是你不須要排序,並且你但願你的每一個task輸出的文件最終是會合併成一份的,你本身認爲能夠減小性能開銷;能夠去調節·bypassMergeThreshold·這個閾值,好比你的reduce task數量是500,默認閾值是200,因此默認仍是會進行sort和直接merge的;能夠將閾值調節成550,不會進行sort,按照hash的作法,每一個reduce task建立一份輸出文件,最後合併成一份文件。(必定要提醒你們,這個參數,其實咱們一般不會在生產環境裏去使用,也沒有通過驗證說,這樣的方式,到底有多少性能的提高)

  4. 若是你想選用sort based shuffle manager,並且大家公司的spark版本比較高,是1.5.x版本的,那麼能夠考慮去嘗試使用tungsten-sort shuffle manager。看看性能的提高與穩定性怎麼樣。

總結:

  1. 在生產環境中,不建議你們貿然使用第三點和第四點:
  2. 若是你不想要你的數據在shuffle時排序,那麼就本身設置一下,用hash shuffle manager。
  3. 若是你的確是須要你的數據在shuffle時進行排序的,那麼就默認不用動,默認就是sort shuffle manager;或者是什麼?若是你壓根兒不care是否排序這個事兒,那麼就默認讓他就是sort的。調節一些其餘的參數(consolidation機制)。(80%,都是用這種)
spark.shuffle.manager:hash、sort、tungsten-sort

new SparkConf().set("spark.shuffle.manager", "hash")
new SparkConf().set("spark.shuffle.manager", "tungsten-sort")
// 默認就是,new SparkConf().set("spark.shuffle.manager", "sort")

複製代碼
相關文章
相關標籤/搜索