其餘更多java基礎文章:
java基礎學習(目錄)java
繼續上一篇Spark學習——性能調優(一)的講解緩存
關於RDD內存的使用,後面會專門寫一篇,能夠配合學習閱讀bash
spark中,堆內存又被劃分紅了兩塊兒,一起是專門用來給RDD的cache、persist操做進行RDD數據緩存用的;另一塊兒,就是咱們剛纔所說的,用來給spark算子函數的運行使用的,存放函數中本身建立的對象。網絡
默認狀況下,給RDD cache操做的內存佔比,是0.6,60%的內存都給了cache操做了。可是問題是,若是某些狀況下,cache不是那麼的緊張,問題在於task算子函數中建立的對象過多,而後內存又不太大,致使了頻繁的minor gc,甚至頻繁full gc,致使spark頻繁的中止工做。性能影響會很大。app
針對上述這種狀況,你們能夠在spark ui界面,若是經過yarn去運行的話,那麼就經過yarn的界面,去查看你的spark做業的運行統計。能夠看到每一個stage的運行狀況,包括每一個task的運行時間、gc時間等等。若是發現gc太頻繁,時間太長。此時就能夠適當調價這個比例。jvm
下降cache操做的內存佔比,大不了用persist操做,選擇將一部分緩存的RDD數據寫入磁盤,或者序列化方式,配合Kryo序列化類,減小RDD緩存的內存佔用;下降cache操做內存佔比;對應的,算子函數的內存佔比就提高了。這個時候,可能,就能夠減小minor gc的頻率,同時減小full gc的頻率。對性能的提高是有必定的幫助的。函數
一句話,讓task執行算子函數時,有更多的內存可使用。post
SparkConf conf = new SparkConf()
.set("spark.storage.memoryFraction", "0.5")
複製代碼
多是說executor的堆外內存不太夠用,致使executor在運行的過程當中,可能會內存溢出;而後可能致使後續的stage的task在運行的時候,可能要從一些executor中去拉取shuffle map output文件,可是executor可能已經掛掉了,關聯的block manager也沒有了;因此可能會報shuffle output file not found;resubmitting task;executor lost;spark做業完全崩潰。性能
上述狀況下,就能夠去考慮調節一下executor的堆外內存。也許就能夠避免報錯;此外,有時,堆外內存調節的比較大的時候,對於性能來講,也會帶來必定的提高。學習
--conf spark.yarn.executor.memoryOverhead=2048
複製代碼
spark-submit腳本里面,去用--conf的方式,去添加配置;必定要注意!!!切記,不是在你的spark做業代碼中,用new SparkConf().set()這種方式去設置,不要這樣去設置,是沒有用的!必定要在spark-submit腳本中去設置。
spark.yarn.executor.memoryOverhead(看名字,顧名思義,針對的是基於yarn的提交模式)
默認狀況下,這個堆外內存上限大概是300多M;咱們一般項目中,真正處理大數據的時候,這裏都會出現問題,致使spark做業反覆崩潰,沒法運行;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G
一般這個參數調節上去之後,就會避免掉某些JVM OOM的異常問題,同時呢,會讓總體spark做業的性能,獲得較大的提高。
碰到一種狀況,偶爾,偶爾,偶爾!!!沒有規律!!!某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。
這種狀況下,頗有多是有那份數據的executor在jvm gc。因此拉取數據的時候,創建不了鏈接。而後超過默認60s之後,直接宣告失敗。
報錯幾回,幾回都拉取不到數據的話,可能會致使spark做業的崩潰。也可能會致使DAGScheduler,反覆提交幾回stage。TaskScheduler,反覆提交幾回task。大大延長咱們的spark做業的運行時間。
能夠考慮調節鏈接的超時時長。
--conf spark.core.connection.ack.wait.timeout=300
複製代碼
spark-submit腳本,切記,不是在new SparkConf().set()這種方式來設置的。
new SparkConf().set("spark.shuffle.consolidateFiles", "true")
複製代碼
如上圖:
合併map端輸出文件後,上面的例子會有什麼改變呢?
提醒一下(map端輸出文件合併):
只有並行執行的task會去建立新的輸出文件;下一批並行執行的task,就會去複用以前已有的輸出文件;可是有一個例外,好比2個task並行在執行,可是此時又啓動要執行2個task;那麼這個時候的話,就沒法去複用剛纔的2個task建立的輸出文件了;而是仍是隻能去建立新的輸出文件。
要實現輸出文件的合併的效果,必須是一批task先執行,而後下一批task再執行,才能複用以前的輸出文件;負責多批task同時起來執行,仍是作不到複用的。
若是map端的task,處理的數據量比較大,可是呢,你的內存緩衝大小是固定的。可能會出現什麼樣的狀況?
每一個task就處理320kb,32kb,總共會向磁盤溢寫320 / 32 = 10次。
每一個task處理32000kb,32kb,總共會向磁盤溢寫32000 / 32 = 1000次。
在map task處理的數據量比較大的狀況下,而你的task的內存緩衝默認是比較小的,32kb。可能會形成屢次的map端往磁盤文件的spill溢寫操做,發生大量的磁盤IO,從而下降性能。
reduce端聚合內存,佔比。默認是0.2。若是數據量比較大,reduce task拉取過來的數據不少,那麼就會頻繁發生reduce端聚合內存不夠用,頻繁發生spill操做,溢寫到磁盤上去。並且最要命的是,磁盤上溢寫的數據量越大,後面在進行聚合操做的時候,極可能會屢次讀取磁盤中的數據,進行聚合。默認不調優,在數據量比較大的狀況下,可能頻繁地發生reduce端的磁盤文件的讀寫。
這兩個點之因此放在一塊兒講,是由於他們倆是有關聯的。數據量變大,map端確定會出點問題;reduce端確定也會出點問題;出的問題是同樣的,都是磁盤IO頻繁,變多,影響性能。
調節map task內存緩衝:
new SparkConf().set("spark.shuffle.file.buffer", "64")
默認32k(spark 1.3.x不是這個參數,後面還有一個後綴,kb;spark 1.5.x之後,變了,就是如今這個參數)
調節reduce端聚合內存佔比:
new SparkConf().set("spark.shuffle.memoryFraction", "0.3")
默認0.2
複製代碼
在實際生產環境中,咱們在何時來調節兩個參數?
看Spark UI,若是你的公司是決定採用standalone模式,那麼很簡單,你的spark跑起來,會顯示一個Spark UI的地址,4040的端口,進去看,依次點擊進去,能夠看到,你的每一個stage的詳情,有哪些executor,有哪些task,每一個task的shuffle write和shuffle read的量,shuffle的磁盤和內存,讀寫的數據量;若是是用的yarn模式來提交,從yarn的界面進去,點擊對應的application,進入Spark UI,查看詳情。
若是發現shuffle 磁盤的write和read很大。這個時候,就意味着最好調節一些shuffle的參數。進行調優。首先固然是考慮開啓map端輸出文件合併機制。
調節上面說的那兩個參數。調節的時候的原則。spark.shuffle.file.buffer,每次擴大一倍,而後看看效果,64,128;spark.shuffle.memoryFraction,每次提升0.1,看看效果。
不能調節的太大,太大了之後過猶不及,由於內存資源是有限的,你這裏調節的太大了,其餘環節的內存使用就會有問題了。
//閾值設置
new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")
複製代碼
來一個總結,如今至關於把spark的shuffle的東西又多講了一些。你們理解的更加深刻了。hash、sort、tungsten-sort。如何來選擇?
需不須要數據默認就讓spark給你進行排序?就好像mapreduce,默認就是有按照key的排序。若是不須要的話,其實仍是建議搭建就使用最基本的HashShuffleManager,由於最開始就是考慮的是不排序,換取高性能;
何時須要用sort shuffle manager?若是你須要你的那些數據按key排序了,那麼就選擇這種吧,並且要注意,reduce task的數量應該是超過200的,這樣sort、merge(多個文件合併成一個)的機制,才能生效把。可是這裏要注意,你必定要本身考量一下,有沒有必要在shuffle的過程當中,就作這個事情,畢竟對性能是有影響的。
若是你不須要排序,並且你但願你的每一個task輸出的文件最終是會合併成一份的,你本身認爲能夠減小性能開銷;能夠去調節·bypassMergeThreshold·這個閾值,好比你的reduce task數量是500,默認閾值是200,因此默認仍是會進行sort和直接merge的;能夠將閾值調節成550,不會進行sort,按照hash的作法,每一個reduce task建立一份輸出文件,最後合併成一份文件。(必定要提醒你們,這個參數,其實咱們一般不會在生產環境裏去使用,也沒有通過驗證說,這樣的方式,到底有多少性能的提高)
若是你想選用sort based shuffle manager,並且大家公司的spark版本比較高,是1.5.x版本的,那麼能夠考慮去嘗試使用tungsten-sort shuffle manager。看看性能的提高與穩定性怎麼樣。
總結:
spark.shuffle.manager:hash、sort、tungsten-sort
new SparkConf().set("spark.shuffle.manager", "hash")
new SparkConf().set("spark.shuffle.manager", "tungsten-sort")
// 默認就是,new SparkConf().set("spark.shuffle.manager", "sort")
複製代碼