大數據開發-Spark調優經常使用手段

Spark調優

spark調優常見手段,在生產中經常會遇到各類各樣的問題,有事前緣由,有事中緣由,也有不規範緣由,spark調優總結下來能夠從下面幾個點來調優。html

1. 分配更多的資源

分配更多的資源:
  它是性能優化調優的王道,就是增長和分配更多的資源,這對於性能和速度上的提高是顯而易見的,
  基本上,在必定範圍以內,增長資源與性能的提高,是成正比的;寫完了一個複雜的spark做業以後,進行性能調優的時候,首先第一步,就是要來調節最優的資源配置;
  在這個基礎之上,若是說你的spark做業,可以分配的資源達到了你的能力範圍的頂端以後,沒法再分配更多的資源了,公司資源有限;那麼纔是考慮去作後面的這些性能調優的點。

相關問題:
(1)分配哪些資源?
(2)在哪裏能夠設置這些資源?
(3)剖析爲何分配這些資源以後,性能能夠獲得提高?

1.1 分配哪些資源

executor-memory、executor-cores、driver-memory

1.2 在哪裏能夠設置這些資源java

在實際的生產環境中,提交spark任務時,使用spark-submit shell腳本,在裏面調整對應的參數。
   
 提交任務的腳本:
 spark-submit \
 --master spark://node1:7077 \
 --class com.hoult.WordCount \
 --num-executors 3 \    配置executor的數量
 --driver-memory 1g \   配置driver的內存(影響不大)
 --executor-memory 1g \ 配置每個executor的內存大小
 --executor-cores 3 \   配置每個executor的cpu個數
 /export/servers/wordcount.jar

1.2 參數調節到多大,算是最大

  • Standalone模式
先計算出公司spark集羣上的全部資源 每臺節點的內存大小和cpu核數,
   好比:一共有20臺worker節點,每臺節點8g內存,10個cpu。
   實際任務在給定資源的時候,能夠給20個executor、每一個executor的內存8g、每一個executor的使用的cpu個數10。
  • Yarn模式
先計算出yarn集羣的全部大小,好比一共500g內存,100個cpu;
   這個時候能夠分配的最大資源,好比給定50個executor、每一個executor的內存大小10g,每一個executor使用的cpu個數爲2。
  • 使用原則
在資源比較充足的狀況下,儘量的使用更多的計算資源,儘可能去調節到最大的大小

1.3 爲何調大資源之後性能能夠提高

--executor-memory

--total-executor-cores

2. 提升並行度

2.1 Spark的並行度指的是什麼

spark做業中,各個stage的task的數量,也就表明了spark做業在各個階段stage的並行度!
    當分配完所能分配的最大資源了,而後對應資源去調節程序的並行度,若是並行度沒有與資源相匹配,那麼致使你分配下去的資源都浪費掉了。同時並行運行,還可讓每一個task要處理的數量變少(很簡單的原理。合理設置並行度,能夠充分利用集羣資源,減小每一個task處理數據量,而增長性能加快運行速度。)

2.2 如何提升並行度

2.2.1 能夠設置task的數量

至少設置成與spark Application 的總cpu core 數量相同。
最理想狀況,150個core,分配150task,一塊兒運行,差很少同一時間運行完畢
官方推薦,task數量,設置成spark Application 總cpu core數量的2~3倍 。
  
好比150個cpu core ,基本設置task數量爲300~500. 與理想狀況不一樣的,有些task會運行快一點,好比50s就完了,有些task 可能會慢一點,要一分半才運行完,因此若是你的task數量,恰好設置的跟cpu core 數量相同,可能會致使資源的浪費。
  由於好比150個task中10個先運行完了,剩餘140個還在運行,可是這個時候,就有10個cpu core空閒出來了,致使浪費。若是設置2~3倍,那麼一個task運行完之後,另一個task立刻補上來,儘可能讓cpu core不要空閒。同時儘可能提高spark運行效率和速度。提高性能。

2.2.2 如何設置task數量來提升並行度

