入門大數據---大數據調優彙總

前言

不進行優化的代碼就是耍流氓。html

整體來講大數據優化主要分爲三點,一是充分利用CPU,二是節省內存,三是減小網絡傳輸。java

1、Hive/MapReduce調優

1.1 本地模式

Hive默認採用集羣模式進行計算,若是對於小數據量,能夠設置爲單臺機器進行計算,這樣能夠大大縮減查詢觸發任務時間。node

用戶能夠經過設置hive.exec.mode.local.auto 的值爲true,來讓Hive在適當的時候自動啓動這個優化。web

set hive.exec.mode.local.auto=true; //開啓本地 mr
//設置 local mr 的最大輸入數據量,當輸入數據量小於這個值時採用 local mr 的方式,
默認爲 134217728,即 128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
//設置 local mr 的最大輸入文件個數,當輸入文件個數小於這個值時採用 local mr 的方
式,默認爲 4
set hive.exec.mode.local.auto.input.files.max=10;

1.2 null值過濾OR隨機分配null值

  • null值過濾算法

    對於key值傾斜,有的時候是無效的null致使的,這個時候能夠考慮過濾掉。sql

    hive (default)> insert overwrite table jointable 
    select n.* from (select * from nullidtable where id is not null ) n left join ori o on n.id = 
    o.id;
  • null值隨機分配shell

    若是null不是異常數據,那麼能夠採用隨機分配將null值分到不一樣分區,解決數據傾斜。數據庫

    insert overwrite table jointable
    select n.* from nullidtable n full join ori o on 
    case when n.id is null then concat('hive', rand()) else n.id end = o.id;

1.3 Count(distinct)去重統計優化

對於大數據量去重,能夠採用分組的方式進行優化。apache

hive (default)> select count(id) from (select id from bigtable group by id) a;

1.4 行列過濾

對關聯表進行過濾時,能夠考慮在關聯時就進行過濾,提升查詢時間。數組

hive (default)> select b.id from bigtable b
join (select id from ori where id <= 10 ) o on b.id = o.id;

1.5 數據傾斜

小文件合併

set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

複雜文件增長Map數

增長 map 的方法爲:根據computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M 公式,調整 maxSize 最大值。讓 maxSize 最大值低於 blocksize 就能夠增長 map 的個數。

設置最大切片值爲100個字節

hive (default)> set mapreduce.input.fileinputformat.split.maxsize=100;
hive (default)> select count(*) from emp;

合理設置Reduce數

  • 調整 reduce 個數方法一

    (1)每一個 Reduce 處理的數據量默認是 256MB

    hive.exec.reducers.bytes.per.reducer=256000000

    (2)每一個任務最大的 reduce 數,默認爲 1009

    hive.exec.reducers.max=1009

    (3)計算 reducer 數的公式

  • 調整 reduce 個數方法二

    在 hadoop 的 mapred-default.xml 文件中修改,設置每一個 job 的 Reduce 個數

    set mapreduce.job.reduces = 15;

1.6 並行執行

在共享集羣中設置併發執行能夠提升運行速度。

set hive.exec.parallel=true; //打開任務並行執行
set hive.exec.parallel.thread.number=16; //同一個 sql 容許最大並行度,默認爲 8。

固然,得是在系統資源比較空閒的時候纔有優點,不然,沒資源,並行也起不來。

1.7 嚴格模式

Hive 提供了一個嚴格模式,能夠防止用戶執行那些可能意向不到的很差的影響的查詢。經過設置屬性 hive.mapred.mode 值爲默認是非嚴格模式 nonstrict 。開啓嚴格模式須要修改 hive.mapred.mode 值爲 strict,開啓嚴格模式能夠禁止 3 種類型的查詢

1).對於分區表,除非 where 語句中含有分區字段過濾條件來限制範圍,不然不容許執行。

2).對於使用了 order by 語句的查詢,要求必須使用 limit 語句。

3).限制笛卡爾積的查詢。對關係型數據庫很是瞭解的用戶可能指望在執行 JOIN 查詢的時候不使用 ON 語句而是使用 where 語句,這樣關係數據庫的執行優化器就能夠高效地將WHERE 語句轉化成那個 ON 語句。

