Spark學習——性能調優(一)

其餘更多java基礎文章:
java基礎學習(目錄)java


Spark的性能調優主要有如下幾個方向:node

  • 常規性能調優:分配資源、並行度、RDD架構與緩存等
  • JVM調優(Java虛擬機):JVM相關的參數,一般狀況下,若是你的硬件配置、基礎的JVM的配置,都ok的話,JVM一般不會形成太嚴重的性能問題;反而更多的是,在troubleshooting中,JVM佔了很重要的地位;JVM形成線上的spark做業的運行報錯,甚至失敗(好比OOM)。
  • shuffle調優(至關重要):spark在執行groupByKey、reduceByKey等操做時的,shuffle環節的調優。這個很重要。shuffle調優,其實對spark做業的性能的影響,是至關之高!!!經驗:在spark做業的運行過程當中,只要一牽扯到有shuffle的操做,基本上shuffle操做的性能消耗,要佔到整個spark做業的50%~90%。10%用來運行map等操做,90%耗費在兩個shuffle操做。groupByKey、countByKey。
  • spark操做調優(spark算子調優,比較重要):groupByKey,countByKey或aggregateByKey來重構實現。有些算子的性能,是比其餘一些算子的性能要高的。foreachPartition替代foreach。若是一旦遇到合適的狀況,效果仍是不錯的。

按照優化效果簡單排序:算法

  1. 分配資源、並行度、RDD架構與緩存
  2. shuffle調優
  3. spark算子調優
  4. JVM調優、廣播大變量、Kryo、fastUtil

本系列主要講解:shell

  • 性能調優
    • 分配更多資源
    • 調節並行度
    • 重構RDD架構及持久化RDD
    • 廣播大變量
    • Kryo序列化
    • fastUtil優化
    • 調節數據本地化等待時長
  • JVM調優
    • 下降cache操做的內存佔比
    • 調節executor堆外內存
    • 調節鏈接等待時長
  • Shuffle調優
    • 合併map端輸出文件
    • 調節map端內存緩存和reduce端內存佔比
    • SortShuffleManager調優
  • 算子調優
    • 使用MapPartition提高性能
    • filter事後使用coalesce減小分區數量
    • 使用foreachPartition優化
    • 使用repatition解決Spark SQL低並行度
    • 使用reduceByKey本地聚合

分配更多資源

性能調優的王道,就是增長和分配更多的資源,性能和速度上的提高,是顯而易見的;基本上,在必定範圍以內,增長資源與性能的提高,是成正比的;寫完了一個複雜的spark做業以後,進行性能調優的時候,首先第一步,我以爲,就是要來調節最優的資源配置;在這個基礎之上,若是說你的spark做業,可以分配的資源達到了你的能力範圍的頂端以後,沒法再分配更多的資源了,公司資源有限;那麼纔是考慮去作後面的這些性能調優的點。apache

分配哪些資源?

  • executor數量
  • 每一個executor的cpu core數量
  • 每一個executor的內存大小
  • driver內存大小

在哪裏分配這些資源?

在咱們在生產環境中,提交spark做業時,用的spark-submit shell腳本,裏面調整對應的參數數組

/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \  配置executor的數量
--driver-memory 100m \  配置driver的內存(影響不大)
--executor-memory 100m \  配置每一個executor的內存大小
--executor-cores 3 \  配置每一個executor的cpu core數量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
複製代碼

爲何多分配了這些資源之後,性能會獲得提高?

如上圖所示,Driver中的SparkContext,DAGScheduler,TaskScheduler,會將咱們的算子,切割成大量的task,提交到Application的executor上面去執行。假設咱們最後切割出了100個task任務。

增長executor

