7 內存溢出問題php
在Spark中使用hql方法執行hive語句時,因爲其在查詢過程當中調用的是Hive的獲取元數據信息、SQL解析,而且使用Cglib等進行序列化反序列化,中間可能產生較多的class文件,致使JVM中的持久代使用較多,若是配置不當,可能引發相似於以下的OOM問題:html
- Exception in thread "Thread-2" java.lang.OutOfMemoryError: PermGen space
複製代碼java
緣由是實際使用時,若是用的是JDK1.6版本,Server模式的持久代默認大小是64M,Client模式的持久代默認大小是32M,而Driver端進行SQL處理時,其持久代的使用可能會達到90M,致使OOM溢出,任務失敗。sql
解決方法就是在Spark的conf目錄中的spark-defaults.conf裏,增長對Driver的JVM配置,由於Driver才負責SQL的解析和元數據獲取。配置以下:apache
- spark.driver.extraJavaOptions -XX:PermSize=128M -XX:MaxPermSize=256M
複製代碼
可是,上述狀況是在yarn-cluster模式下出現,yarn-client模式運行時卻是正常的,原來在$SPARK_HOME/bin/spark-class文件中已經設置了持久代大小:app
- JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"
-
複製代碼tcp
當以yarn-client模式運行時,driver就運行在客戶端的spark-submit進程中,其JVM參數是取的spark-class文件中的設置,所謂未出現持久代溢出現象。工具
總結一下Spark中各個角色的JVM參數設置: oop
(1)Driver的JVM參數:
-Xmx,-Xms,若是是yarn-client模式,則默認讀取spark-env文件中的SPARK_DRIVER_MEMORY值,-Xmx,-Xms值同樣大小;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的JVM參數值。
PermSize,若是是yarn-client模式,則是默認讀取spark-class文件中的JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"值;若是是yarn-cluster模式,讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的JVM參數值。
GC方式,若是是yarn-client模式,默認讀取的是spark-class文件中的JAVA_OPTS;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的參數值。
以上值最後都可被spark-submit工具中的--driver-java-options參數覆蓋。大數據
(2)Executor的JVM參數:
-Xmx,-Xms,若是是yarn-client模式,則默認讀取spark-env文件中的SPARK_EXECUTOR_MEMORY值,-Xmx,-Xms值同樣大小;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。
PermSize,兩種模式都是讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。
GC方式,兩種模式都是讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。
(3)Executor數目及所佔CPU個數
若是是yarn-client模式,Executor數目由spark-env中的SPARK_EXECUTOR_INSTANCES指定,每一個實例的數目由SPARK_EXECUTOR_CORES指定;若是是yarn-cluster模式,Executor的數目由spark-submit工具的--num-executors參數指定,默認是2個實例,而每一個Executor使用的CPU數目由--executor-cores指定,默認爲1核。
每一個Executor運行時的信息能夠經過yarn logs命令查看到,相似於以下:
- 14/08/13 18:12:59 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Setting up executor with commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill %p', -Xms1024m -Xmx1024m , -XX:PermSize=256M -XX:MaxPermSize=256M -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC -Xloggc:/tmp/spark_gc.log, -Djava.io.tmpdir=$PWD/tmp, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler, 1, sparktest2, 3, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
複製代碼
其中,akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler表示當前的Executor進程所在節點,後面的1表示Executor編號,sparktest2表示ApplicationMaster的host,接着的3表示當前Executor所佔用的CPU數目。
8 序列化異常
在Spark上執行hive語句的時候,出現相似於以下的異常:
- org.apache.spark.SparkDriverExecutionException: Execution error
- at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:849)
- at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
- at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
- at akka.actor.ActorCell.invoke(ActorCell.scala:456)
- at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
- at akka.dispatch.Mailbox.run(Mailbox.scala:219)
- at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
- at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
- at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
- at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
- at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
- Caused by: java.lang.ClassCastException: scala.collection.mutable.HashSet cannot be cast to scala.collection.mutable.BitSet
- at org.apache.spark.sql.execution.BroadcastNestedLoopJoin$anonfun$7.apply(joins.scala:336)
- at org.apache.spark.rdd.RDD$anonfun$19.apply(RDD.scala:813)
- at org.apache.spark.rdd.RDD$anonfun$19.apply(RDD.scala:810)
- at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
- at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:845)
複製代碼
排查其先後的日誌,發現大都是序列化的東西:
- 14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Serialized task 8.0:3 as 20849 bytes in 0 ms
- 14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Finished TID 813 in 25 ms on sparktest0 (progress: 3/200)
複製代碼
而在spark-default.conf中,事先設置了序列化方式爲Kryo:
- spark.serializer org.apache.spark.serializer.KryoSerializer
複製代碼
根據異常信息,可見是HashSet轉爲BitSet類型轉換失敗,Kryo把鬆散的HashSet轉換爲了緊湊的BitSet,把序列化方式註釋掉以後,任務能夠正常執行。難道Spark的Kryo序列化作的還不到位?此問題須要進一步跟蹤。
9 Executor僵死問題
運行一個Spark任務,發現其運行速度遠遠慢於執行一樣SQL語句的Hive的執行,甚至出現了OOM的錯誤,最後卡住達幾小時!而且Executor進程在瘋狂GC。
截取其一Task的OOM異常信息:
能夠看到這是在序列化過程當中發生的OOM。根據節點信息,找到對應的Executor進程,觀察其Jstack信息:
- Thread 36169: (state = BLOCKED)
- - java.lang.Long.valueOf(long) @bci=27, line=557 (Compiled frame)
- - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=5, line=113 (Compiled frame)
- - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=103 (Compiled frame)
- - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame)
- - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=158, line=338 (Compiled frame)
- - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=293 (Compiled frame)
- - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame)
- - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame)
- - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame)
- - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame)
- - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame)
- - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame)
- - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame)
- - org.apache.spark.serializer.KryoDeserializationStream.readObject(scala.reflect.ClassTag) @bci=8, line=118 (Compiled frame)
- - org.apache.spark.serializer.DeserializationStream$anon$1.getNext() @bci=10, line=125 (Compiled frame)
- - org.apache.spark.util.NextIterator.hasNext() @bci=16, line=71 (Compiled frame)
- - org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext() @bci=4, line=1031 (Compiled frame)
- - scala.collection.Iterator$anon$13.hasNext() @bci=4, line=371 (Compiled frame)
- - org.apache.spark.util.CompletionIterator.hasNext() @bci=4, line=30 (Compiled frame)
- - org.apache.spark.InterruptibleIterator.hasNext() @bci=22, line=39 (Compiled frame)
- - scala.collection.Iterator$anon$11.hasNext() @bci=4, line=327 (Compiled frame)
- - org.apache.spark.sql.execution.HashJoin$anonfun$execute$1.apply(scala.collection.Iterator, scala.collection.Iterator) @bci=14, line=77 (Compiled frame)
- - org.apache.spark.sql.execution.HashJoin$anonfun$execute$1.apply(java.lang.Object, java.lang.Object) @bci=9, line=71 (Interpreted frame)
- - org.apache.spark.rdd.ZippedPartitionsRDD2.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=48, line=87 (Interpreted frame)
- - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=262 (Interpreted frame)
複製代碼
有大量的BLOCKED線程,繼續觀察GC信息,發現大量的FULL GC。
分析,在插入Hive表的時候,實際上須要寫HDFS,在此過程的HashJoin時,伴隨着大量的Shuffle寫操做,JVM的新生代不斷GC,Eden Space寫滿了就往Survivor Space寫,同時超過必定大小的數據會直接寫到老生代,當新生代寫滿了以後,也會把老的數據搞到老生代,若是老生代空間不足了,就觸發FULL GC,仍是空間不夠,那就OOM錯誤了,此時線程被Blocked,致使整個Executor處理數據的進程被卡住。
當處理大數據的時候,若是JVM配置不當就容易引發上述問題。解決的方法就是增大Executor的使用內存,合理配置新生代和老生代的大小,能夠將老生代的空間適當的調大點