##Spark提供了一個滑動窗口給咱們使用,藉助於它,咱們很方便就能夠按照本身想要的時間間隔來處理數據。好比說:股票一分鐘的展現,一分鐘最新動態java
##前面的處理都同樣,該從Kafka哪裏讀數據仍是同樣。只是不同的是在reduceByKey變成了reduceByKeyWithWindow(updateFunc,Seconds(30),Seconds(20))apache
#若是是須要累加求和,那麼須要定義這個updateFunc方法。它裏面須要傳遞兩個參數,爲本身定義的case calss組裝的信息。數組
#代碼實現以下: package com.liufu.org.streaming import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils函數
/** * Created by liufu on 2016/11/20. */ object StreamAndWindow { //定義一個累計函數,將之前的數據和如今的數據加起來,而後繼續保持在checkpoint val updateFunc = (old:MsgBean, n:MsgBean) => { MsgBean(old.downFloe + n.downFloe, old.upflow + n.upflow) } def main(args: Array[String]): Unit = { //定義一個數組來對接main方法傳入的參數。 val Array(zkList,cumsumorGroup,topics,numThreads) = args //將topics和numThreads組裝成一個map。這是當去消費kafka數據的時候須要的。 val topic_numThread: Map[String, Int] = topics.split(",").map((_,numThreads.toInt)).toMap val conf = new SparkConf().setAppName("streamTest").setMaster("local[2]") //建立Streamingcomtext對象,而後指定處理數據的時間間隔 val ssc: StreamingContext = new StreamingContext(conf,Seconds(10)) //設置一個文件目錄,用於保存之前數據。 ssc.checkpoint("file:///E:/checkpoint") //讀取Kafka的數據。 val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkList,cumsumorGroup,topic_numThread,StorageLevel.MEMORY_AND_DISK) //將kafka的數據拿出來處理 val msgBeanSet: DStream[((String), MsgBean)] = kafkaStream.map(line => { val fields = line._2.split(",") ((fields(0)), MsgBean(fields(1).toInt, fields(2).toInt)) }) //既然是reduceByKey那麼數據必須是k-v類型。千方百計組裝便可。 val result: DStream[(String, MsgBean)] = msgBeanSet.reduceByKeyAndWindow(updateFunc,Seconds(40),Seconds(20)) result.print() //也能夠將這個result寫入到redies或者Hbase中。 //必定要啓動streamContext程序,而後一直等待,不然任務不會提交的。 ssc.start() ssc.awaitTermination() } } //定義一個case class ,用來組裝切分後的消息。 case class MsgBean(upflow:Int,downFloe:Int)
#總結:spa