Spark學習之Spark調優與調試(二) [看圖說話] 基於Spark UI性能優化與調試——初級篇

  下面來看看更復雜的狀況,好比,當調度器進行流水線執行(pipelining),或把多個 RDD 合併到一個步驟中時。當RDD 不須要混洗數據就能夠從父節點計算出來時,調度器就會自動進行流水線執行。上一篇博文結尾處輸出的譜系圖使用不一樣縮進等級來展現 RDD 是否會在物理步驟中進行流水線執行。在物理執行時,執行計劃輸出的縮進等級與其父節點相同的 RDD 會與其父節點在同一個步驟中進行流水線執行。例如,當計算 counts 時,儘管有不少級父 RDD,但從縮進來看總共只有兩級。這代表物理執行只須要兩個步驟。因爲執行序列中有幾個連續的篩選和映射操做,因此這個例子中才出現了流水線執行。下圖展現了計算 counts 這個RDD 時的兩個執行步驟。html

   

  除了流水線執行的優化,當一個 RDD 已經緩存在集羣內存或磁盤上時,Spark 的內部調度器也會自動截短 RDD 譜系圖。在這種狀況下,Spark 會「短路」求值,直接基於緩存下來的 RDD 進行計算。還有一種截短 RDD 譜系圖的狀況發生在當 RDD 已經在以前的數據混洗中做爲副產品物化出來時,哪怕該 RDD 並無被顯式調用 persist() 方法。這種內部優化是基於 Spark 數據混洗操做的輸出均被寫入磁盤的特性,同時也充分利用了 RDD 圖的某些部分會被屢次計算的事實。git

  一個物理步驟會啓動不少任務,每一個任務都是在不一樣的數據分區上作一樣的事情。任務內部的流程是同樣的,以下所述。
   (1) 從數據存儲(若是該 RDD 是一個輸入 RDD)或已有 RDD(若是該步驟是基於已經緩存的數據)或數據混洗的輸出中獲取輸入數據。
   (2) 執行必要的操做來計算出這些操做所表明的 RDD。例如,對輸入數據執行 filter() 和map() 函數,或者進行分組或歸約操做。
   (3) 把輸出寫到一個數據混洗文件中,寫入外部存儲,或者是發回驅動器程序(若是最終RDD 調用的是相似 count() 這樣的行動操做)。github

  概括一下,Spark 執行時有下面所列的這些流程。
   • 用戶代碼定義RDD的有向無環圖
    RDD 上的操做會建立出新的 RDD,並引用它們的父節點,這樣就建立出了一個圖。
   • 行動操做把有向無環圖強制轉譯爲執行計劃
    當你調用 RDD 的一個行動操做時,這個 RDD 就必須被計算出來。這也要求計算出該RDD 的父節點。Spark 調度器提交一個做業來計算全部必要的 RDD。這個做業會包含一個或多個步驟,每一個步驟其實也就是一波並行執行的計算任務。一個步驟對應有向無環圖中的一個或多個 RDD,一個步驟對應多個 RDD 是由於發生了流水線執行。
   • 任務於集羣中調度並執行
    步驟是按順序處理的,任務則獨立地啓動來計算出 RDD 的一部分。一旦做業的最後一個步驟結束,一個行動操做也就執行完畢了。
  在一個給定的 Spark 應用中,因爲須要建立一系列新的 RDD,所以上述階段會連續發生不少次。apache

3、查找信息

  Spark 在應用執行時記錄詳細的進度信息和性能指標。這些內容能夠在兩個地方找到:Spark 的網頁用戶界面以及驅動器進程和執行器進程生成的日誌文件中。瀏覽器

一、Spark網頁用戶界面

        Spark 內建的網頁用戶界面是瞭解 Spark 應用的行爲和性能表現的第一站。默認狀況下,它在驅動器程序所在機器的 4040 端口上。緩存

  關於Spark網頁用戶界面推薦一個博客,寫得很詳細[看圖說話] 基於Spark UI性能優化與調試——初級篇。還有一個轉載後的博客spark的UI界面性能優化

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object Spark_8 {
  def main(args: Array[String]): Unit = {
    
    // 建立一個conf對象
    val conf = new SparkConf()
    conf.set("spark.app.name", "My Spark App")
    conf.set("spark.master", "local[4]")
    // conf.set("spark.ui.port", "36000")   // 重載默認端口配置   
    // 使用這個配置對象建立一個SparkContext
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 設置日誌顯示級別
    val input = sc.textFile("words.txt")  // 讀取輸入文件
    // 切分爲單詞而且刪掉空行 若是大於0的話刪除不掉空行
    val tokenized = input.map(line=>line.split(" ")).filter(words=>words.size>1) 
    val counts = tokenized.map(words=>(words(0),1)).reduceByKey((a,b)=>a+b) // 提取出日誌等級並進行計數
    
    // 緩存RDD
    counts.cache()
    
    println(input.toDebugString)  // 經過toDebugString查看RDD的譜系
    println("====================================================")
    println(tokenized.toDebugString)
    println("====================================================")
    println(counts.toDebugString)
    // countRDD已經緩存 第一次求值運行仍然須要兩個步驟
    counts.collect().foreach(println)
    println("====================================================")
    // 該次求值只有一個步驟
    counts.collect().foreach(println)
    
    Thread.sleep(60000)  // 爲了訪問 http://localhost:4040  線程睡眠
    
  }
}

  在本地模式程序運行下里面的日誌信息包含了Spark網頁用戶界面的URL。網絡

