Kafka+Spark Streaming+Redis實時計算整合實踐

基於Spark通用計算平臺,能夠很好地擴展各類計算類型的應用,尤爲是Spark提供了內建的計算庫支持,像Spark Streaming、Spark SQL、MLlib、GraphX,這些內建庫都提供了高級抽象,能夠用很是簡潔的代碼實現複雜的計算邏輯、這也得益於Scala編程語言的簡潔性。這 裏,咱們基於1.3.0版本的Spark搭建了計算平臺,實現基於Spark Streaming的實時計算。
咱們的應用場景是分析用戶使用手機App的行爲,描述以下所示:html

  • 手機客戶端會收集用戶的行爲事件(咱們以點擊事件爲例),將數據發送到數據服務器,咱們假設這裏直接進入到Kafka消息隊列java

  • 後端的實時服務會從Kafka消費數據,將數據讀出來並進行實時分析,這裏選擇Spark Streaming,由於Spark Streaming提供了與Kafka整合的內置支持redis

  • 通過Spark Streaming實時計算程序分析,將結果寫入Redis,能夠實時獲取用戶的行爲數據,並能夠導出進行離線綜合統計分析數據庫

Spark Streaming介紹apache

Spark Streaming提供了一個叫作DStream(Discretized Stream)的高級抽象,DStream表示一個持續不斷輸入的數據流,能夠基於Kafka、TCP Socket、Flume等輸入數據流建立。在內部,一個DStream其實是由一個RDD序列組成的。Sparking Streaming是基於Spark平臺的,也就繼承了Spark平臺的各類特性,如容錯(Fault-tolerant)、可擴展 (Scalable)、高吞吐(High-throughput)等。
在Spark Streaming中,每一個DStream包含了一個時間間隔以內的數據項的集合,咱們能夠理解爲指定時間間隔以內的一個batch,每個batch就 構成一個RDD數據集,因此DStream就是一個個batch的有序序列,時間是連續的,按照時間間隔將數據流分割成一個個離散的RDD數據集,如圖所 示(來自官網):

咱們都知道,Spark支持兩種類型操做:Transformations和Actions。Transformation從一個已知的RDD數據集通過 轉換獲得一個新的RDD數據集,這些Transformation操做包括map、filter、flatMap、union、join等,並且 Transformation具備lazy的特性,調用這些操做並無馬上執行對已知RDD數據集的計算操做,而是在調用了另外一類型的Action操做才 會真正地執行。Action執行,會真正地對RDD數據集進行操做,返回一個計算結果給Driver程序,或者沒有返回結果,如將計算結果數據進行持久 化,Action操做包括reduceByKey、count、foreach、collect等。關於Transformations和Actions 更詳細內容,能夠查看官網文檔。
一樣、Spark Streaming提供了相似Spark的兩種操做類型,分別爲Transformations和Output操做,它們的操做對象是DStream,做 用也和Spark相似:Transformation從一個已知的DStream通過轉換獲得一個新的DStream,並且Spark Streaming還額外增長了一類針對Window的操做,固然它也是Transformation,可是能夠更靈活地控制DStream的大小(時間 間隔大小、數據元素個數),例如window(windowLength, slideInterval)、countByWindow(windowLength, slideInterval)、reduceByWindow(func, windowLength, slideInterval)等。Spark Streaming的Output操做容許咱們將DStream數據輸出到一個外部的存儲系統,如數據庫或文件系統等,執行Output操做相似執行 Spark的Action操做,使得該操做以前lazy的Transformation操做序列真正地執行。編程

Kafka+Spark Streaming+Redis編程實踐json

下面,咱們根據上面提到的應用場景,來編程實現這個實時計算應用。首先,寫了一個Kafka Producer模擬程序,用來模擬向Kafka實時寫入用戶行爲的事件數據,數據是JSON格式,示例以下:後端

1 {"uid":"068b746ed4620d25e26055a9f804385f","event_time":"1430204612405","os_type":"Android","click_count":6}

一個事件包含4個字段:api

  • uid:用戶編號服務器

  • event_time:事件發生時間戳

  • os_type:手機App操做系統類型

  • click_count:點擊次數

下面是咱們實現的代碼,以下所示:

01 package org.shirdrn.spark.streaming.utils
02
03 import java.util.Properties
04 import scala.util.Properties
05 import org.codehaus.jettison.json.JSONObject
06 import kafka.javaapi.producer.Producer
07 import kafka.producer.KeyedMessage
08 import kafka.producer.KeyedMessage
09 import kafka.producer.ProducerConfig
10 import scala.util.Random
11
12 object KafkaEventProducer {
13
14   private val users = Array(
15       "4A4D769EB9679C054DE81B973ED5D768", "8dfeb5aaafc027d89349ac9a20b3930f",
16       "011BBF43B89BFBF266C865DF0397AA71", "f2a8474bf7bd94f0aabbd4cdd2c06dcf",
17       "068b746ed4620d25e26055a9f804385f", "97edfc08311c70143401745a03a50706",
18       "d7f141563005d1b5d0d3dd30138f3f62", "c8ee90aade1671a21336c721512b817a",
19       "6b67c8c700427dee7552f81f3228c927", "a95f22eabc4fd4b580c011a3161a9d9d")
20
21   private val random = new Random()
22
23   private var pointer = -1
24
25   def getUserID() : String = {
26        pointer = pointer + 1
27     if(pointer >= users.length) {
28       pointer = 0
29       users(pointer)
30     } else {
31       users(pointer)
32     }
33   }
34
35   def click() : Double = {
36     random.nextInt(10)
37   }
38
39   // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic user_events --replication-factor 2 --partitions 2
40   // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --list
41   // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --describe user_events
42   // bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:22181/kafka --topic test_json_basis_event --from-beginning
43   def main(args: Array[String]): Unit = {
44     val topic = "user_events"
45     val brokers = "10.10.4.126:9092,10.10.4.127:9092"
46     val props = new Properties()
47     props.put("metadata.broker.list", brokers)
48     props.put("serializer.class", "kafka.serializer.StringEncoder")
49
50     val kafkaConfig = new ProducerConfig(props)
51     val producer = new Producer[String, String](kafkaConfig)
52
53     while(true) {
54       // prepare event data
55       val event = new JSONObject()
56       event
57         .put("uid", getUserID)
58         .put("event_time", System.currentTimeMillis.toString)
59         .put("os_type", "Android")
60         .put("click_count", click)
61
62       // produce event message
63       producer.send(new KeyedMessage[String, String](topic, event.toString))
64       println("Message sent: " + event)
65
66       Thread.sleep(200)
67     }
68   
69 }

經過控制上面程序最後一行的時間間隔來控制模擬寫入速度。下面咱們來討論實現實時統計每一個用戶的點擊次數,它是按照用戶分組進行累加次數,邏輯比較簡單,關鍵是在實現過程當中要注意一些問題,如對象序列化等。先看實現代碼,稍後咱們再詳細討論,代碼實現以下所示:

01 object UserClickCountAnalytics {
02
03   def main(args: Array[String]): Unit = {
04     var masterUrl = "local[1]"
05     if (args.length > 0) {
06       masterUrl = args(0)
07     }
08
09     // Create a StreamingContext with the given master URL
10     val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
11     val ssc = new StreamingContext(conf, Seconds(5))
12
13     // Kafka configurations
14     val topics = Set("user_events")
15     val brokers = "10.10.4.126:9092,10.10.4.127:9092"
16     val kafkaParams = Map[String, String](
17       "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
18
19     val dbIndex = 1
20     val clickHashKey = "app::users::click"
21
22     // Create a direct stream
23     val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
24
25     val events = kafkaStream.flatMap(line => {
26       val data = JSONObject.fromObject(line._2)
27       Some(data)
28     })
29
30     // Compute user click times
31     val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
32     userClicks.foreachRDD(rdd => {
33       rdd.foreachPartition(partitionOfRecords => {
34         partitionOfRecords.foreach(pair => {
35           val uid = pair._1
36           val clickCount = pair._2
37           val jedis = RedisClient.pool.getResource
38           jedis.select(dbIndex)
39           jedis.hincrBy(clickHashKey, uid, clickCount)
40           RedisClient.pool.returnResource(jedis)
41         })
42       })
43     })
44
45     ssc.start()
46     ssc.awaitTermination()
47
48   }
49 }

上面代碼使用了Jedis客戶端來操做Redis,將分組計數結果數據累加寫入Redis存儲,若是其餘系統須要實時獲取該數據,直接從Redis實時讀取便可。RedisClient實現代碼以下所示:

01 object RedisClient extends Serializable {
02   val redisHost = "10.10.4.130"
03   val redisPort = 6379
04   val redisTimeout = 30000
05   lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
06
07   lazy val hook = new Thread {
08     override def run = {
09       println("Execute hook thread: " + this)
10       pool.destroy()
11     }
12   }
13   sys.addShutdownHook(hook.run)
14 }

上面代碼咱們分別在local[K]和Spark Standalone集羣模式下運行經過。
若是咱們是在開發環境進行調試的時候,也就是使用local[K]部署模式,在本地啓動K個Worker線程來計算,這K個Worker在同一個JVM實 例裏,上面的代碼默認狀況是,若是沒有傳參數則是local[K]模式,因此若是使用這種方式在建立Redis鏈接池或鏈接的時候,可能很是容易調試通 過,可是在使用Spark Standalone、YARN Client(YARN Cluster)或Mesos集羣部署模式的時候,就會報錯,主要是因爲在處理Redis鏈接池或鏈接的時候出錯了。咱們能夠看一下Spark架構,如圖 所示(來自官網):

不管是在本地模式、Standalone模式,仍是在Mesos或YARN模式下,整個Spark集羣的結構均可以用上圖抽象表示,只是各個組件的運行環 境不一樣,致使組件多是分佈式的,或本地的,或單個JVM實例的。如在本地模式,則上圖表現爲在同一節點上的單個進程以內的多個組件;而在YARN Client模式下,Driver程序是在YARN集羣以外的一個節點上提交Spark Application,其餘的組件都運行在YARN集羣管理的節點上。
在Spark集羣環境部署Application後,在進行計算的時候會將做用於RDD數據集上的函數(Functions)發送到集羣中Worker上 的Executor上(在Spark Streaming中是做用於DStream的操做),那麼這些函數操做所做用的對象(Elements)必須是可序列化的,經過Scala也可使用 lazy引用來解決,不然這些對象(Elements)在跨節點序列化傳輸後,沒法正確地執行反序列化重構成實際可用的對象。上面代碼咱們使用lazy引 用(Lazy Reference)來實現的,代碼以下所示:

01 // lazy pool reference
02 lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
03 ...
04 partitionOfRecords.foreach(pair => {
05   val uid = pair._1
06   val clickCount = pair._2
07   val jedis = RedisClient.pool.getResource
08   jedis.select(dbIndex)
09   jedis.hincrBy(clickHashKey, uid, clickCount)
10   RedisClient.pool.returnResource(jedis)
11 })

另外一種方式,咱們將代碼修改成,把對Redis鏈接的管理放在操做DStream的Output操做範圍以內,由於咱們知道它是在特定的Executor中進行初始化的,使用一個單例的對象來管理,以下所示:

001 package org.shirdrn.spark.streaming
002
003 import org.apache.commons.pool2.impl.GenericObjectPoolConfig
004 import org.apache.spark.SparkConf
005 import org.apache.spark.streaming.Seconds
006 import org.apache.spark.streaming.StreamingContext
007 import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
008 import org.apache.spark.streaming.kafka.KafkaUtils
009
010 import kafka.serializer.StringDecoder
011 import net.sf.json.JSONObject
012 import redis.clients.jedis.JedisPool
013
014 object UserClickCountAnalytics {
015
016   def main(args: Array[String]): Unit = {
017     var masterUrl = "local[1]"
018     if (args.length > 0) {
019       masterUrl = args(0)
020     }
021
022     // Create a StreamingContext with the given master URL
023     val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
024     val ssc = new StreamingContext(conf, Seconds(5))
025
026     // Kafka configurations
027     val topics = Set("user_events")
028     val brokers = "10.10.4.126:9092,10.10.4.127:9092"
029     val kafkaParams = Map[String, String](
030       "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
031
032     val dbIndex = 1
033     val clickHashKey = "app::users::click"
034
035     // Create a direct stream
036     val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
037
038     val events = kafkaStream.flatMap(line => {
039       val data = JSONObject.fromObject(line._2)
040       Some(data)
041     })
042
043     // Compute user click times
044     val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
045     userClicks.foreachRDD(rdd => {
046       rdd.foreachPartition(partitionOfRecords => {
047         partitionOfRecords.foreach(pair => {
048
049           /**
050            * Internal Redis client for managing Redis connection {@link  Jedis} based on {@link  RedisPool}
051            */
052           object InternalRedisClient extends Serializable {
053
054             @transient private var pool: JedisPool = null
055
056             def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
057                 maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
058               makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true, false, 10000)  
059             }
060
061             def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
062                 maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,
063                 testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
064               if(pool == null) {
065                    val poolConfig = new GenericObjectPoolConfig()
066                    poolConfig.setMaxTotal(maxTotal)
067                    poolConfig.setMaxIdle(maxIdle)
068                    poolConfig.setMinIdle(minIdle)
069                    poolConfig.setTestOnBorrow(testOnBorrow)
070                    poolConfig.setTestOnReturn(testOnReturn)
071                    poolConfig.setMaxWaitMillis(maxWaitMillis)
072                    pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)
073
074                    val hook = new Thread{
075                         override def run = pool.destroy()
076                    }
077                    sys.addShutdownHook(hook.run)
078               }
079             }
080
081             def getPool: JedisPool = {
082               assert(pool != null)
083               pool
084             }
085           }
086
087           // Redis configurations
088           val maxTotal = 10
089           val maxIdle = 10
090           val minIdle = 1
091           val redisHost = "10.10.4.130"
092           val redisPort = 6379
093           val redisTimeout = 30000
094           val dbIndex = 1
095           InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
096
097           val uid = pair._1
098           val clickCount = pair._2
099           val jedis =InternalRedisClient.getPool.getResource
100           jedis.select(dbIndex)
101           jedis.hincrBy(clickHashKey, uid, clickCount)
102           InternalRedisClient.getPool.returnResource(jedis)
103         })
104       })
105     })
106
107     ssc.start()
108     ssc.awaitTermination()
109
110   }
111 }