若是executor數量比較少,那麼,可以並行執行的task數量就比較少,就意味着,咱們的Application的並行執行的能力就很弱。
好比有3個executor,每一個executor有2個cpu core,那麼同時可以並行執行的task,就是6個。6個執行完之後,再換下一批6個task。 增長了executor數量之後,那麼,就意味着,可以並行執行的task數量,也就變多了。好比原先是6個,如今可能能夠並行執行10個,甚至20個,100個。那麼並行能力就比以前提高了數倍,數十倍。 相應的,性能(執行的速度),也能提高數倍~數十倍。緩存

增長每一個executor的cpu core

增長每一個executor的cpu core,也是增長了執行的並行能力。本來20個executor,每一個才2個cpu core。可以並行執行的task數量,就是40個task。
如今每一個executor的cpu core,增長到了5個。可以並行執行的task數量,就是100個task。 執行的速度,提高了2.5倍。bash

增長每一個executor的內存量。

增長了內存量之後,對性能的提高,有三點:網絡

  1. 若是須要對RDD進行cache,那麼更多的內存,就能夠緩存更多的數據,將更少的數據寫入磁盤,甚至不寫入磁盤。減小了磁盤IO。
  2. 對於shuffle操做,reduce端,會須要內存來存放拉取的數據並進行聚合。若是內存不夠,也會寫入磁盤。若是給executor分配更多內存之後,就有更少的數據,須要寫入磁盤,甚至不須要寫入磁盤。減小了磁盤IO,提高了性能。
  3. 對於task的執行,可能會建立不少對象。若是內存比較小,可能會頻繁致使JVM堆內存滿了,而後頻繁GC,垃圾回收,minor GC和full GC。(速度很慢)。內存加大之後,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。

調節並行度

並行度:其實就是指的是,Spark做業中,各個stage的task數量,也就表明了Spark做業的在各個階段(stage)的並行度。架構

若是並行度太低,會怎麼樣?

假設,如今已經在spark-submit腳本里面,給咱們的spark做業分配了足夠多的資源,好比50個executor,每一個executor有10G內存,每一個executor有3個cpu core。基本已經達到了集羣或者yarn隊列的資源上限。

task沒有設置,或者設置的不多,好比就設置了,100個task。50個executor,每一個executor有3個cpu core,也就是說,你的Application任何一個stage運行的時候,都有總數在150個cpu core,能夠並行運行。可是你如今,只有100個task,平均分配一下,每一個executor分配到2個task,ok,那麼同時在運行的task,只有100個,每一個executor只會並行運行2個task。每一個executor剩下的一個cpu core,就浪費掉了。

你的資源雖然分配足夠了,可是問題是,並行度沒有與資源相匹配,致使你分配下去的資源都浪費掉了。

合理的並行度的設置

應該是要設置的足夠大,大到能夠徹底合理的利用你的集羣資源;好比上面的例子,總共集羣有150個cpu core,能夠並行運行150個task。那麼就應該將你的Application的並行度,至少設置成150,才能徹底有效的利用你的集羣資源,讓150個task,並行執行;並且task增長到150個之後,便可以同時並行運行,還可讓每一個task要處理的數據量變少;好比總共150G的數據要處理,若是是100個task,每一個task計算1.5G的數據;如今增長到150個task,能夠並行運行,並且每一個task主要處理1G的數據就能夠。

很簡單的道理,只要合理設置並行度,就能夠徹底充分利用你的集羣計算資源,而且減小每一個task要處理的數據量,最終,就是提高你的整個Spark做業的性能和運行速度。

  1. task數量,至少設置成與Spark application的總cpu core數量相同(最理想狀況,好比總共150個cpu core,分配了150個task,一塊兒運行,差很少同一時間運行完畢)
  2. 官方是推薦,task數量,設置成spark application總cpu core數量的2~3倍,好比150個cpu core,基本要設置task數量爲300~500;

