----本節內容-------html
1.遺留問題解答java
2.Spark調優初體驗node
2.1 利用WebUI分析程序瓶頸linux
2.2 設置合適的資源web
2.3 調整任務的併發度算法
2.4 修改存儲格式sql
3.Spark調優經驗shell
3.1 Spark原理及調優工具數據庫
3.2 運行環境優化apache
3.2.1 防止沒必要要的分發
3.2.2 提升數據本地性
3.2.3 存儲格式選擇
3.2.4 選擇高配機器
3.3 優化操做符
3.3.1 過濾操做致使多小任務
3.3.2 下降單條記錄開銷
3.3.3 處理數據傾斜或者任務傾斜
3.3.4 複用RDD進行緩存
3.3.5 慎用耗資源操做符
3.3.6 做業並行化
3.4 做業參數調優
3.4.1 設置合適的資源量
3.4.2 設置合理的JVM
3.4.3 啓用更高效的序列化方法
3.4.4 增大off head內存
3.4.5 shuffle參數調優
3.4.6 設置reduce task數目
3.4.7 使用spark sql實現
4.Spark 調優案例
5.參考資料
---------------------
1.遺留問題
1)BAT這樣的企業內部是如何開發和運行Spark程序的?
· 開發和測試
intellij進行開發,對scala支持的很是好,java也支持的很好,開發好了在local模式下運行進行測試,intellij是不能遠程提交到集羣的,itellij沒有分發jar包功能。
· spark生產環境運行
將全部依賴的包打成assembly.jar包,這樣再也不依賴環境裏的任何模式,通常用yarn cluster模式運行(driver能夠容錯),能夠用yarn client模式進行測試。天天要跑,可使用工做流調度器,按照你的設置定時定點跑程序,若是出錯了能夠報警,工做流調度引擎,能夠處理複雜的依賴,用的比較多的開源
airflow:輕量級,不少公司用
oozie:比較複雜
2)編譯spark程序很是慢
修改mavn pom倉庫,將國外的倉庫,改爲國內的倉庫,如阿里雲
3)Spark參數的設置問題
有三種設置方式, 在程序內設置(優先級高),提交參數設置(優先級中),配置文件配置(優先級低),park-default:經常使用的能夠在配置文件中配
2.Spark調優初體驗
調優程序,首先得知道程序慢在哪裏,要定位問題,找到優化的點。定位問題又涉及到不少層面,機器硬件、網絡、操做系統、JVM虛擬機、大數據軟件平臺層、軟件開發層,因此調優是一個綜合工程,Spark程序調優能夠分爲如下幾個層面:
1)基礎設施層
機器硬件(如磁盤的選擇,SATA盤仍是SAS盤,磁盤RAID方式等)、網絡(千兆網卡仍是萬兆網卡,網絡峯值期間的帶寬、吞吐、網絡延遲、網絡抖動,不少時候網絡問題致使各類莫名問題,舉個真實的例子,公司網線被老鼠咬了,致使網絡時而能夠,時而不行,鬼知道是什麼問題,讓人抓狂)、操做系統(操做系統的穩定性,內核版本的選擇,很是重要,還有一些配置策略得和hadoop生態吻合)。這些都很是底層了,就網絡、linux操做系統就夠花時間學習和了解的了,不少時候須要系統集成部的同事一塊兒配合。
2)大數據平臺層:HDFS、YARN、Hive、Spark、JVM等
JVM的調優,尚未什麼經驗,並且目前也沒有很是深刻,最多也就是GC策略挑挑,大數據平臺調優真是個技術活,須要不少經驗。
3)程序開發層
開發工程師程序開發技巧、對業務的理解等
2.1利用WebUI分析Spark程序瓶頸
選擇合適的資源,前提是瞭解集羣有多少資源 。1)集羣內存總量:單臺節點共8G,2G給datanode,6G個spark作計算(yarn共有18G,3個節點);2)集羣CPU總合數: 每一個機器有8個CPU,共24個core;3)集羣總存儲:總存儲空間有多少,通常保證20%左右的剩餘空間,要否則會影響集羣總體性能,HDFS剩餘空間不足也會致使任務執行很慢,甚至失敗。
如何觀察和評估程序的效率怎麼執行shell,跑spark任務就不講了,說明一下如何觀察Spark任務執行的瓶頸,從哪幾個指標觀察任務執行的效率。
指標一:觀察Spark任務解析和提交消耗的時間
主要涉及到的是jar包上傳的優化,這裏面分爲2種狀況:
1)程序依賴的jar,這種一般是spark lib目錄下的全部jar包,有好幾百兆,spark程序會上傳這些,爲了提高效率,能夠提早上傳好,
2)程序自身的jar包,若是程序不常常變更,也能夠提早上傳到HDFS上。
指標二:觀察WebUI產生的系統監控數據
這裏面有不少指標,羅列以下
1)觀察job監控參數,產生了多少個job,一個action對應一個job,若是action之間沒有依賴關係,資源有富餘,可讓job並行執行。
2)觀察stage監控參數,一個job分解成告終果stage,每一個stage執行了多少時間,輸入了多少數據量,shuffle read了多少數據,shuffle write 了多少數據。
3)觀察executor監控參數,driver在哪裏,executor在哪裏,每一個executor啓動了幾個CPU核數,起了多少內存,輸入多少數據(能夠查看數據是否有傾斜),shuffle了多少數據,內存放了多少數據,GC執行了多少時間。還能夠查看stderr,查看日誌。
4)觀察每一個task監控參數,task執行的時間,GC執行的時間,讀入的數據總大小和記錄數,shuffle的大小,task執行時的本地行,舉個執行的任務例子,以下所示
能夠看出來,一共有2個stage,1個stage包含8個task,一個包含2個task,先跑8個的,再跑2個的,一個14秒,一個0.1秒。再看看executor,發現只有2個executor,一個executor只有1個core,也就是一個executor只能處理一個task,集羣也就是最多跑2個,10個task要跑5輪
再點擊stage的連接進去,觀察每一個task跑多長時間
好了,按照前面羅列的監控指標點,能夠看出以下監控參數:
集羣資源:內存3個節點18G,VCORE:3個節點24VCORE
1)Job監控參數:只有一個Job
2)Stage監控參數:有2個Stage
· stage1:8個task,這個先跑,執行了14秒
· stage2:2個task,這個後跑,執行了0.1秒,是collect方法,數據匯聚到driver
3)executor:2個executor,1個executor 1G內存
4)task:本地性node_local,GC毫秒級,shuffle也是不足Kb
分析消耗的資源:
1)從stage的執行時間分析,stage1執行時間長,能夠考慮優化stage1
2)executor分析:只有2個executor,內存也只有1G,啓動的都是默認參數,和集羣資源相比,有很做資源沒有利用起來,有點浪費,能夠調整executor個數,增大併發度,或者若是數據量大,也有必要,增大內存。
3)task:本地行還能夠,node級別,gc也能夠,shuffle也少
總結可能優化的點:
優化stage1,Job中的stage1有8個task,2個executor,須要跑4輪才能跑完第一輪的全部task,調整爲8個executor(原來同時跑2個task,如今能夠同時跑8個task了),那麼只須要一輪就而已跑完全部的task。內存1G也夠用了,由於輸入數據就不足1G,若是輸入數據很大的話,也能夠增大內存。
2.2 設置合適的資源
設置合適的資源,要明確集羣的資源總量,而後觀察監控指標,檢查是否充分使用了集羣中的資源
資源量關聯度比較大的一個是內存一個是CPU的使用率,固然還有其餘,但這兩個指標是比較重點的,這兩個指標對應spark程序主要是Executor的內存和core。因此計算資源的設置單位是Executor
增長Executor個數:--num-executors 4
增長每一個Executor同時雲心的task數目:--executor-cores 2
除了Executor消耗資源,還有driver和shuffle等也都是消耗資源大戶,我把幾個經常使用的和資源使用相關的參數含義及參考值總結以下:
num-executors
參數說明:該參數用於設置Spark做業總共要用多少個Executor進程來執行。Driver在向YARN集羣管理器申請資源時,YARN集羣管理器會盡量按照你的設置來在集羣的各個工做節點上,啓動相應數量的Executor進程。這個參數很是之重要,若是不設置的話,默認只會給你啓動少許的Executor進程,此時你的Spark做業的運行速度是很是慢的。
參數調優建議:每一個Spark做業的運行通常設置50~100個左右的Executor進程比較合適,設置太少或太多的Executor進程都很差。設置的太少,沒法充分利用集羣資源;設置的太多的話,大部分隊列可能沒法給予充分的資源。
executor-memory
參數說明:該參數用於設置每一個Executor進程的內存。Executor內存的大小,不少時候直接決定了Spark做業的性能,並且跟常見的JVM OOM異常,也有直接的關聯。
參數調優建議:每一個Executor進程的內存設置4G~8G較爲合適。可是這只是一個參考值,具體的設置仍是得根據不一樣部門的資源隊列來定。能夠看看本身團隊的資源隊列的最大內存限制是多少,num-executors乘以executor-memory,就表明了你的Spark做業申請到的總內存量(也就是全部Executor進程的內存總和),這個量是不能超過隊列的最大內存量的。此外,若是你是跟團隊裏其餘人共享這個資源隊列,那麼申請的總內存量最好不要超過資源隊列最大總內存的1/3~1/2,避免你本身的Spark做業佔用了隊列全部的資源,致使別的同窗的做業沒法運行。
executor-cores
參數說明:該參數用於設置每一個Executor進程的CPU core數量。這個參數決定了每一個Executor進程並行執行task線程的能力。由於每一個CPU core同一時間只能執行一個task線程,所以每一個Executor進程的CPU core數量越多,越可以快速地執行完分配給本身的全部task線程。
參數調優建議:Executor的CPU core數量設置爲2~4個較爲合適。一樣得根據不一樣部門的資源隊列來定,能夠看看本身的資源隊列的最大CPU core限制是多少,再依據設置的Executor數量,來決定每一個Executor進程能夠分配到幾個CPU core。一樣建議,若是是跟他人共享這個隊列,那麼num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,也是避免影響其餘同窗的做業運行。
driver-memory
參數說明:該參數用於設置Driver進程的內存。
參數調優建議:Driver的內存一般來講不設置,或者設置1G左右應該就夠了。惟一須要注意的一點是,若是須要使用collect算子將RDD的數據所有拉取到Driver上進行處理,那麼必須確保Driver的內存足夠大,不然會出現OOM內存溢出的問題。
spark.default.parallelism
參數說明:該參數用於設置每一個stage的默認task數量。這個參數極爲重要,若是不設置可能會直接影響你的Spark做業性能。
參數調優建議:Spark做業的默認task數量爲500~1000個較爲合適。不少同窗常犯的一個錯誤就是不去設置這個參數,那麼此時就會致使Spark本身根據底層HDFS的block數量來設置task的數量,默認是一個HDFS block對應一個task。一般來講,Spark默認設置的數量是偏少的(好比就幾十個task),若是task數量偏少的話,就會致使你前面設置好的Executor的參數都前功盡棄。試想一下,不管你的Executor進程有多少個,內存和CPU有多大,可是task只有1個或者10個,那麼90%的Executor進程可能根本就沒有task執行,也就是白白浪費了資源!所以Spark官網建議的設置原則是,設置該參數爲num-executors * executor-cores的2~3倍較爲合適,好比Executor的總CPU core數量爲300個,那麼設置1000個task是能夠的,此時能夠充分地利用Spark集羣的資源。
spark.storage.memoryFraction
參數說明:該參數用於設置RDD持久化數據在Executor內存中能佔的比例,默認是0.6。也就是說,默認Executor 60%的內存,能夠用來保存持久化的RDD數據。根據你選擇的不一樣的持久化策略,若是內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。
參數調優建議:若是Spark做業中,有較多的RDD持久化操做,該參數的值能夠適當提升一些,保證持久化的數據可以容納在內存中。避免內存不夠緩存全部的數據,致使數據只能寫入磁盤中,下降了性能。可是若是Spark做業中的shuffle類操做比較多,而持久化操做比較少,那麼這個參數的值適當下降一些比較合適。此外,若是發現做業因爲頻繁的gc致使運行緩慢(經過spark web ui能夠觀察到做業的gc耗時),意味着task執行用戶代碼的內存不夠用,那麼一樣建議調低這個參數的值。
spark.shuffle.memoryFraction
參數說明:該參數用於設置shuffle過程當中一個task拉取到上個stage的task的輸出後,進行聚合操做時可以使用的Executor內存的比例,默認是0.2。也就是說,Executor默認只有20%的內存用來進行該操做。shuffle操做在進行聚合時,若是發現使用的內存超出了這個20%的限制,那麼多餘的數據就會溢寫到磁盤文件中去,此時就會極大地下降性能。
參數調優建議:若是Spark做業中的RDD持久化操做較少,shuffle操做較多時,建議下降持久化操做的內存佔比,提升shuffle操做的內存佔比比例,避免shuffle過程當中數據過多時內存不夠用,必須溢寫到磁盤上,下降了性能。此外,若是發現做業因爲頻繁的gc致使運行緩慢,意味着task執行用戶代碼的內存不夠用,那麼一樣建議調低這個參數的值。
資源參數的調優,沒有一個固定的值,須要同窗們根據本身的實際狀況(包括Spark做業中的shuffle操做數量、RDD持久化操做數量以及spark web ui中顯示的做業gc狀況),同時參考本篇文章中給出的原理以及調優建議,合理地設置上述參數。
參數設置demo
/bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
優化後的,一共執行了11秒
每一個task執行時間
再觀察每一個task花費時間,觀察發現,每一個task執行的時間比原來的4秒還長,緣由是在資源不變的狀況,任務數多了只有2個CPU,CPU變成瓶頸了,要頻繁切換若是CPU不少,優化效果會很是好,CPU是瓶頸
2.3 設置合適的併發度
任務的併發度有幾個級別:job級別的併發,stage級別的併發,task級別的併發。這裏談的併發優化是task級別的,主要就是map任務並行度和reduce任務的並行度,這方面的調優其實能夠參考MapReduce的調優,原理都是同樣同樣的。Spark的任務數須要注意幾點:
1)Map個數默認是和輸入文件的blok數是同樣的,如hdfs則和 blokc數目一致,hbase則和regio個數一致。
2)rdd之間的map個數若是不修改,後面的和前面個數同樣
3)reduce默認個數也是和map個數同樣
map設置方法:
單個設置: sc.textFile("/input/data",100); //指定100個blokc,那麼就100個map
批量設置:將每一個map處理數量調大,map數就少了,默認128M
2.4 修改存儲格式
不少人並不明白爲何文件存儲格式會影響文件的讀取效率,我打個最簡單的比方。咱們知道linux 系統是單機版的操做系統,裏面有ext3,ext4這樣的文件,ext的職責就是對linux系統文件進行管理,HDFS是分佈式的文件系統,也是對文件進行管理。好的文件系統就像一個勤快的媳婦,在你房子面積不變的狀況下,勤快的媳婦會將物品放的層次分明,利用到房子裏面的每一個空間,你要拿什麼東西,都能很快找到;而很差的文件系統就像是一個懶媳婦,房間裏面堆滿了東西,找起來很麻煩,房間利用率也很是糟糕,找東西困難,放東西進去也很難,東西越多越髒越亂。(媳婦沒有好壞之分,只有適合不適合,還有看你怎麼和媳婦相處了,互相了不瞭解,性格和脾氣對不對路,文件系統也是如此)
文件存儲格式和文件系統是同樣的原理,文件系統管理的是文件,而文件儲存格式管理的是文件內容(管理的是文件中每一行每一列的具體內容)。因此低效率的文件存儲格式就像是一個賴媳婦,家裏被管的一塌糊塗,東西越多越髒亂差,高效率的文件存儲格式就是勤快且聰明的媳婦,一切都管的井井有理,取東西方便,放東西也容易,還會根據不一樣的物品特徵進行擺放,完美,6666!!!
csv,txt,json等等都是懶媳婦,parquet,orc都是勤快媳婦,那爲何文本文件是懶媳婦,parquet是好媳婦,主要有如下幾個緣由:
文本文件爲何很差?
文本文件行存儲,存儲佔用空間,並且讀取數據的時候會讀出不少沒必要要的數據出來,這就好像你叫懶媳婦給你拿一頂帽子,結果她把衣服,鞋子,襪子通通拿出來,而後再從裏面挑出你要的帽子。
parquet爲何好,Spark使用parquet文件存儲格式意義在哪裏?
1) 若是說HDFS 是大數據時代分佈式文件系統首選標準,那麼parquet則是整個大數據時代文件存儲格式實時首選標準
2) 速度更快:從使用spark sql操做普通文件CSV和parquet文件速度對比上看,絕大多數狀況
會比使用csv等普通文件速度提高10倍左右,在一些普通文件系統沒法在spark上成功運行的狀況
下,使用parquet不少時候能夠成功運行
3) parquet的壓縮技術很是穩定出色,在spark sql中對壓縮技術的處理可能沒法正常的完成工做
(例如會致使lost task,lost executor)可是此時若是使用parquet就能夠正常的完成
4) 極大的減小磁盤I/o,一般狀況下可以減小75%的存儲空間,由此能夠極大的減小spark sql處理
數據的時候的數據輸入內容,尤爲是在spark1.6x中有個下推過濾器在一些狀況下能夠極大的
減小磁盤的IO和內存的佔用,(下推過濾器)
5) spark 1.6x parquet方式極大的提高了掃描的吞吐量,極大提升了數據的查找速度spark1.6和spark1.5x相比而言,提高了大約1倍的速度,在spark1.6X中,操做parquet時候cpu也進行了極大的優化,有效的下降了cpu
6) 採用parquet能夠極大的優化spark的調度和執行。咱們測試spark若是用parquet能夠有效的減小stage的執行消耗,同時能夠優化執行路徑
3.Spark調優經驗
3.1 Spark原理及調優工具
· Spark Web UI界面
· jstack、jstat、jprofile
· history server:當Spark應用退出後,仍能夠得到歷史Spark應用的stages和tasks執行信息,便於分析程序不明緣由掛掉的狀況,Spark的history server依賴mr的history server。
3.2 運行環境優化
3.2.1 防止沒必要要的分發
每一個Application都會上傳一個spark-assembly-x.x.x-SNAPSHOT-hadoopx.x.x-cdhx.x.x.jar的jar包,影響HDFS的性能以及佔用HDFS的空間.對於用戶的jar包,有時候體積也很是龐大,咱們一樣的方式上傳hdfs上,而後直接使用。
1. 依賴ja包重複上傳
執行spark任務有大量jar包上傳HDFS,將系統jar包上傳到hdfs上,直接使用hdfs上的文件,具體下:
1)修改conf/spark-default.conf添加如下配置
spark.yarn.jar hdfs://master:9000/system/spark/jars/spark-assembly-1.6.0-hadoop2.6.0.jar
2)再次執行SparkPi,提交腳本發生了變化,以下:
bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 1 \ lib/spark-examples*.jar 10
2. 用戶jar包重複上傳,避免重複分發
bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 1 \ hdfs://master:9000/user/spark/jars/spark-examples-1.6.0-hadoop2.6.0.jar 10
3.2.2 提升數據本地性
分佈式數據並行環境下,保持數據的本地性是很是重要的內容,事關分佈式系統性能高下,涉及到數據本地性的概念有block、partition、worker、rack。
Spark中的數據本地性有三種:
PROCESS_LOCAL是指讀取緩存在本地節點的數據
NODE_LOCAL是指讀取本地節點
RACK_LOCAL是指讀非本機架的節點數據
yarn和hfs儘量的在一個節點上不少rack local,說明本地性不好,能夠經過增長副本數來提高本地新。
3.2.3 存儲格式選擇
BAT等公司80%都是採用列式存儲結構 ,相同的列存儲在一塊兒,只讀取所需的列,io減小,相同的列存在一塊兒,壓縮比會很是高。大概是行存儲的1/22個apache頂級項目ORC:源自於hive,建表最好都搞成orc,hive經常使用parquet,rdd讀取效率低,spark sql高效率讀取
列式存儲和行式存儲相比有哪些優點呢?
·能夠跳過不符合條件的數據,只讀取須要的數據,下降IO數據量。
·壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節約存儲空間。
·只讀取須要的列,支持向量運算,可以獲取更好的掃描性能。
從上圖能夠很清楚地看到,行式存儲下一張表的數據都是放在一塊兒的,但列式存儲下都被分開保存了。因此它們就有了以下這些優缺點:
經過字典表壓縮數據。爲了方面後面的講解,這部分也順帶提一下了。
下面中才是那張表原本的樣子。通過字典表進行數據壓縮後,表中的字符串才都變成數字了。正由於每一個字符串在字典表裏只出現一次了,因此達到了壓縮的目的(有點像規範化和非規範化Normalize和Denomalize)
下面這個圖,經過一條查詢的執行過程說明列式存儲(以及數據壓縮)的優勢:
關鍵步驟以下:
1.去字典表裏找到字符串對應數字(只進行一次字符串比較)。
2. 用數字去列表裏匹配,匹配上的位置設爲1。
3. 把不一樣列的匹配結果進行位運算獲得符合全部條件的記錄下標。
4. 使用這個下標組裝出最終的結果集。
3.2.4 選擇高配機器
隨着硬件的不斷髮展和企業的需求的不斷變化, 大部分企業在集羣搭建初期和中期的集羣配置都不同。而Spark是一個很是消耗內存的,所以對於初期一些配置較低,尤爲內存較差的機器,是不適合跑spark任務的,更加適合硬件配置高一點的機器上跑。機器配置的良莠不齊,應該如何有區別的調度
,yarn提供了很好的解決方案。
yarn支持標籤,根據機器的配置,給機器打相應的標籤,標籤如何打不在討論範圍,目前只有capacity 調度算法支持標籤給隊列支持打標籤,將標籤和隊列綁定在一塊兒,將應用程序提交到指定標籤的隊列,執行的時候就會提交到到相應標籤節點 。
不少時候是基礎平臺的修改,運維負責優化 yarn基於標籤的調度,haodop從hadoop2.6.0開始提供基於標籤的調度策略。
3.3 優化操做符
3.3.1 過濾操做致使多小任務
filter操做使用不當,很容易引起麻煩。假如一個任務有3個parition,通過filger過濾以後,可能致使部分剩下不多,有些剩餘不少,剩餘不少的在下一步計算量很大,會拖後腿,其餘的做業很快就作完了,而剩餘不少的要執行很長時間,整個任務都要延誤,而其餘很快執行完的做業早就釋放資源了
形成資源還的浪費
對於這種場景有2種優化策略:
1)coalses:合併已有的partiion,性能很是高,可是頗有可能還不是很均與,
大的依舊很大,小的進行了合併
2)repartion:根據數據量燈亮劃分,每一個partion儘量均勻,會通過一次shuffle比較均勻
3.3.2 下降單條記錄開銷
作過Java鏈接數據庫操做的人都知道,要儘可能避免數據庫連接的頻繁創建和斷開,方法不少,好比數據庫鏈接池的發明。單機版本對數據庫的鏈接操做比較容易管理和控制,但在分佈式環境下,數據庫的鏈接管理和控制很麻煩,數據的鏈接是不可序列化的,所以分佈式環境下,統一管理數據庫鏈接顯然是不靠譜的。好比這段代碼,若是寫數據到數據庫,就會頻繁創建和斷開鏈接,顯然是低效率的。由於數據庫鏈接的不可序列化,你也不可能把conn拿出來。
解決方法是:使用mapPartitions或者mapWith操做符
緣由在於mapPartitions是map的調用的粒度不一樣,map的輸入變換函數是應用於RDD中每一個元素,而mapPartitions的輸入函數是應用於每一個分區。
假設一個rdd有10個元素,分紅3個分區。若是使用map方法,map中的輸入函數會被調用10次;而使用mapPartitions方法的話,其輸入函數會只會被調用3次,每一個分區調用1次。在大數據集狀況下的資源初始化開銷和批處理處理,尤爲數據庫連接操做,顯得特別好用。
3.3.3 處理數據傾斜或者任務傾斜
spark任務中的數據傾斜可能致使某一臺節點超負荷運轉、內存不足,其餘節點都處於空閒等待,加內存不能解決問題。應該找到出問題的shuffle操做,修正它。具體問題具體分析,有以下個方向,但不限於此。
1)改變數據結構,從源頭調整數據的分佈,例如分表,分區存放,橫向或者縱向分表等。
2)修改並行度
· 改變並行度能夠改善數據傾斜的緣由是由於若是某個task有100個key而且數據巨大,那麼有可能致使OOM或者任務運行緩慢; ·此時若是把並行度變大,那麼能夠分解每一個task的數據量,好比把該task分解給10個task, 那麼每一個task的數據量將變小,從而能夠解決OOM或者任務執行慢.對應reduceByKey而言能夠傳入並行度參數也能夠自定義partition. · 增長並行度:改變計算資源並無從根本上解決數據傾斜的問題,可是加快了任務運行的速度. · 這是加入有傾斜的key, 加隨機數前綴,reduceByKey聚合操做能夠分而治之,產生的結果是代前綴的,所以需
3)提取彙集,預操做join, 把傾斜數據在上游進行操做.
4)局部聚合+全局聚合
5)儘可能避免shuffle
6)啓用推測執行,避免慢節點任務拖後腿,慢磁盤問題在hadoop集羣運行了好幾年以後很是明顯,尤爲是磁盤,hadoop以及不少監控工具並無對慢磁盤進行監控,須要本身寫腳本監控。
下面的例子就是經過對key進行增長隨機數,而後進行局部聚合+全局聚合
3.3.4 複用RDD進行緩存
RDD是一系列的數據+計算,每一次計算利用上次計算結果都會從新計算,對於一些經常使用的計算結果能夠緩存起來,避免重複計算
cache和persist的區別:cache只有一個默認的緩存級別MEMORY_ONLY ,而persist能夠根據狀況設置其它的緩存級別。
3.3.5 慎用耗資源操做符
選擇 Operator 方案的主要目標是減小 shuffle 的次數以及被 shuffle 的文件的大小。由於 shuffle 是最耗資源的操做,因此有 shuffle 的數據都須要寫到磁盤而且經過網絡傳遞,repartition,join,cogroup,以及任何 *By 或者 *ByKey 的 transformation 都須要 shuffle 數據。不是全部這些 Operator 都是平等的,可是有些常見的性能陷阱是須要注意的。消耗資源的操做盡可能少用,能用小砍刀辦到的事情,何必屠龍刀,Spark中比較消耗資源的操做有
· 笛卡爾積操做
· 帶shuffle的各類算子
若是可能,用treeReduce代替reduce,儘可能用reduceByKey替代groupByKey,舉個栗子
儘可能避免差生shuffle,何時不發生 Shuffle 當前一個 transformation 已經用相同的 patitioner 把數據分 patition 了,Spark知道如何避免 shuffle
3.3.6 做業並行化
Job之間若是沒有依賴關係,在資源容許的狀況下,固然是能並行就更佳,能高併發的,高吞吐量的時候那還要等什麼,除非你不但願活早點幹完。Job之間並行注意2點
· 啓動FAIR調度器:spark.scheduler.mode=fait
· 將action相關操做放到單獨線程中
3.4 做業參數調優
3.4.1 設置合適的資源量
實際開發過程當中很差把控須要啓動多少個Executor,每一個Executor多少個內存要在測試環境不斷的嘗試,每一個應用程序都不同,要根據實際針對性的調整。須要把握幾點
· Executor數目並不是越多越好
· Task數目不該小於Executor的core總數,要否則有些Executor就白啓動了,跑空任務
假設有1G的數據,默認8個task,是啓動8個Executor,一個Executor1核,仍是啓動4個Executor,一個Executor2核,這個須要手動調試去觀察和比較。要一個個的去試試才知道了,沒有固定的方式方法。
3.4.2 設置合理的JVM
JVM調優用的比較多的是調整GC策略,如何調整垃圾的回收機制主要是Executor的垃圾回收機制。能夠爲spark定製與Hadoop不一樣的jdk版本,前提是各個節點要安裝了 ,經過Spark Web UI能夠查看GC消耗的時間。JVM參數設置主要有2個對象:driver和executor。而且提交的方式不同,參數設置和程序讀取的地方也不同。
1. Driver的JVM參數
yarn-client模式: (1)-Xmx,-Xms,默認讀取spark-env.sh,文件中的SPARK_DRIVER_MEMORY值,-Xmx,-Xms值同樣大小;(2)PermSize,默認讀取spark-class文件中的JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"值;(3)GC方式,默認讀取的是spark-class文件中的JAVA_OPTS
yarn-cluster模式:(1) -Xmx,-Xms讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的JVM參數值;(2)PermSize,讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的JVM參數值。(3)取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的參數值
上值最後都可被spark-submit工具中的--driver-java-options參數覆蓋。
2.Executor的JVM參數
-Xmx,-Xms,若是是yarn-client模式,則默認讀取spark-env文件中的SPARK_EXECUTOR_MEMORY值,-Xmx,-Xms值同樣大小;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。
PermSize,兩種模式都是讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。GC方式,兩種模式都是讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。
3.JVM參數說明
-Xms:初始堆的大小,默認是物理內存的1/64
-Xmx:最大堆大小,默認物理內存的1/4
-Xmn:年輕代大小
-XX:PermSize:持久化初始值,默認是物理內存的1/64
-XX:MaxPermSize:設置持久代最大值,默認物理內存的1/4
-XX:+UseConcMarkSweepGC,使用CMS內存收集
3.4.3 啓用更高效的序列化方法
序列化經常使用於網絡傳輸和數據持久化以便於存儲和傳輸,分佈式集羣有大量的網絡傳輸和數據存儲,序列化的重要性不言而喻。
若是序列化格式序列化過程緩慢,或者須要佔用字節不少,都會大大拖慢總體的計算效率。一般,序列化都是Spark應用優化時首先須要關注的地方。Spark着眼於要達到便利性(容許你在計算過程當中使用任何Java類型)和性能的一個平衡。Spark主要提供了兩個序列化庫:
Spark主要提供了兩個序列化庫:
Java serialization: 默認狀況,Spark使用Java自帶的ObjectOutputStream 框架來序列化對象,這樣任何實現了 java.io.Serializable 接口的對象,都能被序列化。同時,你還能夠經過擴展 java.io.Externalizable 來控制序列化性能。Java序列化很靈活但性能較差,同時序列化後佔用的字節數也較多。
Kryo serialization: Spark還可使用Kryo 庫(版本2)提供更高效的序列化格式。Kryo的序列化速度和字節佔用都比Java序列化好不少(一般是10倍左右),但Kryo不支持全部實現了Serializable 接口的類型,它須要你在程序中 register 須要序列化的類型,以獲得最佳性能。
要切換到使用 Kryo,你能夠在 SparkConf 初始化的時候調用 conf.set(「spark.serializer」, 「org.apache.spark.serializer.KryoSerializer」)。這個設置不只控制各個worker節點之間的混洗數據序列化格式,同時還控制RDD存到磁盤上的序列化格式。目前,Kryo不是默認的序列化格式,由於它須要你在使用前註冊須要序列化的類型,不過咱們仍是建議在對網絡敏感的應用場景下使用Kryo。
3.4.4 增大off head內存
spark爲了引擎更高效,用了部分堆外內存,這種類型的內存用JVM的參數就很難控制住,內存不夠會被殺掉。可能不是jvm的內存不夠了,多是對外內存不足,設置overhead參數
memory*0.1。Spark On YARN,Executor常常被YARN殺掉報的錯以下所示:
· 解決方法:增大overhead內存大小,默認是內存的10%
driver:spark.yarn.driver.memeoryOverhead
executor:spark.yarn.executor.memoryOverhead
3.4.5 shuffle參數調優
1) shuffle實現的選擇
Hash-based Shuffle:每一個executor產生R個文件
Sorted-based Shuffle(默認實現):每一個Map Task產生一個文件,更省內存的實現
如何選擇?reduce task數目較多時,選擇sort-based實現,修改spark.shuffle.manager,選擇hash或者sort,shuffle的詳細能夠參考上一篇文章
2)默認狀況下
reduce task收到的數據會存到內存(HashTable)中,防止reduce task的OOM能夠將spark.shuffle.spill設爲true。
spill條件,可經過spark.shuffle.memeoryFraction設置,默認是0.3。
3.4.6 設置reduce task數目
Reduce Task數目太小,運行過慢,且可能致使OOM
Reduce Task數目過大,產生較多小任務,啓動和調度開銷增大
顯示設置reduce task數目,好比groupByKey,reduceByKey等均提供了設置參數
默認修改參數值spark.default.parallelism,模式是跟前一個階段一致。
3.4.7 使用spark sql實現
愈來愈多的程序會採用spark sql編寫,而不是spark core API,緣由是
1)更加簡單,DataFrame/Dataset是更高級的API,而RDD API是比較低級
2)更加高效,spark sql自帶了優化器,能夠自動優化你的程序
4.調優案例
略,後面實操了一遍補上
5.參考資料
1.http://www.csdn.net/article/2015-07-08/2825160 Spark性能調優
2.http://blog.csdn.net/xiaolang85/article/details/51705088
3.http://blog.csdn.net/wuxb_2000/article/details/52870198
4..http://www.cnblogs.com/redcreen/archive/2011/05/04/2037057.html,JVM系列三:JVM參數設置、分析
5.董西成ppt