1)避免建立重複RDDweb
2)儘量複用同一個RDD數據庫
3)對屢次使用的RDD進行持久化apache
4)儘可能避免使用shuffle類算子數組
5)使用map-side預聚合的shuffle操做緩存
6)使用高性能的算子性能優化
7)廣播大變量網絡
8)使用Kryo優化序列化性能數據結構
9)優化數據結構併發
10)資源參數調優ide
1)避免建立重複RDD
對於同一份數據,只應該建立一個RDD,不能建立多個RDD來表明同一份數據。
2)儘量複用同一個RDD
除了要避免在開發過程當中對一份徹底相同的數據建立多個RDD以外,在對不一樣的數據執行算子 操做時還要儘量地複用一個RDD。好比說,有一個RDD的數據格式是key-value類型的,另 一個是單value類型的,這兩個RDD的value數據是徹底同樣的。那麼此時咱們能夠只使用key-value類型的那個RDD,由於其中已經包含了另外一個的數據。對於相似這種多個RDD的數據有重疊或者包含的狀況,咱們應該儘可能複用一個RDD,這樣能夠儘量地減小RDD的數量,從而儘量減小算子執行的次數。
3)對屢次使用的RDD進行持久化
Spark中對於一個RDD執行屢次算子的默認原理是這樣的:每次你對一個RDD執行一個算子操做時,都會從新從源頭處計算一遍,計算出那個RDD來,而後再對這個RDD執行你的算子操做。所以對於這種狀況,建議是:對屢次使用的RDD進行持久化,此時Spark就會根據你的持久化策略,將RDD中的數據保存到內存或者磁盤中。之後每次對這個RDD進行算子操做時,都會直接從內存或磁盤中提取持久化的RDD數據,而後執行算子,而不會從源頭處從新計算一遍這個RDD,再執行算子操做。
persist():手動選擇持久化級別,並使用指定的方式進行持久化
持久化級別 | 含義 |
---|---|
MEMORY_ONLY | 使用未序列化的Java對象格式,將數據保存在內存中。若是內存不夠存放全部的數據,則數據可能就不會進行持久化。cache()使用的持久化策略 |
MEMORY_AND_DISK | 使用未序列化的Java對象格式,優先嚐試將數據保存在內存中,吐過內存不夠存放全部的數據,會將數據寫入磁盤文件中。 |
MEMORY_ONLY_SER | 含義同MEMORY_ONLY,但會將RDD中的數據進行序列化,RDD的每一個partition會被序列化成一個字節數組。 |
MEMORY_AND_DISK_SER | 含義同MEMORY_AND_DISK,但會將RDD中的數據進行序列化,RDD的每一個partition會被序列化成一個字節數組。 |
DISK_ONLY | 使用未序列化的Java對象格式,將數據所有寫入磁盤文件中。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2...... | 對於以上任意一種持久化策略,若是加上後綴_2,表明的是將每一個持久化的數據,都複製一份副本,並將副本保存到其餘節點上。這種基於副本的持久化機制主要用於進行容錯,加入某個節點掛掉,節點的內存或磁盤中的持久化數據丟失了,那麼後續對RDD計算時還可使用該數據在其餘節點上的副本。 |
默認狀況下,性能最高的固然是MEMORY_ONLY,但前提是你的內存必須足夠大,能夠綽綽有餘地存放下整個RDD的全部數據。不進行序列化與反序列化的數據的操做,避免了部分性能開銷,對於這個RDD的後續算子操做,都是基於純內存的數據的操做,不須要從磁盤文件中讀取數據,性能很高。可是必需要注意的是,在實際的生產環境中,恐怕可以直接用這種策略的場景仍是有限的,若是RDD中數據比較多時,直接用這種持久化級別,會致使JVM的OOM內存溢出異常。
若是使用MEMORY_ONLY級別時發生了內存溢出,那麼建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數據序列化後再保存在內存中,此時每一個partition僅僅是一個字節數組而已,大大減小了對象數量,並下降了內存佔用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。可是後續算子能夠基於純內存進行操做,所以性能整體仍是比較高的。此外,可能發生的問題同上,若是RDD中的數據量過多的話,仍是可能會致使OOM內存溢出的異常。
若是純內存的級別都沒法使用,那麼建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。由於既然到了這一步,就說明RDD的數據量很大,內存沒法徹底放下。序列化後的數據比較少,能夠節省內存和磁盤的空間開銷。同時該策略會優先儘可能嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。
一般不建議使用DISK_ONLY和後綴爲_2的級別,由於徹底基於磁盤文件進行數據的讀寫,會致使性能急劇下降,有時還不如從新計算一次全部RDD。後綴爲_2的級別,必須將全部數據都複製一份副本,併發送到其餘節點上,數據複製以及網絡傳輸會致使較大的性能開銷,除非是要求做業的高可用性,不然不建議使用。
4)儘可能避免使用shuffle類算子
在Spark做業運行過程當中,最消耗性能的地方就是shuffle過程。Shuffle過程當中,各個節點上的相同key都會先寫入本地磁盤文件中,而後其餘節點須要經過網絡傳輸拉去各個節點上的磁盤文件中的相同key。並且相同key都拉取到同一個節點進行聚合操做時,還有可能會由於一個節點上處理的key過多,致使內存不夠存放,進而溢寫到磁盤文件中。所以在shuffle過程當中,可能會發生大量的磁盤文件讀寫的IO操做,以及數據的網絡傳輸操做。磁盤IO和網絡數據傳輸是shuffle性能較差的主要緣由。
所以在開發過程當中,要儘可能避免使用會致使shuffle的算子,儘可能使用map類的非shuffle算子,能夠大大減小性能開銷。
使用廣播變量與map替代join
val rddOld = rdd1.join(rdd2)//會致使Shuffle操做 val rdd2Data = rdd2.collect()//將rdd2的數據收集回來 val rdd2DataBroadcast = sc.broadcast(rdd2Data)//將rdd2做爲廣播變量 val rddNew = rdd.map(rdd2DataBroadcast......)//對相同的key進行拼接 //每一個Executor存放一份廣播變量 //建議將數據量比較少(幾百M到一兩個G)的rdd做爲廣播變量
5)使用map-side預聚合的shuffle操做
若是由於業務須要,必定要使用shuffle操做,沒法用map類的算子來替代,那麼儘可能使用能夠map-side預聚合的算子。所謂map-side預聚合,說的是在每一個節點本地對相同的key進行一次聚合操做,相似於MapReduce中的本地combiner。map-side預聚合以後,每一個節點本地就只會有一條相同的key,由於多條相同的key都被聚合起來了。其餘節點在拉取全部節點上的相同key時,就會大大減小須要拉取的數據數量,從而也就減小了磁盤IO以及網絡傳輸開銷。一般來講,在可能的狀況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。由於reduceByKey和aggregateByKey算子都會使用用戶自定義的函數對每一個節點本地的相同key進行預聚合。而groupByKey算子是不會進行預聚合的,全量的數據會在集羣的各個節點之間分發和傳輸,性能相對來講比較差。
6)使用高性能的算子
1)groupByKey ==> reduceByKey / aggregateByKey //具體看第五點
2)map ==> mapPartitions
mapPartitions類的算子,一次函數調用會處理一個partition全部的數據,而不是一次函數調用處理一條,性能相對來講會高一些。可是有的時候,使用mapPartitions會出現OOM的問題。由於單次函數調用就要處理掉一個partition全部的數據,若是內存不夠,垃圾回收是沒法回收掉大多對象的,極可能出現OOM異常,因此使用這類操做時要慎重。
3)foreach ==> foreachPartitions
原理相似於mapPartitions。在實踐中發現,foreachpartitions類的算子,對性能的提高仍是頗有幫助的。好比在foreach函數中,將RDD中全部數據寫MySQL,那麼若是是普通的foreach算子,就會一條數據一條數據地寫,每次函數調用可能就會建立一個數據庫鏈接,此時就勢必會頻繁地建立和銷燬數據庫鏈接,性能是很是低下;可是若是用foreachPartitions算子一次性處理一個partition的數據,那麼對於每一個partition,只要建立一個數據庫鏈接便可,而後執行批量插入操做,此時性能是比較高的。實踐中發現,對於1萬條左右的數據量寫MySQL,性能能夠提高30%以上。
4)使用filter以後進行coalesce操做
一般對一個RDD執行filter算子過濾掉RDD中較多數據後(好比30%以上的數據),建議使用coalesce算子,手動減小RDD的partition數量,將RDD中的數據壓縮到更少的partition中去。由於filter以後,RDD的每一個partition中都會有不少數據被過濾掉,此時若是照常進行後續的計算,其實每一個task處理的partition中的數據量並非不少,有一點資源浪費,並且此時處理的task越多,可能速度反而越慢。所以用coalesce減小partition數量,將RDD中的數據壓縮到更少的partition以後,只要使用更少的task便可處理完全部的partition。在某些場景下,對於性能的提高會有必定的幫助。
5)partition + sort ==> repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,若是須要在repartition重分區以後,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。由於該算子能夠一邊進行重分區的shuffle操做,一邊進行排序。shuffle與sort兩個操做同時進行,比先shuffle再sort來講,性能是要高的。
7)廣播大變量
有時在開發過程當中,會遇到須要在算子函數中使用外部變量的場景(尤爲是大變量,100M以上的集合),那麼久應該使用廣播變量來提高性能。
在算子函數中使用到外部變量時,默認狀況下,Spark會將該變量複製多個副本,經過網絡傳輸到task中,此時每一個task都有一個變量副本。若是變量自己比較大的話,那麼大量的變量副本在網絡中傳輸的性能開銷,以及在各個節點的Executor中佔用過多內存致使的頻繁GC,都會極大地影響性能。所以對於上述狀況,若是使用的外部變量比較大,建議使用Spark的廣播變量,對該變量進行廣播。廣播後的變量,會保證每一個Executor的內存中,只駐留一份廣播變量,而Executor中的task執行時共享該Executor中的那份共享副本。這樣的話,能夠大大減小變量副本的數量,從而減小網絡傳輸的性能開銷,並減小對Executor內存的佔用開銷,下降GC的頻率。//代碼看第四點
8)使用Kryo優化序列化性能
在Spark中,主要有三個地方涉及了序列化:
1)在算子函數中使用到外部變量時,該變量會被序列化後進行網絡傳輸
2)將自定義的類型做爲RDD的泛型類型時。全部自定義類型對象,都會進行序列化,所以在這種狀況下,也要求自定義的類型必須實現Serializable接口
3)使用可序列化的持久化策略時(好比MEMORY_ONLY_SER),Spark會將RDD中的每一個partition都序列化成一個大的字節數組
對於這三種出現序列化的地方,咱們均可以經過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark默認使用的是Java的序列化機制,也就ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。可是Spark同時支持使用Kyro序列化庫,Kryo序列化庫的性能比Java序列化庫的性能要高不少。Spark之因此默認沒有使用Kryo做爲序列化庫,是由於Kyro要求最好要註冊全部須要進行序列化的自定義類型,所以對於開發者來講,這種方式比較麻煩。
如下是使用Kryo的代碼示例,咱們只要設置序列化類,再註冊要序列化的自定義類型便可
val conf = new SparkConf().setMaster(...).setAppName(...) conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//設置序列化器爲KyroSerializer conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))//註冊要序列化的自定義類型
9)優化數據結構
Java中,有三種類型比較耗費內存:
1)對象,每一個Java對象都有對象頭、引用等額外的信息,所以比較佔用內存空間
2)字符串,每一個字符串內部都有一個字符數組以及長度等額外信息
3)集合類型,由於集合類型內部一般會使用一些內部類來封裝集合元素,好比Map.Entry
所以Spark官方建議,在Spark編碼實現中,特別是對於算子函數中的代碼,儘可能不要使用上述三種數據結構,儘可能使用字符串替代對象,使用原始類型替代字符串,使用數組替代集合類型,這樣能夠減小內存佔用,從而下降GC頻率,提高性能。
10)資源參數調優
num-executors 參數說明:該參數用於設置Spark做業總共要用多少個Executor進程來執行。Driver在向YARN集羣管理器申請資源時,YARN集羣管理器會盡量按照你的設置來在集羣的各個工做節點上,啓動相應數量的Executor進程。這個參數很是之重要,若是不設置的話,默認只會給你啓動少許的Executor進程,此時你的Spark做業的運行速度是很是慢的。 參數調優建議:每一個Sparkk做業的運行通常設置50~100個左右的Executor進程比較合適,設置太少或太多的Executor進程都很差。設置的太少,沒法充分利用集羣資源;設置的太多的話,大部分隊列可能沒法給予充分的資源。 -------------------------------------------------------------------------------- executor-memory 參數說明:該參數用於設置每一個Executor進程的內存。Executor內存的大小,不少時候直接決定了 Spark做業的性能,並且跟常見的JVM OOM異常,也有直接的關聯。 參數調優建議:每一個Executor進程的內存設置4G~8G較爲合適。可是這只是一個參考值,具體的設置仍是得根據不一樣部門的資源隊列來定。能夠看看本身團隊的資源隊列的最大內存限制是多少,num-executors乘以executor-memory,是不能超過隊列的最大內存量的。此外,若是你是跟團隊裏其餘人共享這個資源隊列,那麼申請的內存量最好不要超過資源隊列最大總內存的1/3~1/2,避免你本身的Spark做業佔用了隊列全部的資源,致使別的同窗的做業沒法運行。 -------------------------------------------------------------------------------- executor-cores 參數說明:該參數用於設置每一個Executor進程的CPU core數量。這個參數決定了每一個Executor進程並行執行task線程的能力。由於每一個CPU core同一時間只能執行一個task線程,所以每一個Executor進程的CPU core數量越多,越可以快速地執行完分配給本身的全部task線程。 參數調優建議:Executor的CPU core數量設置爲2~4個較爲合適。一樣得根據不一樣部門的資源隊列來定,能夠看看本身的資源隊列的最大CPU core限制是多少,再依據設置的Executor數量,來決定每一個Executor進程能夠分配到幾個CPU core。一樣建議,若是是跟他人共享這個隊列,那麼num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,也是避免影響其餘同窗的做業運行。 -------------------------------------------------------------------------------- driver-memory 參數說明:該參數用於設置Driver進程的內存。 參數調優建議:Driver的內存一般來講不設置,或者設置1G左右應該就夠了。惟一須要注意的一點是,若是須要使用collect算子將RDD的數據所有拉取到Driver上進行處理,那麼必須確保Driver的內存足夠大,不然會出現OOM內存溢出的問題。 -------------------------------------------------------------------------------- spark.default.parallelism 參數說明:該參數用於設置每一個stage的默認task數量。這個參數極爲重要,若是不設置可能會直接影響你的Spark做業性能。 參數調優建議:Spark做業的默認task數量爲500~1000個較爲合適。不少同窗常犯的一個錯誤就是不去設置這個參數,那麼此時就會致使Spark本身根據底層HDFS的block數量來設置task的數量,默認是一個HDFS block對應一個task。一般來講,Spark默認設置的數量是偏少的(好比就幾十個task),若是task數量偏少的話,就會致使你前面設置好的Executor的參數都前功盡棄。試想一下,不管你的Executor進程有多少個,內存和CPU有多大,可是task只有1個或者10個,那麼90%的Executor進程可能根本就沒有task執行,也就是白白浪費了資源!所以Spark官網建議的設置原則是,設置該參數爲num-executors * executor-cores的2~3倍較爲合適,好比Executor的總CPU core數量爲300個,那麼設置1000個task是能夠的,此時能夠充分地利用Spark集羣的資源。 -------------------------------------------------------------------------------- spark.storage.memoryFraction 參數說明:該參數用於設置RDD持久化數據在Executor內存中能佔的比例,默認是0.6。也就是說,默認Executor 60%的內存,能夠用來保存持久化的RDD數據。根據你選擇的不一樣的持久化策略,若是內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。 參數調優建議:若是Spark做業中,有較多的RDD持久化操做,該參數的值能夠適當提升一些,保證持久化的數據可以容納在內存中。避免內存不夠緩存全部的數據,致使數據只能寫入磁盤中,下降了性能。可是若是Spark做業中的shuffle類操做比較多,而持久化操做比較少,那麼這個參數的值適當下降一些比較合適。此外,若是發現做業因爲頻繁的gc致使運行緩慢(經過spark web ui能夠觀察到做業的gc耗時),意味着task執行用戶代碼的內存不夠用,那麼一樣建議調低這個參數的值。 -------------------------------------------------------------------------------- spark.shuffle.memoryFraction 參數說明:該參數用於設置shuffle過程當中一個task拉取到上個stage的task的輸出後,進行聚合操做時可以使用的Executor內存的比例,默認是0.2。也就是說,Executor默認只有20%的內存用來進行該操做。shuffle操做在進行聚合時,若是發現使用的內存超出了這個20%的限制,那麼多餘的數據就會溢寫到磁盤文件中去,此時就會極大地下降性能。 參數調優建議:若是Spark做業中的RDD持久化操做較少,shuffle操做較多時,建議下降持久化操做的內存佔比,提升shuffle操做的內存佔比比例,避免shuffle過程當中數據過多時內存不夠用,必須溢寫到磁盤上,下降了性能。此外,若是發現做業因爲頻繁的gc致使運行緩慢,意味着task執行用戶代碼的內存不夠用,那麼一樣建議調低這個參數的值。
Spark最初採用的靜態內存管理機制。內存交給JVM去管理,存儲內存、執行內存和其餘內存的大小在Spark應用程序運行期間均爲固定的。用戶能夠應用程序啓動前進行配置。
1)Storage內存區域:由spark.storage.memoryFraction控制(默認爲0.6,佔系統內存的60%)
2)Execution內存區域:由spark.shuffle.memoryFraction控制(默認爲0.2,佔系統內存的20%)
3)其餘:取決於上面兩部分的大小(默認爲0.2,佔系統內存的20%)
1)Storage內存區域:
1)可用的Storage內存:用於緩存RDD數據和broadcast數據,由spark.storage.safetyFraction決定(默認爲0.9,佔Storage內存的90%)
2)用於unroll:緩存iterator形式的Block數據,由spark.storage.unrollFraction決定(默認爲0.2,佔Storage內存的20%)
3)預留:可預防OOM
2)Execution內存區域:
1)可用的Execution內存:用於緩存在shuffle過程當中的中間數據,由spark.shuffle.safetyFraction控制(默認爲0.8,佔Execution內存的80%)
2)預留:可預防OOM
3)其餘:
用戶定義的數據結構或Spark內部元數據
可用的堆內內存的大小須要按照下面的方式計算:
可用的存儲內存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction
可用的執行內存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction
其中systemMaxMemory取決於當前JVM堆內內存的大小,最後可用的存儲內存或者執行內存要在此基礎上與各自的memoryFraction參數和safetyFraction參數相乘得出。
堆外內存的空間分配較爲簡單,存儲內存,執行內存的大小一樣是固定的。
1)Storage內存:由spark.memory.storageFraction控制(默認爲0.5,佔堆外可用內存的50%)
2)Execution內存:(默認爲0.5,佔堆外可用內存的50%)
靜態內存管理機制實現起來較爲簡單,但若是開發人員不熟悉Spark的存儲機制,或沒有根據具體的數據規模和計算任務作相應的配置,很容易形成「一半海水,一半火焰」的局面,即存儲內存和執行內存中的一方剩餘大量的空間,而另外一方卻早早被佔滿,不得不淘汰或移除舊的內容以存儲新的內容。所以,出現了新的內存管理機制:統一內存管理。但出於兼容舊版本應用程序的目的,Spark仍然保留了它的實現。
spark-submit \ --master yarn-cluster \ --num-executors 100 \ --executor-memory 6G \ --executor-cores 4 \ --driver-memory 1G \ --conf spark.default.parallelism=1000 \ --conf spark.storage.memoryFraction=0.5 \ --conf spark.shuffle.memoryFraction=0.3
1)Storage內存:由spark.memory.storageFraction控制(默認爲0.5,佔堆外可用內存的50%)
2)Execution內存:(默認爲0.5,佔堆外可用內存的50%)
動態佔用機制:與靜態內存管理同樣,當雙方空間都被佔滿後,如有新增內容雙方都須要將其存儲到磁盤。可是若己方空間不足對方空間空餘則可佔用對方空間。
Storage佔用對方的內存可被淘汰。
Execution佔用對方的內存不可被淘汰,只能等待釋放。
憑藉統一內存管理機制,Spark在必定程度上提升了對內和堆外內存資源的利用率,下降了開發者維護Spark內存的難度,但並不意味着開發者能夠高枕無憂。若是存儲內存的空間太大或者說緩存的數據過多,反而會致使頻繁的全量垃圾惠州,下降任務執行時的性能,由於緩存的RDD數據一般都是長期駐留內存的,因此要想充分發揮Spark的性能,須要開發者進一步瞭解存儲內存和執行內存各自的管理方式和實現原理。