spark 使用中會遇到的一些問題及解決思路

7 內存溢出問題php

    在Spark中使用hql方法執行hive語句時,因爲其在查詢過程當中調用的是Hive的獲取元數據信息、SQL解析,而且使用Cglib等進行序列化反序列化,中間可能產生較多的class文件,致使JVM中的持久代使用較多,若是配置不當,可能引發相似於以下的OOM問題:html

  1. Exception in thread "Thread-2" java.lang.OutOfMemoryError: PermGen space

複製代碼java

緣由是實際使用時,若是用的是JDK1.6版本,Server模式的持久代默認大小是64M,Client模式的持久代默認大小是32M,而Driver端進行SQL處理時,其持久代的使用可能會達到90M,致使OOM溢出,任務失敗。sql

解決方法就是在Spark的conf目錄中的spark-defaults.conf裏,增長對Driver的JVM配置,由於Driver才負責SQL的解析和元數據獲取。配置以下:apache

  1. spark.driver.extraJavaOptions -XX:PermSize=128M -XX:MaxPermSize=256M   

複製代碼
可是,上述狀況是在yarn-cluster模式下出現,yarn-client模式運行時卻是正常的,原來在$SPARK_HOME/bin/spark-class文件中已經設置了持久代大小:app

  1. JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"
  2.  

複製代碼tcp

當以yarn-client模式運行時,driver就運行在客戶端的spark-submit進程中,其JVM參數是取的spark-class文件中的設置,所謂未出現持久代溢出現象。工具

    總結一下Spark中各個角色的JVM參數設置:   oop

(1)Driver的JVM參數:
-Xmx,-Xms,若是是yarn-client模式,則默認讀取spark-env文件中的SPARK_DRIVER_MEMORY值,-Xmx,-Xms值同樣大小;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的JVM參數值。
PermSize,若是是yarn-client模式,則是默認讀取spark-class文件中的JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"值;若是是yarn-cluster模式,讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的JVM參數值。
GC方式,若是是yarn-client模式,默認讀取的是spark-class文件中的JAVA_OPTS;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的參數值。
以上值最後都可被spark-submit工具中的--driver-java-options參數覆蓋。大數據

 

(2)Executor的JVM參數:
-Xmx,-Xms,若是是yarn-client模式,則默認讀取spark-env文件中的SPARK_EXECUTOR_MEMORY值,-Xmx,-Xms值同樣大小;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。
PermSize,兩種模式都是讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。
GC方式,兩種模式都是讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。

 

(3)Executor數目及所佔CPU個數
若是是yarn-client模式,Executor數目由spark-env中的SPARK_EXECUTOR_INSTANCES指定,每一個實例的數目由SPARK_EXECUTOR_CORES指定;若是是yarn-cluster模式,Executor的數目由spark-submit工具的--num-executors參數指定,默認是2個實例,而每一個Executor使用的CPU數目由--executor-cores指定,默認爲1核。
每一個Executor運行時的信息能夠經過yarn logs命令查看到,相似於以下:

  1. 14/08/13 18:12:59 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Setting up executor with commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill %p', -Xms1024m -Xmx1024m , -XX:PermSize=256M -XX:MaxPermSize=256M -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC -Xloggc:/tmp/spark_gc.log, -Djava.io.tmpdir=$PWD/tmp, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler, 1, sparktest2, 3, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)

複製代碼

  其中,akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler表示當前的Executor進程所在節點,後面的1表示Executor編號,sparktest2表示ApplicationMaster的host,接着的3表示當前Executor所佔用的CPU數目。

8 序列化異常

在Spark上執行hive語句的時候,出現相似於以下的異常:

  1. org.apache.spark.SparkDriverExecutionException: Execution error
  2.     at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:849)
  3.     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
  4.     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  5.     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  6.     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  7.     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  8.     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  9.     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  10.     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  11.     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  12.     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  13. Caused by: java.lang.ClassCastException: scala.collection.mutable.HashSet cannot be cast to scala.collection.mutable.BitSet
  14.     at org.apache.spark.sql.execution.BroadcastNestedLoopJoin$anonfun$7.apply(joins.scala:336)
  15.     at org.apache.spark.rdd.RDD$anonfun$19.apply(RDD.scala:813)
  16.     at org.apache.spark.rdd.RDD$anonfun$19.apply(RDD.scala:810)
  17.     at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
  18.     at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:845)

