Spark性能優化總結

近期優化了一個spark流量統計的程序,此程序跑5分鐘小數據量日誌不到5分鐘,但相同的程序跑一天大數據量日誌各類失敗。經優化,使用160 vcores + 480G memory,一天的日誌可在2.5小時內跑完,下面對一些優化的思路方法進行梳理。 java

優化的目標

  1. 保證大數據量下任務運行成功
  2. 下降資源消耗
  3. 提升計算性能

三個目標優先級依次遞減,首要解決的是程序可以跑通大數據量,資源性能儘可能進行優化。 node

基礎優化

這部分主要對程序進行優化,主要考慮stage、cache、partition等方面。 web

Stage

在進行shuffle操做時,如reduceByKey、groupByKey,會劃分新的stage。同一個stage內部使用pipe line進行執行,效率較高;stage之間進行shuffle,效率較低。故大數據量下,應進行代碼結構優化,儘可能減小shuffle操做。 網絡

Cache

本例中,首先計算出一個baseRDD,而後對其進行cache,後續啓動三個子任務基於cache進行後續計算。 併發

對於5分鐘小數據量,採用StorageLevel.MEMORY_ONLY,而對於大數據下咱們直接採用了StorageLevel.DISK_ONLY。DISK_ONLY_2相較DISK_ONLY具備2備份,cache的穩定性更高,但同時開銷更大,cache除了在executor本地進行存儲外,還需走網絡傳輸至其餘節點。後續咱們的優化,會保證executor的穩定性,故沒有必要採用DISK_ONLY_2。實時上,若是優化的很差,咱們發現executor也會大面積掛掉,這時候即使DISK_ONLY_2,也是然並卵,因此保證executor的穩定性纔是保證cache穩定性的關鍵。 app

cache是lazy執行的,這點很容易犯錯,例如: jvm

val raw = sc.textFile(file)
val baseRDD = raw.map(...).filter(...)
baseRDD.cache()
val threadList = new Array(
  new Thread(new SubTaskThead1(baseRDD)),
  new Thread(new SubTaskThead2(baseRDD)),
  new Thread(new SubTaskThead3(baseRDD))
)
threadList.map(_.start())
threadList.map(_.join())

這個例子在三個子線程開始並行執行的時候,baseRDD因爲lazy執行,還沒被cache,這時候三個線程會同時進行baseRDD的計算,cache的功能形同虛設。能夠在baseRDD.cache()後增長baseRDD.count(),顯式的觸發cache,固然count()是一個action,自己會觸發一個job。 post

再舉一個錯誤的例子: 性能

val raw = sc.textFile(file)
val pvLog = raw.filter(isPV(_))
val clLog = raw.filter(isCL(_))
val baseRDD = pvLog.union(clLog)
val baseRDD.count()

因爲textFile()也是lazy執行的,故本例會進行兩次相同的hdfs文件的讀取,效率較差。解決辦法,是對pvLog和clLog共同的父RDD進行cache。 測試

Partition

一個stage由若干partition並行執行,partition數是一個很重要的優化點。

本例中,一天的日誌由6000個小文件組成,加上後續複雜的統計操做,某個stage的parition數達到了100w。parition過多會有不少問題,好比全部task返回給driver的MapStatus都已經很大了,超過spark.driver.maxResultSize(默認1G),致使driver掛掉。雖然spark啓動task的速度很快,可是每一個task執行的計算量太少,有一半多的時間都在進行task序列化,形成了浪費,另外shuffle過程的網絡消耗也會增長。

對於reduceByKey(),若是不加參數,生成的rdd與父rdd的parition數相同,不然與參數相同。還可使用coalesce()和repartition()下降parition數。例如,本例中因爲有6000個小文件,致使baseRDD有6000個parition,可使用coalesce()下降parition數,這樣parition數會減小,每一個task會讀取多個小文件。

val raw = sc.textFile(file).coalesce(300)
val baseRDD = raw.map(...).filter(...)
baseRDD.cache()

那麼對於每一個stage設置多大的partition數合適那?固然不一樣的程度的複雜度不一樣,這個數值須要不斷進行調試,本例中經測試保證每一個parition的輸入數據量在1G之內便可,若是parition數過少,每一個parition讀入的數據量變大,會增長內存的壓力。例如,咱們的某一個stage的ShuffleRead達到了3T,我設置parition數爲6000,平均每一個parition讀取500M數據。

val bigRDD = ...
bigRDD.coalesce(6000).reduceBy(...)

最後,通常咱們的原始日誌很大,可是計算結果很小,在saveAsTextFile前,能夠減小結果rdd的parition數目,這樣會計算hdfs上的結果文件數,下降小文件數會下降hdfs namenode的壓力,也會減小最後咱們收集結果文件的時間。

val resultRDD = ...
resultRDD.repartition(1).saveAsTextFile(output)

