基於Spark通用計算平臺,能夠很好地擴展各類計算類型的應用,尤爲是Spark提供了內建的計算庫支持,像Spark Streaming、Spark SQL、MLlib、GraphX,這些內建庫都提供了高級抽象,能夠用很是簡潔的代碼實現複雜的計算邏輯、這也得益於Scala編程語言的簡潔性。這 裏,咱們基於1.3.0版本的Spark搭建了計算平臺,實現基於Spark Streaming的實時計算。
咱們的應用場景是分析用戶使用手機App的行爲,描述以下所示:html
手機客戶端會收集用戶的行爲事件(咱們以點擊事件爲例),將數據發送到數據服務器,咱們假設這裏直接進入到Kafka消息隊列java
後端的實時服務會從Kafka消費數據,將數據讀出來並進行實時分析,這裏選擇Spark Streaming,由於Spark Streaming提供了與Kafka整合的內置支持redis
通過Spark Streaming實時計算程序分析,將結果寫入Redis,能夠實時獲取用戶的行爲數據,並能夠導出進行離線綜合統計分析數據庫
Spark Streaming介紹apache
Spark Streaming提供了一個叫作DStream(Discretized Stream)的高級抽象,DStream表示一個持續不斷輸入的數據流,能夠基於Kafka、TCP Socket、Flume等輸入數據流建立。在內部,一個DStream其實是由一個RDD序列組成的。Sparking Streaming是基於Spark平臺的,也就繼承了Spark平臺的各類特性,如容錯(Fault-tolerant)、可擴展 (Scalable)、高吞吐(High-throughput)等。
在Spark Streaming中,每一個DStream包含了一個時間間隔以內的數據項的集合,咱們能夠理解爲指定時間間隔以內的一個batch,每個batch就 構成一個RDD數據集,因此DStream就是一個個batch的有序序列,時間是連續的,按照時間間隔將數據流分割成一個個離散的RDD數據集,如圖所 示(來自官網):
咱們都知道,Spark支持兩種類型操做:Transformations和Actions。Transformation從一個已知的RDD數據集通過 轉換獲得一個新的RDD數據集,這些Transformation操做包括map、filter、flatMap、union、join等,並且 Transformation具備lazy的特性,調用這些操做並無馬上執行對已知RDD數據集的計算操做,而是在調用了另外一類型的Action操做才 會真正地執行。Action執行,會真正地對RDD數據集進行操做,返回一個計算結果給Driver程序,或者沒有返回結果,如將計算結果數據進行持久 化,Action操做包括reduceByKey、count、foreach、collect等。關於Transformations和Actions 更詳細內容,能夠查看官網文檔。
一樣、Spark Streaming提供了相似Spark的兩種操做類型,分別爲Transformations和Output操做,它們的操做對象是DStream,做 用也和Spark相似:Transformation從一個已知的DStream通過轉換獲得一個新的DStream,並且Spark Streaming還額外增長了一類針對Window的操做,固然它也是Transformation,可是能夠更靈活地控制DStream的大小(時間 間隔大小、數據元素個數),例如window(windowLength, slideInterval)、countByWindow(windowLength, slideInterval)、reduceByWindow(func, windowLength, slideInterval)等。Spark Streaming的Output操做容許咱們將DStream數據輸出到一個外部的存儲系統,如數據庫或文件系統等,執行Output操做相似執行 Spark的Action操做,使得該操做以前lazy的Transformation操做序列真正地執行。編程
Kafka+Spark Streaming+Redis編程實踐json
下面,咱們根據上面提到的應用場景,來編程實現這個實時計算應用。首先,寫了一個Kafka Producer模擬程序,用來模擬向Kafka實時寫入用戶行爲的事件數據,數據是JSON格式,示例以下:後端
1 |
{"uid":"068b746ed4620d25e26055a9f804385f","event_time":"1430204612405","os_type":"Android","click_count":6} |
一個事件包含4個字段:api
uid:用戶編號服務器
event_time:事件發生時間戳
os_type:手機App操做系統類型
click_count:點擊次數
下面是咱們實現的代碼,以下所示:
01 |
package org.shirdrn.spark.streaming.utils |
03 |
import java.util.Properties |
04 |
import scala.util.Properties |
05 |
import org.codehaus.jettison.json.JSONObject |
06 |
import kafka.javaapi.producer.Producer |
07 |
import kafka.producer.KeyedMessage |
08 |
import kafka.producer.KeyedMessage |
09 |
import kafka.producer.ProducerConfig |
10 |
import scala.util.Random |
12 |
object KafkaEventProducer { |
14 |
private val users = Array( |
15 |
"4A4D769EB9679C054DE81B973ED5D768" , "8dfeb5aaafc027d89349ac9a20b3930f" , |
16 |
"011BBF43B89BFBF266C865DF0397AA71" , "f2a8474bf7bd94f0aabbd4cdd2c06dcf" , |
17 |
"068b746ed4620d25e26055a9f804385f" , "97edfc08311c70143401745a03a50706" , |
18 |
"d7f141563005d1b5d0d3dd30138f3f62" , "c8ee90aade1671a21336c721512b817a" , |
19 |
"6b67c8c700427dee7552f81f3228c927" , "a95f22eabc4fd4b580c011a3161a9d9d" ) |
21 |
private val random = new Random() |
23 |
private var pointer = - 1 |
25 |
def getUserID() : String = { |
27 |
if (pointer > = users.length) { |
35 |
def click() : Double = { |
39 |
// bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic user_events --replication-factor 2 --partitions 2 |
40 |
// bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --list |
41 |
// bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --describe user_events |
42 |
// bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:22181/kafka --topic test_json_basis_event --from-beginning |
43 |
def main(args : Array[String]) : Unit = { |
44 |
val topic = "user_events" |
45 |
val brokers = "10.10.4.126:9092,10.10.4.127:9092" |
46 |
val props = new Properties() |
47 |
props.put( "metadata.broker.list" , brokers) |
48 |
props.put( "serializer.class" , "kafka.serializer.StringEncoder" ) |
50 |
val kafkaConfig = new ProducerConfig(props) |
51 |
val producer = new Producer[String, String](kafkaConfig) |
55 |
val event = new JSONObject() |
57 |
.put( "uid" , getUserID) |
58 |
.put( "event_time" , System.currentTimeMillis.toString) |
59 |
.put( "os_type" , "Android" ) |
60 |
.put( "click_count" , click) |
62 |
// produce event message |
63 |
producer.send( new KeyedMessage[String, String](topic, event.toString)) |
64 |
println( "Message sent: " + event) |
經過控制上面程序最後一行的時間間隔來控制模擬寫入速度。下面咱們來討論實現實時統計每一個用戶的點擊次數,它是按照用戶分組進行累加次數,邏輯比較簡單,關鍵是在實現過程當中要注意一些問題,如對象序列化等。先看實現代碼,稍後咱們再詳細討論,代碼實現以下所示:
01 |
object UserClickCountAnalytics { |
03 |
def main(args : Array[String]) : Unit = { |
04 |
var masterUrl = "local[1]" |
05 |
if (args.length > 0 ) { |
09 |
// Create a StreamingContext with the given master URL |
10 |
val conf = new SparkConf().setMaster(masterUrl).setAppName( "UserClickCountStat" ) |
11 |
val ssc = new StreamingContext(conf, Seconds( 5 )) |
13 |
// Kafka configurations |
14 |
val topics = Set( "user_events" ) |
15 |
val brokers = "10.10.4.126:9092,10.10.4.127:9092" |
16 |
val kafkaParams = Map[String, String]( |
17 |
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder" ) |
20 |
val clickHashKey = "app::users::click" |
22 |
// Create a direct stream |
23 |
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) |
25 |
val events = kafkaStream.flatMap(line = > { |
26 |
val data = JSONObject.fromObject(line. _ 2 ) |
30 |
// Compute user click times |
31 |
val userClicks = events.map(x = > (x.getString( "uid" ), x.getInt( "click_count" ))).reduceByKey( _ + _ ) |
32 |
userClicks.foreachRDD(rdd = > { |
33 |
rdd.foreachPartition(partitionOfRecords = > { |
34 |
partitionOfRecords.foreach(pair = > { |
36 |
val clickCount = pair. _ 2 |
37 |
val jedis = RedisClient.pool.getResource |
39 |
jedis.hincrBy(clickHashKey, uid, clickCount) |
40 |
RedisClient.pool.returnResource(jedis) |
46 |
ssc.awaitTermination() |
上面代碼使用了Jedis客戶端來操做Redis,將分組計數結果數據累加寫入Redis存儲,若是其餘系統須要實時獲取該數據,直接從Redis實時讀取便可。RedisClient實現代碼以下所示:
01 |
object RedisClient extends Serializable { |
02 |
val redisHost = "10.10.4.130" |
04 |
val redisTimeout = 30000 |
05 |
lazy val pool = new JedisPool( new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout) |
07 |
lazy val hook = new Thread { |
09 |
println( "Execute hook thread: " + this ) |
13 |
sys.addShutdownHook(hook.run) |
上面代碼咱們分別在local[K]和Spark Standalone集羣模式下運行經過。
若是咱們是在開發環境進行調試的時候,也就是使用local[K]部署模式,在本地啓動K個Worker線程來計算,這K個Worker在同一個JVM實 例裏,上面的代碼默認狀況是,若是沒有傳參數則是local[K]模式,因此若是使用這種方式在建立Redis鏈接池或鏈接的時候,可能很是容易調試通 過,可是在使用Spark Standalone、YARN Client(YARN Cluster)或Mesos集羣部署模式的時候,就會報錯,主要是因爲在處理Redis鏈接池或鏈接的時候出錯了。咱們能夠看一下Spark架構,如圖 所示(來自官網):
不管是在本地模式、Standalone模式,仍是在Mesos或YARN模式下,整個Spark集羣的結構均可以用上圖抽象表示,只是各個組件的運行環 境不一樣,致使組件多是分佈式的,或本地的,或單個JVM實例的。如在本地模式,則上圖表現爲在同一節點上的單個進程以內的多個組件;而在YARN Client模式下,Driver程序是在YARN集羣以外的一個節點上提交Spark Application,其餘的組件都運行在YARN集羣管理的節點上。
在Spark集羣環境部署Application後,在進行計算的時候會將做用於RDD數據集上的函數(Functions)發送到集羣中Worker上 的Executor上(在Spark Streaming中是做用於DStream的操做),那麼這些函數操做所做用的對象(Elements)必須是可序列化的,經過Scala也可使用 lazy引用來解決,不然這些對象(Elements)在跨節點序列化傳輸後,沒法正確地執行反序列化重構成實際可用的對象。上面代碼咱們使用lazy引 用(Lazy Reference)來實現的,代碼以下所示:
01 |
// lazy pool reference |
02 |
lazy val pool = new JedisPool( new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout) |
04 |
partitionOfRecords.foreach(pair = > { |
06 |
val clickCount = pair. _ 2 |
07 |
val jedis = RedisClient.pool.getResource |
09 |
jedis.hincrBy(clickHashKey, uid, clickCount) |
10 |
RedisClient.pool.returnResource(jedis) |
另外一種方式,咱們將代碼修改成,把對Redis鏈接的管理放在操做DStream的Output操做範圍以內,由於咱們知道它是在特定的Executor中進行初始化的,使用一個單例的對象來管理,以下所示:
001 |
package org.shirdrn.spark.streaming |
003 |
import org.apache.commons.pool 2 .impl.GenericObjectPoolConfig |
004 |
import org.apache.spark.SparkConf |
005 |
import org.apache.spark.streaming.Seconds |
006 |
import org.apache.spark.streaming.StreamingContext |
007 |
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions |
008 |
import org.apache.spark.streaming.kafka.KafkaUtils |
010 |
import kafka.serializer.StringDecoder |
011 |
import net.sf.json.JSONObject |
012 |
import redis.clients.jedis.JedisPool |
014 |
object UserClickCountAnalytics { |
016 |
def main(args : Array[String]) : Unit = { |
017 |
var masterUrl = "local[1]" |
018 |
if (args.length > 0 ) { |
022 |
// Create a StreamingContext with the given master URL |
023 |
val conf = new SparkConf().setMaster(masterUrl).setAppName( "UserClickCountStat" ) |
024 |
val ssc = new StreamingContext(conf, Seconds( 5 )) |
026 |
// Kafka configurations |
027 |
val topics = Set( "user_events" ) |
028 |
val brokers = "10.10.4.126:9092,10.10.4.127:9092" |
029 |
val kafkaParams = Map[String, String]( |
030 |
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder" ) |
033 |
val clickHashKey = "app::users::click" |
035 |
// Create a direct stream |
036 |
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) |
038 |
val events = kafkaStream.flatMap(line = > { |
039 |
val data = JSONObject.fromObject(line. _ 2 ) |
043 |
// Compute user click times |
044 |
val userClicks = events.map(x = > (x.getString( "uid" ), x.getInt( "click_count" ))).reduceByKey( _ + _ ) |
045 |
userClicks.foreachRDD(rdd = > { |
046 |
rdd.foreachPartition(partitionOfRecords = > { |
047 |
partitionOfRecords.foreach(pair = > { |
050 |
* Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool} |
052 |
object InternalRedisClient extends Serializable { |
054 |
@ transient private var pool : JedisPool = null |
056 |
def makePool(redisHost : String, redisPort : Int, redisTimeout : Int, |
057 |
maxTotal : Int, maxIdle : Int, minIdle : Int) : Unit = { |
058 |
makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true , false , 10000 ) |
061 |
def makePool(redisHost : String, redisPort : Int, redisTimeout : Int, |
062 |
maxTotal : Int, maxIdle : Int, minIdle : Int, testOnBorrow : Boolean, |
063 |
testOnReturn : Boolean, maxWaitMillis : Long) : Unit = { |
065 |
val poolConfig = new GenericObjectPoolConfig() |
066 |
poolConfig.setMaxTotal(maxTotal) |
067 |
poolConfig.setMaxIdle(maxIdle) |
068 |
poolConfig.setMinIdle(minIdle) |
069 |
poolConfig.setTestOnBorrow(testOnBorrow) |
070 |
poolConfig.setTestOnReturn(testOnReturn) |
071 |
poolConfig.setMaxWaitMillis(maxWaitMillis) |
072 |
pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout) |
074 |
val hook = new Thread{ |
075 |
override def run = pool.destroy() |
077 |
sys.addShutdownHook(hook.run) |
081 |
def getPool : JedisPool = { |
087 |
// Redis configurations |
091 |
val redisHost = "10.10.4.130" |
093 |
val redisTimeout = 30000 |
095 |
InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle) |
098 |
val clickCount = pair. _ 2 |
099 |
val jedis = InternalRedisClient.getPool.getResource |
100 |
jedis.select(dbIndex) |
101 |
jedis.hincrBy(clickHashKey, uid, clickCount) |
102 |
InternalRedisClient.getPool.returnResource(jedis) |
108 |
ssc.awaitTermination() |
上面代碼實現,得益於Scala語言的特性,能夠在代碼中任何位置進行class或object的定義,咱們將用來管理Redis鏈接的代碼放在了 特定操做的內部,就避免了瞬態(Transient)對象跨節點序列化的問題。這樣作還要求咱們可以瞭解Spark內部是如何操做RDD數據集的,更多可 以參考RDD或Spark相關文檔。
在集羣上,以Standalone模式運行,執行以下命令:
2 |
./bin/spark-submit --class org.shirdrn.spark.streaming.UserClickCountAnalytics --master spark://hadoop1:7077 --executor-memory 1G --total-executor-cores 2 ~/spark-0.0.SNAPSHOT.jar spark://hadoop1:7077 |
能夠查看集羣中各個Worker節點執行計算任務的狀態,也能夠很是方便地經過Web頁面查看。
下面,看一下咱們存儲到Redis中的計算結果,以下所示:
01 |
127.0.0.1:6379[1]> HGETALL app::users::click |
02 |
1) "4A4D769EB9679C054DE81B973ED5D768" |
04 |
3) "8dfeb5aaafc027d89349ac9a20b3930f" |
06 |
5) "011BBF43B89BFBF266C865DF0397AA71" |
08 |
7) "97edfc08311c70143401745a03a50706" |
10 |
9) "d7f141563005d1b5d0d3dd30138f3f62" |
12 |
11) "a95f22eabc4fd4b580c011a3161a9d9d" |
14 |
13) "6b67c8c700427dee7552f81f3228c927" |
16 |
15) "f2a8474bf7bd94f0aabbd4cdd2c06dcf" |
18 |
17) "c8ee90aade1671a21336c721512b817a" |
20 |
19) "068b746ed4620d25e26055a9f804385f" |
有關更多關於Spark Streaming的詳細內容,能夠參考官方文檔。
附錄
這裏,附上前面開發的應用所對應的依賴,以及打包Spark Streaming應用程序的Maven配置,以供參考。若是使用maven-shade-plugin插件,配置有問題的話,打包後在Spark集羣上 提交Application時候可能會報錯Invalid signature file digest for Manifest main attributes。參考的Maven配置,以下所示:
003 |
< modelVersion >4.0.0</ modelVersion > |
004 |
< groupId >org.shirdrn.spark</ groupId > |
005 |
< artifactId >spark</ artifactId > |
006 |
< version >0.0.1-SNAPSHOT</ version > |
010 |
< groupId >org.apache.spark</ groupId > |
011 |
< artifactId >spark-core_2.10</ artifactId > |
012 |
< version >1.3.0</ version > |
015 |
< groupId >org.apache.spark</ groupId > |
016 |
< artifactId >spark-streaming_2.10</ artifactId > |
017 |
< version >1.3.0</ version > |
020 |
< groupId >net.sf.json-lib</ groupId > |
021 |
< artifactId >json-lib</ artifactId > |
022 |
< version >2.3</ version > |
025 |
< groupId >org.apache.spark</ groupId > |
026 |
< artifactId >spark-streaming-kafka_2.10</ artifactId > |
027 |
< version >1.3.0</ version > |
030 |
< groupId >redis.clients</ groupId > |
031 |
< artifactId >jedis</ artifactId > |
032 |
< version >2.5.2</ version > |
035 |
< groupId >org.apache.commons</ groupId > |
036 |
< artifactId >commons-pool2</ artifactId > |
037 |
< version >2.2</ version > |
042 |
< sourceDirectory >${basedir}/src/main/scala</ sourceDirectory > |
043 |
< testSourceDirectory >${basedir}/src/test/scala</ testSourceDirectory > |
046 |
< directory >${basedir}/src/main/resources</ directory > |
051 |
< directory >${basedir}/src/test/resources</ directory > |
056 |
< artifactId >maven-compiler-plugin</ artifactId > |
057 |
< version >3.1</ version > |
064 |
< groupId >org.apache.maven.plugins</ groupId > |
065 |
< artifactId >maven-shade-plugin</ artifactId > |
066 |
< version >2.2</ version > |
068 |
< createDependencyReducedPom >true</ createDependencyReducedPom > |
072 |
< phase >package</ phase > |
079 |
< include >*:*</ include > |
084 |
< artifact >*:*</ artifact > |
086 |
< exclude >META-INF/*.SF</ exclude > |
087 |
< exclude >META-INF/*.DSA</ exclude > |
088 |
< exclude >META-INF/*.RSA</ exclude > |
094 |
implementation = "org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> |
096 |
implementation = "org.apache.maven.plugins.shade.resource.AppendingTransformer" > |
097 |
< resource >reference.conf</ resource > |
100 |
implementation = "org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer" > |
101 |
< resource >log4j.properties</ resource > |
參考連接
本文基於署名-非商業性使用-相同方式共享 4.0許可協議發佈,歡迎轉載、使用、從新發布,但務必保留文章署名時延軍(包含連接:http://shiyanjun.cn),不得用於商業目的,基於本文修改後的做品務必以相同的許可發佈。若有任何疑問,請與我聯繫。