上面代碼實現,得益於Scala語言的特性,能夠在代碼中任何位置進行class或object的定義,咱們將用來管理Redis鏈接的代碼放在了 特定操做的內部,就避免了瞬態(Transient)對象跨節點序列化的問題。這樣作還要求咱們可以瞭解Spark內部是如何操做RDD數據集的,更多可 以參考RDD或Spark相關文檔。
在集羣上,以Standalone模式運行,執行以下命令:

1 cd /usr/local/spark
2 ./bin/spark-submit --class org.shirdrn.spark.streaming.UserClickCountAnalytics --master spark://hadoop1:7077 --executor-memory 1G --total-executor-cores 2 ~/spark-0.0.SNAPSHOT.jar spark://hadoop1:7077

能夠查看集羣中各個Worker節點執行計算任務的狀態,也能夠很是方便地經過Web頁面查看。
下面,看一下咱們存儲到Redis中的計算結果,以下所示:

01 127.0.0.1:6379[1]> HGETALL app::users::click
02 1) "4A4D769EB9679C054DE81B973ED5D768"
03 2) "7037"
04 3) "8dfeb5aaafc027d89349ac9a20b3930f"
05 4) "6992"
06 5) "011BBF43B89BFBF266C865DF0397AA71"
07 6) "7021"
08 7) "97edfc08311c70143401745a03a50706"
09 8) "6874"
10 9) "d7f141563005d1b5d0d3dd30138f3f62"
11 10) "7057"
12 11) "a95f22eabc4fd4b580c011a3161a9d9d"
13 12) "7092"
14 13) "6b67c8c700427dee7552f81f3228c927"
15 14) "7266"
16 15) "f2a8474bf7bd94f0aabbd4cdd2c06dcf"
17 16) "7188"
18 17) "c8ee90aade1671a21336c721512b817a"
19 18) "6950"
20 19) "068b746ed4620d25e26055a9f804385f"