1.8 JVM重用

在小文件場景或者task特別多的狀況下,執行時間都很短。JVM重用可使用同一個JVM在同一個Job裏面重複使用N次。N值在mapred-site.xml文件中進行配置。

<property>
 <name>mapreduce.job.jvm.numtasks</name>
 <value>10</value>
 <description>How many tasks to run per jvm. If set to -1, there is
 no limit. 
 </description>
</property>

缺點:JVM重用會一直佔用使用到的task插槽,以便進行重用,若是遇到了某個job裏面的reduce task分配不均勻,致使出現某幾個task佔用task時間很長,其它task空閒也無法被其它job使用,只有全部的task都執行完後纔會釋放。

1.9 推測執行

對於某些耗時的任務,能夠啓動推測執行,這樣就會把「拖後腿」的任務找出來,而後啓動個備份任務執行相同的數據。最後選出執行最快的爲最終結果。

設置開啓推測執行參數:Hadoop 的 mapred-site.xml 文件中進行配置:

<property>
 <name>mapreduce.map.speculative</name>
 <value>true</value>
 <description>If true, then multiple instances of some map tasks 
 may be executed in parallel.</description>
</property>
<property>
 <name>mapreduce.reduce.speculative</name>
 <value>true</value>
 <description>If true, then multiple instances of some reduce tasks 
 may be executed in parallel.</description>
</property>

不過 hive 自己也提供了配置項來控制 reduce-side 的推測執行:

<property>
 <name>hive.mapred.reduce.tasks.speculative.execution</name>
 <value>true</value>
 <description>Whether speculative execution for reducers should be turned on. 
</description>
 </property>

PS:對於時差要求很苛刻的建議關閉掉推測執行。對於執行很長的任務也不建議開啓,由於會浪費很大資源。

1.10 HDFS小文件解決方案

1)Hadoop Archive:

是一個高效地將小文件放入 HDFS 塊中的文件存檔工具,它可以將多個小文件打包成

一個 HAR 文件,這樣就減小了 namenode 的內存使用。

2)Sequence file:

sequence file 由一系列的二進制 key/value 組成,若是 key 爲文件名,value 爲文件內容,

則能夠將大批小文件合併成一個大文件。

3)CombineFileInputFormat:

CombineFileInputFormat 是一種新的 inputformat,用於將多個文件合併成一個單獨的

split,另外,它會考慮數據的存儲位置。

2、Spark調優

2.1 性能監控方式

Spark Web UI

經過 http://master:4040咱們能夠得到運行中的程序信息。

(1)stages和tasks調度狀況;

(2)RDD大小和內存使用狀況;

(3)系統環境信息;

(4)正在執行的executor信息;

設置歷史服務器記錄歷史信息:

(1)在$SPARK_HOME/conf/spark-env.sh中設置:

export SPARK_HISTORY_OPTS="-Dspark.history.retainedApplications=50 Dspark.history.fs.logDirectory=hdfs://master01:9000/directory"

說明:spark.history.retainedApplica-tions僅顯示最近50個應用。

spark.history.fs.logDirectory:Spark History Server頁面只顯示該路徑下的信息。

(2)$SPARK_HOME/conf/spark-defaults.conf

spark.eventLog.enabled true

spark.eventLog.dir hdfs://hadoop000:8020/directory #應用在運行過程當中全部的信息均記錄在該屬性指定的路徑下

spark.eventLog.compress true

(3)HistoryServer 啓動

$SPARK_HOMR/bin/start-histrory-server.sh

(4)HistoryServer 中止

$SPARK_HOMR/bin/stop-histrory-server.sh

--一樣executor的logs也是查看的一個出處:

Standalone 模式:$SPARK_HOME/logs

YARN 模式:在 yarn-site.xml 文件中配置了 YARN 日誌的存放位置:yarn.nodemanager.log-dirs,或使用命令獲取 yarn logs -applicationId。

其它監控工具

Nmon

Jmeter

Jprofiler

2.2 調優要點

內存調優要點

1.對象佔內存,優化數據結構