這裏使用repartition()不使用coalesce(),是爲了避免下降resultRDD計算的併發量,經過再作一次shuffle將結果進行彙總。

資源優化

在搜狗咱們的spark程序跑在yarn集羣上,咱們應保證咱們的程序有一個穩定高效的集羣環境。

設置合適的資源參數

一些經常使用的參數設置以下:

--queue:集羣隊列
--num-executors:executor數量,默認2
--executor-memory:executor內存,默認512M
--executor-cores:每一個executor的併發數,默認1

executor的數量能夠根據任務的併發量進行估算,例如我有1000個任務,每一個任務耗時1分鐘,若10個併發則耗時100分鐘,100個併發耗時10分鐘,根據本身對併發需求進行調整便可。默認每一個executor內有一個併發執行任務,通常夠用,也可適當增長,固然內存的使用也會有所增長。

對於yarn-client模式,整個application所申請的資源爲:

total vores = executor-cores * num-executors + spark.yarn.am.cores
total memory= (executor-memory + spark.yarn.executor.memoryOverhead) * num-executors + (spark.yarn.am.memory + spark.yarn.am.memoryOverhead)

當申請的資源超出所指定的隊列的min cores和min memory時,executor就有被yarn kill掉的風險。而spark的每一個stage是有狀態的,若是被kill掉,對性能影響比較大。例如,本例中的baseRDD被cache,若是某個executor被kill掉,會致使其上的cache的parition失效,須要從新計算,對性能影響極大。

這裏還有一點須要注意,executor-memory設置的是executor jvm啓動的最大堆內存,java內存除了堆內存外,還有棧內存、堆外內存等,因此spark使用spark.yarn.executor.memoryOverhead對非堆內存進行限制,也就是說executor-memory + spark.yarn.executor.memoryOverhead是所能使用的內存的上線,若是超過此上線,就會被yarn kill掉。本次優化,堆外內存的優化起到了相當重要的做用,咱們後續會看到。

spark.yarn.executor.memoryOverhead默認是executor-memory * 0.1,最小是384M。好比,咱們的executor-memory設置爲1G,spark.yarn.executor.memoryOverhead是默認的384M,則咱們向yarn申請使用的最大內存爲1408M,但因爲yarn的限制爲倍數(不知道是否是隻是咱們的集羣是這樣),實際上yarn運行咱們運行的最大內存爲2G。這樣感受浪費申請的內存,申請的堆內存爲1G,實際上卻給咱們分配了2G,若是對spark.yarn.executor.memoryOverhead要求不高的話,能夠對executor-memory再精細化,好比申請executor-memory爲640M,加上最小384M的spark.yarn.executor.memoryOverhead,正好一共是1G。

除了啓動executor外,spark還會啓動一個am,可使用spark.yarn.am.memory設置am的內存大小,默認是512M,spark.yarn.am.memoryOverhead默認也是最小384M。有時am會出現OOM的狀況,能夠適當調大spark.yarn.am.memory。

executor默認的永久代內存是64K,能夠看到永久代使用率長時間爲99%,經過設置spark.executor.extraJavaOptions適當增大永久代內存,例如:–conf spark.executor.extraJavaOptions=」-XX:MaxPermSize=64m」

driver端在yarn-client模式下運行在本地,也能夠對相關參數進行配置,如–driver-memory等。

查看日誌

executor的stdout、stderr日誌在集羣本地,當出問題時,能夠到相應的節點查詢,固然從web ui上也能夠直接看到。

executor除了stdout、stderr日誌,咱們能夠把gc日誌打印出來,便於咱們對jvm的內存和gc進行調試。

--conf spark.executor.extraJavaOptions="-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log"

除了executor的日誌,nodemanager的日誌也會給咱們一些幫助,好比由於超出內存上限被kill、資源搶佔被kill等緣由都能看到。

除此以外,spark am的日誌也會給咱們一些幫助,從yarn的application頁面能夠直接看到am所在節點和log連接。

複雜的集羣環境

咱們的yarn集羣節點上上跑着mapreduce、hive、pig、tez、spark等各種任務,除了內存有所限制外,CPU、帶寬、磁盤IO等都沒有限制(固然,這麼作也是爲了提升集羣的硬件利用率),加上集羣總體業務較多負載較高,使得spark的執行環境十分惡劣。常見的一些因爲集羣環境,致使spark程序失敗或者性能降低的狀況有:

  • 節點掛掉,致使此節點上的spark executor掛掉
  • 節點OOM,把節點上的spark executor kill掉
  • CPU使用太高,致使spark程序執行過慢
  • 磁盤目錄滿,致使spark寫本地磁盤失敗
  • 磁盤IO太高,致使spark寫本地磁盤失敗
  • HDFS掛掉,hdfs相關操做失敗

內存/GC優化

