一、使用Sparkconf配置Sparkapache
對Spark進行性能調優,一般就是修改Spark應用的運行時配置選項。緩存
Spark中最主要的配置機制經過SparkConf類對Spark進行配置,當建立出一個SparkContext時,就須要建立出一個SparkConf實例。網絡
Sparkconf實例包含用戶要重載的配置選項的鍵值對。調用set()方法來添加配置項的設置,而後把這個對象傳給SparkContext的構造方法。app
調用setAppName()和setMaster()來分別設置spark.app.name和spark.master的值。工具
例如:性能
//建立一個conf對象 val conf = new SparkConf() conf.set("spark.app.name","My Spark App") conf.set("spark.master","local[4]") conf.set("spark.ui.port","36000") //使用這個配置對象建立一個SparkContext val sc = new SparkContext(conf)
Spark運行經過spark-submit工具動態設置配置項。當應用被spark-submit腳本啓動時,腳本會把這些配置項設置到運行環境中。優化
例如:ui
$ bin/spark-submit \ --class com.example.MyAPP \ --master local[4] \ --name "My Spark App" --conf spark.ui.port=36000 \ myApp.jar
Spark有特定的優先級順序來選擇實際配置,優先級最高的是在用戶代碼中顯示調用set()方法設置的選項,其次是經過spark-submit傳遞的參數,再次是寫在配置文件中的值,最後是系統的默認值。spa
二、Spark執行的組成部分:做業、任務和步驟scala
經過Spark示例展現Spark執行的各個階段,以瞭解用戶代碼如何被編譯爲下層的執行計劃。
val input = sc.textFile("input.txt") val tokenized = input.map(line => line.split(" ")).filter(words => words.size > 0) val counts = tokenized.map(words => (words(0),1)).reduceByKey{(a,b) => a+b}
以上示例執行了三次轉化操做,最終生成一個叫作counts的RDD。程序定義了一個RDD對象的有向無環圖,每一個RDD維護了其指向一個或多個父節點的引用,以及表示其與父節點之間關係的信息。
這裏counts的譜系圖以下:
在調用行動操做以前,RDD都只是存儲着可讓咱們計算出具體數據的描述信息。要觸發實際計算,須要對counts調用一個行動操做,好比使用collect()將數據收集到驅動器程序。
counts.collect()
Spark調度器會建立出用於計算行動操做的RDD物理執行計劃。Spark調度器從最終須要被調用行動操做的RDD出發,向上回溯全部必須計算的RDD。調度器會訪問RDD的父節點,父節點的父節點,以此類推,遞歸向上生成計算全部必要的祖先RDD的物理計劃。以下:
流水線執行:當RDD不須要混洗數據就能夠從父節點計算出來時,調度器就會自動進行流水線執行。在物理執行時,執行計劃輸出的縮進等級與父節點相同的RDD會與父節點在同一個步驟中進行流水線執行。
除了流水線執行的優化,當一個RDD已經緩存在集羣內存或磁盤上時,Spark的內部調度器也會自動截短RDD譜系圖。這種狀況下,Spark會短路求值,直接基於緩存下來的RDD進行計算。
特定的行動操做所生成的步驟的集合被稱爲一個做業。
一個物理步驟會啓動不少任務,每一個任務都是在不一樣的數據分區上作一樣的事情。任務內部的流程是同樣的,包括:(1)從數據存儲或已有RDD或數據混洗的輸出中獲取輸入數據。(2)執行必要的操做來計算出這些操做所表明的RDD。(3)把輸出寫到一個數據混洗文件中,寫入外部存儲或者是發回驅動器程序。
三、Spark優化的關鍵性能
RDD的邏輯表示實際上是一個對象集合。在物理執行期間,RDD會被分爲一系列的分區,每一個分區都是整個數據的子集。當Spark調度並運行任務時,Spark會爲每一個分區中的數據建立出一個任務。輸入RDD通常會根據其底層的存儲系統選擇並行度。
並行度會從兩方面影響程序的性能:當並行度太低時,Spark集羣會出現資源閒置的狀況,而當並行度太高時,每一個分區產生的間接開銷累計起來就會更大。
Spark有兩種方法來對操做的並行度進行調優:一種是在數據混洗操做時,使用參數的方式爲混洗後的RDD指定並行度。第二種方法是對於任何已有的RDD,能夠進行從新分區來獲取更多或者更少的分區數。可使用repartition()實現從新分區操做,該操做會把RDD隨機打亂並分紅設定的分區數目。使用coalesce()操做沒有打亂數據,比repartition()更爲高效。
當Spark須要經過網絡傳輸數據,或者將數據溢寫到磁盤上時,Spark須要把數據序列化爲二進制格式。序列化會在數據進行混洗操做時發生,此時有可能須要經過網絡傳輸大量數據。
Spark默認會使用Java內建的序列化庫。Spark也支持第三方序列化庫Kryo,能夠提供比Java的序列化工具更短的序列化時間和更高壓縮比的二進制表示。
使用Kryo序列化工具示例以下:
val conf = new SparkConf().setMaster("local").setAppName("partitions") conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") //嚴格要求註冊類 得到最佳性能 conf.set("spark.kryo.registrationRequired","true") conf.registerKryoClasses(Array(classOf[MyClass],classOf[MyotherClass]))
Spark內存有如下用途:
能夠經過調整調整內存各區域比例獲得更好的性能表現。
其它優化:
Spark默認的cache()操做會以MEMORY_ONLY的存儲等級持久化數據,當緩存新的RDD時分區空間不夠,舊的分區會被刪除。當用到這些分取數據時,在進行重算。使用persist()方法以MEMORY_AND_DISK存儲等級存儲,內存中放不下的分區會被寫入磁盤,須要時再從磁盤讀取回來。這種方式會有更好的性能。
還有一種是緩存序列化後的對象而非直接緩存。經過MEMORY_ONLY_SER 或者 MEMORY_AND_DISK_SER的存儲等級實現。
提供給Spark的硬件資源會顯著影響應用的完成時間,影響集羣規模的主要參數包括:分配給沒各執行器節點的內存大小,每一個執行器節點佔用的核心數,執行器節點總數,以及用來存儲臨時數據的本地磁盤數量。