spark優化參數調節和故障參數調節

1:「物盡其用」,但給spark分配多個機器後,先需配置spark-submit shell以下:

/usr/local/spark/bin/spark-submit \
--class com.spark.test.Top3UV \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
--files /usr/local/hive/conf/hive-site.xml \
--driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.17.jar \
/usr/local/jars/spTest-1.0-SNAPSHOT-jar-with-dependencies.jar \

executor的cpu核心數爲3,而且executor數量爲3,那麼總cpu核心數就是9,task並行度推薦設置2~3倍的cpu-core才能「物盡其用」,由於難以保證全部task都在同一時間執行完成!

  SparkConf conf = new SparkConf()java

  conf.set("spark.default.parallelism", "500")mysql

2:重複使用的Rdd,須要緩存:StorageLevel.MEMORY_AND_DISK_SER_2()

可選擇:
(1)內存緩存(2)內存磁盤緩存(3)帶有序列化的緩存(4)帶有副本的緩存-》以防數據丟失,形如 _2。
***複用的rdd還能夠持久化到hdfs,使用checkpoint機制,以下

javaSparkContext.setCheckpointDir("hdfs://xxx:9000/checkPointPath");//設置checkpoint的存儲路徑
rdd_date_range.checkpoint();//對rdd_date_range的Rdd進行checkpoint存儲(若是rdd使用cache則checkpoint數據從緩存中獲取)
3:shuffle操做時優化
 (1)開啓 consolidateFile,這樣map端產生的file數量會和下游stage的task數量一致,不會由於重複建立文件致使性能降低。
 (2)"spark.shuffle.file.buffer", "128k",設置以後可減小map端數據輸出到文件的次數減小,提高性能。
 (3)"spark.reducer.maxSizeInFlight", "96m",下游stage拉取map造成file中的內容,每次拉取的數據量,值太小會致使屢次網絡通訊。
 (4)"spark.shuffle.io.maxRetries", "6",若是上游jvm出現stw的話,有可能下游獲取file時會出現,沒法獲取的狀況,這個參數表明能夠重試的次數,"spark.shuffle.io.retryWait", "10s"而這個參數設定每次重試的間隔時間。
4:使用  fastUtil工具  代替jdk中帶有的基礎數據類型,減少內存開銷;如:ArrayList或者HashMap等。
5:使用kryo序列化工具,這樣序列化數據的速度可以提高,並且可以減少內存的開銷;***但要注意的是
使用kryo序列化,若是涉及到自定義類型必需要註冊,這樣才能被kryo序列化***
6:spark1.2.x 之後默認使用sort shuffle manager,但若是沒有必要使用排序功能能夠在SparkConf中設置便可;
以下:
   new SparkConf().set("spark.shuffle.manager","hash");

***固然spark1.5.x之後又出現了tungsten-sort shuffle manager,要比sort shuffle manager性能更好***
(若是使用sort shuffle manager能夠經過new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold","350");若是實際task的數量大於350纔開啓sort shuffle manager)
7:若是在某處使用了spark sql,那麼這個階段的partition數量是不受控制的spark.default.parallelism設置控制的,若是想要設置須要使用repartition方法來設置實際的partition數量;
以下:
actionDF.javaRDD().repartition(1000);//在使用DataFrame轉化成javaRDD的時候,使用repartition來提升實際的數據分塊數量,從而提升並行度。
8:若是可用內存比較大的話,那麼map操做可使用mapPartitions來代替;
如:
rdd.mapToPair使用rdd.mapPartitionsToPair來代替,這樣會一次性得到rdd中的某個partition,方法變成迭代的方式僅僅執行一次(可是這樣很是大的可能致使oom直接掛掉);
 rdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Row>, Object, Object>() {
            @Override
            public Iterable<Tuple2<Object, Object>> call(Iterator<Row> rowIterator) throws Exception {
                return null;
            }
        });
        return rdd.mapToPair(new PairFunction<Row, String, Row>() {
            @Override
            public Tuple2<String, Row> call(Row row) throws Exception {
                return new Tuple2<String, Row>(row.getString(2), row);
            }
        });
9:shuffle階段,在reduce中有時須要調節拉取數據時內存緩衝區(從map端輸出的action觸發job計算的文件),默認是48MB,若是當數據量特別大的時候頗有可能出現OOM的問題,這個問題除了增長硬件條件外,必須經過犧牲性能來換取執行能力了;
調整參數:
 將默認48MB下調到10MB,增長數據拉取(增長了網絡通訊次數)的次數,來避免OOM
 new SparkConf().set("spark.reducer.maxSizeInFlight","10");
10:若是在日誌中出現了shuffle file not found 錯誤! 極可能是因爲reduce時executor的jvm發生gc致使了reduce階段沒法得到文件,
解決問題能夠經過,增長重試次數,並調節重試的週期:
.set("spark.shuffle.io.maxRetries",3);//默認重試次數是3次,能夠調成60
.set("spark.shuffle.io.retryWait",5);//默認每隔5s重試一次,能夠調成60
11:若是報錯 Scala.Math(NULL) 相似於這種異常,那就是說明在算子中出現了null值的直接返回。
12:spark默認狀況下cache緩存配置佔比爲spark.storage.memoryFraction:0.6,咱們能夠調整的小些如0.3,必要的時候可使用persist進行內存+磁盤的緩存方式(StorageLevel.MEMORY_AND_DISK())進行緩存;這樣可以保證運行spark核心業務的各類算子可以有足夠的運行空間,防止因爲內存不足而且頻繁的GC而形成spark做業執行的卡頓。
相關文章
相關標籤/搜索