19/04/21 20:23:12 INFO MemoryStore: MemoryStore started with capacity 884.7 MB
19/04/21 20:23:12 INFO SparkEnv: Registering OutputCommitCoordinator
19/04/21 20:23:12 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/04/21 20:23:12 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.11.1:4040
19/04/21 20:23:12 INFO Executor: Starting executor ID driver on host localhost

  注意,若是DAG Visualization這兒沒有圖片顯示的話,那說明是瀏覽器的問題。app

  

  我換了一個火狐瀏覽器,就能出來圖片了。函數

  

二、驅動器進程和執行器進程的日誌

  在某些狀況下,用戶須要深刻研讀驅動器進程和執行器進程所生成的日誌來獲取更多信息。日誌會更詳細地記錄各類異常事件,例如內部的警告以及用戶代碼輸出的詳細異常信息。這些數據對於尋找錯誤緣由頗有用。

4、關鍵性能考量

一、並行度

  並行度會從兩方面影響程序的性能。首先,當並行度太低時,Spark 集羣會出現資源閒置的狀況。好比,假設你的應用有 1000 個可以使用的計算核心,但所運行的步驟只有 30 個任務,你就應該提升並行度來充分利用更多的計算核心。而當並行度太高時,每一個分區產生的間接開銷累計起來就會更大。評判並行度是否太高的標準包括任務是不是幾乎在瞬間(毫秒級)完成的,或者是否觀察到任務沒有讀寫任何數據。

  Spark 提供了兩種方法來對操做的並行度進行調優。第一種方法是在數據混洗操做時,使用參數的方式爲混洗後的 RDD 指定並行度。第二種方法是對於任何已有的 RDD,能夠進行從新分區來獲取更多或者更少的分區數。從新分區操做經過 repartition() 實現,該操做會把 RDD 隨機打亂並分紅設定的分區數目。若是你肯定要減小 RDD 分區,可使用coalesce() 操做。因爲沒有打亂數據,該操做比 repartition() 更爲高效。若是你認爲當前的並行度太高或者太低,能夠利用這些方法對數據分佈進行從新調整。

  舉個例子,假設咱們從 S3 上讀取了大量數據,而後立刻進行 filter() 操做篩選掉數據集中的絕大部分數據。默認狀況下, filter() 返回的 RDD 的分區數和其父節點同樣,這樣可能會產生不少空的分區或者只有不多數據的分區。在這樣的狀況下,能夠經過合併獲得分區更少的 RDD 來提升應用性能。

二、序列化格式

  當 Spark 須要經過網絡傳輸數據,或是將數據溢寫到磁盤上時,Spark 須要把數據序列化爲二進制格式。序列化會在數據進行混洗操做時發生,此時有可能須要經過網絡傳輸大量數據。默認狀況下,Spark 會使用 Java 內建的序列化庫。Spark 也支持使用第三方序列化庫 Kryo(https://github.com/EsotericSoftware/kryo),能夠提供比 Java 的序列化工具更短的序列化時間和更高壓縮比的二進制表示,但不能直接序列化所有類型的對象。幾乎全部的應用都在遷移到 Kryo 後得到了更好的性能。

三、內存管理

  在各個執行器進程中,內存有如下所列幾種用途。
   • RDD存儲
   • 數據混洗與聚合的緩存區
   • 用戶代碼

  對於默認緩存策略的另外一個改進是緩存序列化後的對象而非直接緩存。咱們能夠經過MEMORY_ONLY_SER 或者 MEMORY_AND_DISK_SER 的存儲等級來實現這一點。緩存序列化後的對象會使緩存過程變慢,由於序列化對象也會消耗一些代價,不過這能夠顯著減小 JVM 的垃圾回收時間,由於不少獨立的記錄如今能夠做爲單個序列化的緩存而存儲。

四、硬件供給

  提供給 Spark 的硬件資源會顯著影響應用的完成時間。影響集羣規模的主要參數包括分配給每一個執行器節點的內存大小、每一個執行器節點佔用的核心數、執行器節點總數,以及用來存儲臨時數據的本地磁盤數量。

  這篇博文主要來自《Spark快速大數據分析》這本書裏面的第八章,內容有刪減,還有本書的一些代碼的實驗結果。

相關文章
相關標籤/搜索