問題:
一、分配哪些資源?
二、在哪裏分配這些資源?
三、爲何多分配了這些資源之後,性能會獲得提高?shell
/usr/local/spark/bin/spark-submit \ --class cn.spark.sparktest.core.WordCountCluster \ --num-executors 3 \ 配置executor的數量 --driver-memory 100m \ 配置driver的內存(影響很大) --executor-memory 100m \ 配置每一個executor的內存大小 --executor-cores 3 \ 配置每一個executor的cpu core數量 /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
第一種,Spark Standalone,公司集羣上,搭建了一套Spark集羣,你內心應該清楚每臺機器還可以給你使用的,大概有多少內存,多少cpu core;那麼,設置的時候,就根據這個實際的狀況,去調節每一個spark做業的資源分配。好比說你的每臺機器可以給你使用4G內存,2個cpu core;20臺機器;executor,20;平均每一個executor:4G內存,2個cpu core。數組
第二種,Yarn。資源隊列。資源調度。應該去查看,你的spark做業,要提交到的資源隊列,大概有多少資源?500G內存,100個cpu core;executor,50;平均每一個executor:10G內存,2個cpu core。緩存
設置隊列名稱:spark.yarn.queue defaultbash
一個原則,你能使用的資源有多大,就儘可能去調節到最大的大小(executor的數量,幾十個到上百個不等;executor內存;executor cpu core)架構
爲何調節了資源之後,性能能夠提高?app
增長executor:性能
若是executor數量比較少,那麼,可以並行執行的task數量就比較少,就意味着,咱們的Application的並行執行的能力就很弱。好比有3個executor,每一個executor有2個cpu core,那麼同時可以並行執行的task,就是6個。6個執行完之後,再換下一批6個task。增長了executor數量之後,那麼,就意味着,可以並行執行的task數量,也就變多了。好比原先是6個,如今可能能夠並行執行10個,甚至20個,100個。那麼並行能力就比以前提高了數倍,數十倍。相應的,性能(執行的速度),也能提高數倍~數十倍。優化
有時候數據量比較少,增長大量的task反而性能會下降,爲何?(想一想就明白了,你用多了,別人用的就少了。。。。)spa
增長每一個executor的cpu core:scala
也是增長了執行的並行能力。本來20個executor,每一個才2個cpu core。可以並行執行的task數量,就是40個task。如今每一個executor的cpu core,增長到了5個。可以並行執行的task數量,就是100個task。執行的速度,提高了2.5倍。
SparkContext,DAGScheduler,TaskScheduler,會將咱們的算子,切割成大量的task,
提交到Application的executor上面去執行。
增長每一個executor的內存量:
增長了內存量之後,對性能的提高,有三點:
一、若是須要對RDD進行cache,那麼更多的內存,就能夠緩存更多的數據,將更少的數據寫入磁盤,甚至不寫入磁盤。減小了磁盤IO。
二、對於shuffle操做,reduce端,會須要內存來存放拉取的數據並進行聚合。若是內存不夠,也會寫入磁盤。若是給executor分配更多內存之後,就有更少的數據,須要寫入磁盤,
甚至不須要寫入磁盤。減小了磁盤IO,提高了性能。
三、對於task的執行,可能會建立不少對象。若是內存比較小,可能會頻繁致使JVM堆內存滿了,而後頻繁GC,垃圾回收,minor GC和full GC。(速度很慢)。內存加大之後,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。
Spark並行度指的是什麼?
Spark做業,Application,Jobs,action(collect)觸發一個job,1個job;每一個job拆成多個stage,
發生shuffle的時候,會拆分出一個stage,reduceByKey。
stage0 val lines = sc.textFile("hdfs://") val words = lines.flatMap(_.split(" ")) val pairs = words.map((_,1)) val wordCount = pairs.reduceByKey(_ + _) stage1 val wordCount = pairs.reduceByKey(_ + _) wordCount.collect()
reduceByKey,stage0的task,在最後,執行到reduceByKey的時候,會爲每一個stage1的task,都建立一份文件(也多是合併在少許的文件裏面);每一個stage1的task,會去各個節點上的各個task建立的屬於本身的那一份文件裏面,拉取數據;每一個stage1的task,拉取到的數據,必定是相同key對應的數據。對相同的key,對應的values,才能去執行咱們自定義的function操做(_ + _)
並行度:其實就是指的是,Spark做業中,各個stage的task數量,也就表明了Spark做業的在各個階段(stage)的並行度。
若是不調節並行度,致使並行度太低,會怎麼樣?
task數量,至少設置成與Spark application的總cpu core數量相同(最理想狀況,好比總共150個cpu core,分配了150個task,一塊兒運行,差很少同一時間運行完畢)
官方是推薦,task數量,設置成spark application總cpu core數量的2~3倍,好比150個cpu core,基本要設置task數量爲300~500;實際狀況,與理想狀況不一樣的,有些task會運行的快一點,好比50s就完了,有些task,可能會慢一點,要1分半才運行完,因此若是你的task數量,恰好設置的跟cpu core數量相同,可能仍是會致使資源的浪費,由於,好比150個task,10個先運行完了,剩餘140個還在運行,可是這個時候,有10個cpu core就空閒出來了,就致使了浪費。那若是task數量設置成cpu core總數的2~3倍,那麼一個task運行完了之後,另外一個task立刻能夠補上來,就儘可能讓cpu core不要空閒,同時也是儘可能提高spark做業運行的效率和速度,提高性能。
如何設置一個Spark Application的並行度?
spark.default.parallelism SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")
默認狀況下,屢次對一個RDD執行算子,去獲取不一樣的RDD;都會對這個RDD以及以前的父RDD,所有從新計算一次;讀取HDFS->RDD1->RDD2-RDD4這種狀況,是絕對絕對,必定要避免的,一旦出現一個RDD重複計算的狀況,就會致使性能急劇下降。好比,HDFS->RDD1-RDD2的時間是15分鐘,那麼此時就要走兩遍,變成30分鐘
持久化,很簡單,就是對RDD調用persist()方法,並傳入一個持久化級別