設置參數spark.default.parallelism
   默認是沒有值的,若是設置了值爲10,它會在shuffle的過程纔會起做用。
   好比 val rdd2 = rdd1.reduceByKey(_+_) 
   此時rdd2的分區數就是10
   
能夠經過在構建SparkConf對象的時候設置,例如:
   new SparkConf().set("spark.defalut.parallelism","500")

2.2.3 給RDD從新設置partition的數量

使用rdd.repartition 來從新分區,該方法會生成一個新的rdd,使其分區數變大。
此時因爲一個partition對應一個task,那麼對應的task個數越多,經過這種方式也能夠提升並行度。

2.2.4 提升sparksql運行的task數量

http://spark.apache.org/docs/2.3.3/sql-programming-guide.htmlnode

經過設置參數 spark.sql.shuffle.partitions=500  默認爲200;
能夠適當增大,來提升並行度。 好比設置爲 spark.sql.shuffle.partitions=500

專門針對sparkSQL來設置的算法

3. RDD的重用和持久化

3.1 實際開發遇到的狀況說明

如上圖所示的計算邏輯:
(1)當第一次使用rdd2作相應的算子操做獲得rdd3的時候,就會從rdd1開始計算,先讀取HDFS上的文件,而後對rdd1作對應的算子操做獲得rdd2,再由rdd2計算以後獲得rdd3。一樣爲了計算獲得rdd4,前面的邏輯會被從新計算。

(3)默認狀況下屢次對一個rdd執行算子操做,去獲取不一樣的rdd,都會對這個rdd及以前的父rdd所有從新計算一次。
這種狀況在實際開發代碼的時候會常常遇到,可是咱們必定要避免一個rdd重複計算屢次,不然會致使性能急劇下降。

總結:能夠把屢次使用到的rdd,也就是公共rdd進行持久化,避免後續須要,再次從新計算,提高效率。

3.2 如何對rdd進行持久化

  • 能夠調用rdd的cache或者persist方法。
(1)cache方法默認是把數據持久化到內存中 ,例如:rdd.cache ,其本質仍是調用了persist方法
(2)persist方法中有豐富的緩存級別,這些緩存級別都定義在StorageLevel這個object中,能夠結合實際的應用場景合理的設置緩存級別。例如: rdd.persist(StorageLevel.MEMORY_ONLY),這是cache方法的實現。

3.3 rdd持久化的時能夠採用序列化

(1)若是正常將數據持久化在內存中,那麼可能會致使內存的佔用過大,這樣的話,也許會致使OOM內存溢出。
(2)當純內存沒法支撐公共RDD數據徹底存放的時候,就優先考慮使用序列化的方式在純內存中存儲。將RDD的每一個partition的數據,序列化成一個字節數組;序列化後,大大減小內存的空間佔用。
(3)序列化的方式,惟一的缺點就是,在獲取數據的時候,須要反序列化。可是能夠減小佔用的空間和便於網絡傳輸
(4)若是序列化純內存方式,仍是致使OOM,內存溢出;就只能考慮磁盤的方式,內存+磁盤的普通方式(無序列化)。
(5)爲了數據的高可靠性,並且內存充足,可使用雙副本機制,進行持久化
  持久化的雙副本機制,持久化後的一個副本,由於機器宕機了,副本丟了,就仍是得從新計算一次;
  持久化的每一個數據單元,存儲一份副本,放在其餘節點上面,從而進行容錯;
  一個副本丟了,不用從新計算,還可使用另一份副本。這種方式,僅僅針對你的內存資源極度充足。
   好比: StorageLevel.MEMORY_ONLY_2

4. 廣播變量的使用

4.1 場景描述

在實際工做中可能會遇到這樣的狀況,因爲要處理的數據量很是大,這個時候可能會在一個stage中出現大量的task,好比有1000個task,這些task都須要一份相同的數據來處理業務,這份數據的大小爲100M,該數據會拷貝1000份副本,經過網絡傳輸到各個task中去,給task使用。這裏會涉及大量的網絡傳輸開銷,同時至少須要的內存爲1000*100M=100G,這個內存開銷是很是大的。沒必要要的內存的消耗和佔用,就致使了你在進行RDD持久化到內存,也許就無法徹底在內存中放下;就只能寫入磁盤,最後致使後續的操做在磁盤IO上消耗性能;這對於spark任務處理來講就是一場災難。

    因爲內存開銷比較大,task在建立對象的時候,可能會出現堆內存放不下全部對象,就會致使頻繁的垃圾回收器的回收GC。GC的時候必定是會致使工做線程中止,也就是致使Spark暫停工做那麼一點時間。頻繁GC的話,對Spark做業的運行的速度會有至關可觀的影響。

