Spark程序排錯

1.shuffle相關

報錯提示

org.apache.spark.shuffle.MetadataFetchFailedException: 
Missing an output location for shuffle 0
 
org.apache.spark.shuffle.FetchFailedException:
Failed to connect to hostname/192.168.xx.xxx:50268

java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:165)
at org.apache.spark.memory.UnifiedMemoryManager.acquireExecutionMemory(UnifiedMemoryManager.scala:80)    java

原理分析

shuffle分爲shuffle writeshuffle read兩部分。
shuffle write的分區數由上一階段的RDD分區數控制,shuffle read的分區數則是由Spark提供的一些參數控制。sql

shuffle write能夠簡單理解爲相似於saveAsLocalDiskFile的操做,將計算的中間結果按某種規則臨時放到各個executor所在的本地磁盤上。apache

shuffle read的時候數據的分區數則是由spark提供的一些參數控制。能夠想到的是,若是這個參數值設置的很小,同時shuffle read的量很大,那麼將會致使一個task須要處理的數據很是大。結果致使JVM crash,從而致使取shuffle數據失敗,同時executor也丟失了,看到Failed to connect to host的錯誤,也就是executor lost的意思。有時候即便不會致使JVM crash也會形成長時間的gc。ssh

解決辦法

知道緣由後問題就好解決了,主要從shuffle的數據量和處理shuffle數據的分區數兩個角度入手。ide

  1. 減小shuffle數據ui

    思考是否可使用map side join或是broadcast join來規避shuffle的產生。spa

    將沒必要要的數據在shuffle前進行過濾,好比原始數據有20個字段,只要選取須要的字段進行處理便可,將會減小必定的shuffle數據。.net

  2. SparkSQL和DataFrame的join,group by等操做scala

    經過spark.sql.shuffle.partitions控制分區數,默認爲200,根據shuffle的量以及計算的複雜度提升這個值。code

  3. Rdd的join,groupBy,reduceByKey等操做

    經過spark.default.parallelism控制shuffle read與reduce處理的分區數,默認爲運行任務的core的總數(mesos細粒度模式爲8個,local模式爲本地的core總數),官方建議爲設置成運行任務的core的2-3倍。

  4. 提升executor的內存

    經過spark.executor.memory適當提升executor的memory值。

  5. 是否存在數據傾斜的問題

    空值是否已通過濾?異常數據(某個key數據特別大)是否能夠單獨處理?考慮改變數據分區規則。

參考

http://blog.csdn.net/lsshlsw/article/details/51213610

相關文章
相關標籤/搜索