Spark的on Yarn模式,其資源分配是交給Yarn的ResourceManager來進行管理的,可是目前的Spark版本,Application日誌的查看,只能經過Yarn的yarn logs命令實現。java
在部署和運行Spark Application的過程當中,若是不注意一些小的細節,也許會致使一些問題的出現。node
部署好Spark的包和配置文件,on yarn的兩種模式都沒法運行,在NodeManager端的日誌都是說Connection Refused,鏈接不上Driver所在的客戶端節點,可是客戶端的80端口能夠正常訪問!同時,在日誌中有相似信息出現:git
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
內存確定是夠的,但就是沒法獲取資源!檢查防火牆,果真客戶端只開啓的對80端口的訪問,其餘都禁止了!若是你的程序在運行的時候也有相似鏈接被拒絕的狀況,最好也是先檢查下防火牆的配置!github
部署完Spark後,分別使用yarn-cluster模式和yarn-client模式運行Spark自帶的計算pi的示例。算法
Spark的一些配置文件除了一些基本屬性外,均未作配置,結果運行的時候兩種運行模式出現了不一樣的情況。yarn-cluster模式能夠正常運行,yarn-client模式老是運行失敗。查看ResourceManager、NodeManager端的日誌,發現程序老是找不到ApplicationMaster,這就奇怪了!而且,客戶端的Driver程序開啓的端口,在NodeManager端訪問被拒絕!非Spark的其餘MR任務,可以正常執行。sql
檢查客戶端配置文件,發現原來在客戶端的/etc/hosts文件中,客戶端的一個IP對應了多個Host,Driver程序會默認去取最後對應的那個Host,好比是hostB,可是在NodeManager端是配置的其餘Host,hostA,因此致使程序沒法訪問。爲了避免影響其餘的程序使用客戶端的Host列表,這裏在Spark配置文件spark-defaults.conf中使用屬性spark.driver.host來指定yarn-client模式運行中和Yarn通訊的DriverHost,此時yarn-client模式能夠正常運行。數據庫
上面配置完了以後,發現yarn-cluster模式又不能運行了!想一想緣由,確定是上面那個配置參數搞的鬼,註釋掉以後,yarn-cluster模式能夠繼續運行。緣由是,yarn-cluster模式下,spark的入口函數是在客戶端運行,可是Driver的其餘功能是在ApplicationMaster中運行的,上面的那個配置至關於指定了ApplicationMaster的地址,實際上的ApplicationMaster在yarn-master模式下是由ResourceManager隨機指定的。apache
測試環境下,經過yarn logs -applicationId xxx能夠查看運行結束的Application的日誌,可是搞到另外一個環境下發現使用上述命令查看日誌時,老是提示以下信息:app
Logs not available at /tmp/nm/remote/logs/hadoop/logs/application_xxx_xxxtcp
Log aggregation has not completed or is not enabled.
去對應的NodeManger目錄下,確實找不到日誌文件。可是/tmp/nm/remote/logs倒是在yarn-site.xml中指定了的目錄,這個是對的,到底什麼緣由呢?難道是Yarn的日誌彙集沒有起做用?
去NodeManager上查看對應Application的日誌:
2014-08-04 09:14:47,513 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Starting aggregate log-file for app application_xxx_xxx at /tmp/nm/remote/logs/spark/logs/application_xxx_xxx/hostB.tmp 2014-08-04 09:14:47,525 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Uploading logs for container container_xxx_xxx_01_000007. Current good log dirs are /data/nm/log 2014-08-04 09:14:47,526 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Uploading logs for container container_xxx_xxx_000001. Current good log dirs are /data/nm/log 2014-08-04 09:14:47,526 INFO org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Deleting path : /data/nm/log/application_xxx_xxx 2014-08-04 09:14:47,607 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Finished aggregate log-file for app application_xxx_xxx
可見,日誌彙集確實起做用了,可是爲何經過命令不能查看!猛然見看到日誌中「/tmp/nm/remote/logs/spark/logs/ application_xxx_xxx/hostB.tmp」,日誌的路徑有問題,在使用yarn logs命令查看的時候,用的是hadoop用戶,實際Spark Application的提交執行用的是spark用戶,而yarn logs命令默認去找的是當前用戶的路徑,這就是查看不到日誌的緣由。切換到spark用戶再查看,日誌終於出來了!
若是在Spark中使用了LZO做爲EventLog的的壓縮算法等,就得實現安裝好LZO這個東東,不然會出現相似於以下的異常:
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found. at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:134) at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:174) at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45) ... 66 more Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1680) at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:127) ... 68 more
或者
[ERROR] [2014-08-05 10:34:41 933] com.hadoop.compression.lzo.GPLNativeCodeLoader [main] (GPLNativeCodeLoader.java:36) Could not load native gpl library java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path
解決辦法就是得安裝好LZO,而且在HDFS、SPARK中配置好相關的包、文件等,具體步驟見:
http://find.searchhub.org/document/a128707a98fe4ec6
https://github.com/twitter/hadoop-lzo/blob/master/README.md
http://hsiamin.com/posts/2014/05/03/enable-lzo-compression-on-hadoop-pig-and-spark/
生產環境下,節點之間確定是有防火牆限制的,並且Hive的元數據庫Mysql,更是對請求的IP和用戶等限制的嚴格,若是在Spark集羣中使用yarn-cluster模式進行提交Spark的Application,其運行時Driver是和ApplicationMaster運行在一塊兒,由Yarn的ResourceManager負責分配到集羣中的某個NodeManager節點上,若是在Hive-site.xml中只配置了Mysql數據庫而沒有配置MetaStore的話,也許會遇到鏈接元數據庫失敗的問題,此時,就得看下Hive-site.xml的配置,是否Mysql的相關權限配置正確、MetaStore服務是否能夠正常鏈接。
在Spark中使用hql方法執行hive語句時,因爲其在查詢過程當中調用的是Hive的獲取元數據信息、SQL解析,而且使用Cglib等進行序列化反序列化,中間可能產生較多的class文件,致使JVM中的持久代使用較多,若是配置不當,可能引發相似於以下的OOM問題:
Exception in thread "Thread-2" java.lang.OutOfMemoryError: PermGen space
緣由是實際使用時,若是用的是JDK1.6版本,Server模式的持久代默認大小是64M,Client模式的持久代默認大小是32M,而Driver端進行SQL處理時,其持久代的使用可能會達到90M,致使OOM溢出,任務失敗。
解決方法就是在Spark的conf目錄中的spark-defaults.conf裏,增長對Driver的JVM配置,由於Driver才負責SQL的解析和元數據獲取。配置以下:
spark.driver.extraJavaOptions -XX:PermSize=128M -XX:MaxPermSize=256M
可是,上述狀況是在yarn-cluster模式下出現,yarn-client模式運行時卻是正常的,原來在$SPARK_HOME/bin/spark-class文件中已經設置了持久代大小:
JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"
當以yarn-client模式運行時,driver就運行在客戶端的spark-submit進程中,其JVM參數是取的spark-class文件中的設置,所謂未出現持久代溢出現象。
總結一下Spark中各個角色的JVM參數設置:
(1)Driver的JVM參數:
-Xmx,-Xms,若是是yarn-client模式,則默認讀取spark-env文件中的SPARK_DRIVER_MEMORY值,-Xmx,-Xms值同樣大小;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的JVM參數值。
PermSize,若是是yarn-client模式,則是默認讀取spark-class文件中的JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"值;若是是yarn-cluster模式,讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的JVM參數值。
GC方式,若是是yarn-client模式,默認讀取的是spark-class文件中的JAVA_OPTS;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的參數值。
以上值最後都可被spark-submit工具中的--driver-java-options參數覆蓋。
(2)Executor的JVM參數:
-Xmx,-Xms,若是是yarn-client模式,則默認讀取spark-env文件中的SPARK_EXECUTOR_MEMORY值,-Xmx,-Xms值同樣大小;若是是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。
PermSize,兩種模式都是讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。
GC方式,兩種模式都是讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。
(3)Executor數目及所佔CPU個數
若是是yarn-client模式,Executor數目由spark-env中的SPARK_EXECUTOR_INSTANCES指定,每一個實例的數目由SPARK_EXECUTOR_CORES指定;若是是yarn-cluster模式,Executor的數目由spark-submit工具的--num-executors參數指定,默認是2個實例,而每一個Executor使用的CPU數目由--executor-cores指定,默認爲1核。
每一個Executor運行時的信息能夠經過yarn logs命令查看到,相似於以下:
14/08/13 18:12:59 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Setting up executor with commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill %p', -Xms1024m -Xmx1024m , -XX:PermSize=256M -XX:MaxPermSize=256M -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC -Xloggc:/tmp/spark_gc.log, -Djava.io.tmpdir=$PWD/tmp, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler, 1, sparktest2, 3, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
其中,akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler表示當前的Executor進程所在節點,後面的1表示Executor編號,sparktest2表示ApplicationMaster的host,接着的3表示當前Executor所佔用的CPU數目。
在Spark上執行hive語句的時候,出現相似於以下的異常:
org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:849) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.ClassCastException: scala.collection.mutable.HashSet cannot be cast to scala.collection.mutable.BitSet at org.apache.spark.sql.execution.BroadcastNestedLoopJoin$$anonfun$7.apply(joins.scala:336) at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:813) at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:810) at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:845)
排查其先後的日誌,發現大都是序列化的東西:
14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Serialized task 8.0:3 as 20849 bytes in 0 ms 14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Finished TID 813 in 25 ms on sparktest0 (progress: 3/200)
而在spark-default.conf中,事先設置了序列化方式爲Kryo:
spark.serializer org.apache.spark.serializer.KryoSerializer
根據異常信息,可見是HashSet轉爲BitSet類型轉換失敗,Kryo把鬆散的HashSet轉換爲了緊湊的BitSet,把序列化方式註釋掉以後,任務能夠正常執行。難道Spark的Kryo序列化作的還不到位?此問題須要進一步跟蹤。
運行一個Spark任務,發現其運行速度遠遠慢於執行一樣SQL語句的Hive的執行,甚至出現了OOM的錯誤,最後卡住達幾小時!而且Executor進程在瘋狂GC。
截取其一Task的OOM異常信息:
能夠看到這是在序列化過程當中發生的OOM。根據節點信息,找到對應的Executor進程,觀察其Jstack信息:
Thread 36169: (state = BLOCKED) - java.lang.Long.valueOf(long) @bci=27, line=557 (Compiled frame) - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=5, line=113 (Compiled frame) - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=103 (Compiled frame) - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame) - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=158, line=338 (Compiled frame) - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=293 (Compiled frame) - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame) - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame) - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame) - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame) - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame) - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame) - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame) - org.apache.spark.serializer.KryoDeserializationStream.readObject(scala.reflect.ClassTag) @bci=8, line=118 (Compiled frame) - org.apache.spark.serializer.DeserializationStream$$anon$1.getNext() @bci=10, line=125 (Compiled frame) - org.apache.spark.util.NextIterator.hasNext() @bci=16, line=71 (Compiled frame) - org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext() @bci=4, line=1031 (Compiled frame) - scala.collection.Iterator$$anon$13.hasNext() @bci=4, line=371 (Compiled frame) - org.apache.spark.util.CompletionIterator.hasNext() @bci=4, line=30 (Compiled frame) - org.apache.spark.InterruptibleIterator.hasNext() @bci=22, line=39 (Compiled frame) - scala.collection.Iterator$$anon$11.hasNext() @bci=4, line=327 (Compiled frame) - org.apache.spark.sql.execution.HashJoin$$anonfun$execute$1.apply(scala.collection.Iterator, scala.collection.Iterator) @bci=14, line=77 (Compiled frame) - org.apache.spark.sql.execution.HashJoin$$anonfun$execute$1.apply(java.lang.Object, java.lang.Object) @bci=9, line=71 (Interpreted frame) - org.apache.spark.rdd.ZippedPartitionsRDD2.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=48, line=87 (Interpreted frame) - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=262 (Interpreted frame)
有大量的BLOCKED線程,繼續觀察GC信息,發現大量的FULL GC。
分析,在插入Hive表的時候,實際上須要寫HDFS,在此過程的HashJoin時,伴隨着大量的Shuffle寫操做,JVM的新生代不斷GC,Eden Space寫滿了就往Survivor Space寫,同時超過必定大小的數據會直接寫到老生代,當新生代寫滿了以後,也會把老的數據搞到老生代,若是老生代空間不足了,就觸發FULL GC,仍是空間不夠,那就OOM錯誤了,此時線程被Blocked,致使整個Executor處理數據的進程被卡住。
當處理大數據的時候,若是JVM配置不當就容易引發上述問題。解決的方法就是增大Executor的使用內存,合理配置新生代和老生代的大小,能夠將老生代的空間適當的調大點。
問題是比較嚴重,Application都直接沒法運行了,可是引發問題的緣由都比較小,歸根結底仍是部署的時候環境較爲複雜,不夠仔細!再接再厲!之後遇到相關的問題,會再這裏持續更新,方便本身,也方便遇到相似問題的朋友們!
-------------------------------------------------------------------------------
若是您看了本篇博客,以爲對您有所收穫,請點擊右下角的 [推薦]
若是您想轉載本博客,請註明出處
若是您對本文有意見或者建議,歡迎留言
感謝您的閱讀,請關注個人後續博客