4.2 廣播變量引入

Spark中分佈式執行的代碼須要傳遞到各個executor的task上運行。對於一些只讀、固定的數據,每次都須要Driver廣播到各個Task上,這樣效率低下。廣播變量容許將變量只廣播給各個executor。該executor上的各個task再從所在節點的BlockManager(負責管理某個executor對應的內存和磁盤上的數據)獲取變量,而不是從Driver獲取變量,從而提高了效率。
廣播變量,初始的時候,就在Drvier上有一份副本。經過在Driver把共享數據轉換成廣播變量。

  task在運行的時候,想要使用廣播變量中的數據,此時首先會在本身本地的Executor對應的BlockManager中,嘗試獲取變量副本;若是本地沒有,那麼就從Driver遠程拉取廣播變量副本,並保存在本地的BlockManager中;
  
  此後這個executor上的task,都會直接使用本地的BlockManager中的副本。那麼這個時候全部該executor中的task都會使用這個廣播變量的副本。也就是說一個executor只須要在第一個task啓動時,得到一份廣播變量數據,以後的task都從本節點的BlockManager中獲取相關數據。

  executor的BlockManager除了從driver上拉取,也可能從其餘節點的BlockManager上拉取變量副本,網絡距離越近越好。

4.3 使用廣播變量後的性能分析

好比一個任務須要50個executor,1000個task,共享數據爲100M。
(1)在不使用廣播變量的狀況下,1000個task,就須要該共享數據的1000個副本,也就是說有1000份數須要大量的網絡傳輸和內存開銷存儲。耗費的內存大小1000*100=100G.

(2)使用了廣播變量後,50個executor就只須要50個副本數據,並且不必定都是從Driver傳輸到每一個節點,還多是就近從最近的節點的executor的blockmanager上拉取廣播變量副本,網絡傳輸速度大大增長;內存開銷 50*100M=5G

總結:
  不使用廣播變量的內存開銷爲100G,使用後的內存開銷5G,這裏就相差了20倍左右的網絡傳輸性能損耗和內存開銷,使用廣播變量後對於性能的提高和影響,仍是很可觀的。
  
  廣播變量的使用不必定會對性能產生決定性的做用。好比運行30分鐘的spark做業,可能作了廣播變量之後,速度快了2分鐘,或者5分鐘。可是一點一滴的調優,聚沙成塔。最後仍是會有效果的。

4.4 廣播變量使用注意事項

(1)能不能將一個RDD使用廣播變量廣播出去?

       不能,由於RDD是不存儲數據的。能夠將RDD的結果廣播出去。

(2)廣播變量只能在Driver端定義,不能在Executor端定義。

(3)在Driver端能夠修改廣播變量的值,在Executor端沒法修改廣播變量的值。

(4)若是executor端用到了Driver的變量,若是不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。

(5)若是Executor端用到了Driver的變量,若是使用廣播變量在每一個Executor中只有一份Driver端的變量副本。

4.5 如何使用廣播變量

  • 例如
(1) 經過sparkContext的broadcast方法把數據轉換成廣播變量,類型爲Broadcast,
  val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6))
  
(2) 而後executor上的BlockManager就能夠拉取該廣播變量的副本獲取具體的數據。
    獲取廣播變量中的值能夠經過調用其value方法
   val array: Array[Int] = broadcastArray.value

5. 儘可能避免使用shuffle類算子

5.1 shuffle描述