實際狀況,與理想狀況不一樣的,有些task會運行的快一點,好比50s就完了,有些task,可能會慢一點,要1分半才運行完,因此若是你的task數量,恰好設置的跟cpu core數量相同,可能仍是會致使資源的浪費,由於,好比150個task,10個先運行完了,剩餘140個還在運行,可是這個時候,有10個cpu core就空閒出來了,就致使了浪費。那若是task數量設置成cpu core總數的2~3倍,那麼一個task運行完了之後,另外一個task立刻能夠補上來,就儘可能讓cpu core不要空閒,同時也是儘可能提高spark做業運行的效率和速度,提高性能。

如何設置一個Spark Application的並行度?

spark.default.parallelism 
SparkConf conf = new SparkConf()
  .set("spark.default.parallelism", "500")
複製代碼

重構RDD架構和持久化RDD

  • 如上圖第一條DAG,默認狀況下,屢次對一個RDD執行算子,去獲取不一樣的RDD;都會對這個RDD以及以前的父RDD,所有從新計算一次;因此在計算RDD3和RDD4的時候,前面的讀取HDFS文件,而後對RDD1執行算子,獲取 到RDD2會計算兩遍。
    這種狀況,是絕對絕對,必定要避免的,一旦出現一個RDD重複計算的狀況,就會致使性能急劇下降。好比,HDFS->RDD1-RDD2的時間是15分鐘,那麼此時就要走兩遍,變成30分鐘。

  • 另一種狀況,在上圖第二條DAG中國,從一個RDD到幾個不一樣的RDD,算子和計算邏輯實際上是徹底同樣的,結果由於人爲的疏忽,計算了屢次,獲取到了多個RDD。這個也是儘可能要避免的。

如何重構RDD架構

  • 1. RDD架構重構與優化
    儘可能去複用RDD,差很少的RDD,能夠抽取稱爲一個共同的RDD,供後面的RDD計算時,反覆使用。

  • 2. 公共RDD必定要實現持久化
    對於要屢次計算和使用的公共RDD,必定要進行持久化。
    持久化,也就是說,將RDD的數據緩存到內存中/磁盤中,(BlockManager),之後不管對這個RDD作多少次計算,那麼都是直接取這個RDD的持久化的數據,好比從內存中或者磁盤中,直接提取一份數據。

  • 3. 持久化,是能夠進行序列化的
    若是正常將數據持久化在內存中,那麼可能會致使內存的佔用過大,這樣的話,也許,會致使OOM內存溢出。
    當純內存沒法支撐公共RDD數據徹底存放的時候,就優先考慮,使用序列化的方式在純內存中存儲。將RDD的每一個partition的數據,序列化成一個大的字節數組,就一個對象;序列化後,大大減小內存的空間佔用。
    序列化的方式,惟一的缺點就是,在獲取數據的時候,須要反序列化。

  • 4. 爲了數據的高可靠性,並且內存充足,可使用雙副本機制,進行持久化
    持久化的雙副本機制,持久化後的一個副本,由於機器宕機了,副本丟了,就仍是得從新計算一次;持久化的每一個數據單元,存儲一份副本,放在其餘節點上面;從而進行容錯;一個副本丟了,不用從新計算,還可使用另一份副本。
    這種方式,僅僅針對你的內存資源極度充足

廣播大變量

關於廣播能夠閱讀 Spark學習(二)——RDD基礎 中的共享變量。

爲何要使用廣播大變量

task執行的算子中,使用了外部的變量,而後driver會把變量以task的形式發送到excutor端,每一個task都會獲取一份變量的副本。若是有不少個task,就會有不少給excutor端攜帶不少個變量,若是這個變量很是大的時候,就可能會形成內存溢出。

