Spark學習筆記6:Spark調優與調試

 一、使用Sparkconf配置Sparkapache

  對Spark進行性能調優,一般就是修改Spark應用的運行時配置選項。緩存

  Spark中最主要的配置機制經過SparkConf類對Spark進行配置,當建立出一個SparkContext時,就須要建立出一個SparkConf實例。網絡

  Sparkconf實例包含用戶要重載的配置選項的鍵值對。調用set()方法來添加配置項的設置,而後把這個對象傳給SparkContext的構造方法。app

  調用setAppName()和setMaster()來分別設置spark.app.name和spark.master的值。工具

例如:性能

//建立一個conf對象
val conf = new SparkConf()
conf.set("spark.app.name","My Spark App")
conf.set("spark.master","local[4]")
conf.set("spark.ui.port","36000")

//使用這個配置對象建立一個SparkContext
val sc = new SparkContext(conf)

  Spark運行經過spark-submit工具動態設置配置項。當應用被spark-submit腳本啓動時,腳本會把這些配置項設置到運行環境中。優化

例如:ui

$ bin/spark-submit \
   --class com.example.MyAPP \ 
   --master local[4] \
   --name "My Spark App"
   --conf spark.ui.port=36000 \
   myApp.jar

  Spark有特定的優先級順序來選擇實際配置,優先級最高的是在用戶代碼中顯示調用set()方法設置的選項,其次是經過spark-submit傳遞的參數,再次是寫在配置文件中的值,最後是系統的默認值。spa

 

 二、Spark執行的組成部分:做業、任務和步驟scala

  經過Spark示例展現Spark執行的各個階段,以瞭解用戶代碼如何被編譯爲下層的執行計劃。

val  input = sc.textFile("input.txt")

val tokenized = input.map(line => line.split(" ")).filter(words => words.size > 0)

val counts = tokenized.map(words => (words(0),1)).reduceByKey{(a,b) => a+b}

  以上示例執行了三次轉化操做,最終生成一個叫作counts的RDD。程序定義了一個RDD對象的有向無環圖,每一個RDD維護了其指向一個或多個父節點的引用,以及表示其與父節點之間關係的信息。

這裏counts的譜系圖以下:

  

  在調用行動操做以前,RDD都只是存儲着可讓咱們計算出具體數據的描述信息。要觸發實際計算,須要對counts調用一個行動操做,好比使用collect()將數據收集到驅動器程序。

counts.collect()

  Spark調度器會建立出用於計算行動操做的RDD物理執行計劃。Spark調度器從最終須要被調用行動操做的RDD出發,向上回溯全部必須計算的RDD。調度器會訪問RDD的父節點,父節點的父節點,以此類推,遞歸向上生成計算全部必要的祖先RDD的物理計劃。以下:

 

  流水線執行:當RDD不須要混洗數據就能夠從父節點計算出來時,調度器就會自動進行流水線執行。在物理執行時,執行計劃輸出的縮進等級與父節點相同的RDD會與父節點在同一個步驟中進行流水線執行。

  除了流水線執行的優化,當一個RDD已經緩存在集羣內存或磁盤上時,Spark的內部調度器也會自動截短RDD譜系圖。這種狀況下,Spark會短路求值,直接基於緩存下來的RDD進行計算。

  特定的行動操做所生成的步驟的集合被稱爲一個做業。

  一個物理步驟會啓動不少任務,每一個任務都是在不一樣的數據分區上作一樣的事情。任務內部的流程是同樣的,包括:(1)從數據存儲或已有RDD或數據混洗的輸出中獲取輸入數據。(2)執行必要的操做來計算出這些操做所表明的RDD。(3)把輸出寫到一個數據混洗文件中,寫入外部存儲或者是發回驅動器程序。

 

 三、Spark優化的關鍵性能   

  • 並行度

  RDD的邏輯表示實際上是一個對象集合。在物理執行期間,RDD會被分爲一系列的分區,每一個分區都是整個數據的子集。當Spark調度並運行任務時,Spark會爲每一個分區中的數據建立出一個任務。輸入RDD通常會根據其底層的存儲系統選擇並行度。

  並行度會從兩方面影響程序的性能:當並行度太低時,Spark集羣會出現資源閒置的狀況,而當並行度太高時,每一個分區產生的間接開銷累計起來就會更大。

  Spark有兩種方法來對操做的並行度進行調優:一種是在數據混洗操做時,使用參數的方式爲混洗後的RDD指定並行度。第二種方法是對於任何已有的RDD,能夠進行從新分區來獲取更多或者更少的分區數。可使用repartition()實現從新分區操做,該操做會把RDD隨機打亂並分紅設定的分區數目。使用coalesce()操做沒有打亂數據,比repartition()更爲高效。

  • 序列化格式

  當Spark須要經過網絡傳輸數據,或者將數據溢寫到磁盤上時,Spark須要把數據序列化爲二進制格式。序列化會在數據進行混洗操做時發生,此時有可能須要經過網絡傳輸大量數據。

  Spark默認會使用Java內建的序列化庫。Spark也支持第三方序列化庫Kryo,能夠提供比Java的序列化工具更短的序列化時間和更高壓縮比的二進制表示。

  使用Kryo序列化工具示例以下:

    val conf = new SparkConf().setMaster("local").setAppName("partitions")
    conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    //嚴格要求註冊類 得到最佳性能
    conf.set("spark.kryo.registrationRequired","true")
    conf.registerKryoClasses(Array(classOf[MyClass],classOf[MyotherClass]))
  • 內存管理

  Spark內存有如下用途:

  1. RDD存儲,默認佔60%    當調用RDD的persist()或cache()方法時,這個RDD的分區會被存儲到緩存區中。
  2. 數據混洗與聚合的緩存區,默認佔20%    當進行數據混洗操做時,Spark會建立出一些中間緩存區來存儲數據混洗的輸出數據。這些緩存區用來存儲聚合操做的中間結果以及數據混洗操做中直接輸出的部分緩存數據。
  3. 用戶代碼,默認佔20%    Spark能夠執行任意的用戶代碼,用戶自行申請大量的內存。

  能夠經過調整調整內存各區域比例獲得更好的性能表現。

  其它優化:

  Spark默認的cache()操做會以MEMORY_ONLY的存儲等級持久化數據,當緩存新的RDD時分區空間不夠,舊的分區會被刪除。當用到這些分取數據時,在進行重算。使用persist()方法以MEMORY_AND_DISK存儲等級存儲,內存中放不下的分區會被寫入磁盤,須要時再從磁盤讀取回來。這種方式會有更好的性能。

  還有一種是緩存序列化後的對象而非直接緩存。經過MEMORY_ONLY_SER 或者 MEMORY_AND_DISK_SER的存儲等級實現。

  • 硬件供給

  提供給Spark的硬件資源會顯著影響應用的完成時間,影響集羣規模的主要參數包括:分配給沒各執行器節點的內存大小,每一個執行器節點佔用的核心數,執行器節點總數,以及用來存儲臨時數據的本地磁盤數量。

相關文章
相關標籤/搜索