用過sparkstreaming的人都知道,當使用sparkstreaming on yarn模式的時候,若是咱們想查看系統運行的log,是無法直接看的,就算能看也只是一部分。redis
這裏的log分:apache
(1)spark自己運行的log微信
(2)代碼裏面業務產生的logapp
spark on yarn模式,若是你的hadoop集羣有100臺,那麼意味着你的sparkstreaming的log有可能會隨機分佈在100臺中,你想查看log必須登陸上每臺機器上,一個個查看,若是經過Hadoop的8088頁面查看,你也得打開可能幾十個頁面才能看到全部的log,那麼問題來了?異步
能不能將這個job運行全部的log統一收集到某一個目錄裏面呢? 若是收集到一塊兒的話排查log就很是方便了。oop
答案是很遺憾,在sparkstreaming裏面無法作到,由於sparkstreaming程序永遠不停機,就算你開啓hadoop的log聚合也沒用,只有當sparkstreaming程序停掉,hadoop的log聚合才能把全部的log收集到一個目錄裏面,因此其餘的非sparkstreaming程序,好比MR,Spark 運行完後,若是開啓log聚合,hadoop會負責把運行在各個節點上的log給統一收集到HDFS上,這樣的話咱們查看log就很是方便了。性能
如今的問題是sparkstreaming不能停機,那麼還能集中收集log到指定的地方嗎?答案是能夠的,咱們使用log4j收集日誌而後異步發送至kafka裏面,最後再經過logstash收集kafka裏面的日誌進入es便可,這樣一條龍服務打通以後,出現任何異常均可以很是快和方便的在es中排查問題,效率大大提高。至於使用logstash從kafka收集到es裏面,不是本文的重點,有興趣的參考散仙前面的文章:http://qindongliang.iteye.com/blog/2278642。spa
下面會介紹下如何使用:調試
streaming項目中的log4j使用的是apache log4j日誌
<dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
sparkstreaming項目能夠單獨提交某個job的log4j文件,這樣就能定製每一個job的log輸出格式,若是提交的時候不提交log4j文件,那麼默認用的是spark安裝目錄下面的log4j文件。 看下咱們log4j文件的內容:
log4j.rootLogger=WARN,console,kafka #log4j.logger.com.demo.kafka=DEBUG,kafka # appender kafka log4j.appender.kafka=kafka.producer.KafkaLog4jAppender log4j.appender.kafka.topic=kp_diag_log # multiple brokers are separated by comma ",". log4j.appender.kafka.brokerList=192.168.201.6:9092,192.168.201.7:9092,192.168.201.8:9092 log4j.appender.kafka.compressionType=none log4j.appender.kafka.syncSend=false log4j.appender.kafka.layout=org.apache.log4j.PatternLayout #log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n log4j.appender.kafka.layout.ConversionPattern=[%d] [%p] [%t] %m%n # appender console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=[%d] [%p] [%t] %m%n #log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
最後看下提交腳本:
jars=`echo /home/spark/x_spark_job/streaming_lib/*jar | sed 's/ /,/g'` echo $jars #nohup /opt/bigdata/spark/bin/spark-submit --class com.bigdata.xuele.streaming.SparkStreamingKmd --master yarn --deploy-mode cluster --executor-cores 3 --driver-memory 4g --executor-memory 4g --num-executors 10 --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=logback.xml" --jars $jars kpdiag-stream-1.0.0-SNAPSHOT.jar &> streaming.log & nohup /opt/bigdata/spark/bin/spark-submit --class com.bigdata.xuele.streaming.SparkStreamingKmd --master yarn --deploy-mode cluster \ --files "/home/spark/x_spark_job/log4j.properties" \ --executor-cores 3 --driver-memory 3g --executor-memory 3g --num-executors 12 --jars $jars \ --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties" \ --driver-class-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --driver-library-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --conf spark.executor.extraClassPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --conf spark.executor.extraLibraryPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ kpdiag-stream-1.0.0-SNAPSHOT.jar &> kp.log &
注意上面提交腳本中,/opt/bigdata/jars/spark/這個路徑引用的jar包,必須在每臺hadoop機器上都要存在,sparkstreaming運行過程當中,會從本地加載jar包,此外log4j.properties文件以及參數裏面--jars 後面的依賴jar 能夠在提交機器上放一份便可,不須要每臺機器上都存放。
提交任務後,在kafka的節點上執行消費者命令就能看到對應的log輸出: 執行命令:
kafka-console-consumer --zookeeper 192.168.201.5:2181 --topic kp_diag_log
收集到的log內容以下:
[2017-01-21 16:37:03,154] [WARN] [Driver] Support for Java 7 is deprecated as of Spark 2.0.0 [2017-01-21 16:37:19,714] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0 [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0 [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0 [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0 [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0 [2017-01-21 16:37:19,740] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0 [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0 [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0 [2017-01-21 16:37:19,842] [WARN] [Executor task launch worker-0] 題目id:b07e88feff464659ab5a351bf1e68ee0在redis不存在
至此,咱們的log就統一收集成功了,後續咱們能夠把log從kafka導入到es中,就能夠任意分析和查詢了。
這裏須要注意一點,sparkstreaming運行時候,系統自己也有大量的log,若是把這個系統log也收集到kafka裏面自己的量是很是大的,並且好多信息不重要,其實 咱們只須要關注業務重點log便可,主要是WARN+ERROR級別的,調試的時候能夠把info級別打開,代碼裏重點關注的log都放在warn級別,異常什麼的放在ERROR便可 這樣排查問題時候也容易並且了避免了大量log的產生從應用自己性能的影響。
有什麼問題能夠掃碼關注微信公衆號:我是攻城師(woshigcs),在後臺留言諮詢。 技術債不能欠,健康債更不能欠, 求道之路,與君同行。