如何收集SparkSteaming運行日誌實時進入kafka中

用過sparkstreaming的人都知道,當使用sparkstreaming on yarn模式的時候,若是咱們想查看系統運行的log,是無法直接看的,就算能看也只是一部分。redis

這裏的log分:apache

(1)spark自己運行的log微信

(2)代碼裏面業務產生的logapp

spark on yarn模式,若是你的hadoop集羣有100臺,那麼意味着你的sparkstreaming的log有可能會隨機分佈在100臺中,你想查看log必須登陸上每臺機器上,一個個查看,若是經過Hadoop的8088頁面查看,你也得打開可能幾十個頁面才能看到全部的log,那麼問題來了?異步

能不能將這個job運行全部的log統一收集到某一個目錄裏面呢? 若是收集到一塊兒的話排查log就很是方便了。oop

答案是很遺憾,在sparkstreaming裏面無法作到,由於sparkstreaming程序永遠不停機,就算你開啓hadoop的log聚合也沒用,只有當sparkstreaming程序停掉,hadoop的log聚合才能把全部的log收集到一個目錄裏面,因此其餘的非sparkstreaming程序,好比MR,Spark 運行完後,若是開啓log聚合,hadoop會負責把運行在各個節點上的log給統一收集到HDFS上,這樣的話咱們查看log就很是方便了。性能

如今的問題是sparkstreaming不能停機,那麼還能集中收集log到指定的地方嗎?答案是能夠的,咱們使用log4j收集日誌而後異步發送至kafka裏面,最後再經過logstash收集kafka裏面的日誌進入es便可,這樣一條龍服務打通以後,出現任何異常均可以很是快和方便的在es中排查問題,效率大大提高。至於使用logstash從kafka收集到es裏面,不是本文的重點,有興趣的參考散仙前面的文章:http://qindongliang.iteye.com/blog/2278642。spa

下面會介紹下如何使用:調試

streaming項目中的log4j使用的是apache log4j日誌

<dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

sparkstreaming項目能夠單獨提交某個job的log4j文件,這樣就能定製每一個job的log輸出格式,若是提交的時候不提交log4j文件,那麼默認用的是spark安裝目錄下面的log4j文件。 看下咱們log4j文件的內容:

log4j.rootLogger=WARN,console,kafka

#log4j.logger.com.demo.kafka=DEBUG,kafka
# appender kafka
log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
log4j.appender.kafka.topic=kp_diag_log
# multiple brokers are separated by comma ",".
log4j.appender.kafka.brokerList=192.168.201.6:9092,192.168.201.7:9092,192.168.201.8:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
#log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
log4j.appender.kafka.layout.ConversionPattern=[%d] [%p] [%t] %m%n

# appender console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%d] [%p] [%t] %m%n
#log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

最後看下提交腳本:

jars=`echo /home/spark/x_spark_job/streaming_lib/*jar | sed 's/ /,/g'`

echo $jars

#nohup /opt/bigdata/spark/bin/spark-submit  --class  com.bigdata.xuele.streaming.SparkStreamingKmd  --master yarn    --deploy-mode cluster --executor-cores 3  --driver-memory 4g   --executor-memory 4g  --num-executors 10  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=logback.xml"   --jars  $jars    kpdiag-stream-1.0.0-SNAPSHOT.jar  &> streaming.log  &


nohup /opt/bigdata/spark/bin/spark-submit    --class  com.bigdata.xuele.streaming.SparkStreamingKmd  --master yarn  --deploy-mode cluster \
 --files "/home/spark/x_spark_job/log4j.properties" \
 --executor-cores 3   --driver-memory 3g   --executor-memory 3g  --num-executors 12    --jars  $jars  \
 --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties"   \
 --driver-class-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \
 --driver-library-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar  \
 --conf spark.executor.extraClassPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar   \
 --conf spark.executor.extraLibraryPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar  \
 kpdiag-stream-1.0.0-SNAPSHOT.jar &> kp.log &

注意上面提交腳本中,/opt/bigdata/jars/spark/這個路徑引用的jar包,必須在每臺hadoop機器上都要存在,sparkstreaming運行過程當中,會從本地加載jar包,此外log4j.properties文件以及參數裏面--jars 後面的依賴jar 能夠在提交機器上放一份便可,不須要每臺機器上都存放。

提交任務後,在kafka的節點上執行消費者命令就能看到對應的log輸出: 執行命令:

kafka-console-consumer --zookeeper 192.168.201.5:2181 --topic kp_diag_log

收集到的log內容以下:

[2017-01-21 16:37:03,154] [WARN] [Driver] Support for Java 7 is deprecated as of Spark 2.0.0

[2017-01-21 16:37:19,714] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0

[2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0

[2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0

[2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0

[2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0

[2017-01-21 16:37:19,740] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0

[2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0

[2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客觀題跳過:類型:0

[2017-01-21 16:37:19,842] [WARN] [Executor task launch worker-0] 題目id:b07e88feff464659ab5a351bf1e68ee0在redis不存在

至此,咱們的log就統一收集成功了,後續咱們能夠把log從kafka導入到es中,就能夠任意分析和查詢了。

這裏須要注意一點,sparkstreaming運行時候,系統自己也有大量的log,若是把這個系統log也收集到kafka裏面自己的量是很是大的,並且好多信息不重要,其實 咱們只須要關注業務重點log便可,主要是WARN+ERROR級別的,調試的時候能夠把info級別打開,代碼裏重點關注的log都放在warn級別,異常什麼的放在ERROR便可 這樣排查問題時候也容易並且了避免了大量log的產生從應用自己性能的影響。

有什麼問題能夠掃碼關注微信公衆號:我是攻城師(woshigcs),在後臺留言諮詢。 技術債不能欠,健康債更不能欠, 求道之路,與君同行。

輸入圖片說明

相關文章
相關標籤/搜索