(1)使用對象數組以及原始類型(primitive type)數組以替代 Java 或 者 Scala 集合類(collection class)。fastutil 庫爲原始數據類型提供了很是方便的集合類,且兼容 Java 標準類庫。

(2)儘量地避免採用含有指針的嵌套數據結構來保存小對象。

(3)考慮採用數字 ID 或者枚舉類型以便替代 String 類型的主鍵。

(4)若是內存少於 32GB,設置 JVM 參數-XX:+UseCom-pressedOops以便將 8 字節指針修改爲 4 字節。與此同時,在 Java 7 或者更高版本,設置 JVM 參數-XX:+UseC-----ompressedStrings 以便採用 8 比特來編碼每個 ASCII 字符。

2.頻繁 GC 或者 OOM

針對這種狀況,首先要肯定現象是發生在 Driver 端仍是在 Executor 端,而後在分別處理。

Driver 端:一般因爲計算過大的結果集被回收到 Driver 端致使,須要調大 Driver 端的內存解決,或者進一步減小結果集的數量。

Executor 端:

(1)之外部數據做爲輸入的 Stage:能夠增長 partition 的數量(即 task 的數量)來減小每一個 task 要處理的數據,來減小 GC 的可能性。

(2)以 shuffle 做爲輸入的 Stage:解決數據傾斜問題。

開啓推測機制

在 spark-default.conf 中添加:spark.speculation true

推測機制與如下幾個參數有關:

  1. spark.speculation.interval 100:檢測週期,單位毫秒;
  2. spark.speculation.quantile 0.75:完成 task 的百分比時啓動推測;
  3. spark.speculation.multiplier 1.5:比其餘的慢多少倍時啓動推測。

數據傾斜優化

  • 查找數據傾斜代碼

    根據shuffler肯定數據傾斜代碼,而後經過隨機取樣找到傾斜數據。

    val sampledPairs = pairs.sample(false, 0.1)
    val sampledWordCounts = sampledPairs.countByKey()
    sampledWordCounts.foreach(println(_))

緩解/消除數據傾斜

避免數據源傾斜

好比數據源是Kafka,一般一個分區對應一個Task,因此若是分區數據不均衡,則致使spark處理不均衡。

好比數據源是Hive,若是Hive數據不均衡,也會致使Spark數據傾斜。

解決方案是預處理或者其它。

調整並行度

好比reduceByKey(1000)。若是是group by,join須要設置參數即spark.sql.shuffle.partitions,該參數表明了shuffle read task的並行度,該值默認是200,對於不少場景來講有點太小。設置完後不一樣的key就能分到不一樣的task去處理。

將join中的shuffler避免掉

針對一個大表一個小表的join操做,使用廣播變量將較小的數據進行廣播,這樣就能夠把join改成map操做。

兩階段聚合

針對RDD執行ReduceByKey等聚合shuffler算子,以及Spark Sql執行GroupByKey等聚合算子,針對數據傾斜,能夠先在key前面打上隨機前綴,進行聚合,而後再把前綴去掉進行聚合,有效解決值分配不均勻問題。

示例以下:

// 第一步,給 RDD 中的每一個 key 都打上一個隨機前綴。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
 new PairFunction<Tuple2<Long,Long>, String, Long>() {
 private static final long serialVersionUID = 1L;
 @Override
 public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
 throws Exception {
 Random random = new Random();
 int prefix = random.nextInt(10);
 return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
 }
 });
// 第二步,對打上隨機前綴的 key 進行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
 new Function2<Long, Long, Long>() {
 private static final long serialVersionUID = 1L;
 @Override
 public Long call(Long v1, Long v2) throws Exception {
 return v1 + v2;
 }
 });
// 第三步,去除 RDD 中每一個 key 的隨機前綴。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
 new PairFunction<Tuple2<String,Long>, Long, Long>() {
 private static final long serialVersionUID = 1L;
 @Override
 public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
 throws Exception {
 long originalKey = Long.valueOf(tuple._1.split("_")[1]);
 return new Tuple2<Long, Long>(originalKey, tuple._2);
 }
 });
// 第四步,對去除了隨機前綴的 RDD 進行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
 new Function2<Long, Long, Long>() {
 private static final long serialVersionUID = 1L;
 @Override
 public Long call(Long v1, Long v2) throws Exception {
 return v1 + v2;
 }
 });

