不廢話,直接進入正題!java
在對RDD進行算子時,要避免相同的算子和計算邏輯之下對RDD進行重複的計算,以下圖所示:面試
對上圖中的RDD計算架構進行修改,獲得以下圖所示的優化結果:算法
獲取到初始RDD後,應該考慮儘早地過濾掉不須要的數據,進而減小對內存的佔用,從而提高Spark做業的運行效率。數據庫
本文首發於公衆號:五分鐘學大數據,歡迎圍觀apache
當咱們將一個文本文件讀取爲 RDD 時,輸入的每一行都會成爲RDD的一個元素。數組
也能夠將多個完整的文本文件一次性讀取爲一個pairRDD,其中鍵是文件名,值是文件內容。緩存
val input:RDD[String] = sc.textFile("dir/*.log")
若是傳遞目錄,則將目錄下的全部文件讀取做爲RDD。文件路徑支持通配符。網絡
可是這樣對於大量的小文件讀取效率並不高,應該使用 wholeTextFiles
返回值爲RDD[(String, String)],其中Key是文件的名稱,Value是文件的內容。架構
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])
wholeTextFiles讀取小文件:app
val filesRDD: RDD[(String, String)] = sc.wholeTextFiles("D:\\data\\files", minPartitions = 3) val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n")) val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" ")) wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)
map(_....) 表示每個元素
mapPartitions(_....) 表示每一個分區的數據組成的迭代器
普通的map算子對RDD中的每個元素進行操做,而mapPartitions算子對RDD中每個分區進行操做。
若是是普通的map算子,假設一個partition有1萬條數據,那麼map算子中的function要執行1萬次,也就是對每一個元素進行操做。
若是是mapPartition算子,因爲一個task處理一個RDD的partition,那麼一個task只會執行一次function,function一次接收全部的partition數據,效率比較高。
好比,當要把RDD中的全部數據經過JDBC寫入數據,若是使用map算子,那麼須要對RDD中的每個元素都建立一個數據庫鏈接,這樣對資源的消耗很大,若是使用mapPartitions算子,那麼針對一個分區的數據,只須要創建一個數據庫鏈接。
mapPartitions算子也存在一些缺點:對於普通的map操做,一次處理一條數據,若是在處理了2000條數據後內存不足,那麼能夠將已經處理完的2000條數據從內存中垃圾回收掉;可是若是使用mapPartitions算子,但數據量很是大時,function一次處理一個分區的數據,若是一旦內存不足,此時沒法回收內存,就可能會OOM,即內存溢出。
所以,mapPartitions算子適用於數據量不是特別大的時候,此時使用mapPartitions算子對性能的提高效果仍是不錯的。(當數據量很大的時候,一旦使用mapPartitions算子,就會直接OOM)
在項目中,應該首先估算一下RDD的數據量、每一個partition的數據量,以及分配給每一個Executor的內存資源,若是資源容許,能夠考慮使用mapPartitions算子代替map。
rrd.foreache(_....) 表示每個元素
rrd.forPartitions(_....) 表示每一個分區的數據組成的迭代器
在生產環境中,一般使用foreachPartition算子來完成數據庫的寫入,經過foreachPartition算子的特性,能夠優化寫數據庫的性能。
若是使用foreach算子完成數據庫的操做,因爲foreach算子是遍歷RDD的每條數據,所以,每條數據都會創建一個數據庫鏈接,這是對資源的極大浪費,所以,對於寫數據庫操做,咱們應當使用foreachPartition算子。
與mapPartitions算子很是類似,foreachPartition是將RDD的每一個分區做爲遍歷對象,一次處理一個分區的數據,也就是說,若是涉及數據庫的相關操做,一個分區的數據只須要建立一次數據庫鏈接,以下圖所示:
使用了foreachPartition 算子後,能夠得到如下的性能提高:
在生產環境中,所有都會使用foreachPartition算子完成數據庫操做。foreachPartition算子存在一個問題,與mapPartitions算子相似,若是一個分區的數據量特別大,可能會形成OOM,即內存溢出。
在Spark任務中咱們常常會使用filter算子完成RDD中數據的過濾,在任務初始階段,從各個分區中加載到的數據量是相近的,可是一旦進過filter過濾後,每一個分區的數據量有可能會存在較大差別,以下圖所示:
根據上圖咱們能夠發現兩個問題:
每一個partition的數據量變小了,若是還按照以前與partition相等的task個數去處理當前數據,有點浪費task的計算資源;
每一個partition的數據量不同,會致使後面的每一個task處理每一個partition數據的時候,每一個task要處理的數據量不一樣,這頗有可能致使數據傾斜問題。
如上圖所示,第二個分區的數據過濾後只剩100條,而第三個分區的數據過濾後剩下800條,在相同的處理邏輯下,第二個分區對應的task處理的數據量與第三個分區對應的task處理的數據量差距達到了8倍,這也會致使運行速度可能存在數倍的差距,這也就是數據傾斜問題。
針對上述的兩個問題,咱們分別進行分析:
針對第一個問題,既然分區的數據量變小了,咱們但願能夠對分區數據進行從新分配,好比將原來4個分區的數據轉化到2個分區中,這樣只須要用後面的兩個task進行處理便可,避免了資源的浪費。
針對第二個問題,解決方法和第一個問題的解決方法很是類似,對分區數據從新分配,讓每一個partition中的數據量差很少,這就避免了數據傾斜問題。
那麼具體應該如何實現上面的解決思路?咱們須要coalesce算子。
repartition與coalesce均可以用來進行重分區,其中repartition只是coalesce接口中shuffle爲true的簡易實現,coalesce默認狀況下不進行shuffle,可是能夠經過參數進行設置。
假設咱們但願將本來的分區個數A經過從新分區變爲B,那麼有如下幾種狀況:
A > B(多數分區合併爲少數分區)
A與B相差值不大
此時使用coalesce便可,無需shuffle過程。
A與B相差值很大
此時可使用coalesce而且不啓用shuffle過程,可是會致使合併過程性能低下,因此推薦設置coalesce的第二個參數爲true,即啓動shuffle過程。
A < B(少數分區分解爲多數分區)
此時使用repartition便可,若是使用coalesce須要將shuffle設置爲true,不然coalesce無效。
咱們能夠在filter操做以後,使用coalesce算子針對每一個partition的數據量各不相同的狀況,壓縮partition的數量,並且讓每一個partition的數據量儘可能均勻緊湊,以便於後面的task進行計算操做,在某種程度上可以在必定程度上提高性能。
注意:local模式是進程內模擬集羣運行,已經對並行度和分區數量有了必定的內部優化,所以不用去設置並行度和分區數量。
Spark做業中的並行度指各個stage的task的數量。
若是並行度設置不合理而致使並行度太低,會致使資源的極大浪費,例如,20個Executor,每一個Executor分配3個CPU core,而Spark做業有40個task,這樣每一個Executor分配到的task個數是2個,這就使得每一個Executor有一個CPU core空閒,致使資源的浪費。
理想的並行度設置,應該是讓並行度與資源相匹配,簡單來講就是在資源容許的前提下,並行度要設置的儘量大,達到能夠充分利用集羣資源。合理的設置並行度,能夠提高整個Spark做業的性能和運行速度。
Spark官方推薦,task數量應該設置爲Spark做業總CPU core數量的2~3倍。之因此沒有推薦task數量與CPU core總數相等,是由於task的執行時間不一樣,有的task執行速度快而有的task執行速度慢,若是task數量與CPU core總數相等,那麼執行快的task執行完成後,會出現CPU core空閒的狀況。若是task數量設置爲CPU core總數的2~3倍,那麼一個task執行完畢後,CPU core會馬上執行下一個task,下降了資源的浪費,同時提高了Spark做業運行的效率。
Spark做業並行度的設置以下:
val conf = new SparkConf().set("spark.default.parallelism", "500")
原則:讓 cpu 的 Core(cpu 核心數) 充分利用起來,
若有100個 Core,那麼並行度能夠設置爲200~300。
咱們知道 Spark 中有並行度的調節策略,可是,並行度的設置對於Spark SQL是不生效的,用戶設置的並行度只對於Spark SQL之外的全部Spark的stage生效。
Spark SQL的並行度不容許用戶本身指定,Spark SQL本身會默認根據hive表對應的HDFS文件的split個數自動設置Spark SQL所在的那個stage的並行度,用戶本身通 spark.default.parallelism 參數指定的並行度,只會在沒Spark SQL的stage中生效。
因爲Spark SQL所在stage的並行度沒法手動設置,若是數據量較大,而且此stage中後續的transformation操做有着複雜的業務邏輯,而Spark SQL自動設置的task數量不多,這就意味着每一個task要處理爲數很多的數據量,而後還要執行很是複雜的處理邏輯,這就可能表現爲第一個有Spark SQL的stage速度很慢,然後續的沒有Spark SQL的stage運行速度很是快。
爲了解決Spark SQL沒法設置並行度和task數量的問題,咱們可使用repartition算子。
repartition 算子使用先後對比圖以下:
Spark SQL這一步的並行度和task數量確定是沒有辦法去改變了,可是,對於Spark SQL查詢出來的RDD,當即使用repartition算子,去從新進行分區,這樣能夠從新分區爲多個partition,從repartition以後的RDD操做,因爲再也不涉及Spark SQL,所以stage的並行度就會等於你手動設置的值,這樣就避免了Spark SQL所在的stage只能用少許的task去處理大量數據並執行復雜的算法邏輯。使用repartition算子的先後對好比上圖所示。
reduceByKey相較於普通的shuffle操做一個顯著的特色就是會進行map端的本地聚合,map端會先對本地的數據進行combine操做,而後將數據寫入給下個stage的每一個task建立的文件中,也就是在map端,對每個key對應的value,執行reduceByKey算子函數。
reduceByKey算子的執行過程以下圖所示:
使用reduceByKey對性能的提高以下:
基於reduceByKey的本地聚合特徵,咱們應該考慮使用reduceByKey代替其餘的shuffle算子,例如groupByKey。
groupByKey與reduceByKey的運行原理以下圖1和圖2所示:
根據上圖可知,groupByKey不會進行map端的聚合,而是將全部map端的數據shuffle到reduce端,而後在reduce端進行數據的聚合操做。因爲reduceByKey有map端聚合的特性,使得網絡傳輸的數據量減少,所以效率要明顯高於groupByKey。
Spark持久化在大部分狀況下是沒有問題的,可是有時數據可能會丟失,若是數據一旦丟失,就須要對丟失的數據從新進行計算,計算完後再緩存和使用,爲了不數據的丟失,能夠選擇對這個RDD進行checkpoint,也就是將數據持久化一份到容錯的文件系統上(好比HDFS)。
一個RDD緩存並checkpoint後,若是一旦發現緩存丟失,就會優先查看checkpoint數據存不存在,若是有,就會使用checkpoint數據,而不用從新計算。也便是說,checkpoint能夠視爲cache的保障機制,若是cache失敗,就使用checkpoint的數據。
使用checkpoint的優勢在於提升了Spark做業的可靠性,一旦緩存出現問題,不須要從新計算數據,缺點在於,checkpoint時須要將數據寫入HDFS等文件系統,對性能的消耗較大。
持久化設置以下:
sc.setCheckpointDir(‘HDFS’) rdd.cache/persist(memory_and_disk) rdd.checkpoint
默認狀況下,task中的算子中若是使用了外部的變量,每一個task都會獲取一份變量的複本,這就形成了內存的極大消耗。一方面,若是後續對RDD進行持久化,可能就沒法將RDD數據存入內存,只能寫入磁盤,磁盤IO將會嚴重消耗性能;另外一方面,task在建立對象的時候,也許會發現堆內存沒法存放新建立的對象,這就會致使頻繁的GC,GC會致使工做線程中止,進而致使Spark暫停工做一段時間,嚴重影響Spark性能。
假設當前任務配置了20個Executor,指定500個task,有一個20M的變量被全部task共用,此時會在500個task中產生500個副本,耗費集羣10G的內存,若是使用了廣播變量, 那麼每一個Executor保存一個副本,一共消耗400M內存,內存消耗減小了5倍。
廣播變量在每一個Executor保存一個副本,此Executor的全部task共用此廣播變量,這讓變量產生的副本數量大大減小。
在初始階段,廣播變量只在Driver中有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在本身本地的Executor對應的BlockManager中嘗試獲取變量,若是本地沒有,BlockManager就會從Driver或者其餘節點的BlockManager上遠程拉取變量的複本,並由本地的BlockManager進行管理;以後此Executor的全部task都會直接從本地的BlockManager中獲取變量。
對於多個Task可能會共用的數據能夠廣播到每一個Executor上:
val 廣播變量名= sc.broadcast(會被各個Task用到的變量,即須要廣播的變量) 廣播變量名.value//獲取廣播變量
默認狀況下,Spark使用Java的序列化機制。Java的序列化機制使用方便,不須要額外的配置,在算子中使用的變量實現Serializable接口便可,可是,Java序列化機制的效率不高,序列化速度慢而且序列化後的數據所佔用的空間依然較大。
Spark官方宣稱Kryo序列化機制比Java序列化機制性能提升10倍左右,Spark之因此沒有默認使用Kryo做爲序列化類庫,是由於它不支持全部對象的序列化,同時Kryo須要用戶在使用前註冊須要序列化的類型,不夠方便,但從Spark 2.0.0版本開始,簡單類型、簡單類型數組、字符串類型的Shuffling RDDs 已經默認使用Kryo序列化方式了。
Kryo序列化註冊方式的代碼以下:
public class MyKryoRegistrator implements KryoRegistrator{ @Override public void registerClasses(Kryo kryo){ kryo.register(StartupReportLogs.class); } }
配置Kryo序列化方式的代碼以下:
//建立SparkConf對象 val conf = new SparkConf().setMaster(…).setAppName(…) //使用Kryo序列化庫 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); //在Kryo序列化庫中註冊自定義的類集合 conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator");
本文首發於公衆號:五分鐘學大數據,回覆【666】便可得到全套大數據筆面試教程