Spark相關問題java
1) Spark的計算結果能夠放入內存,支持基於內存的迭代,MR不支持。node
2) Spark有DAG有向無環圖,能夠實現pipeline的計算模式。mysql
3) 資源調度模式:Spark粗粒度資源調度,MR是細粒度資源調度。算法
資源複用:Spark中的task能夠複用同一批Executor的資源。sql
MR裏面每個map task對應一個jvm,不能複用資源。數據庫
Driver進程:負責任務的分發和結果的回收。apache
Executor進程:負責具體任務的執行。數組
Master進程:Spark資源管理的主進程,負責資源調度。緩存
Worker進程:Spark資源管理的從進程,woker節點主要運行Executor安全
1) .搭建Spark集羣的時候要給Spark集羣足夠的資源(core,memory)
在spark安裝包的conf下spark-env.sh
SPARK_WORKER_CORES
SPARK_WORKER_MEMORY
SPARK_WORKER_INSTANCE
2) .在提交Application的時候給Application分配更多的資源。
提交命令選項:(在提交Application的時候使用選項)
--executor-cores
--executor-memory
--total-executor-cores
配置信息:(在Application的代碼中設置
在Spark-default.conf中設置)
spark.executor.cores
spark.executor.memory
spark.max.cores
原則:一個core通常分配2~3個task,每個task通常處理1G數據(task的複雜度相似wc)
提升並行度的方式:
1) .若是讀取的數據在HDFS上,下降block塊的大小
2) .sc.textFile(path,numPartitions)
3) sc.parallelize(list,numPartitions) 通常用於測試
4) coalesce、repartition能夠提升RDD的分區數。
5) 配置信息:
spark.default.parallelism not set (默認executor core的總個數)
spark.sql.shuffle.partitions 200
6) 自定義分區器
如何選擇一種最合適的持久化策略?
默認狀況下,性能最高的固然是MEMORY_ONLY,但前提是你的內存必須足夠足夠大,能夠綽綽有餘地存放下整個RDD的全部數據。由於不進行序列化與反序列化操做,就避免了這部分的性能開銷;對這個RDD的後續算子操做,都是基於純內存中的數據的操做,不須要從磁盤文件中讀取數據,性能也很高;並且不須要複製一份數據副本,並遠程傳送到其餘節點上。可是這裏必需要注意的是,在實際的生產環境中,恐怕可以直接用這種策略的場景仍是有限的,若是RDD中數據比較多時(好比幾十億),直接用這種持久化級別,會致使JVM的OOM內存溢出異常。
若是使用MEMORY_ONLY級別時發生了內存溢出,那麼建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD數據序列化後再保存在內存中,此時每一個partition僅僅是一個字節數組而已,大大減小了對象數量,並下降了內存佔用。這種級別比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。可是後續算子能夠基於純內存進行操做,所以性能整體仍是比較高的。此外,可能發生的問題同上,若是RDD中的數據量過多的話,仍是可能會致使OOM內存溢出的異常。
若是純內存的級別都沒法使用,那麼建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。由於既然到了這一步,就說明RDD的數據量很大,內存沒法徹底放下。序列化後的數據比較少,能夠節省內存和磁盤的空間開銷。同時該策略會優先儘可能嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。
一般不建議使用DISK_ONLY和後綴爲_2的級別:由於徹底基於磁盤文件進行數據的讀寫,會致使性能急劇下降,有時還不如從新計算一次全部RDD。後綴爲_2的級別,必須將全部數據都複製一份副本,併發送到其餘節點上,數據複製以及網絡傳輸會致使較大的性能開銷,除非是要求做業的高可用性,不然不建議使用。
持久化算子:
cache:
MEMORY_ONLY
persist:
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
通常不要選擇帶有_2的持久化級別。
checkpoint:
① 若是一個RDD的計算時間比較長或者計算起來比較複雜,通常將這個RDD的計算結果保存到HDFS上,這樣數據會更加安全。
② 若是一個RDD的依賴關係很是長,也會使用checkpoint,會切斷依賴關係,提升容錯的效率。
使用廣播變量來模擬使用join,使用狀況:一個RDD比較大,一個RDD比較小。
join算子=廣播變量+filter、廣播變量+map、廣播變量+flatMap
即儘可能使用有combiner的shuffle類算子。
combiner概念:
在map端,每個map task計算完畢後進行的局部聚合。
combiner好處:
1) 下降shuffle write寫磁盤的數據量。
2) 下降shuffle read拉取數據量的大小。
3) 下降reduce端聚合的次數。
有combiner的shuffle類算子:
1) reduceByKey:這個算子在map端是有combiner的,在一些場景中可使用reduceByKey代替groupByKey。
2) aggregateByKey(fun1,func2)
使用reduceByKey替代groupByKey
使用mapPartition替代map
使用foreachPartition替代foreach
filter後使用coalesce減小分區數
使用使用repartitionAndSortWithinPartitions替代repartition與sort類操做
使用repartition和coalesce算子操做分區。
開發過程當中,會遇到須要在算子函數中使用外部變量的場景(尤爲是大變量,好比100M以上的大集合),那麼此時就應該使用Spark的廣播(Broadcast)功能來提高性能,函數中使用到外部變量時,默認狀況下,Spark會將該變量複製多個副本,經過網絡傳輸到task中,此時每一個task都有一個變量副本。若是變量自己比較大的話(好比100M,甚至1G),那麼大量的變量副本在網絡中傳輸的性能開銷,以及在各個節點的Executor中佔用過多內存致使的頻繁GC,都會極大地影響性能。若是使用的外部變量比較大,建議使用Spark的廣播功能,對該變量進行廣播。廣播後的變量,會保證每一個Executor的內存中,只駐留一份變量副本,而Executor中的task執行時共享該Executor中的那份變量副本。這樣的話,能夠大大減小變量副本的數量,從而減小網絡傳輸的性能開銷,並減小對Executor內存的佔用開銷,下降GC的頻率。
廣播大變量發送方式:Executor一開始並無廣播變量,而是task運行須要用到廣播變量,會找executor的blockManager要,bloackManager找Driver裏面的blockManagerMaster要。
使用廣播變量能夠大大下降集羣中變量的副本數。不使用廣播變量,變量的副本數和task數一致。使用廣播變量變量的副本和Executor數一致。
使用廣播變量能夠大大的下降集羣中變量的副本數。
不使用廣播變量:變量的副本數和task數一致。
使用廣播變量:變量的副本數與Executor數一致。
廣播變量最大能夠是多大?
ExecutorMemory*60%*90%*80% = executorMemory *0.42
在Spark中,主要有三個地方涉及到了序列化:
1) 在算子函數中使用到外部變量時,該變量會被序列化後進行網絡傳輸。
2) 將自定義的類型做爲RDD的泛型類型時(好比JavaRDD<SXT>,SXT是自定義類型),全部自定義類型對象,都會進行序列化。所以這種狀況下,也要求自定義的類必須實現Serializable接口。
3) 使用可序列化的持久化策略時(好比MEMORY_ONLY_SER),Spark會將RDD中的每一個partition都序列化成一個大的字節數組。
Kryo序列化器介紹:
Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化後的數據要更小,大概是Java序列化機制的1/10。因此Kryo序列化優化之後,可讓網絡傳輸的數據變少;在集羣中耗費的內存資源大大減小。
對於這三種出現序列化的地方,咱們均可以經過使用Kryo序列化類庫,來優化序列化和反序列化的性能。Spark默認使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。可是Spark同時支持使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高不少。官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。Spark之因此默認沒有使用Kryo做爲序列化類庫,是由於Kryo要求最好要註冊全部須要進行序列化的自定義類型,所以對於開發者來講,這種方式比較麻煩。
Spark中使用Kryo:
Sparkconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(new Class[]{SpeedSortKey.class}) |
java中有三種類型比較消耗內存:
1) 對象,每一個Java對象都有對象頭、引用等額外的信息,所以比較佔用內存空間。
2) 字符串,每一個字符串內部都有一個字符數組以及長度等額外信息。
3) 集合類型,好比HashMap、LinkedList等,由於集合類型內部一般會使用一些內部類來封裝集合元素,好比Map.Entry。
所以Spark官方建議,在Spark編碼實現中,特別是對於算子函數中的代碼,儘可能不要使用上述三種數據結構,儘可能使用字符串替代對象,使用原始類型(好比Int、Long)替代字符串,使用數組替代集合類型,這樣儘量地減小內存佔用,從而下降GC頻率,提高性能。
fasteutil介紹:
fastutil是擴展了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue;fastutil可以提供更小的內存佔用,更快的存取速度;咱們使用fastutil提供的集合類,來替代本身平時使用的JDK的原生的Map、List、Set,好處在於,fastutil集合類,能夠減少內存的佔用,而且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設置元素的值的時候,提供更快的存取速度。fastutil的每一種集合類型,都實現了對應的Java中的標準接口(好比fastutil的map,實現了Java的Map接口),所以能夠直接放入已有系統的任何代碼中。
fastutil最新版本要求Java 7以及以上版本。
使用:
見RandomExtractCars.java類
1) PROCESS_LOCAL
task要計算的數據在本進程(Executor)的內存中。
2) NODE_LOCAL
① task所計算的數據在本節點所在的磁盤上。
② task所計算的數據在本節點其餘Executor進程的內存中。
3) NO_PREF
task所計算的數據在關係型數據庫中,如mysql。
4) RACK_LOCAL
task所計算的數據在同機架的不一樣節點的磁盤或者Executor進程的內存中
5) ANY
跨機架。
Spark中任務調度時,TaskScheduler在分發以前須要依據數據的位置來分發,最好將task分發到數據所在的節點上,若是TaskScheduler分發的task在默認3s依然沒法執行的話,TaskScheduler會從新發送這個task到相同的Executor中去執行,會重試5次,若是依然沒法執行,那麼TaskScheduler會下降一級數據本地化的級別再次發送task。
如上圖中,會先嚐試1,PROCESS_LOCAL數據本地化級別,若是重試5次每次等待3s,會默認這個Executor計算資源滿了,那麼會下降一級數據本地化級別到2,NODE_LOCAL,若是仍是重試5次每次等待3s仍是失敗,那麼仍是會下降一級數據本地化級別到3,RACK_LOCAL。這樣數據就會有網絡傳輸,下降了執行效率。
1) 如何提升數據本地化的級別?
能夠增長每次發送task的等待時間(默認都是3s),將3s倍數調大, 結合WEBUI來調節:
• spark.locality.wait
• spark.locality.wait.process
• spark.locality.wait.node
• spark.locality.wait.rack
注意:等待時間不能調大很大,調整數據本地化的級別不要本末倒置,雖然每個task的本地化級別是最高了,但整個Application的執行時間反而加長。
2) 如何查看數據本地化的級別?
經過日誌或者WEBUI
spark1.x 中有 兩種類型的shuffle (hashShuffleManager 另一個是sortShuffleManager)
到spark2.x之後 只有一種shuffle 機制 SortShuffle 管理器叫作SortShuffleManager
reduceByKey會將上一個RDD中的每個key對應的全部value聚合成一個value,而後生成一個新的RDD,元素類型是<key,value>對的形式,這樣每個key對應一個聚合起來的value。
問題:聚合以前,每個key對應的value不必定都是在一個partition中,也不太可能在同一個節點上,由於RDD是分佈式的彈性的數據集,RDD的partition極有可能分佈在各個節點上。
如何聚合?
– Shuffle Write:上一個stage的每一個map task就必須保證將本身處理的當前分區的數據相同的key寫入一個分區文件中,可能會寫入多個不一樣的分區文件中。
– Shuffle Read:reduce task就會從上一個stage的全部task所在的機器上尋找屬於己的那些分區文件,這樣就能夠保證每個key所對應的value都會匯聚到同一個節點上去處理和聚合。
Spark中有兩種Shuffle管理類型,HashShufflManager和SortShuffleManager,Spark1.2以前是HashShuffleManager, Spark1.2引入SortShuffleManager,在Spark 2.0+版本中已經將HashShuffleManager丟棄。
1) 普通機制
a) 每個map task將不一樣結果寫到不一樣的buffer中,每一個buffer的大小爲32K。buffer起到數據緩存的做用。
b) 每一個buffer文件最後對應一個磁盤小文件。
c) reduce task來拉取對應的磁盤小文件。
① .map task的計算結果會根據分區器(默認是hashPartitioner)來決定寫入到哪個磁盤小文件中去。ReduceTask會去Map端拉取相應的磁盤小文件。
② .產生的磁盤小文件的個數:
M(map task的個數)*R(reduce task的個數)
產生的磁盤小文件過多,會致使如下問題:
a) 在Shuffle Write過程當中會產生不少寫磁盤小文件的對象。
b) 在Shuffle Read過程當中會產生不少讀取磁盤小文件的對象。
c) 在JVM堆內存中對象過多會形成頻繁的gc,gc還沒法解決運行所須要的內存 的話,就會OOM。
d) 在數據傳輸過程當中會有頻繁的網絡通訊,頻繁的網絡通訊出現通訊故障的可能性大大增長,一旦網絡通訊出現了故障會致使shuffle file cannot find 因爲這個錯誤致使的task失敗,TaskScheduler不負責重試,由DAGScheduler負責重試Stage。
2) 合併機制(considation機制)
產生磁盤小文件的個數:C(core的個數)*R(reduce的個數)
若是核數比較多的話 那麼產生的小文件個數 是否是也不少啊?
1) 普通機制
a) map task 的計算結果會寫入到一個內存數據結構裏面,內存數據結構默認是5M
b) 在shuffle的時候會有一個定時器,不按期的去估算這個內存結構的大小,當內存結構中的數據超過5M時,好比如今內存結構中的數據爲5.01M,那麼他會申請5.01*2-5=5.02M內存給內存數據結構。
c) 若是申請成功不會進行溢寫,若是申請不成功,這時候會發生溢寫磁盤。
d) 在溢寫以前內存結構中的數據會進行排序分區
e) 而後開始溢寫磁盤,寫磁盤是以batch的形式去寫,一個batch是1萬條數據,
f) map task執行完成後,會將這些磁盤小文件合併成一個大的磁盤文件,同時生成一個索引文件。
g) reduce task去map端拉取數據的時候,首先解析索引文件,根據索引文件再去拉取對應的數據。
產生磁盤小文件的個數: 2*M(map task的個數)
2) bypass機制
① .bypass運行機制的觸發條件以下:
shuffle reduce task的數量小於spark.shuffle.sort.bypassMergeThreshold的參數值。這個值默認是200。
② .產生的磁盤小文件爲:2*M(map task的個數)
1) MapOutputTracker
MapOutputTracker是Spark架構中的一個模塊,是一個主從架構。管理磁盤小文件的地址。
2) BlockManager
BlockManager塊管理者,是Spark架構中的一個模塊,也是一個主從架構。
BlockManagerMaster會在集羣中有用到廣播變量和緩存數據或者刪除緩存數據的時候,通知BlockManagerSlave傳輸或者刪除數據。
BlockManagerSlave會與BlockManagerSlave之間通訊。
¬ 不管在Driver端的BlockManager仍是在Excutor端的BlockManager都含有三個對象:
① DiskStore:負責磁盤的管理。
② MemoryStore:負責內存的管理。
③ BlockTransferService:負責數據的傳輸。
3) Shuffle文件尋址圖
4) Shuffle文件尋址流程
a) 當map task執行完成後,會將task的執行狀況和磁盤小文件的地址封裝到MpStatus對象中,經過MapOutputTrackerWorker對象向Driver中的MapOutputTrackerMaster彙報。
b) 在全部的map task執行完畢後,Driver中就掌握了全部的磁盤小文件的地址。
c) 在reduce task執行以前,會經過Excutor中MapOutPutTrackerWorker向Driver端的MapOutputTrackerMaster獲取磁盤小文件的地址。
d) 獲取到磁盤小文件的地址後,會經過BlockManager鏈接數據所在節點,而後經過BlockTransferService進行數據的傳輸。
e) BlockTransferService默認啓動5個task去節點拉取數據。默認狀況下,5個task拉取數據量不能超過48M。
1) 在代碼中,不推薦使用,硬編碼。
new SparkConf().set(「spark.shuffle.file.buffer」,」64」)
2) 在提交spark任務的時候,推薦使用。
spark-submit --conf spark.shuffle.file.buffer=64 –conf ….
3) 在conf下的spark-default.conf配置文件中,不推薦,由於是寫死後全部應用程序都要用。
Spark執行應用程序時,Spark集羣會啓動Driver和Executor兩種JVM進程,Driver負責建立SparkContext上下文,提交任務,task的分發等。Executor負責task的計算任務,並將結果返回給Driver。同時須要爲須要持久化的RDD提供儲存。Driver端的內存管理比較簡單,這裏所說的Spark內存管理針對Executor端的內存管理。
Spark內存管理分爲靜態內存管理和統一內存管理,Spark1.6以前使用的是靜態內存管理,Spark1.6以後引入了統一內存管理。
靜態內存管理中存儲內存、執行內存和其餘內存的大小在 Spark 應用程序運行期間均爲固定的,但用戶能夠應用程序啓動前進行配置。
統一內存管理與靜態內存管理的區別在於儲存內存和執行內存共享同一塊空間,能夠互相借用對方的空間。
Spark1.6以上版本默認使用的是統一內存管理,能夠經過參數spark.memory.useLegacyMode 設置爲true(默認爲false)使用靜態內存管理。
1) 減小每次拉取的數據量
2) 提升shuffle聚合的內存比例
3) 提升Excutor的總內存
好比咱們建立對象 先往伊甸園和s1 中放 滿了 發生minoGC 此時 會清空 伊甸園和s1 若是還有對象 那麼就往s2中放 若是s2放的下 就放在s2中 s2也滿了 會發生小型的minoGC 將對象清空
若是還有數據 將數據+1 加到15 會放入到老年代中
可是 老年代中的對象 都是經常使用的對象 好比數據庫鏈接池等 老年代若是滿了 會發生full GC 若是清空後 還不夠用 就會發生GC
咱們上面討論的問題 討論的task 的內存夠不夠用
JVM堆內存分爲一塊較大的Eden和兩塊較小的Survivor,每次只使用Eden和其中一塊Survivor,當回收時將Eden和Survivor中還存活着的對象一次性複製到另一塊Survivor上,最後清理掉Eden和剛纔用過的Survivor。也就是說當task建立出來對象會首先往Eden和survivor1中存放,survivor2是空閒的,當Eden和survivor1區域放滿之後就會觸發minor gc小型垃圾回收,清理掉再也不使用的對象。會將存活下來的對象放入survivor2中。
若是存活下來的對象大小大於survivor2的大小,那麼JVM就會將多餘的對象直接放入到老年代中。
若是這個時候年輕代的內存不是很大的話,就會常常的進行minor gc,頻繁的minor gc會致使短期內有些存活的對象(屢次垃圾回收都沒有回收掉,一直在用的又不能被釋放,這種對象每通過一次minor gc都存活下來)頻繁的倒來倒去,會致使這些短生命週期的對象(不必定長期使用)每進行一次垃圾回收就會長一歲。年齡過大,默認15歲,垃圾回收仍是沒有回收回去就會跑到老年代裏面去了。
這樣會致使在老年代中存放大量的短生命週期的對象,老年代應該存放的是數量比較少而且會長期使用的對象,好比數據庫鏈接池對象。這樣的話,老年代就會滿溢(full gc 由於原本老年代中的對象不多,不多進行full gc 所以採起了不太複雜可是消耗性能和時間的垃圾回收算法)。無論minor gc 仍是 full gc都會致使JVM的工做線程中止。
總結-堆內存不足形成的影響:
1) 頻繁的minor gc。
2) 老年代中大量的短生命週期的對象會致使full gc。
3) gc 多了就會影響Spark的性能和運行的速度。
Spark JVM調優主要是下降gc時間,能夠修改Executor內存的比例參數。
RDD緩存、task定義運行的算子函數,可能會建立不少對象,這樣會佔用大量的堆內存。堆內存滿了以後會頻繁的GC,若是GC還不可以知足內存的須要的話就會報OOM。好比一個task在運行的時候會建立N個對象,這些對象首先要放入到JVM年輕代中。好比在存數據的時候咱們使用了foreach來將數據寫入到內存,每條數據都會封裝到一個對象中存入數據庫中,那麼有多少條數據就會在JVM中建立多少個對象。
Spark中如何內存調優?
Spark Executor堆內存中存放(以靜態內存管理爲例):RDD的緩存數據和廣播變量(spark.storage.memoryFraction 0.6),shuffle聚合內存(spark.shuffle.memoryFraction 0.2),task的運行(0.2)那麼如何調優呢?
1) 提升Executor整體內存的大小
2) 下降儲存內存比例或者下降聚合內存比例
如何查看gc?
Spark WEBUI中job->stage->task
Spark底層shuffle的傳輸方式是使用netty傳輸,netty在進行網絡傳輸的過程會申請堆外內存(netty是零拷貝),因此使用了堆外內存。默認狀況下,這個堆外內存上限默認是每個executor的內存大小的10%;真正處理大數據的時候,這裏都會出現問題,致使spark做業反覆崩潰,沒法運行;此時就會去調節這個參數,到至少1G(1024M),甚至說2G、4G。
executor在進行shuffle write,優先從本身本地關聯的mapOutPutWorker中獲取某份數據,若是本地block manager沒有的話,那麼會經過TransferService,去遠程鏈接其餘節點上executor的block manager去獲取,嘗試創建遠程的網絡鏈接,而且去拉取數據。頻繁建立對象讓JVM堆內存滿溢,進行垃圾回收。正好碰到那個exeuctor的JVM在垃圾回收。處於垃圾回過程當中,全部的工做線程所有中止;至關於只要一旦進行垃圾回收,spark / executor中止工做,沒法提供響應,spark默認的網絡鏈接的超時時長是60s;若是卡住60s都沒法創建鏈接的話,那麼這個task就失敗了。task失敗了就會出現shuffle file cannot find的錯誤。
那麼如何調節等待的時長呢?
在./spark-submit提交任務的腳本里面添加:
--conf spark.core.connection.ack.wait.timeout=300
Executor因爲內存不足或者堆外內存不足了,掛掉了,對應的Executor上面的block manager也掛掉了,找不到對應的shuffle map output文件,Reducer端不可以拉取數據。咱們能夠調節堆外內存的大小,如何調節?
在./spark-submit提交任務的腳本里面添加
yarn下:
--conf spark.yarn.executor.memoryOverhead=2048 單位M
standalone下:
--conf spark.memory.offHeap.size=2048單位M
方案實現思路:
在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,好比reduceByKey(1000),該參數就設置了這個shuffle算子執行時shuffle read task的數量。對於Spark SQL中的shuffle類語句,好比group by、join等,須要設置一個參數,即spark.sql.shuffle.partitions,該參數表明了shuffle read task的並行度,該值默認是200,對於不少場景來講都有點太小。
方案實現原理:
增長shuffle read task的數量,可讓本來分配給一個task的多個key分配給多個task,從而讓每一個task處理比原來更少的數據。舉例來講,若是本來有5個不一樣的key,每一個key對應10條數據,這5個key都是分配給一個task的,那麼這個task就要處理50條數據。而增長了shuffle read task之後,每一個task就分配到一個key,即每一個task就處理10條數據,那麼天然每一個task的執行時間都會變短了。
方案適用場景:
對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。
方案實現思路:
這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每一個key都打上一個隨機數,好比10之內的隨機數,此時原先同樣的key就變成不同的了,好比(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着對打上隨機數後的數據,執行reduceByKey等聚合操做,進行局部聚合,那麼局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。而後將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操做,就能夠獲得最終結果了,好比(hello, 4)。
方案實現原理:
將本來相同的key經過附加隨機前綴的方式,變成多個不一樣的key,就可讓本來被一個task處理的數據分散到多個task上去作局部聚合,進而解決單個task處理數據量過多的問題。接着去除掉隨機前綴,再次進行全局聚合,就能夠獲得最終的結果。
若是一個RDD中有一個key致使數據傾斜,同時還有其餘的key,那麼通常先對數據集進行抽樣,而後找出傾斜的key,再使用filter對原始的RDD進行分離爲兩個RDD,一個是由傾斜的key組成的RDD1,一個是由其餘的key組成的RDD2,那麼對於RDD1可使用加隨機前綴進行多分區多task計算,對於另外一個RDD2正常聚合計算,最後將結果再合併起來。
BroadCast+filter(或者map)
方案適用場景:
在對RDD使用join類操做,或者是在Spark SQL中使用join語句時,並且join操做中的一個RDD或表的數據量比較小(好比幾百M或者一兩G),比較適用此方案。
方案實現思路:
不使用join算子進行鏈接操做,而使用Broadcast變量與map類算子實現join操做,進而徹底規避掉shuffle類的操做,完全避免數據傾斜的發生和出現。將較小RDD中的數據直接經過collect算子拉取到Driver端的內存中來,而後對其建立一個Broadcast變量;接着對另一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照鏈接key進行比對,若是鏈接key相同的話,那麼就將兩個RDD的數據用你須要的方式鏈接起來。
方案實現原理:
普通的join是會走shuffle過程的,而一旦shuffle,就至關於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。可是若是一個RDD是比較小的,則能夠採用廣播小RDD全量數據+map算子來實現與join一樣的效果,也就是map join,此時就不會發生shuffle操做,也就不會發生數據傾斜。
方案適用場景:
兩個RDD/Hive表進行join的時候,若是數據量都比較大,沒法採用「解決方案五」,那麼此時能夠看一下兩個RDD/Hive表中的key分佈狀況。若是出現數據傾斜,是由於其中某一個RDD/Hive表中的少數幾個key的數據量過大,而另外一個RDD/Hive表中的全部key都分佈比較均勻,那麼採用這個解決方案是比較合適的。
方案實現思路:
對包含少數幾個數據量過大的key的那個RDD,經過sample算子採樣出一份樣原本,而後統計一下每一個key的數量,計算出來數據量最大的是哪幾個key。而後將這幾個key對應的數據從原來的RDD中拆分出來,造成一個單獨的RDD,並給每一個key都打上n之內的隨機數做爲前綴,而不會致使傾斜的大部分key造成另一個RDD。接着將須要join的另外一個RDD,也過濾出來那幾個傾斜key對應的數據並造成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會致使傾斜的大部分key也造成另一個RDD。再將附加了隨機前綴的獨立RDD與另外一個膨脹n倍的獨立RDD進行join,此時就能夠將原先相同的key打散成n份,分散到多個task中去進行join了。而另外兩個普通的RDD就照常join便可。最後將兩次join的結果使用union算子合併起來便可,就是最終的join結果 。
方案適用場景:
若是在進行join操做時,RDD中有大量的key致使數據傾斜,那麼進行分拆key也沒什麼意義,此時就只能使用最後一種方案來解決問題了。
方案實現思路:
該方案的實現思路基本和「解決方案六」相似,首先查看RDD/Hive表中的數據分佈狀況,找到那個形成數據傾斜的RDD/Hive表,好比有多個key都對應了超過1萬條數據。而後將該RDD的每條數據都打上一個n之內的隨機前綴。同時對另一個正常的RDD進行擴容,將每條數據都擴容成n條數據,擴容出來的每條數據都依次打上一個0~n的前綴。最後將兩個處理後的RDD進行join便可。
1) connection timeout ----shuffle file cannot find
提升創建鏈接的超時時間,或者下降gc,下降gc了那麼spark不能堆外提供服務的時間就少了,那麼超時的可能就會下降。
2) fetch data fail ---- shuffle file cannot find
提升拉取數據的重試次數以及間隔時間。
3) OOM/executor lost ---- shuffle file cannot find
提升堆外內存大小,提升堆內內存大小。
BlockManager拉取的數據量大,reduce task處理的數據量小
解決方法:
1) 下降每次拉取的數據量
2) 提升shuffle聚合的內存比例
3) 提升Executor的內存比例
val rdd = rdd.map{x=>{
x+」~」;
}}
rdd.foreach{x=>{
System.out.println(x.getName())
}}