Spark學習——性能調優(三)

其餘更多java基礎文章:
java基礎學習(目錄)java


繼續上一篇Spark學習——性能調優(二)的講解算法

使用MapPartition提高性能

何時比較適合用MapPartitions系列操做?

數據量不是特別大的時候,均可以用這種MapPartitions系列操做,性能仍是很是不錯的,是有提高的。緩存

在項目中,本身先去估算一下RDD的數據量,以及每一個partition的量,還有本身分配給每一個executor的內存資源。看看一會兒內存容納全部的partition數據,行不行。若是行,能夠試一下,能跑通就好。性能確定是有提高的。bash

可是試了一下之後,發現,不行,OOM了,那就放棄吧。網絡

filter事後使用coalesce減小分區數量

默認狀況下,通過了這種filter以後,RDD中的每一個partition的數據量,可能都不太同樣了。(本來每一個partition的數據量多是差很少的) 這可能會致使的問題:

  1. 每一個partition數據量變少了,可是在後面進行處理的時候,仍是要跟partition數量同樣數量的task,來進行處理;有點浪費task計算資源。
  2. 每一個partition的數據量不同,會致使後面的每一個task處理每一個partition的時候,每一個task要處理的數據量就不一樣,這個時候很容易發生數據傾斜。

針對上述的兩個問題,咱們但願應該可以怎麼樣?app

  1. 針對第一個問題,咱們但願能夠進行partition的壓縮吧,由於數據量變少了,那麼partition其實也徹底能夠對應的變少。好比原來是4個partition,如今徹底能夠變成2個partition。那麼就只要用後面的2個task來處理便可。就不會形成task計算資源的浪費。(沒必要要,針對只有一點點數據的partition,還去啓動一個task來計算)post

  2. 針對第二個問題,其實解決方案跟第一個問題是同樣的;也是去壓縮partition,儘可能讓每一個partition的數據量差很少。那麼這樣的話,後面的task分配到的partition的數據量也就差很少。不會形成有的task運行速度特別慢,有的task運行速度特別快。避免了數據傾斜的問題。性能

coalesce算子
主要就是用於在filter操做以後,針對每一個partition的數據量各不相同的狀況,來壓縮partition的數量。減小partition的數量,並且讓每一個partition的數據量都儘可能均勻緊湊。學習

從而便於後面的task進行計算操做,在某種程度上,可以必定程度的提高性能。優化

RDD.filter(XXX).coalesce(100);
複製代碼

使用foreachPartition優化

使用repatition解決Spark SQL低並行度

前說過,並行度是本身能夠調節,或者說是設置的。

一、spark.default.parallelism
二、textFile(),傳入第二個參數,指定partition數量(比較少用)
複製代碼

官方推薦,根據你的application的總cpu core數量(在spark-submit中能夠指定,好比 200個),本身手動設置spark.default.parallelism參數,指定爲cpu core總數的2~3倍。400~600個並行度。

你設置的這個並行度,在哪些狀況下會生效?哪些狀況下,不會生效?

若是你壓根兒沒有使用Spark SQL(DataFrame),那麼你整個spark application默認全部stage的並行度都是你設置的那個參數。(除非你使用coalesce算子縮減過partition數量)

問題來了,若是用Spark SQL,那含有Spark SQL的那個stage的並行度,你無法本身指定。Spark SQL本身會默認根據hive表對應的hdfs文件的block,自動設置Spark SQL查詢所在的那個stage的並行度。你本身經過spark.default.parallelism參數指定的並行度,只會在沒有Spark SQL的stage中生效。

好比你第一個stage,用了Spark SQL從hive表中查詢出了一些數據,而後作了一些transformation操做,接着作了一個shuffle操做(groupByKey);下一個stage,在shuffle操做以後,作了一些transformation操做。hive表,對應了一個hdfs文件,有20個block;你本身設置了spark.default.parallelism參數爲100。

你的第一個stage的並行度,是不受你的控制的,就只有20個task;第二個stage,纔會變成你本身設置的那個並行度,100。

問題在哪裏?

Spark SQL默認狀況下,它的那個並行度,我們無法設置。可能致使的問題,也可能沒什麼問題。Spark SQL所在的那個stage中,後面的那些transformation操做,可能會有很是複雜的業務邏輯,甚至說複雜的算法。若是你的Spark SQL默認把task數量設置的不多,20個,而後每一個task要處理爲數很多的數據量,而後還要執行特別複雜的算法。

這個時候,就會致使第一個stage的速度,特別慢。第二個stage,刷刷刷,很是快。

如何解決
repartition算子,你用Spark SQL這一步的並行度和task數量,確定是沒有辦法去改變了。可是呢,能夠將你用Spark SQL查詢出來的RDD,使用repartition算子,去從新進行分區,此時能夠分區成多個partition,好比從20個partition,分區成100個。

而後呢,從repartition之後的RDD,再日後,並行度和task數量,就會按照你預期的來了。就能夠避免跟Spark SQL綁定在一個stage中的算子,只能使用少許的task去處理大量數據以及複雜的算法邏輯。

return dataDF.javaRDD().repartition(1000);
複製代碼

使用reduceByKey本地聚合

用reduceByKey對性能的提高:

  1. 在本地進行聚合之後,在map端的數據量就變少了,減小磁盤IO。並且能夠減小磁盤空間的佔用。
  2. 下一個stage,拉取數據的量,也就變少了。減小網絡的數據傳輸的性能消耗。
  3. 在reduce端進行數據緩存的內存佔用變少了。
  4. reduce端,要進行聚合的數據量也變少了。
相關文章
相關標籤/搜索