spark中的shuffle涉及到數據要進行大量的網絡傳輸,下游階段的task任務須要經過網絡拉取上階段task的輸出數據,shuffle過程,簡單來講,就是將分佈在集羣中多個節點上的同一個key,拉取到同一個節點上,進行聚合或join等操做。好比reduceByKey、join等算子,都會觸發shuffle操做。
  
  若是有可能的話,要儘可能避免使用shuffle類算子。
  由於Spark做業運行過程當中,最消耗性能的地方就是shuffle過程。

5.2 哪些算子操做會產生shuffle

spark程序在開發的過程當中使用reduceByKey、join、distinct、repartition等算子操做,這裏都會產生shuffle,因爲shuffle這一塊是很是耗費性能的,實際開發中儘可能使用map類的非shuffle算子。這樣的話,沒有shuffle操做或者僅有較少shuffle操做的Spark做業,能夠大大減小性能開銷。

5.3 如何避免產生shuffle

  • 小案例
//錯誤的作法:
// 傳統的join操做會致使shuffle操做。
// 由於兩個RDD中,相同的key都須要經過網絡拉取到一個節點上,由一個task進行join操做。
val rdd3 = rdd1.join(rdd2)
    
//正確的作法:
// Broadcast+map的join操做,不會致使shuffle操做。
// 使用Broadcast將一個數據量較小的RDD做爲廣播變量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,能夠從rdd2DataBroadcast中,獲取rdd2的全部數據。
// 而後進行遍歷,若是發現rdd2中某條數據的key與rdd1的當前數據的key是相同的,那麼就斷定能夠進行join。
// 此時就能夠根據本身須要的方式,將rdd1當前數據與rdd2中能夠鏈接的數據,拼接在一塊兒(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)

// 注意,以上操做,建議僅僅在rdd2的數據量比較少(好比幾百M,或者一兩G)的狀況下使用。
// 由於每一個Executor的內存中,都會駐留一份rdd2的全量數據。

5.4 使用map-side預聚合的shuffle操做

  • map-side預聚合
若是由於業務須要,必定要使用shuffle操做,沒法用map類的算子來替代,那麼儘可能使用能夠map-side預聚合的算子。

  所謂的map-side預聚合,說的是在每一個節點本地對相同的key進行一次聚合操做,相似於MapReduce中的本地combiner。
  map-side預聚合以後,每一個節點本地就只會有一條相同的key,由於多條相同的key都被聚合起來了。其餘節點在拉取全部節點上的相同key時,就會大大減小須要拉取的數據數量,從而也就減小了磁盤IO以及網絡傳輸開銷。
  一般來講,在可能的狀況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。由於reduceByKey和aggregateByKey算子都會使用用戶自定義的函數對每一個節點本地的相同key進行預聚合。
  而groupByKey算子是不會進行預聚合的,全量的數據會在集羣的各個節點之間分發和傳輸,性能相對來講比較差。
  
  好比以下兩幅圖,就是典型的例子,分別基於reduceByKey和groupByKey進行單詞計數。其中第一張圖是groupByKey的原理圖,能夠看到,沒有進行任何本地聚合時,全部數據都會在集羣節點之間傳輸;第二張圖是reduceByKey的原理圖,能夠看到,每一個節點本地的相同key數據,都進行了預聚合,而後才傳輸到其餘節點上進行全局聚合。
  • groupByKey進行單詞計數原理sql

  • reduceByKey單詞計數原理shell

6. 使用高性能的算子

6.1 使用reduceByKey/aggregateByKey替代groupByKey

  • reduceByKey/aggregateByKey 能夠進行預聚合操做,減小數據的傳輸量,提高性能數據庫

  • groupByKey 不會進行預聚合操做,進行數據的全量拉取,性能比較低apache

6.2 使用mapPartitions替代普通map

mapPartitions類的算子,一次函數調用會處理一個partition全部的數據,而不是一次函數調用處理一條,性能相對來講會高一些。
  可是有的時候,使用mapPartitions會出現OOM(內存溢出)的問題。由於單次函數調用就要處理掉一個partition全部的數據,若是內存不夠,垃圾回收時是沒法回收掉太多對象的,極可能出現OOM異常。因此使用這類操做時要慎重!

6.3 使用foreachPartition替代foreach