兩階段聚合案例

  1. 經過以下 SQL,將 id 爲 9 億到 9.08 億共 800 萬條數據的 id 轉爲9500048 或者 9500096,其它數據的 id 除以 100 取整。從而該數據集中,id 爲 9500048 和 9500096 的數據各 400 萬,其它 id 對應的數據記錄數均爲 100 條。這些數據存於名爲 test 的表中。
  2. 對於另一張小表 test_new,取出 50 萬條數據,並將 id(遞增且惟一)除以 100 取整,使得全部 id 都對應 100 條數據。
  3. 經過以下操做,實現傾斜 Key 的分散處理:
  4. 將 leftRDD 中傾斜的 key(即 9500048 與 9500096)對應的數據單獨過濾出來,且加上 1 到 24 的隨機前綴,並將前綴與原數據用逗號分隔(以方便以後去掉前綴)造成單獨的 leftSkewRDD。
  5. 將 rightRDD 中傾斜 key 對應的數據抽取出來,並經過 flatMap 操做將該數據集中每條數據均轉換爲 24 條數據(每條分別加上 1 到 24 的隨機前綴),造成單獨的 rightSkewRDD。
  6. 將 leftSkewRDD 與 rightSkewRDD 進行 Join,並將並行度設置爲 48,且 在 Join 過 程 中 將 隨 機 前 綴 去 掉 , 得 到 傾 斜 數 據集的 Join 結 果skewedJoinRDD。
  7. 將 leftRDD 中不包含傾斜 Key 的 數 據 抽 取 出 來 做 爲 單 獨 的leftUnSkewRDD。
  8. 對 leftUnSkewRDD 與原始的 rightRDD 進行 Join,並行度也設置爲 48,獲得 Join 結果 unskewedJoinRDD。
  9. 經過 union 算子將 skewedJoinRDD 與 unskewedJoinRDD 進行合併,從而獲得完整的 Join 結果集。

具體實現代碼以下:

public class SparkDataSkew{
 public static void main(String[] args) {
 int parallelism = 48;
 SparkConf sparkConf = new SparkConf();
 sparkConf.setAppName("SolveDataSkewWithRandomPrefix");
 sparkConf.set("spark.default.parallelism", parallelism + "");
 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
 JavaPairRDD<String, String> leftRDD = 
javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
 .mapToPair((String row) -> {
 String[] str = row.split(",");
 return new Tuple2<String, String>(str[0], str[1]);
 });
 JavaPairRDD<String, String> rightRDD = 
javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
 .mapToPair((String row) -> {
 String[] str = row.split(",");
 return new Tuple2<String, String>(str[0], str[1]);
 });
 String[] skewedKeyArray = new String[]{"9500048", "9500096"};
 Set<String> skewedKeySet = new HashSet<String>();
 List<String> addList = new ArrayList<String>();
 for(int i = 1; i <=24; i++) {
 addList.add(i + "");
 }
 for(String key : skewedKeyArray) {
 skewedKeySet.add(key);
 }
 Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet);
 Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);
 JavaPairRDD<String, String> leftSkewRDD = leftRDD
 .filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
 .mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new 
Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2()));
 JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> 
tuple) -> skewedKeys.value().contains(tuple._1()))
 .flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
 .map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
 .collect(Collectors.toList())
 .iterator()
 );
 JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD
 .join(rightSkewRDD, parallelism)
 .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, 
String>(tuple._1().split(",")[1], tuple._2()._2()));
 JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> 
tuple) -> !skewedKeys.value().contains(tuple._1()));
 JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, 
parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, 
String>(tuple._1(), tuple._2()._2()));
 skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((Iterator<Tuple2<String, 
String>> iterator) -> {
 AtomicInteger atomicInteger = new AtomicInteger();
 iterator.forEachRemaining((Tuple2<String, String> tuple) -> 
atomicInteger.incrementAndGet());
 });
 javaSparkContext.stop();
 javaSparkContext.close();
 } 
}

大表隨機添加 N 種隨機前綴,小表擴大 N 倍

過濾少數致使傾斜的 key

