Flume+Kafka+SparkStreaming已經發展爲一個比較成熟的實時日誌收集與計算架構,利用Kafka,便可以支持將用於離線分析的數據流到HDFS,又能夠同時支撐多個消費者實時消費數據,包括SparkStreaming。然而,在SparkStreaming程序中若是有複雜業務邏輯的統計,使用scala代碼實現起來比較困難,也不易於別人理解。但若是在SparkSteaming中也使用SQL來作統計分析,是否是就簡單的多呢?java
本文介紹將SparkSQL與SparkStreaming結合起來,使用SQL完成實時的日誌數據統計。SparkStreaming程序以yarn-cluster模式運行在YARN上,不單獨部署Spark集羣。sql
Hadoop-2.3.0-cdh5.0.0(YARN)typescript
spark-1.5.0-bin-hadoop2.3數據庫
kafka_2.10-0.8.2.1apache
另外,還編譯了SparkStreaming用於讀取Kafka數據的插件:cookie
spark-streaming-kafka_2.10-1.5.0.jar架構
相關環境的部署本文不作介紹,請參考文章最後的相關閱讀。oop
以60秒爲間隔,統計60秒內的pv,ip數,uvui
最終結果包括:spa
時間點:pv:ips:uv
2015-11-11T14:59:59|~|xxx|~|202.109.201.181|~|xxx|~|xxx|~|xxx|~|B5C96DCA0003DB546E7 2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|B1611D0E00003857808 2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|1555BD0100016F2E76F 2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EA13670E0B942E70E 2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EA13670E0B942E70E 2015-11-11T15:00:01|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|4E3512790001039FDB9
每條日誌包含7個字段,分隔符爲|~|,其中,第3列爲ip,第7列爲cookieid。假設原始日誌已經由Flume流到Kafka中。
程序中使用下面的SQL語句完成對一個批次的數據統計:
SELECT date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') AS time, COUNT(1) AS pv, COUNT(DISTINCT ip) AS ips, COUNT(DISTINCT cookieid) as uv FROM daplog
SparkStreaming程序代碼:
package com.lxw.test
import scala.reflect.runtime.universe import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Time import org.apache.spark.streaming.kafka.KafkaUtils /** * auth:lxw1234 * http://lxw1234.com */ object DapLogStreaming { def main (args : Array[String]) { val sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("DapLogStreaming") //每60秒一個批次 val ssc = new StreamingContext(sparkConf, Seconds(60)) //從Kafka中讀取數據,topic爲daplog,該topic包含兩個分區 val kafkaStream = KafkaUtils.createStream( ssc, "bj11-65:2181", //Kafka集羣使用的zookeeper "group_spark_streaming", //該消費者使用的group.id Map[String, Int]("daplog" -> 0,"daplog" -> 1), //日誌在Kafka中的topic及其分區 StorageLevel.MEMORY_AND_DISK_SER) .map(x => x._2.split("\\|~\\|", -1)) //日誌以|~|爲分隔符 kafkaStream.foreachRDD((rdd: RDD[Array[String]], time: Time) => { val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ //構造case class: DapLog,提取日誌中相應的字段 val logDataFrame = rdd.map(w => DapLog(w(0).substring(0, 10),w(2),w(6))).toDF() //註冊爲tempTable logDataFrame.registerTempTable("daplog") //查詢該批次的pv,ip數,uv val logCountsDataFrame = sqlContext.sql("select date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as time,count(1) as pv,count(distinct ip) as ips,count(distinct cookieid) as uv from daplog") //打印查詢結果 logCountsDataFrame.show() }) ssc.start() ssc.awaitTermination() } } case class DapLog(day:String, ip:String, cookieid:String) object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } }
示例中只是將實時統計的結果打印到標準輸出,真實場景通常是將結果持久化到數據庫中。
將該程序打包成DapLogStreaming.jar,上傳至網關機。
進入$SPARK_HOME/bin執行下面的命令,將SparkStreaming程序提交到YARN:
./spark-submit \
--class com.lxw.test.DapLogStreaming \
--master yarn-cluster \
--executor-memory 2G \
--num-executors 6 \
--jars /home/liuxiaowen/kafka-clients-0.8.2.1.jar,/home/liuxiaowen/metrics-core-2.2.0.jar,/home/liuxiaowen/zkclient-0.3.jar,/home/liuxiaowen/spark-streaming-kafka_2.10-1.5.0.jar,/home/liuxiaowen/kafka_2.10-0.8.2.1.jar \ /home/liuxiaowen/DapLogStreaming.jar
注意:SparkStreaming及Kafka插件運行時候須要依賴相應的jar包。
進入YARN ResourceManager的WEB界面,找到該程序對應的Application,點擊ApplicationMaster的連接,進入SparkMaster界面:
每一個批次(60秒),會生成一個Job。
點擊TAB頁」Streaming」,進入Streaming的監控頁面:
在最下方,顯示正在處理的批次和已經完成的批次,包括每一個批次的events數量。
最後,最主要的,點擊ApplicationMaster的logs連接,查看stdout標準輸出:
已經按照SQL中統計的字段,打印出統計結果,每60秒一個批次打印一次。
因爲kafka_2.10-0.8.2.1是基於Scala2.10的,所以Spark、Spark的Kafka插件、SparkStreaming應用程序都須要使用Scala2.10,若是使用Scala2.11,運行時候會報出因Scala版本不一致而形成的錯誤,好比:
15/11/11 15:36:26 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:59) at com.lxw.test.DapLogStreaming$.main(DapLogStreaming.scala:23) at com.lxw.test.DapLogStreaming.main(DapLogStreaming.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.sc