Spark學習之Spark調優與調試(一)

1、使用SparkConf配置Spark

  對 Spark 進行性能調優,一般就是修改 Spark 應用的運行時配置選項。Spark 中最主要的配置機制是經過 SparkConf 類對 Spark 進行配置。當建立出一個 SparkContext 時,就須要建立出一個 SparkConf 的實例。shell

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


object Test {
  def main(args: Array[String]): Unit = {
    
    // 建立一個conf對象
    val conf = new SparkConf()
    conf.set("spark.app.name", "My Spark App")
    conf.set("spark.master", "local[4]")
    conf.set("spark.ui.port", "36000")   // 重載默認端口配置   
    // 使用這個配置對象建立一個SparkContext
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 設置日誌顯示級別
    
  }
}

  Spark 容許經過 spark-submit 工具動態設置配置項。當應用被 spark-submit 腳本啓動時,腳本會把這些配置項設置到運行環境中。當一個新的 SparkConf 被建立出來時,這些環境變量會被檢測出來而且自動配好。這樣,在使用spark-submit 時,用戶應用中只要建立一個「空」的 SparkConf ,並直接傳給 SparkContext的構造方法就好了。apache

  spark-submit 工具爲經常使用的 Spark 配置項參數提供了專用的標記,還有一個通用標記--conf 來接收任意 Spark 配置項的值。併發

$ bin/spark-submit \
    --class com.example.MyApp \
    --master local[4] \
    --name "My Spark App" \
    --conf spark.ui.port=36000 \
    myApp.jar

  spark-submit 也支持從文件中讀取配置項的值。這對於設置一些與環境相關的配置項比較有用,方便不一樣用戶共享這些配置(好比默認的 Spark 主節點)。默認狀況下, spark-submit 腳本會在 Spark 安裝目錄中找到 conf/spark-defaults.conf 文件,嘗試讀取該文件中以空格隔開的鍵值對數據。你也能夠經過 spark-submit 的 --properties-File 標記,自定義該文件的路徑。app

$ bin/spark-submit \
    --class com.example.MyApp \
    --properties-file my-config.conf \
    myApp.jar

## Contents of my-config.conf ##
spark.master local[4]
spark.app.name "My Spark App"
spark.ui.port 36000 

  有時,同一個配置項可能在多個地方被設置了。例如,某用戶可能在程序代碼中直接調用了 setAppName() 方法,同時也經過 spark-submit 的 --name 標記設置了這個值。針對這種狀況,Spark 有特定的優先級順序來選擇實際配置。優先級最高的是在用戶代碼中顯式調用 set() 方法設置的選項。其次是經過 spark-submit 傳遞的參數,再次是寫在配置文件中的值,最後是系統的默認值。 下表列出了一些經常使用的配置項。工具

  

  

2、Spark執行的組成部分:做業、任務和步驟

  下面經過一個示例應用來展現 Spark 執行的各個階段,以瞭解用戶代碼如何被編譯爲下層的執行計劃。咱們實現的是一個簡單的日誌分析應用。輸入數據是一個由不一樣嚴重等級的日誌消息和一些分散的空行組成的文本文件,咱們但願計算其中各級別的日誌消息的條數。性能

  

    val input = sc.textFile("words.txt")  // 讀取輸入文件
    // 切分爲單詞而且刪掉空行 若是大於0的話刪除不掉空行
    val tokenized = input.map(line=>line.split(" ")).filter(words=>words.size>1)  //若是大於0的話刪除不掉空行
    val counts = tokenized.map(words=>(words(0),1)).reduceByKey((a,b)=>a+b) // 提取出日誌等級並進行計數

  這一系列代碼生成了一個叫做 counts 的 RDD,其中包含各級別日誌對應的條目數。在shell 中執行完這些命令以後,程序沒有執行任何行動操做。相反,程序定義了一個 RDD對象的有向無環圖(DAG),咱們能夠在稍後行動操做被觸發時用它來進行計算。每一個RDD 維護了其指向一個或多個父節點的引用,以及表示其與父節點之間關係的信息。好比,當你在 RDD 上調用 val b = a.map() 時, b 這個 RDD 就存下了對其父節點 a 的一個引用。這些引用使得 RDD 能夠追蹤到其全部的祖先節點。ui

  Spark 提供了 toDebugString() 方法來查看 RDD 的譜系。spa

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


object Test {
  def main(args: Array[String]): Unit = {
    
    // 建立一個conf對象
    val conf = new SparkConf()
    conf.set("spark.app.name", "My Spark App")
    conf.set("spark.master", "local[4]")
    // conf.set("spark.ui.port", "36000")   // 重載默認端口配置   
    // 使用這個配置對象建立一個SparkContext
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")  // 設置日誌顯示級別
    val input = sc.textFile("words.txt")  // 讀取輸入文件
    // 切分爲單詞而且刪掉空行 若是大於0的話刪除不掉空行
    val tokenized = input.map(line=>line.split(" ")).filter(words=>words.size>1)  //若是大於0的話刪除不掉空行
    val counts = tokenized.map(words=>(words(0),1)).reduceByKey((a,b)=>a+b) // 提取出日誌等級並進行計數
    
    println(input.toDebugString)  // 經過toDebugString查看RDD的譜系
    println("====================================================")
    println(tokenized.toDebugString)
    println("====================================================")
    println(counts.toDebugString)
    
  }
}

  

  在調用行動操做以前,RDD 都只是存儲着可讓咱們計算出具體數據的描述信息。要觸發實際計算,須要對 counts 調用一個行動操做,好比使用 collect() 將數據收集到驅動器程序中。scala

    counts.collect().foreach(println)

  Spark 調度器會建立出用於計算行動操做的 RDD 物理執行計劃。咱們在此處調用 RDD 的collect() 方法,因而 RDD 的每一個分區都會被物化出來併發送到驅動器程序中。Spark 調度器從最終被調用行動操做的 RDD(在本例中是 counts )出發,向上回溯全部必須計算的 RDD。調度器會訪問 RDD 的父節點、父節點的父節點,以此類推,遞歸向上生成計算全部必要的祖先 RDD 的物理計劃。咱們以最簡單的狀況爲例,調度器爲有向圖中的每一個RDD 輸出計算步驟,步驟中包括 RDD 上須要應用於每一個分區的任務。而後以相反的順序執行這些步驟,計算得出最終所求的 RDD。日誌

相關文章
相關標籤/搜索