spark-性能調優

問題:
一、分配哪些資源?
二、在哪裏分配這些資源?
三、爲何多分配了這些資源之後,性能會獲得提高?shell

  • 分配哪些資源?executor、cpu per executor、memory per executor、driver memory
  • 在哪裏分配這些資源?在咱們在生產環境中,提交spark做業時,用的spark-submit shell腳本,裏面調整對應的參數
    /usr/local/spark/bin/spark-submit \
    --class cn.spark.sparktest.core.WordCountCluster \
    --num-executors 3 \  配置executor的數量
    --driver-memory 100m \  配置driver的內存(影響很大)
    --executor-memory 100m \  配置每一個executor的內存大小
    --executor-cores 3 \  配置每一個executor的cpu core數量
    /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

     

  • 調節到多大,算是最大呢?
    • 第一種,Spark Standalone,公司集羣上,搭建了一套Spark集羣,你內心應該清楚每臺機器還可以給你使用的,大概有多少內存,多少cpu core;那麼,設置的時候,就根據這個實際的狀況,去調節每一個spark做業的資源分配。好比說你的每臺機器可以給你使用4G內存,2個cpu core;20臺機器;executor,20;平均每一個executor:4G內存,2個cpu core。數組

    • 第二種,Yarn。資源隊列。資源調度。應該去查看,你的spark做業,要提交到的資源隊列,大概有多少資源?500G內存,100個cpu core;executor,50;平均每一個executor:10G內存,2個cpu core。緩存

    • 設置隊列名稱:spark.yarn.queue defaultbash

    • 一個原則,你能使用的資源有多大,就儘可能去調節到最大的大小(executor的數量,幾十個到上百個不等;executor內存;executor cpu core)架構

  • 爲何調節了資源之後,性能能夠提高?app

    • 增長executor:性能

      若是executor數量比較少,那麼,可以並行執行的task數量就比較少,就意味着,咱們的Application的並行執行的能力就很弱。好比有3個executor,每一個executor有2個cpu core,那麼同時可以並行執行的task,就是6個。6個執行完之後,再換下一批6個task。增長了executor數量之後,那麼,就意味着,可以並行執行的task數量,也就變多了。好比原先是6個,如今可能能夠並行執行10個,甚至20個,100個。那麼並行能力就比以前提高了數倍,數十倍。相應的,性能(執行的速度),也能提高數倍~數十倍。優化

      有時候數據量比較少,增長大量的task反而性能會下降,爲何?(想一想就明白了,你用多了,別人用的就少了。。。。)spa

    • 增長每一個executor的cpu core:scala

      也是增長了執行的並行能力。本來20個executor,每一個才2個cpu core。可以並行執行的task數量,就是40個task。如今每一個executor的cpu core,增長到了5個。可以並行執行的task數量,就是100個task。執行的速度,提高了2.5倍。

      SparkContext,DAGScheduler,TaskScheduler,會將咱們的算子,切割成大量的task,
      提交到Application的executor上面去執行。

    • 增長每一個executor的內存量:

      增長了內存量之後,對性能的提高,有三點:
      一、若是須要對RDD進行cache,那麼更多的內存,就能夠緩存更多的數據,將更少的數據寫入磁盤,甚至不寫入磁盤。減小了磁盤IO。
      二、對於shuffle操做,reduce端,會須要內存來存放拉取的數據並進行聚合。若是內存不夠,也會寫入磁盤。若是給executor分配更多內存之後,就有更少的數據,須要寫入磁盤,
      甚至不須要寫入磁盤。減小了磁盤IO,提高了性能。
      三、對於task的執行,可能會建立不少對象。若是內存比較小,可能會頻繁致使JVM堆內存滿了,而後頻繁GC,垃圾回收,minor GC和full GC。(速度很慢)。內存加大之後,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。

Spark並行度指的是什麼?

Spark做業,Application,Jobs,action(collect)觸發一個job,1個job;每一個job拆成多個stage,
發生shuffle的時候,會拆分出一個stage,reduceByKey。

stage0
val lines = sc.textFile("hdfs://")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val wordCount = pairs.reduceByKey(_ + _)

stage1
val wordCount = pairs.reduceByKey(_ + _)
wordCount.collect()

reduceByKey,stage0的task,在最後,執行到reduceByKey的時候,會爲每一個stage1的task,都建立一份文件(也多是合併在少許的文件裏面);每一個stage1的task,會去各個節點上的各個task建立的屬於本身的那一份文件裏面,拉取數據;每一個stage1的task,拉取到的數據,必定是相同key對應的數據。對相同的key,對應的values,才能去執行咱們自定義的function操做(_ + _)

並行度:其實就是指的是,Spark做業中,各個stage的task數量,也就表明了Spark做業的在各個階段(stage)的並行度。

