Spark性能調優

1、分配資源
最大調節分配資源:
經常使用的資源調度模式有Spark Standalone和Spark On Yarn。好比說你的每臺機器可以給你使用60G內存,10個cpu core,20臺機器。那麼executor的數量是20。平均每一個executor所能分配60G內存和10個cpu core。
1.增長executor數量:提高了並行度
2.增長每一個executor的CPU Core:提升了並行度
3.增長每一個executor的內存量:
對於RDD的cache,減小了磁盤IO
對於shuffle操做的reduce端:減小了磁盤IO
對於task的執行,可能會建立不少對象,內存小的話會頻繁致使JVM堆內存滿了,就會頻繁的進行GC。
2、executor、task、cpu Core數量關係調優
1.並行度
task數量,至少設置與Spark Application的總CPU Core 數量相同。官方推薦,task數量,設置成spark application總CPU core數量的2-3倍。
假如集羣有50個executor,每一個executor10個G內存,每一個executor有3個CPU core。150個CPU Core最多並行執行150個task
3、RDD架構重構以及RDD持久化
一、RDD架構重構
複用RDD,抽取共同的RDD
二、公共RDD的持久化
將RDD的數據緩存到內存中/磁盤中(BlockManager
三、持久化的序列化
減小內存開銷
四、內存充足,雙副本持久化
4、廣播變量
一臺節點有一個或者多個Executor,一個Executor有多個Task(task數量由Executor裏的Cpu Core數量一對一決定)
廣播變量:
將固定的,只讀的數據變量提早廣播給各個Executor,該Executor上的各個Task再從所在節點的BlockManager獲取廣播變量,若是本地的Executor沒有就從Driver遠程拉取變量副本,並保存到本地的BlockManager中.解決了每次將固定的,只讀的數據用Driver廣播到各個Task上的繁重且效率低下的問題.
使用方法:
1.調用SparkContext.broadcast建立一個Broadcast[T]對象.任何序列化的類型均可以這麼實現
2.經過 .value 獲取對象的值
3.變量只會被髮送到各個節點一次,應該做爲只讀值處理(修改這個值不會影響到別的節點)
5、使用Kryo序列化
默認Spark內部的序列化機制:
Java的ObjectOutputStream/ObjectInputStream:
優勢: 使用方便,實現Serializable接口便可
缺點: 效率低,序列化速度慢,序列化後的數據相對大
Spark支持使用Kryo序列化機制:
比Java速度快
序列化後的數據小 Java的十分之一大小
因此網絡傳輸的數據變少,在集羣中耗費的內存資源大大減小
使用Kryo後影響的幾個地方
1.優化網絡傳輸的性能,優化集羣中內存佔用和消耗
2.持久化RDD,優化內存的佔用和消耗.持久化RDD的內存佔用的內存越小,task執行的時候建立的對象,就不至於頻繁的佔滿內存,頻繁發生GC
3.shuffle 優化網絡傳輸的性能
Kryo的使用:
Spark中Scala語法的Kryo使用:
//建立一個集合存儲輸入輸出,字典文件,停用詞庫
val Array(inputPath,outputPath,dicPath,stopwords,date) = args
val conf = new SparkConf()
.setAppName(s"${this.getClass.getName}").setMaster("local[*]")
//使用Kryo序列胡
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val sQLContext = new SQLContext(sc)
//spark 1.6 版本時候默認的壓縮方式仍是snappy,2.0以後才默認snappy
sQLContext.setConf("spark.sql.parquet.compression.codec","snappy")
6、使用fastutil優化數據格式
Fastutil:(最新版本要求Java7及以上版本)
概念:
是擴展了Java標準的集合框架(Map,List,Set,HashMap,ArrayList,HashSet)的類庫,提供了特殊類型的map,set,list和queue
優點:
代替JDK的原生的集合框架,佔內存小,存取速度快
提供了64位的array,set,list的集合列表,實用的IO類,來處理二進制和文本類型的文件
每種集合類型都實現了對應的java中的標準接口
支持雙向迭代器
支持引用類型 可是對引用類型是使用等於號(=)進行比較的,而不是equals()方法
應用:
1.算子函數使用了外部變量:
使用fastutil改寫外部變量,首先從源頭上就減小內存的佔用,經過廣播變量進一步減小內存佔用,再經過Kryo序列化類庫進一步減小內存佔用
2.在task任務執行的計算邏輯裏(算子函數),若是邏輯中有要建立較大的Map,List集合,可使用fastutil類庫重寫,減小了task建立出來的集合類型的內存佔用,避免executor內存頻繁佔滿,頻繁喚起GC,致使性能降低.
fastutil調優的說明:
廣播變量、Kryo序列化類庫、fastutil,都是以前所說的,對於性能來講,相似於一種調味品,烤雞,原本就很好吃了,而後加了一點特質的孜然麻辣粉調料,就更加好吃了一點。分配資源、並行度、RDD架構與持久化,這三個就是烤雞。broadcast、kryo、fastutil,相似於調料。
好比:通過一些調優以後30分鐘的任務,通過broadcast,kryo,fastutil調優多是29,28或者25,通過shuffle調優15分,groupbykey用reducebykey改寫,執行本地聚合也許就只有10分鐘,利用資源更大的YARN隊列,1分鐘.
fastutil使用:
在pom.xml中引用fastutil的包
<dependency>
    <groupId>fastutil</groupId>
    <artifactId>fastutil</artifactId>
    <version>5.0.9</version>
</dependency>
List<Integer> 至關於 IntList
基本都是相似於IntList的格式,前綴就是集合的元素類型。特殊的就是Map,Int2IntMap,表明了key-value映射的元素類型。除此以外,還支持object、reference
7、調節數據本地化等待時長
task的locality的五種方式:
PROCESS_LOCAL:進程本地化
NODE_LOCAL:節點本地化
NO_PREF:對於task來講,數據從哪裏獲取都同樣,沒有優點之分
RACK_LOCAL:機架本地化
ANY:數據和task可能在集羣中的任何地方,並且再也不一個機架中,性能最差.
Spark的任務調度:
Spark在Driver上,對Application的每個stage的task進行分配以前都會計算出每一個task要計算的是哪一個分片數據.
首先會優先將task正好分配到他要計算的數據所在的節點,這樣就避免了網絡傳輸數據.
可是可能會遇到那個節點的計算資源和計算能力都滿了.這種時候,Spark默認等待3秒(不是絕對,還有不少種狀況,對於不一樣的本地化級別,都會去等待),到最後實在等不下去,就會選擇一個差的本地化級別.
對於第二種差的狀況,確定會發生數據傳輸,task會經過其所在的節點的blockManager來獲取數據,BlockManager發現本身本地沒有數據,會經過一個getRemote()方法,經過TransferService(網絡傳輸組件)從數據所在節點的BlockManager中,獲取數據.
適當的調節本地化等待時長:
觀察Spark做業的運行日誌,看數據本地化級別的信息,若是大可能是PROCESS_LOCALL,就不用調節,若大可能是NODE_LOCAL,ANY就須要調節,反覆的看日誌,以及做業運行時間,不要一味地提高本地化級別形成了本末倒置.
調節方式:
spark.locality.wait ,默認是3s。6s,10s
默認狀況下,下面3個的等待時長,都是跟上面那個是同樣的,都是3s
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
new SparkConf().set("spark.locality.wait", "10")
相關文章
相關標籤/搜索