好比,外部變量map是1M。總共,你前面調優都調的特好,資源給的到位,配合着資源,並行度調節的絕對到位,1000個task。大量task的確都在並行運行。
這些task裏面都用到了佔用1M內存的map,那麼首先,map會拷貝1000份副本,經過網絡傳輸到各個task中去,給task使用。總計有1G的數據,會經過網絡傳輸。網絡傳輸的開銷,不容樂觀啊!!!網絡傳輸,也許就會消耗掉你的spark做業運行的總時間的一小部分。
map副本,傳輸到了各個task上以後,是要佔用內存的。1個map的確不大,1M;1000個map分佈在你的集羣中,一會兒就耗費掉1G的內存。對性能會有什麼影響呢?
沒必要要的內存的消耗和佔用,就致使了,你在進行RDD持久化到內存,也許就無法徹底在內存中放下;就只能寫入磁盤,最後致使後續的操做在磁盤IO上消耗性能;
你的task在建立對象的時候,也許會發現堆內存放不下全部對象,也許就會致使頻繁的垃圾回收器的回收,GC。GC的時候,必定是會致使工做線程中止,也就是致使Spark暫停工做那麼一點時間。頻繁GC的話,對Spark做業的運行的速度會有至關可觀的影響。

廣播大變量原理

  • 廣播變量,初始的時候,就在Drvier上有一份副本。
  • task在運行的時候,想要使用廣播變量中的數據,此時首先會在本身本地的Executor對應的BlockManager中,嘗試獲取變量副本;若是本地沒有,那麼就從Driver遠程拉取變量副本,並保存在本地的BlockManager中;此後這個executor上的task,都會直接使用本地的BlockManager中的副本。
  • executor的BlockManager除了從driver上拉取,也可能從其餘節點的BlockManager上拉取變量副本,距離越近越好。

使用Kryo序列化

  • 默認狀況下,Spark內部是使用Java的序列化機制,ObjectOutputStream / ObjectInputStream,對象輸入輸出流機制,來進行序列化。這種默認序列化機制的好處在於,處理起來比較方便;也不須要咱們手動去作什麼事情,只是,你在算子裏面使用的變量,必須是實現Serializable接口的,可序列化便可。
  • 可是缺點在於,默認的序列化機制的效率不高,序列化的速度比較慢;序列化之後的數據,佔用的內存空間相對仍是比較大。 能夠手動進行序列化格式的優化,Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化後的數據要更小,大概是Java序列化機制的1/10。因此Kryo序列化優化之後,可讓網絡傳輸的數據變少;在集羣中耗費的內存資源大大減小。

Kryo序列化機制,一旦啓用之後,會生效的幾個地方:

  1. 算子函數中使用到的外部變量
    算子函數中使用到的外部變量,使用Kryo之後:優化網絡傳輸的性能,能夠優化集羣中內存的佔用和消耗
  2. 持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER等
    持久化RDD,優化內存的佔用和消耗;持久化RDD佔用的內存越少,task執行的時候,建立的對象,就不至於頻繁的佔滿內存,頻繁發生GC。
  3. shuffle
    能夠優化網絡傳輸的性能

如何使用Kryo

  • 首先第一步,在SparkConf中設置一個屬性,spark.serializer,org.apache.spark.serializer.KryoSerializer類;
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
複製代碼

Kryo之因此沒有被做爲默認的序列化類庫的緣由,就要出現了:主要是由於Kryo要求,若是要達到它的最佳性能的話,那麼就必定要註冊你自定義的類(好比,你的算子函數中使用到了外部自定義類型的對象變量,這時,就要求必須註冊你的類,不然Kryo達不到最佳性能)。

  • 第二步,註冊你使用到的,須要經過Kryo序列化的一些自定義
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
複製代碼

使用fastUtil優化數據格式

fastutil是擴展了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue;
fastutil可以提供更小的內存佔用,更快的存取速度;咱們使用fastutil提供的集合類,來替代本身平時使用的JDK的原生的Map、List、Set,好處在於,fastutil集合類,能夠減少內存的佔用,而且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設置元素的值的時候,提供更快的存取速度;
fastutil也提供了64位的array、set和list,以及高性能快速的,以及實用的IO類,來處理二進制和文本類型的文件; fastutil最新版本要求Java 7以及以上版本;

使用場景