2.3 Shuffle調優

調優概述

代碼開發,資源分配和數據傾斜是重中之重,除此以外,Shuffler做爲一個補充,也須要學習下。

shuffler相關參數調優

  • spark.shuffle.file.buffer

    默認值:32K

    參數說明:緩衝大小,超過緩衝大小纔會寫入磁盤。

    調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(),從而減小 shuffle write 過程當中溢寫磁盤文件的次數,也就能夠減小磁盤 IO 次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有 1%~5%的提高。

  • spark.reducer.maxSizeInFlight

    默認值:48m

    參數說明:這個 buffer 緩衝決定了每次可以拉取多少數據。

    調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(好比 96m),從而減小拉取數據的次數,也就能夠減小網絡傳輸的次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提高。

  • spark.shuffle.io.maxRetries

    默認值:3

    參數說明:拉去失敗重試次數。

    調優建議:對於那些包含了特別耗時的 shuffle 操做的做業,建議增長重試最大次數(好比 60 次),以免因爲 JVM 的 full gc 或者網絡不穩定等因素致使的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的 shuffle 過程,調節該參數能夠大幅度提高穩定性。

  • spark.shuffle.io.retryWait

    默認值:5s

    參數說明:重試拉取數據的等待時間,默認是5s。

    調優建議:建議加大間隔時長(好比 60s),以增長 shuffle 操做的穩定性。

  • spark.shuffle.memoryFraction

    默認值:0.2

    參數說明:分配給聚合操做的內存比例,默認是20%。

  • spark.shuffle.manager

默認值:sort

2.4 程序開發調優

原則一:避免建立重複的 RDD

對同一個數據源不要創建多個RDD。

原則二:儘量複用同一個 RDD

數據有包含關係的RDD能重用的就重用。

原則三:對屢次使用的 RDD 進行持久化

每次你對RDD執行算子操做時,都會從源頭處從新計算一遍,因此通常會採起持久化方式,這樣就直接從內存取了。

對屢次使用的RDD進行持久化示例:

// 若是要對一個 RDD 進行持久化,只要對這個 RDD 調用 cache()和 persist()便可。
// 正確的作法。
// cache()方法表示:使用非序列化的方式將 RDD 中的數據所有嘗試持久化到內存中。
// 此時再對 rdd1 執行兩次算子操做時,只有在第一次執行 map 算子時,纔會將這個 rdd1 從源頭處計
算一次。
// 第二次執行 reduce 算子時,就會直接從內存中提取數據進行計算,不會重複計算一個 rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)
// persist()方法表示:手動選擇持久化級別,並使用指定的方式進行持久化。
// 好比說,StorageLevel.MEMORY_AND_DISK_SER 表示,內存充足時優先持久化到內存中,
//內存不充足時持久化到磁盤文件中。
// 並且其中的_SER 後綴表示,使用序列化的方式來保存 RDD 數據,此時 RDD 中的每一個 partition
//都會序列化成一個大的字節數組,而後再持久化到內存或磁盤中
// 序列化的方式能夠減小持久化的數據對內存/磁盤的佔用量,進而避免內存被持久化數據佔用過多,
//從而發生頻繁 GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
 .persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

原則四:儘可能避免使用 shuffle 類算子

開發過程當中,能避免則儘量避免使用 reduceByKey、join、distinct、repartition 等會進行 shuffle 的算子,儘可能使用 map 類的非shuffle 算子。這樣的話,沒有 shuffle 操做或者僅有較少 shuffle 操做的Spark 做業,能夠大大減小性能開銷。

Broadcast 與 map 進行 join 代碼示例:

// 傳統的 join 操做會致使 shuffle 操做。
// 由於兩個 RDD 中,相同的 key 都須要經過網絡拉取到一個節點上,由一個 task 進行 join 操做。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map 的 join 操做,不會致使 shuffle 操做。
// 使用 Broadcast 將一個數據量較小的 RDD 做爲廣播變量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在 rdd1.map 算子中,能夠從 rdd2DataBroadcast 中,獲取 rdd2 的全部數據。
// 而後進行遍歷,若是發現 rdd2 中某條數據的 key 與 rdd1 的當前數據的 key 是相同的,
//那麼就斷定能夠進行 join。
// 此時就能夠根據本身須要的方式,將 rdd1 當前數據與 rdd2 中能夠鏈接的數據,
//拼接在一塊兒(String 或 Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 注意,以上操做,建議僅僅在 rdd2 的數據量比較少(好比幾百 M,或者一兩 G)的狀況下使用。
// 由於每一個 Executor 的內存中,都會駐留一份 rdd2 的全量數據。

