1.儘量複用同一個RDDmysql
2.對重複使用的RDD進行持久化sql
3.對DAG過長的計算增長chekpoint(檢查點)機制,將文件最好保存在HDFS中(多副本)shell
4.選擇一種合適的持久化策略數據庫
默認狀況下,性能最高的固然是MEMORY_ONLY,但前提是你的內存必須足夠足夠大,能夠綽綽有餘地存放下整個RDD的全部數據。由於不進行序列化與反序列化操做,就避免了這部分的性能開銷;對這個RDD的後續算子操做,都是基於純內存中的數據的操做,不須要從磁盤文件中讀取數據,性能也很高;並且不須要複製一份數據副本,並遠程傳送到其餘節點上。可是這裏必需要注意的是,在實際的生產環境中,恐怕可以直接用這種策略的場景仍是有限的,若是RDD中數據比較多時(好比幾十億),直接用這種持久化級別,會致使JVM的OOM內存溢出異常。apache
若是使用MEMORY_ONLY級別時發生了內存溢出,那麼建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數據序列化後再保存在內存中,此時每一個partition僅僅是一個字節數組而已,大大減小了對象數量,並下降了內存佔用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。可是後續算子能夠基於純內存進行操做,所以性能整體仍是比較高的。此外,可能發生的問題同上,若是RDD中的數據量過多的話,仍是可能會致使OOM內存溢出的異常。數組
若是純內存的級別都沒法使用,那麼建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。由於既然到了這一步,就說明RDD的數據量很大,內存沒法徹底放下。序列化後的數據比較少,能夠節省內存和磁盤的空間開銷。同時該策略會優先儘可能嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。緩存
一般不建議使用DISK_ONLY和後綴爲_2的級別:由於徹底基於磁盤文件進行數據的讀寫,會致使性能急劇下降,有時還不如從新計算一次全部RDD。後綴爲_2的級別,必須將全部數據都複製一份副本,併發送到其餘節點上,數據複製以及網絡傳輸會致使較大的性能開銷,除非是要求做業的高可用性,不然不建議使用。網絡
5.更改程序shufflle的序列化方式爲Kryo併發
方法一:修改spark-defaults.conf配置文件 設置: spark.serializer org.apache.spark.serializer.KryoSerializer 注意:用空格隔開 方法二:啓動spark-shell或者spark-submit時配置 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 方法三:在代碼中 val conf = new SparkConf() conf.set(「spark.serializer」,「org.apache.spark.serializer.KryoSerializer」)
6.測試的時候可使用collect,可是生產環境上不可使用。Driver端會爆炸ide
7.開啓推遲執行機制
能夠設置spark.speculation true
開啓後,spark會檢測執行較慢的Task,並複製這個Task在其餘節點運行,最後哪一個節點先運行完,就用其結果,而後將慢Task 殺死
8.配置多臨時文件目錄
spark.local.dir參數。當shuffle、歸併排序(sort、merge)時都會產生臨時文件。這些臨時文件都在這個指定的目錄下。那這個文件夾有不少臨時文件,若是都發生讀寫操做,有的線程在讀這個文件,有的線程在往這個文件裏寫,磁盤I/O性能就很是低。
能夠建立多個文件夾,每一個文件夾都對應一個真實的硬盤。假如原來是3個程序同時讀寫一個硬盤,效率確定低,如今讓三個程序分別讀取3個磁盤,這樣衝突減小,效率就提升了。這樣就有效提升外部文件讀和寫的效率。怎麼配置呢?只須要在這個配置時配置多個路徑就能夠。中間用逗號分隔。
spark.local.dir=/home/tmp,/home/tmp2
而後須要把每一個目錄掛載到不一樣的磁盤上
9.進行join時,能夠將小表廣播出去
10.有些狀況下,RDD操做使用MapPartitions替代map
好比JdisPool,mysql鏈接池的建立和銷燬操做
map方法對RDD的每一條記錄逐一操做。mapPartitions是對RDD裏的每一個分區操做
rdd.map{ x=>conn=getDBConn.conn;write(x.toString);conn close;}
這樣頻繁的連接、斷開數據庫,效率差。
rdd.mapPartitions{(record:=>conn.getDBConn;for(item<-recorders;write(item.toString);conn close;}
這樣就一次連接一次斷開,中間批量操做,效率提高。
參數配置調優
spark.reducer.maxSizeInFlight:默認值:48m 該參數用於設置shuffle read task的buffer緩衝大小,而這個buffer緩衝決定了每次可以拉取多少數據。調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(好比96m),從而減小拉取數據的次數,也就能夠減小網絡傳輸的次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提高。
spark.shuffle.io.maxRetries:默認值:3 shuffle read task從shuffle write task所在節點拉取屬於本身的數據時,若是由於網絡異常致使拉取失敗,是會自動進行重試的。該參數就表明了能夠重試的最大次數。若是在指定次數以內拉取仍是沒有成功,就可能會致使做業執行失敗。
調優建議:對於那些包含了特別耗時的shuffle操做的做業,建議增長重試最大次數(好比60次),以免因爲JVM的full gc或者網絡不穩定等因素致使的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數能夠大幅度提高穩定性。
spark.shuffle.io.retryWait :默認值:5s參數說明:具體解釋同上,該參數表明了每次重試拉取數據的等待間隔,默認是5s。建議加大間隔時長(好比60s),以增長shuffle操做的穩定性。
spark.shuffle.manager:默認值:sort 該參數用於設置ShuffleManager的類型。Spark 1.5之後,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默認選項,可是Spark 1.2以及以後的版本默認都是SortShuffleManager了。tungsten-sort與sort相似,可是使用了tungsten計劃中的堆外內存管理機制,內存使用效率更高。
調優建議:因爲SortShuffleManager默認會對數據進行排序,所以若是你的業務邏輯中須要該排序機制的話,則使用默認的SortShuffleManager就能夠;而若是你的業務邏輯不須要對數據進行排序,那麼建議參考後面的幾個參數調優,經過bypass機制或優化的HashShuffleManager來避免排序操做,同時提供較好的磁盤讀寫性能。這裏要注意的是,tungsten-sort要慎用,由於以前發現了一些相應的bug
spark.shuffle.sort.bypassMergeThreshold
默認值:200
參數說明:當ShuffleManager爲SortShuffleManager時,若是shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程當中不會進行排序操做,而是直接按照未經優化的HashShuffleManager的方式去寫數據,可是最後會將每一個task產生的全部臨時磁盤文件都合併成一個文件,並會建立單獨的索引文件。調優建議:當你使用SortShuffleManager時,若是的確不須要排序操做,那麼建議將這個參數調大一些,大於shuffle read task的數量。那麼此時就會自動啓用bypass機制,map-side就不會進行排序了,減小了排序的性能開銷。可是這種方式下,依然會產生大量的磁盤文件,所以shuffle write性能有待提升。
spark.shuffle.consolidateFiles:默認值:false 參數說明:若是使用HashShuffleManager,該參數有效。若是設置爲true,那麼就會開啓consolidate機制,會大幅度合併shuffle write的輸出文件,對於shuffle read task數量特別多的狀況下,這種方法能夠極大地減小磁盤IO開銷,提高性能。調優建議:若是的確不須要SortShuffleManager的排序機制,那麼除了使用bypass機制,還能夠嘗試將spark.shffle.manager參數手動指定爲hash,使用HashShuffleManager,同時開啓consolidate機制。在實踐中嘗試過,發現其性能比開啓了bypass機制的SortShuffleManager要高出10%~30%。