參數配置java
一、spark-env.sh數據庫
二、程序經過SparkConf或System.setPropertyapache
性能觀察與日誌數組
1)Web UI。
2)Driver程序控制臺日志。
3)logs文件夾下日誌。
4)work文件夾下日誌。
5)Profiler工具。緩存
調度與分區優化安全
1.小分區合併性能優化
頻繁的過濾或者過濾掉的數據量過大就會產生問題,形成大量小分區的產生。Spark是每一個數據分區都會分配一個任務執行,若是任務過多,則每一個任務處理的數據量很小,會形成線程切換開銷大,不少任務等待執行,並行度不高;服務器
解決方式:能夠採用RDD中重分區的函數進行數據緊縮,減小分區數,將小分區合併變爲大分區。
經過coalesce函數來減小分區。這個函數會返回一個含有numPartitions數量個分區的新RDD,即將整個RDD重分區。
當分區由10000重分區到100時,因爲先後兩個階段的分區是窄依賴的,因此不會產生Shuffle的操做。
可是若是分區數量急劇減小,如極端情況從10000重分區爲一個分區時,就會形成一個問題:數據會分佈到一個節點上進行計算,徹底沒法開掘集羣並行計算的能力。爲了規避這個問題,能夠設置shuffle=true網絡
因爲Shuffle能夠分隔Stage,這就保證了上一階段Stage中的上游任務還是10000個分區在並行計算。若是不加Shuffle,則兩個上下游的任務合併爲一個Stage計算,這個Stage便會在1個分區情況下進行並行計算。數據結構
另外一個需求,即當前的每一個分區數據量過大,須要將分區數量增長,以利用並行計算能力,這就須要把Shuffle設置爲true,而後執行coalesce函數,將分區數增大,在這個過程當中,默認使用Hash分區器將數據進行重分區。
reparition本質上就是調用的coalesce方法。所以若是用戶不想進行Shuffle,就需用coalese配置重分區,爲了方便起見,能夠直接用repartition進行重分區。
2.傾斜問題
傾斜(skew)問題是分佈式大數據計算中的重要問題,傾斜有數據傾斜和任務傾斜兩種狀況,數據傾斜致使的結果即爲任務傾斜,在個別分區上,任務執行時間過長。當少許任務處理的數據量和其餘任務差別過大時,任務進度長時間維持在99%(或100%),此時,任務監控頁面中有少許(1個或幾個)reduce子任務
未完成。單一reduce的記錄數與平均記錄數差別過大,最長時長遠大於平均時長,常可能達到3倍甚至更多
數據傾斜
產生數據傾斜的緣由大體有如下幾種。
1)key的數據分佈不均勻(通常是分區key取得很差或者分區函數設計得很差)。
2)業務數據自己就會產生數據傾斜(像TPC-DS爲了模擬真實環境負載特地用有傾斜的數據進行測試)。
3)結構化數據表設計問題。
4)某些SQL語句會產生數據傾斜
任務傾斜
任務傾斜的緣由較爲隱蔽,通常就是那臺機器的正在執行的Executor執行時間過長,由於服務器架構,或JVM,也多是來自線程池的問題,等等。
解決方式:能夠經過考慮在其餘並行處理方式中間加入彙集運算,以減小傾斜數據量。
數據傾斜通常能夠經過在業務上將極度不均勻的數據剔除解決。這裏其實還有Skew Join的一種處理方式,將數據分兩個階段處理,傾斜的key數據做爲數據源處理,剩下的key的數據再作一樣的處理。兩者分開作一樣的處理
任務執行速度傾斜
產生緣由多是數據傾斜,也多是執行任務的機器在架構,OS、JVM各節點配置不一樣或其餘緣由。
解決方式:設置spark.speculation=true把那些執行時間過長的節點去掉,從新調度分配任務,這個方式和Hadoop MapReduce的speculation是相通的。同時能夠配置多長時間來推測執行,spark.speculation.interval用來設置執行間隔進行配置。在源碼中默認是配置的100
解決方案
1)增大任務數,減小每一個分區數據量:增大任務數,也就是擴大分區量,同時減小單個分區的數據量。
2)對特殊key處理:空值映射爲特定Key,而後分發到不一樣節點,對空值不作處理。
3)廣播。
①小數據量表直接廣播。
②數據量較大的表能夠考慮切分爲多個小表,多階段進行Map Side Join。
4)彙集操做能夠Map端彙集部分結果,而後Reduce端合併,減小Reduce端壓力。
5)拆分RDD:將傾斜數據與原數據分離,分兩個Job進行計算。
3.並行度
Spark會根據文件的大小,默認配置Map階段任務數量,也就是分區數量(也能夠經過SparkContext.textFile等方法進行配置)。而Reduce的階段任務數量配置能夠有兩種方式
第一種方式:寫函數的過程當中經過函數的第二個參數進行配置
第二種方式:經過配置spark.default.parallelism來進行配置。它們的本質原理一致,均是控制Shuffle過程的默認任務數量
Spark官方推薦選擇每一個CPU Core分配2~3個任務,即cpu corenum*2(或3)數量的並行度。
若是並行度過高,任務數太多,就會產生大量的任務啓動和切換開銷。
若是並行度過低,任務數過小,就會沒法發揮集羣的並行計算能力,任務執行過慢,同時可能會形成內存combine數據過多佔用內存,而出現內存溢出(out of memory)的異常。
3. DAG調度執行優化
1)同一個Stage中儘可能容納更多的算子,以減小Shuffle的發生。
因爲Stage中的算子是按照流水線方式執行的,因此更多的Transformation放在一塊兒執行可以減小Shuffle的開銷和任務啓動和切換的開銷
2)複用已經cache過的數據。可使用cache和persist函數將數據緩存在內存,其實能夠按單機的方式理解,存儲仍然是多級存儲,數據存儲在訪問快的存儲設備中,提升快速存儲命中率會提高整個應用程序的性能
內存存儲優化
1.JVM調優
內存調優過程的大方向上有三個方向是值得考慮的。
1)應用程序中對象所佔用的內存空間。
2)訪問這些內存對象的代價。
3)垃圾回收的開銷。
一般情況下,Java的對象訪問速度是很快的,可是相對於對象中存儲的原始數據,Java對象總體會耗費2~5倍的內存空間。
1)不一樣的Java對象都會有一個對象頭(object header),這個對象頭大約爲16byte,包含指向這個對象的類的指針等信息,對一些只有少許數據的對象,這是極爲不經濟的。例如,只有一個Int屬性的對象,這個頭的信息所佔空間會大於對象的數據空間。
2)Java中的字符串(String)佔用40byte空間。String的內存是將真正字符串的信息存儲在一個char數組中,而且還會存儲其餘的信息,如字符串長度,同時若是採用UTF-16編碼,一個字符就佔用2byte的空間。綜合以上,一個10字符的字符串會佔用超過60byte的內存空間。
3)經常使用的一些集合類,如LinkedList等是採用鏈式數據結構存儲的,對底層的每一個數據項進行了包裝,這個對象不僅存儲數據,還會存儲指向其餘數據項的指針,這些指針也會產生數據空間的佔用和開銷。
4)集合類中的基本數據類型經常採用一些裝箱的對象存儲,如java.lang.Ingeger。裝箱與拆箱的機制在不少程序設計語言中都有,Java中裝箱意味着將這些基本數據類型包裝爲對象存儲在內存的Java堆中,而拆箱意味着將堆中對象轉換爲棧中存儲的數據。
計算數據在集羣內存佔用的空間的大小的最好方法是建立一個RDD,讀取這些數據,將數據加載到cache,在驅動程序的控制檯查看SparkContext的日誌。這些日誌信息會顯示每一個分區佔用多少空間
調整數據結構
減小對象嵌套;使用數字的ID或者枚舉對象;序列化存儲RDD;
當內存小於32GB時,官方推薦配置JVM參數-XX:+UseCompressedOops,進而將指針由8byte壓縮爲4byte。OOP的全稱是ordinary object pointer,即普通對象指針。在64位HotSpot中,OOP使用32位指針,默認64位指針會比32位指針使用的內存多1.5倍,啓用CompressOops後,會壓縮的對象以下。
①每一個Class的屬性指針(靜態成員變量)。
②每一個對象的屬性指針。
③普通對象數組每一個元素的指針。
可是,指向PermGen的Class對象指針、本地變量、堆棧元素、入參、返回值、NULL指針不會被壓縮。能夠經過配置文件spark-env.sh配置這個參數,從而在Spark中啓用JVM指針壓縮。
JVM垃圾回收(GC)調優
當Spark程序產生大數據量的RDD時,JVM的垃圾回收就會成爲一個問題。當Spark任務的工做內存空間和RDD的緩存數據空間產生干擾時,垃圾回收一樣會成爲一個問題,能夠經過控制分給RDD的緩存來緩解這個問題。GC來講,一個重要的配置參數就是內存給RDD用於緩存的空間大小。默認狀況下,Spark用配置好的Executor 60%的內存(spark.executor.memory)緩存RDD。這就意味着40%的剩餘內存空間可讓Task在執行過程當中緩存新建立的對象。在有些狀況下,用戶的任務變慢,並且JVM頻繁地進行垃圾回收或者出現內存溢出(out of memory異常),這時能夠調整這個百分比參數爲50%。這個百分比參數能夠經過配置spark-env.sh中的變量spark.storage.memoryFraction=0.5進行配置。同時結合序列化的緩存存儲對象減小內存空間佔用,將會更加有效地緩解垃圾回收問題
度量GC的影響:-verbose:gc-XX:+PrintGCDetails-XX:+PrintGCTime-Stamps
若是任務是從HDFS讀取數據,內存空間的佔用能夠經過從HDFS讀取的數據塊大小和數量估計。須要注意的是,通常狀況下,壓縮的數據壓縮以後一般爲原來數據塊大小的2~3倍。所以若是一個JVM中要執行3~4個任務,同時HDFS的數據塊大小是64MB,就能夠估計須要的Eden代大小是4×3×64MB大小的空間。
OOM的緣由還極可能是Shuffle類操做符在任務執行過程當中在內存創建的Hash表過大。在這種狀況下,能夠經過增長任務數,即分區數來提高並行性度,減少每一個任務的輸入數據,減小內存佔用來解決
2.磁盤臨時目錄空間優化
配置參數spark.local.dir可以配置Spark在磁盤的臨時目錄,默認是/tmp目錄。在Spark進行Shuffle的過程當中,中間結果會寫入Spark在磁盤的臨時目錄中,或者當內存不可以徹底存儲RDD時,內存放不下的數據會寫到配置的磁盤臨時目錄中。這個臨時目錄設置太小會形成No space left on device異常。也能夠配置多個盤塊spark.local.dir=/mn1/spark,/mnt2/spar,/mnt3/spark來擴展Spark的磁盤臨時目錄,讓更多的數據能夠寫到磁盤,加快I/O速度
網絡傳輸優化
1.大任務分發優化
在任務的分發過程當中會序列化任務的元數據信息,以及任務須要的jar和文件。任務的分發是經過AKKA庫中的Actor模型之間的消息傳送的。由於Spark採用了Scala的函數式風格,傳遞函數的變量引用採用閉包方式傳遞,因此當須要傳輸的數據經過Task進行分發時,會拖慢總體的執行速度。配置參數spark.akka.frameSize(默認buffer的大小爲10MB)能夠緩解過大的任務形成AKKA緩衝區溢出的問題,可是這個方式並不能解決本質的問題。
spark.akka.frameSize控制Spark框架內使用的AKKA框架中,Actor通訊消息的最大容量(如任務(Task)的輸出結果),由於整個Spark集羣的消息傳遞都是經過Actor進行的,默認爲10MB。當處理大規模數據時,任務的輸出可能會大於這個值,須要根據實際數據設置一個更高的值。若是是這個值不夠大而產生的錯誤,則能夠從Worker節點的日誌中排查。一般Worker上的任務失敗後,主節點Master的運行日誌上提示「Lost TID:」,可經過查看失敗的Worker日誌文件$SPARK_HOME/work/目錄下面的日誌文件中記錄的任務的Serializedsize of result是否超過10MB來肯定通訊數據超過AKKA的Buffer異常
2.Broadcast在調優場景的使用
Spark的Broadcast(廣播)變量對數據傳輸進行優化,經過Broadcast變量將用到的大數據量數據進行廣播發送,能夠提高總體速度。Broadcast主要用於共享Spark在計算過程當中各個task都會用到的只讀變量,Broadcast變量只會在每臺計算機器上保存一份,而不會每一個task都傳遞一份,這樣就大大節省了空間,節省空間的同時意味着傳輸時間的減小,效率也高。在Spark的HadoopRDD實現中,就採用Broadcast進行Hadoop JobConf的傳輸。官方文檔的說法是,當task大於20KB時,能夠考慮使用Broadcast進行優化,還能夠在控制檯日誌看到任務是多大,進而決定是否優化。還須要注意,每次迭代所傳輸的Broadcast變量都
會保存在從節點Worker的內存中,直至內存不夠用,Spark纔會把舊的Broadcast變量釋放掉,不能提早進行釋放。BroadCast變量有一些應用場景,如MapSideJoin中的小表進行廣播、機器學習中須要共享的矩陣的廣播等
3.Collect結果過大優化
當收集的最終結果數據過大時,能夠將數據存儲在分佈式的HDFS或其餘分佈式持久化層上。將數據分佈式地存儲,能夠減少單機數據的I/O開銷和單機內存存儲壓力。或者當數據不太大,但會超出AKKA傳輸的Buffer大小時,須要增長AKKA Actor的buffer,能夠經過配置參數spark.akka.frameSize(默認大小爲10MB)進行調整。
序列化與壓縮
1.經過序列化優化
序列化的本質做用是將鏈式存儲的對象數據,轉化爲連續空間的字節數組存儲的數據
1)對象能夠以數據流方式進行進程間傳輸(包含網絡傳輸),一樣能夠以連續空間方式存儲到文件或者其餘持久化層中。
2)連續空間的存儲意味着能夠進行壓縮。這樣減小數據存儲空間和傳輸時間。
3)減小了對象自己的元數據信息和基本數據類型的元數據信息的開銷。
4)對象數減小也會減小GC的開銷和壓力。
經過spark.serializer="org.apache.spark.serializer.KryoSerializer"來配置是否使用Kyro進行序列化
Kyro相對於Java序列化庫可以更加快速和緊湊地進行序列化(一般有10倍的性能優點),可是Kyro並不能支持全部可序列化的類型,若是對程序有較高的性能優化要求,就須要自定義註冊類。官方推薦對於網絡傳輸密集型(network-intensive)計算,採用Kyro序列化性能更好。
若是對象佔用空間很大,須要增長Kryo的緩衝區容量,就須要增長配置項spark.kryoserializer.buffer.mb的數值,默認是2MB,但參數值應該足夠大,以便容納最大的序列化後對象的傳輸。若是用戶不註冊自定義的類,Kyro仍能夠運行,可是它會針對每一個對象存儲一次整個類名,這樣會形成很大的空間浪費。
2.經過壓縮方式優化
在Spark應用中,有很大一部分做業是I/O密集型的。數據壓縮對I/O密集型的做業帶來性能的大大提高,可是若是用戶的jobs做業是CPU密集型的,那麼再壓縮就會下降性能,這就要判斷做業的類型,權衡是否要壓縮數據。
Spark目前支持LZF和Snappy兩種解壓縮方式。Snappy提供了更高的壓縮速度,LZF提供了更高的壓縮比,用戶能夠根據具體的需求選擇壓縮方式
批處理優化
調用外部資源,如數據庫鏈接等,這些鏈接經過JDBC或者ODBC與外部數據源進行交互。將單條記錄寫轉化爲數據庫的批量寫,每一個分區的數據寫一次,這樣能夠利用數據庫的批量寫優化減小開銷和減輕數據庫壓力
不然,由於整個RDD的數據項很大,整個集羣會在短期內產生高併發寫入數據庫的操做,對數據庫壓力很大,將產生很大的寫入開銷
reduce和reduceByKey的優化
reduce是Action操做,reduceByKey是Transformation操做
reduce是一種聚合函數,能夠把各個任務的執行結果聚集到一個節點,還能夠指定自定義的函數傳入reduce執行。Spark也對reduce的實現進行了優化,能夠把同一個任務內的結果先在本地Worker節點執行聚合函數,再把結果傳給Driver執行聚合。但最終數據仍是要彙總到主節點,並且reduce會把接收到的數據保存到內存中,直到全部任務都完成爲止。所以,當任務不少,任務的結果數據又比較大時Driver容易形成性能瓶頸,這樣就應該考慮儘可能避免reduce的使用,而將數據轉化爲Key-Value對,並使用reduceByKey實現邏輯,使計算變爲分佈式計算。
reduceByKey也是聚合操做,是根據key聚合對應的value。一樣的,在每個mapper把數據發送給reducer前,會在Map端本地先合併(相似於MapReduce中的Combiner)。與reduce不一樣的是,reduceByKey不是把數據聚集到Driver節點,是分佈式進行的,所以不會存在reduce那樣的性能瓶頸。
Shuffle操做符的內存使用
有時候OOM並非由於內存大小不可以容納RDD,而是由於執行任務中使用的數據集合太大(如groupByKey)。Spark的Shuffle操做符(sortByKey、groupByKey、reduceByKey、join等均可以算是Shuffle操做符,由於這些操做會引起Shuffle)在執行分組操做的過程當中,會在每一個任務執行過程當中,在內存建立Hash表來對數據進行分組,而這個Hash表在不少狀況下一般變得很大。最簡單的一種解決 方案就是增長並行度,即增長任務數量和分區數量。這樣每輪次每一個Executor執行的任務數是固定的,每一個任務接收的輸入數據變少會減小Hash表的大小,佔用的內存就會減小,從而避免內存溢出OOM的發生。 Spark經過多任務複用Worker的JVM,每一個節點全部任務的執行是在同一個JVM上的線程池中執行的,這樣就減小了線程的啓動開銷,能夠高效地支持單個任務200ms的執行時間。經過這個機制,能夠安全地將任務數量的配置擴展到超過集羣的總體的CPU core數,而不會出現問題。