package com.xp.cn.streaming import org.apache.log4j.{Level, Logger} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{HashPartitioner, SparkContext, SparkConf} /** * Created by xupan on 2017/12/16. */ object UpdateStateByKeyWordCount { def main(args: Array[String]) { Logger.getLogger("org").setLevel(Level.ERROR) //建立conf,spark streaming至少要啓動兩個線程,一個負責接受數據,一個負責處理數據 val conf = new SparkConf().setAppName("FirstStreaming").setMaster("local[2]") //建立SparkContext val sc = new SparkContext(conf) //建立StreamingContext,每隔10秒產生一個批次 val ssc = new StreamingContext(sc, Seconds(10)) //設置checkpoint,hdfs或則本地文件系統 ssc.checkpoint("data/checkpoint") val zkQuorum = "xupan001:2181,xupan002:2181,xupan003:2181" val groupId = "g1" val topic = Map[String,Int]("test001" -> 1) //建立kafkaDStream //createDirectStream:上產環境用,底層API, //createStream,效率低,數據可能丟失 val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic) val lines = data.map(_._2) //Option[Int]:歷史結果 //Seq[Int]:單詞出現的個數 //String:單詞 val updateFUN = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { //iter.map(v => (v._1, v._2.sum + v._3.getOrElse(0))) iter.map { case (x, y, z) => { (x, y.sum + z.getOrElse(0)) } } //上下等價 } val countSD = lines .flatMap(_.split(" ")) .map(word => (word, 1)) .updateStateByKey(updateFUN, new HashPartitioner(ssc.sparkContext.defaultMinPartitions), true) //action觸發,每次計算只是計算當前批次的結果 countSD.print() //啓動,開始接收數據並用streamingContext.start() ssc.start() //等待處理中止,streamingContext.awaitTermination() ssc.awaitTermination() } }
-------------------------------------------
Time: 1513416280000 ms
-------------------------------------------
(d,1)
(s,1)
(a,1)apache
-------------------------------------------
Time: 1513416290000 ms
-------------------------------------------
(d,3)
(s,3)
(a,3)spa
-------------------------------------------
Time: 1513416300000 ms
-------------------------------------------
(d,3)
(w,1)
(s,3)
(e,1)
(a,3)
(q,1)線程
-------------------------------------------
Time: 1513416310000 ms
-------------------------------------------
(d,3)
(w,2)
(s,3)
(e,2)
(a,3)
(q,2)scala
-------------------------------------------
Time: 1513416320000 ms
-------------------------------------------
(d,4)
(w,2)
(s,4)
(e,2)
(a,4)
(q,2)code
問題:get
若是程序退出,不會累加以前的數據kafka