通過上述優化,咱們的程序的穩定性有所提高,可是讓咱們徹底跑通的最後一根救命稻草是內存、GC相關的優化。

Direct Memory

咱們使用的spark版本是1.5.2(更準確的說是1.5.3-shapshot),shuffle過程當中block的傳輸使用netty(spark.shuffle.blockTransferService)。基於netty的shuffle,使用direct memory存進行buffer(spark.shuffle.io.preferDirectBufs),因此在大數據量shuffle時,堆外內存使用較多。固然,也可使用傳統的nio方式處理shuffle,可是此方式在spark 1.5版本設置爲deprecated,並將會在1.6版本完全移除,因此我最終仍是採用了netty的shuffle。

jvm關於堆外內存的配置相對較少,經過-XX:MaxDirectMemorySize能夠指定最大的direct memory。默認若是不設置,則與最大堆內存相同。

Direct Memory是受GC控制的,例如ByteBuffer bb = ByteBuffer.allocateDirect(1024),這段代碼的執行會在堆外佔用1k的內存,Java堆內只會佔用一個對象的指針引用的大小,堆外的這1k的空間只有當bb對象被回收時,纔會被回收,這裏會發現一個明顯的不對稱現象,就是堆外可能佔用了不少,而堆內沒佔用多少,致使還沒觸發GC。加上-XX:MaxDirectMemorySize這個大小限制後,那麼只要Direct Memory使用到達了這個大小,就會強制觸發GC,這個大小若是設置的不夠用,那麼在日誌中會看到java.lang.OutOfMemoryError: Direct buffer memory。

例如,在咱們的例子中,發現堆外內存飆升的比較快,很容易被yarn kill掉,因此應適當調小-XX:MaxDirectMemorySize(也不能太小,不然會報Direct buffer memory異常)。固然你也能夠調大spark.yarn.executor.memoryOverhead,加大yarn對咱們使用內存的寬容度,可是這樣比較浪費資源了。

GC優化

GC優化前,最好是把gc日誌打出來,便於咱們進行調試。

--conf spark.executor.extraJavaOptions="-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log"

經過看gc日誌,咱們發現一個case,特定時間段內,堆內存其實很閒,堆內存使用率也就5%左右,長時間不進行父gc,致使Direct Memory一直不進行回收,一直在飆升。因此,咱們的目標是讓父gc更頻繁些,多觸發一些Direct Memory回收。

第一,能夠減小整個堆內存的大小,固然也不能過小,不然堆內存也會報OOM。這裏,我配置了1G的最大堆內存。

第二,可讓年輕代的對象儘快進入年老代,增長年老代的內存。這裏我使用了-Xmn100m,將年輕代大小設置爲100M。另外,年輕代的對象默認會在young gc 15次後進入年老代,這會形成年輕代使用率比較大,young gc比較多,可是年老代使用率低,父gc比較少,經過配置-XX:MaxTenuringThreshold=1,年輕代的對象通過一次young gc後就進入年老代,加快年老代父gc的頻率。

第三,可讓年老代更頻繁的進行父gc。通常年老代gc策略咱們主要有-XX:+UseParallelOldGC和-XX:+UseConcMarkSweepGC這兩種,ParallelOldGC吞吐率較大,ConcMarkSweepGC延遲較低。咱們但願父gc頻繁些,對吞吐率要求較低,並且ConcMarkSweepGC能夠設置-XX:CMSInitiatingOccupancyFraction,即年老代內存使用率達到什麼比例時觸發CMS。咱們決定使用CMS,並設置-XX:CMSInitiatingOccupancyFraction=10,即年老代使用率10%時觸發父gc。

經過對GC策略的配置,咱們發現父gc進行的頻率加快了,帶來好處就是Direct Memory可以儘快進行回收,固然也有壞處,就是gc時間增長了,cpu使用率也有所增長。

最終咱們對executor的配置以下:

--executor-memory 1G --num-executors 160 --executor-cores 1 --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.executor.extraJavaOptions="-XX:MaxPermSize=64m -XX:+CMSClassUnloadingEnabled -XX:MaxDirectMemorySize=1536m -Xmn100m -XX:MaxTenuringThreshold=1 -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=10 -XX:+UseCompressedOops -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError"

總結

經過對Stage/Cache/Partition、資源、內存/GC的優化,咱們的spark程序最終可以在160 vcores + 480G memory資源下,使用2.5小時跑通一天的日誌。

對於程序優化,我認爲應本着以下幾點進行:

  1. 經過監控CPU、內存、網絡、IO、GC、應用指標等數據,切實找到系統的瓶頸點。
  2. 統籌全局,制定相應的解決方案,解決問題的思路是否清晰準確很重要,另外切勿『頭疼醫頭,腳疼醫腳』,應整體考慮把握。
  3. 瞭解一些技術的背景知識,對於每次優化儘可能作得完全些,多進行總結。
相關文章
相關標籤/搜索