原理相似於「使用mapPartitions替代map」,也是一次函數調用處理一個partition的全部數據,而不是一次函數調用處理一條數據。
  在實踐中發現,foreachPartitions類的算子,對性能的提高仍是頗有幫助的。好比在foreach函數中,將RDD中全部數據寫MySQL,那麼若是是普通的foreach算子,就會一條數據一條數據地寫,每次函數調用可能就會建立一個數據庫鏈接,此時就勢必會頻繁地建立和銷燬數據庫鏈接,性能是很是低下;  可是若是用foreachPartitions算子一次性處理一個partition的數據,那麼對於每一個partition,只要建立一個數據庫鏈接便可,而後執行批量插入操做,此時性能是比較高的。實踐中發現,對於1萬條左右的數據量寫MySQL,性能能夠提高30%以上。

6.4 使用filter以後進行coalesce操做

一般對一個RDD執行filter算子過濾掉RDD中較多數據後(好比30%以上的數據),建議使用coalesce算子,手動減小RDD的partition數量,將RDD中的數據壓縮到更少的partition中去。
  由於filter以後,RDD的每一個partition中都會有不少數據被過濾掉,此時若是照常進行後續的計算,其實每一個task處理的partition中的數據量並非不少,有一點資源浪費,並且此時處理的task越多,可能速度反而越慢。
  所以用coalesce減小partition數量,將RDD中的數據壓縮到更少的partition以後,只要使用更少的task便可處理完全部的partition。在某些場景下,對於性能的提高會有必定的幫助。

6.5 使用repartitionAndSortWithinPartitions替代repartition與sort類操做

repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,若是須要在repartition重分區以後,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。
  由於該算子能夠一邊進行重分區的shuffle操做,一邊進行排序。shuffle與sort兩個操做同時進行,比先shuffle再sort來講,性能多是要高的。

7. 使用Kryo優化序列化性能

7.1 spark序列化介紹

Spark在進行任務計算的時候,會涉及到數據跨進程的網絡傳輸、數據的持久化,這個時候就須要對數據進行序列化。Spark默認採用Java的序列化器。默認java序列化的優缺點以下:
其好處:
  處理起來方便,不須要咱們手動作其餘操做,只是在使用一個對象和變量的時候,須要實現Serializble接口。
其缺點:
  默認的序列化機制的效率不高,序列化的速度比較慢;序列化之後的數據,佔用的內存空間相對仍是比較大。

Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化後的數據要更小,大概是Java序列化機制的1/10。因此Kryo序列化優化之後,可讓網絡傳輸的數據變少;在集羣中耗費的內存資源大大減小。

7.2 Kryo序列化啓用後生效的地方

Kryo序列化機制,一旦啓用之後,會生效的幾個地方:
(1)算子函數中使用到的外部變量
  算子中的外部變量可能來着與driver須要涉及到網絡傳輸,就須要用到序列化。
      最終能夠優化網絡傳輸的性能,優化集羣中內存的佔用和消耗
    
(2)持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER
  將rdd持久化時,對應的存儲級別裏,須要用到序列化。
      最終能夠優化內存的佔用和消耗;持久化RDD佔用的內存越少,task執行的時候,建立的對象,就不至於頻繁的佔滿內存,頻繁發生GC。
    
(3)  產生shuffle的地方,也就是寬依賴
  下游的stage中的task,拉取上游stage中的task產生的結果數據,跨網絡傳輸,須要用到序列化。最終能夠優化網絡傳輸的性能

7.3 如何開啓Kryo序列化機制

// 建立SparkConf對象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設置序列化器爲KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

// 註冊要序列化的自定義類型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

8. 使用fastutil優化數據格式

8.1 fastutil介紹

fastutil是擴展了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue;

fastutil可以提供更小的內存佔用,更快的存取速度;咱們使用fastutil提供的集合類,來替代本身平時使用的JDK的原生的Map、List、Set.

8.2 fastutil好處

fastutil集合類,能夠減少內存的佔用,而且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設置元素的值的時候,提供更快的存取速度

8.3 Spark中應用fastutil的場景和使用

8.3.1 算子函數使用了外部變量