複製代碼
排查其先後的日誌,發現大都是序列化的東西:

  1. 14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Serialized task 8.0:3 as 20849 bytes in 0 ms
  2. 14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Finished TID 813 in 25 ms on sparktest0 (progress: 3/200)

複製代碼
而在spark-default.conf中,事先設置了序列化方式爲Kryo:

  1. spark.serializer org.apache.spark.serializer.KryoSerializer

複製代碼

根據異常信息,可見是HashSet轉爲BitSet類型轉換失敗,Kryo把鬆散的HashSet轉換爲了緊湊的BitSet,把序列化方式註釋掉以後,任務能夠正常執行。難道Spark的Kryo序列化作的還不到位?此問題須要進一步跟蹤。

9 Executor僵死問題

    運行一個Spark任務,發現其運行速度遠遠慢於執行一樣SQL語句的Hive的執行,甚至出現了OOM的錯誤,最後卡住達幾小時!而且Executor進程在瘋狂GC。

    截取其一Task的OOM異常信息:

能夠看到這是在序列化過程當中發生的OOM。根據節點信息,找到對應的Executor進程,觀察其Jstack信息:

  1. Thread 36169: (state = BLOCKED)
  2. - java.lang.Long.valueOf(long) @bci=27, line=557 (Compiled frame)
  3. - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=5, line=113 (Compiled frame)
  4. - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=103 (Compiled frame)
  5. - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame)
  6. - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=158, line=338 (Compiled frame)
  7. - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=293 (Compiled frame)
  8. - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame)
  9. - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame)
  10. - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame)
  11. - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame)
  12. - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame)
  13. - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame)
  14. - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame)
  15. - org.apache.spark.serializer.KryoDeserializationStream.readObject(scala.reflect.ClassTag) @bci=8, line=118 (Compiled frame)
  16. - org.apache.spark.serializer.DeserializationStream$anon$1.getNext() @bci=10, line=125 (Compiled frame)
  17. - org.apache.spark.util.NextIterator.hasNext() @bci=16, line=71 (Compiled frame)
  18. - org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext() @bci=4, line=1031 (Compiled frame)
  19. - scala.collection.Iterator$anon$13.hasNext() @bci=4, line=371 (Compiled frame)
  20. - org.apache.spark.util.CompletionIterator.hasNext() @bci=4, line=30 (Compiled frame)
  21. - org.apache.spark.InterruptibleIterator.hasNext() @bci=22, line=39 (Compiled frame)
  22. - scala.collection.Iterator$anon$11.hasNext() @bci=4, line=327 (Compiled frame)
  23. - org.apache.spark.sql.execution.HashJoin$anonfun$execute$1.apply(scala.collection.Iterator, scala.collection.Iterator) @bci=14, line=77 (Compiled frame)
  24. - org.apache.spark.sql.execution.HashJoin$anonfun$execute$1.apply(java.lang.Object, java.lang.Object) @bci=9, line=71 (Interpreted frame)
  25. - org.apache.spark.rdd.ZippedPartitionsRDD2.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=48, line=87 (Interpreted frame)
  26. - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=262 (Interpreted frame)

複製代碼

有大量的BLOCKED線程,繼續觀察GC信息,發現大量的FULL GC。

    分析,在插入Hive表的時候,實際上須要寫HDFS,在此過程的HashJoin時,伴隨着大量的Shuffle寫操做,JVM的新生代不斷GC,Eden Space寫滿了就往Survivor Space寫,同時超過必定大小的數據會直接寫到老生代,當新生代寫滿了以後,也會把老的數據搞到老生代,若是老生代空間不足了,就觸發FULL GC,仍是空間不夠,那就OOM錯誤了,此時線程被Blocked,致使整個Executor處理數據的進程被卡住。

當處理大數據的時候,若是JVM配置不當就容易引發上述問題。解決的方法就是增大Executor的使用內存,合理配置新生代和老生代的大小,能夠將老生代的空間適當的調大點

相關文章
相關標籤/搜索