有關更多關於Spark Streaming的詳細內容,能夠參考官方文檔。

附錄

這裏,附上前面開發的應用所對應的依賴,以及打包Spark Streaming應用程序的Maven配置,以供參考。若是使用maven-shade-plugin插件,配置有問題的話,打包後在Spark集羣上 提交Application時候可能會報錯Invalid signature file digest for Manifest main attributes。參考的Maven配置,以下所示:

001 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
002      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
003      <modelVersion>4.0.0</modelVersion>
004      <groupId>org.shirdrn.spark</groupId>
005      <artifactId>spark</artifactId>
006      <version>0.0.1-SNAPSHOT</version>
007
008      <dependencies>
009           <dependency>
010                <groupId>org.apache.spark</groupId>
011                <artifactId>spark-core_2.10</artifactId>
012                <version>1.3.0</version>
013           </dependency>
014           <dependency>
015                <groupId>org.apache.spark</groupId>
016                <artifactId>spark-streaming_2.10</artifactId>
017                <version>1.3.0</version>
018           </dependency>
019           <dependency>
020                <groupId>net.sf.json-lib</groupId>
021                <artifactId>json-lib</artifactId>
022                <version>2.3</version>
023           </dependency>
024           <dependency>
025                <groupId>org.apache.spark</groupId>
026                <artifactId>spark-streaming-kafka_2.10</artifactId>
027                <version>1.3.0</version>
028           </dependency>
029           <dependency>
030                <groupId>redis.clients</groupId>
031                <artifactId>jedis</artifactId>
032                <version>2.5.2</version>
033           </dependency>
034           <dependency>
035                <groupId>org.apache.commons</groupId>
036                <artifactId>commons-pool2</artifactId>
037                <version>2.2</version>
038           </dependency>
039      </dependencies>
040
041      <build>
042           <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
043           <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
044           <resources>
045                <resource>
046                     <directory>${basedir}/src/main/resources</directory>
047                </resource>
048           </resources>
049           <testResources>
050                <testResource>
051                     <directory>${basedir}/src/test/resources</directory>
052                </testResource>
053           </testResources>
054           <plugins>
055                <plugin>
056                     <artifactId>maven-compiler-plugin</artifactId>
057                     <version>3.1</version>
058                     <configuration>
059                          <source>1.6</source>
060                          <target>1.6</target>
061                     </configuration>
062                </plugin>
063                <plugin>
064                     <groupId>org.apache.maven.plugins</groupId>
065                     <artifactId>maven-shade-plugin</artifactId>
066                     <version>2.2</version>
067                     <configuration>
068                          <createDependencyReducedPom>true</createDependencyReducedPom>
069                     </configuration>
070                     <executions>
071                          <execution>
072                               <phase>package</phase>
073                               <goals>
074                                    <goal>shade</goal>
075                               </goals>
076                               <configuration>
077                                    <artifactSet>
078                                         <includes>
079                                              <include>*:*</include>
080                                         </includes>
081                                    </artifactSet>
082                                    <filters>
083                                         <filter>
084                                              <artifact>*:*</artifact>
085                                              <excludes>
086                                                   <exclude>META-INF/*.SF</exclude>
087                                                   <exclude>META-INF/*.DSA</exclude>
088                                                   <exclude>META-INF/*.RSA</exclude>
089                                              </excludes>
090                                         </filter>
091                                    </filters>
092                                    <transformers>
093                                         <transformer
094                                              implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
095                                         <transformer
096                                              implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
097                                              <resource>reference.conf</resource>
098                                         </transformer>
099                                         <transformer
100                                              implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
101                                              <resource>log4j.properties</resource>
102                                         </transformer>
103                                    </transformers>
104                               </configuration>
105                          </execution>
106                     </executions>
107                </plugin>
108           </plugins>
109      </build>
110 </project>

參考連接

Creative Commons License

本文基於署名-非商業性使用-相同方式共享 4.0許可協議發佈,歡迎轉載、使用、從新發布,但務必保留文章署名時延軍(包含連接:http://shiyanjun.cn),不得用於商業目的,基於本文修改後的做品務必以相同的許可發佈。若有任何疑問,請與我聯繫

相關文章
相關標籤/搜索