UpdateStateByKeyWordCount單詞累加統計

package com.xp.cn.streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkContext, SparkConf}

/**
 * Created by xupan on 2017/12/16.
 */
object UpdateStateByKeyWordCount {


  def main(args: Array[String]) {


    Logger.getLogger("org").setLevel(Level.ERROR)

    //建立conf,spark streaming至少要啓動兩個線程,一個負責接受數據,一個負責處理數據
    val conf = new SparkConf().setAppName("FirstStreaming").setMaster("local[2]")

    //建立SparkContext
    val sc = new SparkContext(conf)

    //建立StreamingContext,每隔10秒產生一個批次
    val ssc = new StreamingContext(sc, Seconds(10))

    //設置checkpoint,hdfs或則本地文件系統
    ssc.checkpoint("data/checkpoint")


    val zkQuorum = "xupan001:2181,xupan002:2181,xupan003:2181"
    val groupId = "g1"

    val topic = Map[String,Int]("test001" -> 1)
    //建立kafkaDStream
    //createDirectStream:上產環境用,底層API,
    //createStream,效率低,數據可能丟失
    val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic)

    val lines = data.map(_._2)

    //Option[Int]:歷史結果
    //Seq[Int]:單詞出現的個數
    //String:單詞
    val updateFUN = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
      //iter.map(v => (v._1, v._2.sum + v._3.getOrElse(0)))
      iter.map { case (x, y, z) => { (x, y.sum + z.getOrElse(0)) } } //上下等價
    }

    val countSD = lines
      .flatMap(_.split(" "))
      .map(word => (word, 1))
      .updateStateByKey(updateFUN, new HashPartitioner(ssc.sparkContext.defaultMinPartitions), true)

    //action觸發,每次計算只是計算當前批次的結果
    countSD.print()

    //啓動,開始接收數據並用streamingContext.start()
    ssc.start()

    //等待處理中止,streamingContext.awaitTermination()
    ssc.awaitTermination()

  }


}

-------------------------------------------
Time: 1513416280000 ms
-------------------------------------------
(d,1)
(s,1)
(a,1)apache

-------------------------------------------
Time: 1513416290000 ms
-------------------------------------------
(d,3)
(s,3)
(a,3)spa

-------------------------------------------
Time: 1513416300000 ms
-------------------------------------------
(d,3)
(w,1)
(s,3)
(e,1)
(a,3)
(q,1)線程

-------------------------------------------
Time: 1513416310000 ms
-------------------------------------------
(d,3)
(w,2)
(s,3)
(e,2)
(a,3)
(q,2)scala

-------------------------------------------
Time: 1513416320000 ms
-------------------------------------------
(d,4)
(w,2)
(s,4)
(e,2)
(a,4)
(q,2)code

 

問題:get

若是程序退出,不會累加以前的數據kafka

相關文章
相關標籤/搜索