spark調優常見手段,在生產中經常會遇到各類各樣的問題,有事前緣由,有事中緣由,也有不規範緣由,spark調優總結下來能夠從下面幾個點來調優。html
分配更多的資源: 它是性能優化調優的王道,就是增長和分配更多的資源,這對於性能和速度上的提高是顯而易見的, 基本上,在必定範圍以內,增長資源與性能的提高,是成正比的;寫完了一個複雜的spark做業以後,進行性能調優的時候,首先第一步,就是要來調節最優的資源配置; 在這個基礎之上,若是說你的spark做業,可以分配的資源達到了你的能力範圍的頂端以後,沒法再分配更多的資源了,公司資源有限;那麼纔是考慮去作後面的這些性能調優的點。 相關問題: (1)分配哪些資源? (2)在哪裏能夠設置這些資源? (3)剖析爲何分配這些資源以後,性能能夠獲得提高?
executor-memory、executor-cores、driver-memory
1.2 在哪裏能夠設置這些資源java
在實際的生產環境中,提交spark任務時,使用spark-submit shell腳本,在裏面調整對應的參數。 提交任務的腳本: spark-submit \ --master spark://node1:7077 \ --class com.hoult.WordCount \ --num-executors 3 \ 配置executor的數量 --driver-memory 1g \ 配置driver的內存(影響不大) --executor-memory 1g \ 配置每個executor的內存大小 --executor-cores 3 \ 配置每個executor的cpu個數 /export/servers/wordcount.jar
先計算出公司spark集羣上的全部資源 每臺節點的內存大小和cpu核數, 好比:一共有20臺worker節點,每臺節點8g內存,10個cpu。 實際任務在給定資源的時候,能夠給20個executor、每一個executor的內存8g、每一個executor的使用的cpu個數10。
先計算出yarn集羣的全部大小,好比一共500g內存,100個cpu; 這個時候能夠分配的最大資源,好比給定50個executor、每一個executor的內存大小10g,每一個executor使用的cpu個數爲2。
在資源比較充足的狀況下,儘量的使用更多的計算資源,儘可能去調節到最大的大小
--executor-memory --total-executor-cores
spark做業中,各個stage的task的數量,也就表明了spark做業在各個階段stage的並行度! 當分配完所能分配的最大資源了,而後對應資源去調節程序的並行度,若是並行度沒有與資源相匹配,那麼致使你分配下去的資源都浪費掉了。同時並行運行,還可讓每一個task要處理的數量變少(很簡單的原理。合理設置並行度,能夠充分利用集羣資源,減小每一個task處理數據量,而增長性能加快運行速度。)
至少設置成與spark Application 的總cpu core 數量相同。 最理想狀況,150個core,分配150task,一塊兒運行,差很少同一時間運行完畢 官方推薦,task數量,設置成spark Application 總cpu core數量的2~3倍 。 好比150個cpu core ,基本設置task數量爲300~500. 與理想狀況不一樣的,有些task會運行快一點,好比50s就完了,有些task 可能會慢一點,要一分半才運行完,因此若是你的task數量,恰好設置的跟cpu core 數量相同,可能會致使資源的浪費。 由於好比150個task中10個先運行完了,剩餘140個還在運行,可是這個時候,就有10個cpu core空閒出來了,致使浪費。若是設置2~3倍,那麼一個task運行完之後,另一個task立刻補上來,儘可能讓cpu core不要空閒。同時儘可能提高spark運行效率和速度。提高性能。
設置參數spark.default.parallelism 默認是沒有值的,若是設置了值爲10,它會在shuffle的過程纔會起做用。 好比 val rdd2 = rdd1.reduceByKey(_+_) 此時rdd2的分區數就是10 能夠經過在構建SparkConf對象的時候設置,例如: new SparkConf().set("spark.defalut.parallelism","500")
使用rdd.repartition 來從新分區,該方法會生成一個新的rdd,使其分區數變大。 此時因爲一個partition對應一個task,那麼對應的task個數越多,經過這種方式也能夠提升並行度。
http://spark.apache.org/docs/2.3.3/sql-programming-guide.htmlnode
經過設置參數 spark.sql.shuffle.partitions=500 默認爲200; 能夠適當增大,來提升並行度。 好比設置爲 spark.sql.shuffle.partitions=500
專門針對sparkSQL來設置的算法
如上圖所示的計算邏輯: (1)當第一次使用rdd2作相應的算子操做獲得rdd3的時候,就會從rdd1開始計算,先讀取HDFS上的文件,而後對rdd1作對應的算子操做獲得rdd2,再由rdd2計算以後獲得rdd3。一樣爲了計算獲得rdd4,前面的邏輯會被從新計算。 (3)默認狀況下屢次對一個rdd執行算子操做,去獲取不一樣的rdd,都會對這個rdd及以前的父rdd所有從新計算一次。 這種狀況在實際開發代碼的時候會常常遇到,可是咱們必定要避免一個rdd重複計算屢次,不然會致使性能急劇下降。 總結:能夠把屢次使用到的rdd,也就是公共rdd進行持久化,避免後續須要,再次從新計算,提高效率。
(1)cache方法默認是把數據持久化到內存中 ,例如:rdd.cache ,其本質仍是調用了persist方法 (2)persist方法中有豐富的緩存級別,這些緩存級別都定義在StorageLevel這個object中,能夠結合實際的應用場景合理的設置緩存級別。例如: rdd.persist(StorageLevel.MEMORY_ONLY),這是cache方法的實現。
(1)若是正常將數據持久化在內存中,那麼可能會致使內存的佔用過大,這樣的話,也許會致使OOM內存溢出。 (2)當純內存沒法支撐公共RDD數據徹底存放的時候,就優先考慮使用序列化的方式在純內存中存儲。將RDD的每一個partition的數據,序列化成一個字節數組;序列化後,大大減小內存的空間佔用。 (3)序列化的方式,惟一的缺點就是,在獲取數據的時候,須要反序列化。可是能夠減小佔用的空間和便於網絡傳輸 (4)若是序列化純內存方式,仍是致使OOM,內存溢出;就只能考慮磁盤的方式,內存+磁盤的普通方式(無序列化)。 (5)爲了數據的高可靠性,並且內存充足,可使用雙副本機制,進行持久化 持久化的雙副本機制,持久化後的一個副本,由於機器宕機了,副本丟了,就仍是得從新計算一次; 持久化的每一個數據單元,存儲一份副本,放在其餘節點上面,從而進行容錯; 一個副本丟了,不用從新計算,還可使用另一份副本。這種方式,僅僅針對你的內存資源極度充足。 好比: StorageLevel.MEMORY_ONLY_2
在實際工做中可能會遇到這樣的狀況,因爲要處理的數據量很是大,這個時候可能會在一個stage中出現大量的task,好比有1000個task,這些task都須要一份相同的數據來處理業務,這份數據的大小爲100M,該數據會拷貝1000份副本,經過網絡傳輸到各個task中去,給task使用。這裏會涉及大量的網絡傳輸開銷,同時至少須要的內存爲1000*100M=100G,這個內存開銷是很是大的。沒必要要的內存的消耗和佔用,就致使了你在進行RDD持久化到內存,也許就無法徹底在內存中放下;就只能寫入磁盤,最後致使後續的操做在磁盤IO上消耗性能;這對於spark任務處理來講就是一場災難。 因爲內存開銷比較大,task在建立對象的時候,可能會出現堆內存放不下全部對象,就會致使頻繁的垃圾回收器的回收GC。GC的時候必定是會致使工做線程中止,也就是致使Spark暫停工做那麼一點時間。頻繁GC的話,對Spark做業的運行的速度會有至關可觀的影響。
Spark中分佈式執行的代碼須要傳遞到各個executor的task上運行。對於一些只讀、固定的數據,每次都須要Driver廣播到各個Task上,這樣效率低下。廣播變量容許將變量只廣播給各個executor。該executor上的各個task再從所在節點的BlockManager(負責管理某個executor對應的內存和磁盤上的數據)獲取變量,而不是從Driver獲取變量,從而提高了效率。
廣播變量,初始的時候,就在Drvier上有一份副本。經過在Driver把共享數據轉換成廣播變量。 task在運行的時候,想要使用廣播變量中的數據,此時首先會在本身本地的Executor對應的BlockManager中,嘗試獲取變量副本;若是本地沒有,那麼就從Driver遠程拉取廣播變量副本,並保存在本地的BlockManager中; 此後這個executor上的task,都會直接使用本地的BlockManager中的副本。那麼這個時候全部該executor中的task都會使用這個廣播變量的副本。也就是說一個executor只須要在第一個task啓動時,得到一份廣播變量數據,以後的task都從本節點的BlockManager中獲取相關數據。 executor的BlockManager除了從driver上拉取,也可能從其餘節點的BlockManager上拉取變量副本,網絡距離越近越好。
好比一個任務須要50個executor,1000個task,共享數據爲100M。 (1)在不使用廣播變量的狀況下,1000個task,就須要該共享數據的1000個副本,也就是說有1000份數須要大量的網絡傳輸和內存開銷存儲。耗費的內存大小1000*100=100G. (2)使用了廣播變量後,50個executor就只須要50個副本數據,並且不必定都是從Driver傳輸到每一個節點,還多是就近從最近的節點的executor的blockmanager上拉取廣播變量副本,網絡傳輸速度大大增長;內存開銷 50*100M=5G 總結: 不使用廣播變量的內存開銷爲100G,使用後的內存開銷5G,這裏就相差了20倍左右的網絡傳輸性能損耗和內存開銷,使用廣播變量後對於性能的提高和影響,仍是很可觀的。 廣播變量的使用不必定會對性能產生決定性的做用。好比運行30分鐘的spark做業,可能作了廣播變量之後,速度快了2分鐘,或者5分鐘。可是一點一滴的調優,聚沙成塔。最後仍是會有效果的。
(1)能不能將一個RDD使用廣播變量廣播出去? 不能,由於RDD是不存儲數據的。能夠將RDD的結果廣播出去。 (2)廣播變量只能在Driver端定義,不能在Executor端定義。 (3)在Driver端能夠修改廣播變量的值,在Executor端沒法修改廣播變量的值。 (4)若是executor端用到了Driver的變量,若是不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。 (5)若是Executor端用到了Driver的變量,若是使用廣播變量在每一個Executor中只有一份Driver端的變量副本。
(1) 經過sparkContext的broadcast方法把數據轉換成廣播變量,類型爲Broadcast, val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6)) (2) 而後executor上的BlockManager就能夠拉取該廣播變量的副本獲取具體的數據。 獲取廣播變量中的值能夠經過調用其value方法 val array: Array[Int] = broadcastArray.value
spark中的shuffle涉及到數據要進行大量的網絡傳輸,下游階段的task任務須要經過網絡拉取上階段task的輸出數據,shuffle過程,簡單來講,就是將分佈在集羣中多個節點上的同一個key,拉取到同一個節點上,進行聚合或join等操做。好比reduceByKey、join等算子,都會觸發shuffle操做。 若是有可能的話,要儘可能避免使用shuffle類算子。 由於Spark做業運行過程當中,最消耗性能的地方就是shuffle過程。
spark程序在開發的過程當中使用reduceByKey、join、distinct、repartition等算子操做,這裏都會產生shuffle,因爲shuffle這一塊是很是耗費性能的,實際開發中儘可能使用map類的非shuffle算子。這樣的話,沒有shuffle操做或者僅有較少shuffle操做的Spark做業,能夠大大減小性能開銷。
//錯誤的作法: // 傳統的join操做會致使shuffle操做。 // 由於兩個RDD中,相同的key都須要經過網絡拉取到一個節點上,由一個task進行join操做。 val rdd3 = rdd1.join(rdd2) //正確的作法: // Broadcast+map的join操做,不會致使shuffle操做。 // 使用Broadcast將一個數據量較小的RDD做爲廣播變量。 val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data) // 在rdd1.map算子中,能夠從rdd2DataBroadcast中,獲取rdd2的全部數據。 // 而後進行遍歷,若是發現rdd2中某條數據的key與rdd1的當前數據的key是相同的,那麼就斷定能夠進行join。 // 此時就能夠根據本身須要的方式,將rdd1當前數據與rdd2中能夠鏈接的數據,拼接在一塊兒(String或Tuple)。 val rdd3 = rdd1.map(rdd2DataBroadcast...) // 注意,以上操做,建議僅僅在rdd2的數據量比較少(好比幾百M,或者一兩G)的狀況下使用。 // 由於每一個Executor的內存中,都會駐留一份rdd2的全量數據。
若是由於業務須要,必定要使用shuffle操做,沒法用map類的算子來替代,那麼儘可能使用能夠map-side預聚合的算子。 所謂的map-side預聚合,說的是在每一個節點本地對相同的key進行一次聚合操做,相似於MapReduce中的本地combiner。 map-side預聚合以後,每一個節點本地就只會有一條相同的key,由於多條相同的key都被聚合起來了。其餘節點在拉取全部節點上的相同key時,就會大大減小須要拉取的數據數量,從而也就減小了磁盤IO以及網絡傳輸開銷。 一般來講,在可能的狀況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。由於reduceByKey和aggregateByKey算子都會使用用戶自定義的函數對每一個節點本地的相同key進行預聚合。 而groupByKey算子是不會進行預聚合的,全量的數據會在集羣的各個節點之間分發和傳輸,性能相對來講比較差。 好比以下兩幅圖,就是典型的例子,分別基於reduceByKey和groupByKey進行單詞計數。其中第一張圖是groupByKey的原理圖,能夠看到,沒有進行任何本地聚合時,全部數據都會在集羣節點之間傳輸;第二張圖是reduceByKey的原理圖,能夠看到,每一個節點本地的相同key數據,都進行了預聚合,而後才傳輸到其餘節點上進行全局聚合。
groupByKey進行單詞計數原理sql
reduceByKey單詞計數原理shell
reduceByKey/aggregateByKey 能夠進行預聚合操做,減小數據的傳輸量,提高性能數據庫
groupByKey 不會進行預聚合操做,進行數據的全量拉取,性能比較低apache
mapPartitions類的算子,一次函數調用會處理一個partition全部的數據,而不是一次函數調用處理一條,性能相對來講會高一些。 可是有的時候,使用mapPartitions會出現OOM(內存溢出)的問題。由於單次函數調用就要處理掉一個partition全部的數據,若是內存不夠,垃圾回收時是沒法回收掉太多對象的,極可能出現OOM異常。因此使用這類操做時要慎重!
原理相似於「使用mapPartitions替代map」,也是一次函數調用處理一個partition的全部數據,而不是一次函數調用處理一條數據。 在實踐中發現,foreachPartitions類的算子,對性能的提高仍是頗有幫助的。好比在foreach函數中,將RDD中全部數據寫MySQL,那麼若是是普通的foreach算子,就會一條數據一條數據地寫,每次函數調用可能就會建立一個數據庫鏈接,此時就勢必會頻繁地建立和銷燬數據庫鏈接,性能是很是低下; 可是若是用foreachPartitions算子一次性處理一個partition的數據,那麼對於每一個partition,只要建立一個數據庫鏈接便可,而後執行批量插入操做,此時性能是比較高的。實踐中發現,對於1萬條左右的數據量寫MySQL,性能能夠提高30%以上。
一般對一個RDD執行filter算子過濾掉RDD中較多數據後(好比30%以上的數據),建議使用coalesce算子,手動減小RDD的partition數量,將RDD中的數據壓縮到更少的partition中去。 由於filter以後,RDD的每一個partition中都會有不少數據被過濾掉,此時若是照常進行後續的計算,其實每一個task處理的partition中的數據量並非不少,有一點資源浪費,並且此時處理的task越多,可能速度反而越慢。 所以用coalesce減小partition數量,將RDD中的數據壓縮到更少的partition以後,只要使用更少的task便可處理完全部的partition。在某些場景下,對於性能的提高會有必定的幫助。
repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,若是須要在repartition重分區以後,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。 由於該算子能夠一邊進行重分區的shuffle操做,一邊進行排序。shuffle與sort兩個操做同時進行,比先shuffle再sort來講,性能多是要高的。
Spark在進行任務計算的時候,會涉及到數據跨進程的網絡傳輸、數據的持久化,這個時候就須要對數據進行序列化。Spark默認採用Java的序列化器。默認java序列化的優缺點以下: 其好處: 處理起來方便,不須要咱們手動作其餘操做,只是在使用一個對象和變量的時候,須要實現Serializble接口。 其缺點: 默認的序列化機制的效率不高,序列化的速度比較慢;序列化之後的數據,佔用的內存空間相對仍是比較大。 Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化後的數據要更小,大概是Java序列化機制的1/10。因此Kryo序列化優化之後,可讓網絡傳輸的數據變少;在集羣中耗費的內存資源大大減小。
Kryo序列化機制,一旦啓用之後,會生效的幾個地方: (1)算子函數中使用到的外部變量 算子中的外部變量可能來着與driver須要涉及到網絡傳輸,就須要用到序列化。 最終能夠優化網絡傳輸的性能,優化集羣中內存的佔用和消耗 (2)持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER 將rdd持久化時,對應的存儲級別裏,須要用到序列化。 最終能夠優化內存的佔用和消耗;持久化RDD佔用的內存越少,task執行的時候,建立的對象,就不至於頻繁的佔滿內存,頻繁發生GC。 (3) 產生shuffle的地方,也就是寬依賴 下游的stage中的task,拉取上游stage中的task產生的結果數據,跨網絡傳輸,須要用到序列化。最終能夠優化網絡傳輸的性能
// 建立SparkConf對象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 設置序列化器爲KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 註冊要序列化的自定義類型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
fastutil是擴展了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue; fastutil可以提供更小的內存佔用,更快的存取速度;咱們使用fastutil提供的集合類,來替代本身平時使用的JDK的原生的Map、List、Set.
fastutil集合類,能夠減少內存的佔用,而且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設置元素的值的時候,提供更快的存取速度
(1)你可使用Broadcast廣播變量優化; (2)可使用Kryo序列化類庫,提高序列化性能和效率; (3)若是外部變量是某種比較大的集合,那麼能夠考慮使用fastutil改寫外部變量; 首先從源頭上就減小內存的佔用(fastutil),經過廣播變量進一步減小內存佔用,再經過Kryo序列化類庫進一步減小內存佔用。
在你的算子函數裏,也就是task要執行的計算邏輯裏面,若是有邏輯中,出現,要建立比較大的Map、List等集合, 可能會佔用較大的內存空間,並且可能涉及到消耗性能的遍歷、存取等集合操做; 那麼此時,能夠考慮將這些集合類型使用fastutil類庫重寫, 使用了fastutil集合類之後,就能夠在必定程度上,減小task建立出來的集合類型的內存佔用。 避免executor內存頻繁佔滿,頻繁喚起GC,致使性能降低。
第一步:在pom.xml中引用fastutil的包 <dependency> <groupId>fastutil</groupId> <artifactId>fastutil</artifactId> <version>5.0.9</version> </dependency> 第二步:平時使用List (Integer)的替換成IntList便可。 List<Integer>的list對應的到fastutil就是IntList類型 使用說明: 基本都是相似於IntList的格式,前綴就是集合的元素類型; 特殊的就是Map,Int2IntMap,表明了key-value映射的元素類型。
Spark在Driver上對Application的每個stage的task進行分配以前,都會計算出每一個task要計算的是哪一個分片數據,RDD的某個partition;Spark的task分配算法,優先會但願每一個task正好分配到它要計算的數據所在的節點,這樣的話就不用在網絡間傳輸數據; 可是一般來講,有時事與願違,可能task沒有機會分配到它的數據所在的節點,爲何呢,可能那個節點的計算資源和計算能力都滿了;因此這種時候,一般來講,Spark會等待一段時間,默認狀況下是3秒(不是絕對的,還有不少種狀況,對不一樣的本地化級別,都會去等待),到最後實在是等待不了了,就會選擇一個比較差的本地化級別,好比說將task分配到距離要計算的數據所在節點比較近的一個節點,而後進行計算。
(1)PROCESS_LOCAL:進程本地化 代碼和數據在同一個進程中,也就是在同一個executor中;計算數據的task由executor執行,數據在executor的BlockManager中;性能最好 (2)NODE_LOCAL:節點本地化 代碼和數據在同一個節點中;好比說數據做爲一個HDFS block塊,就在節點上,而task在節點上某個executor中運行;或者是數據和task在一個節點上的不一樣executor中;數據須要在進程間進行傳輸;性能其次 (3)RACK_LOCAL:機架本地化 數據和task在一個機架的兩個節點上;數據須要經過網絡在節點之間進行傳輸; 性能比較差 (4) ANY:無限制 數據和task可能在集羣中的任何地方,並且不在一個機架中;性能最差
spark.locality.wait,默認是3s 首先採用最佳的方式,等待3s後降級,仍是不行,繼續降級...,最後仍是不行,只可以採用最差的。
修改spark.locality.wait參數,默認是3s,能夠增長 下面是每一個數據本地化級別的等待時間,默認都是跟spark.locality.wait時間相同, 默認都是3s(可查看spark官網對應參數說明,以下圖所示) spark.locality.wait.node spark.locality.wait.process spark.locality.wait.rack
在代碼中設置: new SparkConf().set("spark.locality.wait","10") 而後把程序提交到spark集羣中運行,注意觀察日誌,spark做業的運行日誌,推薦你們在測試的時候,先用client模式,在本地就直接能夠看到比較全的日誌。 日誌裏面會顯示,starting task .... PROCESS LOCAL、NODE LOCAL..... 例如: Starting task 0.0 in stage 1.0 (TID 2, 192.168.200.102, partition 0, NODE_LOCAL, 5254 bytes) 觀察大部分task的數據本地化級別 若是大多都是PROCESS_LOCAL,那就不用調節了。若是是發現,好多的級別都是NODE_LOCAL、ANY,那麼最好就去調節一下數據本地化的等待時長。應該是要反覆調節,每次調節完之後,再來運行,觀察日誌 看看大部分的task的本地化級別有沒有提高;看看整個spark做業的運行時間有沒有縮短。 注意注意: 在調節參數、運行任務的時候,別本末倒置,本地化級別卻是提高了, 可是由於大量的等待時長,spark做業的運行時間反而增長了,那就仍是不要調節了。
Executor的內存主要分爲三塊數組
第一塊是讓task執行咱們本身編寫的代碼時使用;緩存
第二塊是讓task經過shuffle過程拉取了上一個stage的task的輸出後,進行聚合等操做時使用
第三塊是讓RDD緩存時使用
在spark1.6版本之前 spark的executor使用的靜態內存模型,可是在spark1.6開始,多增長了一個統一內存模型。 經過spark.memory.useLegacyMode 這個參數去配置 默認這個值是false,表明用的是新的動態內存模型; 若是想用之前的靜態內存模型,那麼就要把這個值改成true。
實際上就是把咱們的一個executor分紅了三部分, 一部分是Storage內存區域, 一部分是execution區域, 還有一部分是其餘區域。若是使用的靜態內存模型,那麼用這幾個參數去控制: spark.storage.memoryFraction:默認0.6 spark.shuffle.memoryFraction:默認0.2 因此第三部分就是0.2 若是咱們cache數據量比較大,或者是咱們的廣播變量比較大, 那咱們就把spark.storage.memoryFraction這個值調大一點。 可是若是咱們代碼裏面沒有廣播變量,也沒有cache,shuffle又比較多,那咱們要把spark.shuffle.memoryFraction 這值調大。
咱們配置好了Storage內存區域和execution區域後,咱們的一個任務假設execution內存不夠用了,可是它的Storage內存區域是空閒的,兩個之間不能互相借用,不夠靈活,因此纔出來咱們新的統一內存模型。
動態內存模型先是預留了300m內存,防止內存溢出。動態內存模型把總體內存分紅了兩部分, 由這個參數表示spark.memory.fraction 這個指的默認值是0.6 表明另外的一部分是0.4, 而後spark.memory.fraction 這部分又劃分紅爲兩個小部分。這兩小部分共佔總體內存的0.6 .這兩部分其實就是:Storage內存和execution內存。由spark.memory.storageFraction 這個參數去調配,由於兩個共佔0.6。若是spark.memory.storageFraction這個值配的是0.5,那說明這0.6裏面 storage佔了0.5,也就是executor佔了0.3 。
Storage內存和execution內存 能夠相互借用。不用像靜態內存模型那樣死板,可是是有規則的
爲何受傷的都是storage呢? 是由於execution裏面的數據是立刻就要用的,而storage裏的數據不必定立刻就要用。
bin/spark-submit \ --master yarn-cluster \ --num-executors 100 \ --executor-memory 6G \ --executor-cores 4 \ --driver-memory 1G \ --conf spark.default.parallelism=1000 \ --conf spark.storage.memoryFraction=0.5 \ --conf spark.shuffle.memoryFraction=0.3 \
java.lang.OutOfMemoryError ExecutorLostFailure Executor exit code 爲143 executor lost hearbeat time out shuffle file lost 若是遇到以上問題,頗有可能就是內存除了問題,能夠先嚐試增長內存。若是仍是解決不了,那麼請聽下一次數據傾斜調優的課。
吳邪,小三爺,混跡於後臺,大數據,人工智能領域的小菜鳥。
更多請關注