若是不調節並行度,致使並行度太低,會怎麼樣?

  • task沒有設置,或者設置的不多,好比就設置了,100個task。50個executor,每一個executor有3個cpu core,也就是說,你的Application任何一個stage運行的時候,都有總數在150個cpu core,能夠並行運行。可是你如今,只有100個task,平均分配一下,每一個executor分配到2個task,ok,那麼同時在運行的task,只有100個,每一個executor只會並行運行2個task。每一個executor剩下的一個cpu core,就浪費掉了。
  • 你的資源雖然分配足夠了,可是問題是,並行度沒有與資源相匹配,致使你分配下去的資源都浪費掉了。合理的並行度的設置,應該是要設置的足夠大,大到能夠徹底合理的利用你的集羣資源;好比上面的例子,總共集羣有150個cpu core,能夠並行運行150個task。那麼就應該將你的Application的並行度,至少設置成150,才能徹底有效的利用你的集羣資源,讓150個task,並行執行;並且task增長到150個之後,便可以同時並行運行,還可讓每一個task要處理的數據量變少;好比總共150G的數據要處理,若是是100個task,每一個task計算1.5G的數據;如今增長到150個task,能夠並行運行,並且每一個task主要處理1G的數據就能夠。
  • 很簡單的道理,只要合理設置並行度,就能夠徹底充分利用你的集羣計算資源,而且減小每一個task要處理的數據量,最終,就是提高你的整個Spark做業的性能和運行速度。
    1. task數量,至少設置成與Spark application的總cpu core數量相同(最理想狀況,好比總共150個cpu core,分配了150個task,一塊兒運行,差很少同一時間運行完畢)

    2. 官方是推薦,task數量,設置成spark application總cpu core數量的2~3倍,好比150個cpu core,基本要設置task數量爲300~500;實際狀況,與理想狀況不一樣的,有些task會運行的快一點,好比50s就完了,有些task,可能會慢一點,要1分半才運行完,因此若是你的task數量,恰好設置的跟cpu core數量相同,可能仍是會致使資源的浪費,由於,好比150個task,10個先運行完了,剩餘140個還在運行,可是這個時候,有10個cpu core就空閒出來了,就致使了浪費。那若是task數量設置成cpu core總數的2~3倍,那麼一個task運行完了之後,另外一個task立刻能夠補上來,就儘可能讓cpu core不要空閒,同時也是儘可能提高spark做業運行的效率和速度,提高性能。

    3. 如何設置一個Spark Application的並行度?

      spark.default.parallelism 
      SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")

       

默認狀況下,屢次對一個RDD執行算子,去獲取不一樣的RDD;都會對這個RDD以及以前的父RDD,所有從新計算一次;讀取HDFS->RDD1->RDD2-RDD4這種狀況,是絕對絕對,必定要避免的,一旦出現一個RDD重複計算的狀況,就會致使性能急劇下降。好比,HDFS->RDD1-RDD2的時間是15分鐘,那麼此時就要走兩遍,變成30分鐘

  1. RDD架構重構與優化儘可能去複用RDD,差很少的RDD,能夠抽取稱爲一個共同的RDD,供後面的RDD計算時,反覆使用。
  2. 公共RDD必定要實現持久化。就比如北方吃餃子,現包現煮。你人來了,要點一盤餃子。餡料+餃子皮+水->包好的餃子,對包好的餃子去煮,煮開了之後,纔有你須要的熟的,熱騰騰的餃子。現實生活中,餃子現包現煮,固然是最好的了。可是Spark中,RDD要去「現包現煮」,那就是一場致命的災難。對於要屢次計算和使用的公共RDD,必定要進行持久化。持久化,也就是說,將RDD的數據緩存到內存中/磁盤中,(BlockManager),之後不管對這個RDD作多少次計算,那麼都是直接取這個RDD的持久化的數據,好比從內存中或者磁盤中,直接提取一份數據。
  3. 持久化,是能夠進行序列化的若是正常將數據持久化在內存中,那麼可能會致使內存的佔用過大,這樣的話,也許,會致使OOM內存溢出。當純內存沒法支撐公共RDD數據徹底存放的時候,就優先考慮,使用序列化的方式在純內存中存儲。將RDD的每一個partition的數據,序列化成一個大的字節數組,就一個對象;序列化後,大大減小內存的空間佔用。序列化的方式,惟一的缺點就是,在獲取數據的時候,須要反序列化。若是序列化純內存方式,仍是致使OOM,內存溢出;就只能考慮磁盤的方式,內存+磁盤的普通方式(無序列化)。內存+磁盤,序列化。
  4. 爲了數據的高可靠性,並且內存充足,可使用雙副本機制,進行持久化持久化的雙副本機制,持久化後的一個副本,由於機器宕機了,副本丟了,就仍是得從新計算一次;持久化的每一個數據單元,存儲一份副本,放在其餘節點上面;從而進行容錯;一個副本丟了,不用從新計算,還可使用另一份副本。這種方式,僅僅針對你的內存資源極度充足.

持久化,很簡單,就是對RDD調用persist()方法,並傳入一個持久化級別

  • 若是是persist(StorageLevel.MEMORY_ONLY()),純內存,無序列化,那麼就能夠用cache()方法來替代
    • StorageLevel.MEMORY_ONLY_SER(),第二選擇
    • StorageLevel.MEMORY_AND_DISK(),第三選擇
    • StorageLevel.MEMORY_AND_DISK_SER(),第四選擇
    • StorageLevel.DISK_ONLY(),第五選擇
  • 若是內存充足,要使用雙副本高可靠機制,選擇後綴帶_2的策略
    • StorageLevel.MEMORY_ONLY_2()
相關文章
相關標籤/搜索