org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
org.apache.spark.shuffle.FetchFailedException: Failed to connect to hostname/192.168.xx.xxx:50268
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:165)
at org.apache.spark.memory.UnifiedMemoryManager.acquireExecutionMemory(UnifiedMemoryManager.scala:80) java
shuffle分爲shuffle write
和shuffle read
兩部分。
shuffle write的分區數由上一階段的RDD分區數控制,shuffle read的分區數則是由Spark提供的一些參數控制。sql
shuffle write能夠簡單理解爲相似於saveAsLocalDiskFile
的操做,將計算的中間結果按某種規則臨時放到各個executor所在的本地磁盤上。apache
shuffle read的時候數據的分區數則是由spark提供的一些參數控制。能夠想到的是,若是這個參數值設置的很小,同時shuffle read的量很大,那麼將會致使一個task須要處理的數據很是大。結果致使JVM crash,從而致使取shuffle數據失敗,同時executor也丟失了,看到Failed to connect to host
的錯誤,也就是executor lost的意思。有時候即便不會致使JVM crash也會形成長時間的gc。ssh
知道緣由後問題就好解決了,主要從shuffle的數據量和處理shuffle數據的分區數兩個角度入手。ide
減小shuffle數據ui
思考是否可使用map side join
或是broadcast join
來規避shuffle的產生。spa
將沒必要要的數據在shuffle前進行過濾,好比原始數據有20個字段,只要選取須要的字段進行處理便可,將會減小必定的shuffle數據。.net
SparkSQL和DataFrame的join,group by等操做scala
經過spark.sql.shuffle.partitions
控制分區數,默認爲200,根據shuffle的量以及計算的複雜度提升這個值。code
Rdd的join,groupBy,reduceByKey等操做
經過spark.default.parallelism
控制shuffle read與reduce處理的分區數,默認爲運行任務的core的總數(mesos細粒度模式爲8個,local模式爲本地的core總數),官方建議爲設置成運行任務的core的2-3倍。
提升executor的內存
經過spark.executor.memory
適當提升executor的memory值。
是否存在數據傾斜的問題
空值是否已通過濾?異常數據(某個key數據特別大)是否能夠單獨處理?考慮改變數據分區規則。
參考