transformation 操做,相似於MapReduce 中的combiner
算法
val lines = sc.textFile("hdfs://")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val counts = pairs.reduceByKey(_ + _)
counts.collect()
reduceByKey,相較於普通的shuffle操做(好比groupByKey),它的一個特色,就是說,
會進行map端的本地聚合。
對map端給下個stage每一個task建立的輸出文件中,寫數據以前,就會進行本地的combiner操做,
也就是說對每個key,對應的values,都會執行你的算子函數(_ + _)
用reduceByKey對性能的提高:
一、在本地進行聚合之後,在map端的數據量就變少了,減小磁盤IO。並且能夠減小磁盤空間的佔用。
二、下一個stage,拉取數據的量,也就變少了。減小網絡的數據傳輸的性能消耗。
三、在reduce端進行數據緩存的內存佔用變少了。
四、reduce端,要進行聚合的數據量也變少了。
總結:
reduceByKey在什麼狀況下使用呢?
一、很是普通的,好比說,就是要實現相似於wordcount程序同樣的,對每一個key對應的值,
進行某種數據公式或者算法的計算(累加、累乘)
二、對於一些相似於要對每一個key進行一些字符串拼接的這種較爲複雜的操做,能夠本身衡量一下,
其實有時,也是可使用reduceByKey來實現的。可是不太好實現。若是真可以實現出來,
對性能絕對是有幫助的。(shuffle基本上就佔了整個spark做業的90%以上的性能消耗,
主要能對shuffle進行必定的調優,都是有價值的)
咱們的程序沒有那麼去作!可是把這個看成一個課後思考題給你們,看你們能不能對咱們的聚合session
的操做應用上ReduceByKey來提升性能!
sql
算子調優之使用repartition解決Spark SQL低並行度的性能問題
spark.sql.shuffle.partitions 調整DataFrame的shuffle並行度
spark.default.parallelism 調整RDD的shuffle並行度
並行度:以前說過,並行度是本身能夠調節,或者說是設置的。
一、spark.default.parallelism
二、textFile(),傳入第二個參數,指定partition數量(比較少用)
我們的項目代碼中,沒有設置並行度,實際上,在生產環境中,是最好本身設置一下的。
官網有推薦的設置方式,你的spark-submit腳本中,會指定你的application總共要啓動多少個executor,
100個;每一個executor多少個cpu core,2~3個;總共application,有cpu core,200個。
官方推薦,根據你的application的總cpu core數量(在spark-submit中能夠指定,200個),
本身手動設置spark.default.parallelism參數,指定爲cpu core總數的2~3倍。400~600個並行度。600。
承上啓下
你設置的這個並行度,在哪些狀況下會生效?哪些狀況下,不會生效?
若是你壓根兒沒有使用Spark SQL(DataFrame),那麼你整個spark application默認全部stage的並行度
都是你設置的那個參數。(除非你使用coalesce算子縮減過partition數量)
問題來了,Spark SQL,用了。用Spark SQL的那個stage的並行度,你無法本身指定。
Spark SQL本身會默認根據hive表對應的hdfs文件的block,自動設置Spark SQL查詢所在的那個stage的
並行度。你本身經過spark.default.parallelism參數指定的並行度,只會在沒有Spark SQL的stage中生效。
好比你第一個stage,用了Spark SQL從hive表中查詢出了一些數據,而後作了一些transformation操做,
接着作了一個shuffle操做(groupByKey);下一個stage,在shuffle操做以後,
作了一些transformation操做。hive表,對應了一個hdfs文件,有20個block;
你本身設置了spark.default.parallelism參數爲100。
你的第一個stage的並行度,是不受你的控制的,就只有20個task;第二個stage,
纔會變成你本身設置的那個並行度,100。
問題在哪裏?
Spark SQL默認狀況下,它的那個並行度,我們無法設置。可能致使的問題,也許沒什麼問題,
也許頗有問題。Spark SQL所在的那個stage中,後面的那些transformation操做,
可能會有很是複雜的業務邏輯,甚至說複雜的算法。若是你的Spark SQL默認把task數量設置的不多,
20個,而後每一個task要處理爲數很多的數據量,而後還要執行特別複雜的算法。
這個時候,就會致使第一個stage的速度,特別慢。第二個stage,1000個task,刷刷刷,很是快。
解決上述Spark SQL沒法設置並行度和task數量的辦法,是什麼呢?
repartition算子,你用Spark SQL這一步的並行度和task數量,確定是沒有辦法去改變了。可是呢,
能夠將你用Spark SQL查詢出來的RDD,使用repartition算子,去從新進行分區,
此時能夠分區成多個partition,好比從20個partition,分區成100個。
而後呢,從repartition之後的RDD,再日後,並行度和task數量,就會按照你預期的來了。
就能夠避免跟Spark SQL綁定在一個stage中的算子,只能使用少許的task去處理大量數據以及
複雜的算法邏輯。
這裏就頗有可能發生上面說的問題
好比說,Spark SQl默認就給第一個stage設置了20個task,可是根據你的數據量以及算法的複雜度
實際上,你須要1000個task去並行執行
因此說,在這裏,就能夠對Spark SQL剛剛查詢出來的RDD執行repartition重分區操做
數據庫
默認狀況下,通過了這種filter以後,RDD中的每一個partition的數據量,可能都不太同樣了。
(本來每一個partition的數據量多是差很少的)
問題:
一、每一個partition數據量變少了,可是在後面進行處理的時候,仍是要跟partition數量同樣數量的task,
來進行處理;有點浪費task計算資源。
二、每一個partition的數據量不同,會致使後面的每一個task處理每一個partition的時候,
每一個task要處理的數據量就不一樣,這個時候很容易發生什麼問題?
數據傾斜。。。。
好比說,第二個partition的數據量才100;可是第三個partition的數據量是900;
那麼在後面的task處理邏輯同樣的狀況下,不一樣的task要處理的數據量可能差異達到了9倍,
甚至10倍以上;一樣也就致使了速度的差異在9倍,甚至10倍以上。
這樣的話呢,就會致使有些task運行的速度很快;有些task運行的速度很慢。這,就是數據傾斜。
針對上述的兩個問題,咱們但願應該可以怎麼樣?
一、針對第一個問題,咱們但願能夠進行partition的壓縮吧,由於數據量變少了,
那麼partition其實也徹底能夠對應的變少。好比原來是4個partition,如今徹底能夠變成2個partition。
那麼就只要用後面的2個task來處理便可。就不會形成task計算資源的浪費。
(沒必要要,針對只有一點點數據的partition,還去啓動一個task來計算)
二、針對第二個問題,其實解決方案跟第一個問題是同樣的;也是去壓縮partition,
儘可能讓每一個partition的數據量差很少。那麼這樣的話,後面的task分配到的partition的數據量
也就差很少。不會形成有的task運行速度特別慢,有的task運行速度特別快。避免了數據傾斜的問題。
有了解決問題的思路以後,接下來,咱們該怎麼來作呢?實現?
緩存
主要就是用於在filter操做以後,針對每一個partition的數據量各不相同的狀況,來壓縮partition的數量。
減小partition的數量,並且讓每一個partition的數據量都儘可能均勻緊湊。
從而便於後面的task進行計算操做,在某種程度上,可以必定程度的提高性能。
說明一下:
這兒,是對完整的數據進行了filter過濾,過濾出來點擊行爲的數據點擊行爲的數據其實只佔總數據的一小部分(譬如 20%)
因此過濾之後的RDD,每一個partition的數據量,頗有可能跟咱們以前說的同樣,會很不均勻並且數據量確定會變少不少
因此針對這種狀況,仍是比較合適用一下coalesce算子的,在filter事後去減小partition的數量
coalesce(100)
這個就是說通過filter以後再把數據壓縮的比較緊湊,壓縮爲100個數據分片,也就是造成了 100 個 partition
對這個coalesce操做作一個說明
若是運行模式都是local模式,主要是用來測試,因此local模式下,
不用去設置分區和並行度的數量
local模式本身自己就是進程內模擬的集羣來執行,自己性能就很高
並且對並行度、partition數量都有必定的內部的優化
這裏咱們再本身去設置,就有點多此一舉
可是就是跟你們說明一下,coalesce算子的使用,便可
網絡
foreach的寫庫原理
默認的foreach的性能缺陷在哪裏?
首先,對於每條數據,都要單獨去調用一次function,task爲每一個數據,都要去執行一次function函數。
若是100萬條數據,(一個partition),調用100萬次。性能比較差。
另一個很是很是重要的一點
若是每一個數據,你都去建立一個數據庫鏈接的話,那麼你就得建立100萬次數據庫鏈接。
可是要注意的是,數據庫鏈接的建立和銷燬,都是很是很是消耗性能的。雖然咱們以前已經用了
數據庫鏈接池,只是建立了固定數量的數據庫鏈接。
你仍是得屢次經過數據庫鏈接,往數據庫(MySQL)發送一條SQL語句,而後MySQL須要去執行這條SQL語句。
若是有100萬條數據,那麼就是100萬次發送SQL語句。
以上兩點(數據庫鏈接,屢次發送SQL語句),都是很是消耗性能的。
foreachPartition,在生產環境中,一般來講,都使用foreachPartition來寫數據庫的
使用批處理操做(一條SQL和多組參數)
發送一條SQL語句,發送一次
一會兒就批量插入100萬條數據。
用了foreachPartition算子以後,好處在哪裏?
一、對於咱們寫的function函數,就調用一次,一次傳入一個partition全部的數據
二、主要建立或者獲取一個數據庫鏈接就能夠
三、只要向數據庫發送一次SQL語句和多組參數便可
在實際生產環境中,清一色,都是使用foreachPartition操做;可是有個問題,跟mapPartitions操做同樣,
若是一個partition的數量真的特別特別大,好比真的是100萬,那基本上就不太靠譜了。
一會兒進來,頗有可能會發生OOM,內存溢出的問題。
一組數據的對比:生產環境
一個partition大概是1千條左右
用foreach,跟用foreachPartition,性能的提高達到了2~3分鐘。
實際項目操做:
首先JDBCHelper裏面已經封裝好了一次批量插入操做!
批量插入session detail
惟一不同的是咱們須要ISessionDetailDAO裏面去實現一個批量插入
List<SessionDetail> sessionDetails
session