Spark 之 算子調優(一)

這是我參與更文挑戰的第21天,活動詳情查看:更文挑戰算法

算子調優一:mapPartitions

  普通的map算子對RDD中的每個元素進行操做,而mapPartitions算子對RDD中每個分區進行操做。若是是普通的map算子,假設一個partition有1萬條數據,那麼map算子中的function要執行1萬次,也就是對每一個元素進行操做。數據庫

image.png

  若是是mapPartition算子,因爲一個task處理一個RDD的partition,那麼一個task只會執行一次function,function一次接收全部的partition數據,效率比較高。markdown

image.png

  好比,當要把RDD中的全部數據經過JDBC寫入數據,若是使用map算子,那麼須要對RDD中的每個元素都建立一個數據庫鏈接,這樣對資源的消耗很大,若是使用mapPartitions算子,那麼針對一個分區的數據,只須要創建一個數據庫鏈接。app

  mapPartitions算子也存在一些缺點:對於普通的map操做,一次處理一條數據,若是在處理了2000條數據後內存不足,那麼能夠將已經處理完的2000條數據從內存中垃圾回收掉;可是若是使用mapPartitions算子,但數據量很是大時,function一次處理一個分區的數據,若是一旦內存不足,此時沒法回收內存,就可能會OOM,即內存溢出。函數

  所以,mapPartitions算子適用於數據量不是特別大的時候,此時使用mapPartitions算子對性能的提高效果仍是不錯的。(當數據量很大的時候,一旦使用mapPartitions算子,就會直接OOM)post

  在項目中,應該首先估算一下RDD的數據量、每一個partition的數據量,以及分配給每一個Executor的內存資源,若是資源容許,能夠考慮使用mapPartitions算子代替map。性能

算子調優二:foreachPartition優化數據庫操做

  在生產環境中,一般使用foreachPartition算子來完成數據庫的寫入,經過foreachPartition算子的特性,能夠優化寫數據庫的性能。優化

  若是使用foreach算子完成數據庫的操做,因爲foreach算子是遍歷RDD的每條數據,所以,每條數據都會創建一個數據庫鏈接,這是對資源的極大浪費,所以,對於寫數據庫操做,咱們應當使用foreachPartition算子。ui

  與mapPartitions算子很是類似,foreachPartition是將RDD的每一個分區做爲遍歷對象,一次處理一個分區的數據,也就是說,若是涉及數據庫的相關操做,一個分區的數據只須要建立一次數據庫鏈接,如圖所示:url

image.png

  • 使用了foreachPartition算子後,能夠得到如下的性能提高:
    • 對於咱們寫的function函數,一次處理一整個分區的數據;
    • 對於一個分區內的數據,建立惟一的數據庫鏈接;
    • 只須要向數據庫發送一次SQL語句和多組參數;

  在生產環境中,所有都會使用foreachPartition算子完成數據庫操做。foreachPartition算子存在一個問題,與mapPartitions算子相似,若是一個分區的數據量特別大,可能會形成OOM,即內存溢出。

算子調優三:repartition解決SparkSQL低並行度問題

  在第一節的常規性能調優中咱們講解了並行度的調節策略,可是,並行度的設置對於Spark SQL是不生效的,用戶設置的並行度只對於Spark SQL之外的全部Spark的stage生效。

  Spark SQL的並行度不容許用戶本身指定,Spark SQL本身會默認根據hive表對應的HDFS文件的split個數自動設置Spark SQL所在的那個stage的並行度,用戶本身通spark.default.parallelism參數指定的並行度,只會在沒Spark SQL的stage中生效。

  因爲Spark SQL所在stage的並行度沒法手動設置,若是數據量較大,而且此stage中後續的transformation操做有着複雜的業務邏輯,而Spark SQL自動設置的task數量不多,這就意味着每一個task要處理爲數很多的數據量,而後還要執行很是複雜的處理邏輯,這就可能表現爲第一個有Spark SQL的stage速度很慢,然後續的沒有Spark SQL的stage運行速度很是快。

爲了解決Spark SQL沒法設置並行度和task數量的問題,咱們可使用repartition算子。 image.png

  Spark SQL這一步的並行度和task數量確定是沒有辦法去改變了,可是,對於Spark SQL查詢出來的RDD,當即使用repartition算子,去從新進行分區,這樣能夠從新分區爲多個partition,從repartition以後的RDD操做,因爲再也不設計Spark SQL,所以stage的並行度就會等於你手動設置的值,這樣就避免了Spark SQL所在的stage只能用少許的task去處理大量數據並執行復雜的算法邏輯。

相關文章
相關標籤/搜索