原則五:使用 map-side 預聚合的 shuffle 操做

若是由於業務須要,必定要使用 shuffle 操做,沒法用 map 類的算子來替代,那麼儘可能使用能夠 map-side 預聚合的算子。

使用reduceByKey,aggregateByKey代替groupByKey,由於reduceByKey和aggregateByKey會進行預聚合,groupByKey不會。

原則六:使用高性能的算子

使用 reduceByKey/aggregateByKey 替代 groupByKey,詳情見「原則五:使用 map-side 預聚合的 shuffle 操做」。

使用 mapPartitions 替代普通 map。

使用 filter 以後進行 coalesce 操做。

使用 repartitionAndSortWithinPartitions 替代 repartition 與 sort 類操做。

原則七:廣播大變量

有時在開發過程當中,會遇到須要在算子函數中使用外部變量的場景,那麼此時就應該使用 Spark的廣播(Broadcast)功能來提高性能。由於若是不使用廣播變量,那麼每一個任務會拉取數據並建立一個副本,這樣會大大增長網絡開銷,並佔用系統內存。若是使用廣播變量的話,數據就會保留一份。

廣播大變量代碼示例:

// 如下代碼在算子函數中,使用了外部的變量。
// 此時沒有作任何特殊操做,每一個 task 都會有一份 list1 的副本。
val list1 = ...
rdd1.map(list1...)
// 如下代碼將 list1 封裝成了 Broadcast 類型的廣播變量。
// 在算子函數中,使用廣播變量時,首先會判斷當前 task 所在 Executor 內存中,是否有變量副本。
// 若是有則直接使用;若是沒有則從 Driver 或者其餘 Executor 節點上遠程拉取一份放到本地 Executor
內存中。
// 每一個 Executor 內存中,就只會駐留一份廣播變量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

原則八:使用 Kryo 優化序列化性能

代碼示例:

// 建立 SparkConf 對象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設置序列化器爲 KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 註冊要序列化的自定義類型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

原則九:優化數據結構

Java 中,有三種類型比較耗費內存:

1.對象。 2.集合類型,好比HashMap,LinedList等。3.字符串,每一個字符串內部都有一個字符數組以及長度等額外信息。

2.5 運行資源調優