(1)你可使用Broadcast廣播變量優化;

(2)可使用Kryo序列化類庫,提高序列化性能和效率;

(3)若是外部變量是某種比較大的集合,那麼能夠考慮使用fastutil改寫外部變量;

首先從源頭上就減小內存的佔用(fastutil),經過廣播變量進一步減小內存佔用,再經過Kryo序列化類庫進一步減小內存佔用。

8.3.2 算子函數裏使用了比較大的集合Map/List

在你的算子函數裏,也就是task要執行的計算邏輯裏面,若是有邏輯中,出現,要建立比較大的Map、List等集合,
可能會佔用較大的內存空間,並且可能涉及到消耗性能的遍歷、存取等集合操做; 
那麼此時,能夠考慮將這些集合類型使用fastutil類庫重寫,

使用了fastutil集合類之後,就能夠在必定程度上,減小task建立出來的集合類型的內存佔用。 
避免executor內存頻繁佔滿,頻繁喚起GC,致使性能降低。

8.3.3 fastutil的使用

第一步:在pom.xml中引用fastutil的包
    <dependency>
      <groupId>fastutil</groupId>
      <artifactId>fastutil</artifactId>
      <version>5.0.9</version>
    </dependency>
    
第二步:平時使用List (Integer)的替換成IntList便可。 
  List<Integer>的list對應的到fastutil就是IntList類型
  
  
使用說明:
基本都是相似於IntList的格式,前綴就是集合的元素類型; 
特殊的就是Map,Int2IntMap,表明了key-value映射的元素類型。

9. 調節數據本地化等待時長

Spark在Driver上對Application的每個stage的task進行分配以前,都會計算出每一個task要計算的是哪一個分片數據,RDD的某個partition;Spark的task分配算法,優先會但願每一個task正好分配到它要計算的數據所在的節點,這樣的話就不用在網絡間傳輸數據;

  可是一般來講,有時事與願違,可能task沒有機會分配到它的數據所在的節點,爲何呢,可能那個節點的計算資源和計算能力都滿了;因此這種時候,一般來講,Spark會等待一段時間,默認狀況下是3秒(不是絕對的,還有不少種狀況,對不一樣的本地化級別,都會去等待),到最後實在是等待不了了,就會選擇一個比較差的本地化級別,好比說將task分配到距離要計算的數據所在節點比較近的一個節點,而後進行計算。

9.1 本地化級別

(1)PROCESS_LOCAL:進程本地化
  代碼和數據在同一個進程中,也就是在同一個executor中;計算數據的task由executor執行,數據在executor的BlockManager中;性能最好
(2)NODE_LOCAL:節點本地化
  代碼和數據在同一個節點中;好比說數據做爲一個HDFS block塊,就在節點上,而task在節點上某個executor中運行;或者是數據和task在一個節點上的不一樣executor中;數據須要在進程間進行傳輸;性能其次
(3)RACK_LOCAL:機架本地化  
  數據和task在一個機架的兩個節點上;數據須要經過網絡在節點之間進行傳輸; 性能比較差
(4)  ANY:無限制
  數據和task可能在集羣中的任何地方,並且不在一個機架中;性能最差

9.2 數據本地化等待時長

spark.locality.wait,默認是3s
首先採用最佳的方式,等待3s後降級,仍是不行,繼續降級...,最後仍是不行,只可以採用最差的。

9.3 如何調節參數而且測試

修改spark.locality.wait參數,默認是3s,能夠增長

下面是每一個數據本地化級別的等待時間,默認都是跟spark.locality.wait時間相同,
默認都是3s(可查看spark官網對應參數說明,以下圖所示)
spark.locality.wait.node
spark.locality.wait.process
spark.locality.wait.rack
在代碼中設置:
new SparkConf().set("spark.locality.wait","10")

而後把程序提交到spark集羣中運行,注意觀察日誌,spark做業的運行日誌,推薦你們在測試的時候,先用client模式,在本地就直接能夠看到比較全的日誌。 
日誌裏面會顯示,starting task .... PROCESS LOCAL、NODE LOCAL.....
例如:
Starting task 0.0 in stage 1.0 (TID 2, 192.168.200.102, partition 0, NODE_LOCAL, 5254 bytes)

