本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark商業應用實戰指導,請持續關注本套博客。版權聲明:本套Spark商業應用實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。apache
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
複製代碼
程序開發調優 :避免建立重複的RDD數組
val rdd1 = sc.textFile("hdfs://master01:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://master01:9000/hello.txt")
rdd2.reduce(...)
複製代碼
須要對名爲「hello.txt」的HDFS文件進行一次map操做,再進行一次reduce操做。 也就是說,須要對一份數據執行兩次算子操做。 錯誤的作法:對於同一份數據執行屢次算子操做時,建立多個RDD。 這裏執行了兩次textFile方法,針對同一個HDFS文件,建立了兩個RDD出來 ,而後分別對每一個RDD都執行了一個算子操做。 這種狀況下,Spark須要從HDFS上兩次加載hello.txt文件的內容,並建立兩個單獨的RDD; 第二次加載HDFS文件以及建立RDD的性能開銷,很明顯是白白浪費掉的。網絡
程序開發調優 :儘量複用同一個RDD數據結構
錯誤的作法: 有一個<long , String>格式的RDD,即rdd1。 接着因爲業務須要,對rdd1執行了一個map操做,建立了一個rdd2, 而rdd2中的數據僅僅是rdd1中的value值而已,也就是說,rdd2是rdd1的子集。ide
JavaPairRDD<long , String> rdd1 = ... JavaRDD rdd2 = rdd1.map(...)函數
分別對rdd1和rdd2執行了不一樣的算子操做。post
rdd1.reduceByKey(...)
rdd2.map(...)
複製代碼
rdd2的數據徹底就是rdd1的子集而已,卻建立了兩個rdd,並對兩個rdd都執行了一次算子操做。 此時會由於對rdd1執行map算子來建立rdd2,而多執行一次算子操做,進而增長性能開銷。 其實在這種狀況下徹底能夠複用同一個RDD。 咱們可使用rdd1,既作reduceByKey操做,也作map操做。性能
JavaPairRDD<long , String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
複製代碼
程序開發調優 :對屢次使用的RDD進行持久化學習
// 正確的作法。
// cache()方法表示:使用非序列化的方式將RDD中的數據所有嘗試持久化到內存中。
// 此時再對rdd1執行兩次算子操做時,只有在第一次執行map算子時,纔會將這個rdd1從源頭處計算一次。
// 第二次執行reduce算子時,就會直接從內存中提取數據進行計算,不會重複計算一個rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)
正確的作法:
// 序列化的方式能夠減小持久化的數據對內存/磁盤的佔用量,進而避免內存被持久化數據佔用過多,
//從而發生頻繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
.persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)
複製代碼
一般不建議使用DISK_ONLY和後綴爲_2的級別:由於徹底基於磁盤文件進行數據的讀寫,會致使性能急劇下降,已經網絡較大開銷測試
若是有可能的話,要儘可能避免使用shuffle類算子,最消耗性能的地方就是shuffle過程。
shuffle過程當中,各個節點上的相同key都會先寫入本地磁盤文件中,而後其餘節點須要經過網絡傳輸拉取各個節點上的磁盤文件中的相同key。並且相同key都拉取到同一個節點進行聚合操做時,還有可能會由於一個節點上處理的key過多,致使內存不夠存放,進而溢寫到磁盤文件中。所以在shuffle過程當中,可能會發生大量的磁盤文件讀寫的IO操做,以及數據的網絡傳輸操做。磁盤IO和網絡數據傳輸也是shuffle性能較差的主要緣由。
儘量避免使用reduceByKey、join、distinct、repartition等會進行shuffle的算子,儘可能使用map類的非shuffle算子。
// 傳統的join操做會致使shuffle操做。
// 由於兩個RDD中,相同的key都須要經過網絡拉取到一個節點上,由一個task進行join操做。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map的join操做,不會致使shuffle操做。
// 使用Broadcast將一個數據量較小的RDD做爲廣播變量。
// 注意,以上操做,建議僅僅在rdd2的數據量比較少(好比幾百M,或者一兩G)的狀況下使用。
// 由於每一個Executor的內存中,都會駐留一份rdd2的全量數據。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
val rdd3 = rdd1.map(rdd2DataBroadcast...)
複製代碼
若是由於業務須要,必定要使用shuffle操做,沒法用map類的算子來替代,那麼儘可能使用能夠map-side預聚合的算子 相似於MapReduce中的本地combiner。map-side預聚合以後,每一個節點本地就只會有一條相同的key,由於多條相同的key都被聚合起來了。其餘節點在拉取全部節點上的相同key時,就會大大減小須要拉取的數據數量,從而也就減小了磁盤IO以及網絡傳輸開銷。
建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子
repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,若是須要在repartition重分區以後,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子
有時在開發過程當中,會遇到須要在算子函數中使用外部變量的場景(尤爲是大變量,好比100M以上的大集合),那麼此時就應該使用Spark的廣播(Broadcast)功能來提高性能。 默認狀況下,Spark會將該變量複製多個副本,經過網絡傳輸到task中,此時每一個task都有一個變量副本。若是變量自己比較大的話(好比100M,甚至1G),那麼大量的變量副本在網絡中傳輸的性能開銷,以及在各個節點的Executor中佔用過多內存致使的頻繁GC,都會極大地影響性能。 廣播後的變量,會保證每一個Executor的內存中,只駐留一份變量副本,而Executor中的task執行時共享該Executor中的那份變量副本。
Spark默認使用的是Java的序列化機制,你可使用Kryo做爲序列化類庫,效率要比 Java的序列化機制要高:
// 建立SparkConf對象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設置序列化器爲KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 註冊要序列化的自定義類型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
複製代碼
Java中,有三種類型比較耗費內存:
一、對象,每一個Java對象都有對象頭、引用等額外的信息,所以比較佔用內存空間。
二、字符串,每一個字符串內部都有一個字符數組以及長度等額外信息。
三、集合類型,好比HashMap、LinkedList等,由於集合類型內部一般會使用一些內部類來封裝集合元素,好比Map.Entry
Spark官方建議,在Spark編碼實現中,特別是對於算子函數中的代碼,儘可能不要使用上述三種數據結構,儘可能使用字符串替代對象,使用原始類型(好比Int、Long)替代字符串,使用數組替代集合類型,這樣儘量地減小內存佔用,從而下降GC頻率,提高性能。
由於開發程序調優相對成熟,因此在此參考大牛的筆記,加上本身的總結,一鼓作氣。
秦凱新 於深圳