在spark-submit調節資源參數來提升資源利用率。

  • num-executors

    參數說明:設置spark做業總共用多少個executor來執行。

    參數調優建議:每一個spark做業通常設置50~100個左右的Executor進程比較合適。過小不能充分利用資源,太大隊列沒法提供足夠的資源。

  • executor-memory

    參數說明:設置每一個Executor進程的內存。

    參數調優建議:每一個 Executor 進程的內存設置 4G~8G 較爲合適。可是這只是一個參考值,具體的設置仍是得根據不一樣部門的資源隊列來定。能夠看看本身團隊的資源隊列的最大內存限制是多少,num-executors 乘 以 executor-memory,是不能超過隊列的最大內存量的。此外,若是你是跟團隊裏其餘人共享這個資源隊列,那麼申請的內存量最好不要超過資源隊列最大總內存的 1/3~1/2,避免你本身的 Spark 做業佔用了隊列全部的資源,致使別的同窗的做業沒法運行。

  • executor-cores

    參數說明:設置每一個Executor進程CUP core數量。由於每一個cpu core一個時間只能執行一個task,因此cpu core數量越多,執行速度越快。

    參數調優建議:Executor 的 CPU core 數量設置爲 2~4 個較爲合適。一樣得根據不一樣部門的資源隊列來定,能夠看看本身的資源隊列的最大CPU core 限制是多少,再依據設置的 Executor 數量,來決定每一個 Executor進程能夠分配到幾個 CPU core。一樣建議,若是是跟他人共享這個隊列,那 麼 num-executors * executor-cores 不 要 超 過 隊 列 總 CPU core 的1/3~1/2 左右比較合適,也是避免影響其餘同窗的做業運行。

  • driver-memory

    參數說明:設置Driver進程的內存。

    參數調優建議:Driver 的內存一般來講不設置,或者設置 1G 左右應該就夠了。

  • spark.default.parallelism

    參數說明:該參數用於設置每一個 stage 的默認 task 數量。這個參數極爲重要,若是不設置可能會直接影響你的 Spark 做業性能。

    參數調優建議:Spark 做業的默認 task 數量爲 500~1000 個較爲合適。很 多 同 學 常 犯 的 一 個 錯 誤 就 是 不 去 設 置 這 個 參 數 , 那 麼 此 時 就 會 導 致Spark 本身根據底層 HDFS 的 block 數量來設置 task 的數量,默認是一個HDFS block 對應一個 task。一般來講,Spark 默認設置的數量是偏少的(好比就幾十個 task),若是 task 數量偏少的話,就會致使你前面設置好的Executor 的參數都前功盡棄。試想一下,不管你的 Executor 進程有多少個,內存和 CPU 有多大,可是 task 只有 1 個或者 10 個,那麼 90%的 Executor進程可能根本就沒有 task 執行,也就是白白浪費了資源!所以 Spark 官網建議的設置原則是,設置該參數爲 num-executors * executor-cores 的 2~3倍較爲合適,好比 Executor 的總 CPU core 數量爲 300 個,那麼設置 1000個 task 是能夠的,此時能夠充分地利用 Spark 集羣的資源。

  • spark.storage.memoryFraction

    參數說明:設置持久化數據在Executor佔比,默認是0.6。

    根據你選擇的不一樣的持久化策略,若是內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。

    參數調優建議:根據實際,能夠適當提升,讓數據寫入內存。

  • spark.shuffle.memoryFraction

    參數說明:該參數用於設置 shuffle 過程當中一個 task 拉取到上個 stage的 task 的輸出後,進行聚合操做時可以使用的 Executor 內存的比例,默認是 0.2。

    參數調優建議:若是 Spark 做業中的 RDD 持久化操做較少,shuffle 操做較多時,建議下降持久化操做的內存佔比,提升 shuffle 操做的內存佔比比例,避免 shuffle 過程當中數據過多時內存不夠用,必須溢寫到磁盤上,下降了性能。此外,若是發現做業因爲頻繁的 gc 致使運行緩慢,意味着 task執行用戶代碼的內存不夠用,那麼一樣建議調低這個參數的值。

    資源參數的調優,沒有一個固定的值,須要同窗們根據本身的實際狀況(包括 Spark 做業中的 shuffle 操做數量、RDD 持久化操做數量以及 spark web ui 中顯示的做業 gc 狀況),同時參考給出的原理以及調優建議,合理地設置上述參數。

    資源參數參考示例:

    如下是一份 spark-submit 命令的示例,你們能夠參考一下,並根據本身的實際狀況進行調節。

    ./bin/spark-submit \
    --master yarn-cluster \
    --num-executors 100 \
    --executor-memory 6G \
    --executor-cores 4 \
    --driver-memory 1G \
    --conf spark.default.parallelism=1000 \
    --conf spark.storage.memoryFraction=0.5 \
    --conf spark.shuffle.memoryFraction=0.3 \

3、Flink調優

3.1 Backpressure調優

  • web.backpressure.cleanup-interval

    說明:當啓動反壓數據採集後,獲取反壓前等待時間,默認是60s。

  • web.backpressure.delay-between-samples:Stack Trace

    說明:抽樣到確認反壓狀態之間的時延,默認爲50ms。

  • web.backpressure.num-samples

    說明:設定Stack Trace抽樣數以肯定反壓狀態,默認爲100。

3.2 Checkpointing優化

經過調整Checkpointing之間的時間間隔進行優化。

val env=StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(milliseconds)

3.3 狀態數據壓縮

目前可用的壓縮算法是Snappy,設置以下:

val env=StreamExecutionEnvironment.getExecutionEnvironment
val config = env.getConfig
config.setUseSnapshotCompression(true)

3.4 Flink內存優化

Flink,Spark等大數據引擎都實現了本身的內存管理,有效解決JVM內存溢出問題。

  • JobManager配置

    jobmanager.heap.size:設定JobManager堆內存大小,默認爲1024MB。
  • TaskManager配置

    TaskManager做爲Flink集羣中的工做節點,全部任務的計算邏輯均執行在TaskManager之上,所以對TaskManager內存配置顯得尤其重要,能夠經過如下參數配置對TaskManager進行優化和調整。

    taskmanager.heap.size

    說明:設定TaskManager堆內存大小,默認值爲1024M,若是在Yarn的集羣中,TaskManager取決於Yarn分配給TaskManager Container的內存大小,且Yarn環境下通常會減掉一部份內存用於Container的容錯。

    taskmanager.jvm-exit-on-oom

    說明:設定TaskManager是否會由於JVM發生內存溢出而中止,默認爲false,當TaskManager發生內存溢出時,也不會致使TaskManager中止。

    taskmanager.memory.size

    說明:設定TaskManager內存大小,默認爲0,若是不設定該值將會使用taskmanager.memory.fraction做爲內存分配依據。

    taskmanager.memory.fraction

    說明:設定TaskManager堆中去除Network Buffers內存後的內存分配比例。該內存主要用於TaskManager任務排序、緩存中間結果等操做。例如,若是設定爲0.8,則表明TaskManager保留80%內存用於中間結果數據的緩存,剩下20%的內存用於建立用戶定義函數中的數據對象存儲。注意,該參數只有在taskmanager.memory.size不設定的狀況下才生效。

    taskmanager.memory.off-heap

    說明:設置是否開啓堆外內存供Managed Memory或者Network Buffers使用。

    taskmanager.memory.preallocate

    說明:設置是否在啓動TaskManager過程當中直接分配TaskManager管理內存。

    taskmanager.numberOfTaskSlots

    說明:每一個TaskManager分配的slot數量。

3.5 設定Network內存比例

taskmanager.network.memory.fraction

說明:JVM中用於Network Buffers的內存比例。

taskmanager.network.memory.min

說明:最小的Network Buffers內存大小,默認爲64MB。

taskmanager.network.memory.max

說明:最大的Network Buffers內存大小,默認1GB。

taskmanager.memory.segment-size

說明:內存管理器和Network棧使用的Buffer大小,默認爲32KB。

3.6 堆內存調優

默認Flink使用的Parallel Scavenge的垃圾回收器,能夠改用G1垃圾回收器。

啓動參數:

env.java.opts= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=300 -XX:+PrintGCDetails

  • -XX:MaxGCPauseMillis:設置容許的最大GC停頓時間,默認是200ms。

  • -XX:G1HeapRegionSize:每一個分區的大小,默認值會根據整個堆區的大小計算出來,範圍是1M~32M,取值是2的冪,計算的傾向是儘可能有2048個分區數。

  • -XX:MaxTenuringThreshold=n:晉升到老年代的「年齡」閾值,默認值爲15。

  • -XX:InitiatingHeapOccupancyPercent:通常會簡寫IHOP,默認是45%,這個佔比跟併發週期的啓動相關,當空間佔比達到這個值時,會啓動併發週期。若是常常出現FullGC,能夠調低該值,今早的回收能夠減小FullGC的觸發,但若是太低,則併發階段會更加頻繁,下降應用的吞吐。

  • -XX:G1NewSizePercent:年輕代最小的堆空間佔比,默認是5%。

  • -XX:G1MaxNewSizePercent:年輕代最大的堆空間佔比,默認是60%。

  • -XX:ConcGCThreads:併發執行的線程數,默認值接近整個應用程序數的1/4。

  • -XX:-XX:G1HeapWastePercent:容許的浪費空間的佔比,默認是5%。若是併發標記可回收的空間小於5%,則不會拋出MixedGC。

  • -XX:G1MixedGCCountTarget:一次全局併發標記以後,後續最多執行的MixedGC次數。默認值是8。

    系列傳送門

相關文章
相關標籤/搜索