觀察大部分task的數據本地化級別 
若是大多都是PROCESS_LOCAL,那就不用調節了。若是是發現,好多的級別都是NODE_LOCAL、ANY,那麼最好就去調節一下數據本地化的等待時長。應該是要反覆調節,每次調節完之後,再來運行,觀察日誌 
看看大部分的task的本地化級別有沒有提高;看看整個spark做業的運行時間有沒有縮短。

注意注意:
在調節參數、運行任務的時候,別本末倒置,本地化級別卻是提高了, 可是由於大量的等待時長,spark做業的運行時間反而增長了,那就仍是不要調節了。

10. 基於Spark內存模型調優

10.1 spark中executor內存劃分

  • Executor的內存主要分爲三塊數組

    • 第一塊是讓task執行咱們本身編寫的代碼時使用;緩存

    • 第二塊是讓task經過shuffle過程拉取了上一個stage的task的輸出後,進行聚合等操做時使用

    • 第三塊是讓RDD緩存時使用

10.2 spark的內存模型

在spark1.6版本之前 spark的executor使用的靜態內存模型,可是在spark1.6開始,多增長了一個統一內存模型。
  經過spark.memory.useLegacyMode 這個參數去配置
      默認這個值是false,表明用的是新的動態內存模型;
      若是想用之前的靜態內存模型,那麼就要把這個值改成true。

10.2.1 靜態內存模型

實際上就是把咱們的一個executor分紅了三部分,
  一部分是Storage內存區域,
  一部分是execution區域,
  還有一部分是其餘區域。若是使用的靜態內存模型,那麼用這幾個參數去控制:
  
spark.storage.memoryFraction:默認0.6
spark.shuffle.memoryFraction:默認0.2  
因此第三部分就是0.2

若是咱們cache數據量比較大,或者是咱們的廣播變量比較大,
  那咱們就把spark.storage.memoryFraction這個值調大一點。
  可是若是咱們代碼裏面沒有廣播變量,也沒有cache,shuffle又比較多,那咱們要把spark.shuffle.memoryFraction 這值調大。
  • 靜態內存模型的缺點
咱們配置好了Storage內存區域和execution區域後,咱們的一個任務假設execution內存不夠用了,可是它的Storage內存區域是空閒的,兩個之間不能互相借用,不夠靈活,因此纔出來咱們新的統一內存模型。

10.2.2 統一內存模型

動態內存模型先是預留了300m內存,防止內存溢出。動態內存模型把總體內存分紅了兩部分,
由這個參數表示spark.memory.fraction 這個指的默認值是0.6 表明另外的一部分是0.4,

而後spark.memory.fraction 這部分又劃分紅爲兩個小部分。這兩小部分共佔總體內存的0.6 .這兩部分其實就是:Storage內存和execution內存。由spark.memory.storageFraction 這個參數去調配,由於兩個共佔0.6。若是spark.memory.storageFraction這個值配的是0.5,那說明這0.6裏面 storage佔了0.5,也就是executor佔了0.3 。
  • 統一內存模型有什麼特色呢?
Storage內存和execution內存 能夠相互借用。不用像靜態內存模型那樣死板,可是是有規則的
爲何受傷的都是storage呢?

是由於execution裏面的數據是立刻就要用的,而storage裏的數據不必定立刻就要用。

10.2.3 任務提交腳本參考

  • 如下是一份spark-submit命令的示例,你們能夠參考一下,並根據本身的實際狀況進行調節
bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

10.2.4 我的經驗

java.lang.OutOfMemoryError
ExecutorLostFailure
Executor exit code 爲143
executor lost
hearbeat time out
shuffle file lost

若是遇到以上問題,頗有可能就是內存除了問題,能夠先嚐試增長內存。若是仍是解決不了,那麼請聽下一次數據傾斜調優的課。

吳邪,小三爺,混跡於後臺,大數據,人工智能領域的小菜鳥。
更多請關注
file

相關文章
相關標籤/搜索