Spark中應用fastutil的場景:

  1. 若是算子函數使用了外部變量;那麼第一,你可使用Broadcast廣播變量優化;第二,可使用Kryo序列化類庫,提高序列化性能和效率;第三,若是外部變量是某種比較大的集合,那麼能夠考慮使用fastutil改寫外部變量,首先從源頭上就減小內存的佔用,經過廣播變量進一步減小內存佔用,再經過Kryo序列化類庫進一步減小內存佔用。
  2. 在你的算子函數裏,也就是task要執行的計算邏輯裏面,若是有邏輯中,出現,要建立比較大的Map、List等集合,可能會佔用較大的內存空間,並且可能涉及到消耗性能的遍歷、存取等集合操做;那麼此時,能夠考慮將這些集合類型使用fastutil類庫重寫,使用了fastutil集合類之後,就能夠在必定程度上,減小task建立出來的集合類型的內存佔用。避免executor內存頻繁佔滿,頻繁喚起GC,致使性能降低。

fastutil的使用

第一步:在pom.xml中引用fastutil的包

<dependency>
    <groupId>fastutil</groupId>
    <artifactId>fastutil</artifactId>
    <version>5.0.9</version>
</dependency>
複製代碼

第二步:List => IntList

IntList fastutilExtractList = new IntArrayList();
複製代碼

調節數據本地化時長

Spark在Driver上,對Application的每個stage的task,進行分配以前,都會計算出每一個task要計算的是哪一個分片數據,RDD的某個partition;Spark的task分配算法,優先,會但願每一個task正好分配到它要計算的數據所在的節點,這樣的話,就不用在網絡間傳輸數據;

可是呢,一般來講,有時,事與願違,可能task沒有機會分配到它的數據所在的節點,爲何呢,可能那個節點的計算資源和計算能力都滿了;因此呢,這種時候,一般來講,Spark會等待一段時間,默認狀況下是3s鍾(不是絕對的,還有不少種狀況,對不一樣的本地化級別,都會去等待),到最後,實在是等待不了了,就會選擇一個比較差的本地化級別,好比說,將task分配到靠它要計算的數據所在節點,比較近的一個節點,而後進行計算。

可是對於第二種狀況,一般來講,確定是要發生數據傳輸,task會經過其所在節點的BlockManager來獲取數據,BlockManager發現本身本地沒有數據,會經過一個getRemote()方法,經過TransferService(網絡數據傳輸組件)從數據所在節點的BlockManager中,獲取數據,經過網絡傳輸回task所在節點。

對於咱們來講,固然不但願是相似於第二種狀況的了。最好的,固然是task和數據在一個節點上,直接從本地executor的BlockManager中獲取數據,純內存,或者帶一點磁盤IO;若是要經過網絡傳輸數據的話,那麼實在是,性能確定會降低的,大量網絡傳輸,以及磁盤IO,都是性能的殺手。

何時要調節這個參數?

觀察日誌,spark做業的運行日誌,推薦你們在測試的時候,先用client模式,在本地就直接能夠看到比較全的日誌。 日誌裏面會顯示,starting task。。。,PROCESS LOCAL、NODE LOCAL 觀察大部分task的數據本地化級別

若是大多都是PROCESS_LOCAL,那就不用調節了 若是是發現,好多的級別都是NODE_LOCAL、ANY,那麼最好就去調節一下數據本地化的等待時長 調節完,應該是要反覆調節,每次調節完之後,再來運行,觀察日誌 看看大部分的task的本地化級別有沒有提高;看看,整個spark做業的運行時間有沒有縮短

你別本末倒置,本地化級別卻是提高了,可是由於大量的等待時長,spark做業的運行時間反而增長了,那就仍是不要調節了

怎麼調節?

new SparkConf()
  .set("spark.locality.wait", "10")
複製代碼
  • spark.locality.wait,默認是3s;6s,10s

默認狀況下,下面3個的等待時長,都是跟上面那個是同樣的,都是3s

  • spark.locality.wait.process
  • spark.locality.wait.node
  • spark.locality.wait.rack
相關文章
相關標籤/搜索