六、SparkStream的滑動窗口函數

##Spark提供了一個滑動窗口給咱們使用,藉助於它,咱們很方便就能夠按照本身想要的時間間隔來處理數據。好比說:股票一分鐘的展現,一分鐘最新動態java

##前面的處理都同樣,該從Kafka哪裏讀數據仍是同樣。只是不同的是在reduceByKey變成了reduceByKeyWithWindow(updateFunc,Seconds(30),Seconds(20))apache

#若是是須要累加求和,那麼須要定義這個updateFunc方法。它裏面須要傳遞兩個參數,爲本身定義的case calss組裝的信息。數組

#代碼實現以下: package com.liufu.org.streaming import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils函數

/**
  * Created by liufu on 2016/11/20.
  */
object StreamAndWindow {
  //定義一個累計函數,將之前的數據和如今的數據加起來,而後繼續保持在checkpoint
  val updateFunc = (old:MsgBean, n:MsgBean) => {
    MsgBean(old.downFloe + n.downFloe, old.upflow + n.upflow)
  }

  def main(args: Array[String]): Unit = {

    //定義一個數組來對接main方法傳入的參數。
    val Array(zkList,cumsumorGroup,topics,numThreads) = args

    //將topics和numThreads組裝成一個map。這是當去消費kafka數據的時候須要的。
    val topic_numThread: Map[String, Int] = topics.split(",").map((_,numThreads.toInt)).toMap

    val conf = new SparkConf().setAppName("streamTest").setMaster("local[2]")
    //建立Streamingcomtext對象,而後指定處理數據的時間間隔
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(10))

    //設置一個文件目錄,用於保存之前數據。
    ssc.checkpoint("file:///E:/checkpoint")

    //讀取Kafka的數據。
    val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkList,cumsumorGroup,topic_numThread,StorageLevel.MEMORY_AND_DISK)

    //將kafka的數據拿出來處理
    val msgBeanSet: DStream[((String), MsgBean)] = kafkaStream.map(line => {
      val fields = line._2.split(",")
      ((fields(0)), MsgBean(fields(1).toInt, fields(2).toInt))
    })

    //既然是reduceByKey那麼數據必須是k-v類型。千方百計組裝便可。
    val result: DStream[(String, MsgBean)] = msgBeanSet.reduceByKeyAndWindow(updateFunc,Seconds(40),Seconds(20))

    result.print()

//也能夠將這個result寫入到redies或者Hbase中。

    //必定要啓動streamContext程序,而後一直等待,不然任務不會提交的。
    ssc.start()
    ssc.awaitTermination()
  }
}

//定義一個case class ,用來組裝切分後的消息。
case class MsgBean(upflow:Int,downFloe:Int)

#總結:spa

  • 1:利用case class封裝字段數據,等價於javaBean
  • 2:能夠將result這個Dstream 寫入到Redies或者Hbase中。而Dstream可能有多個分區,咱們能夠以一個分區做爲單位,而後建立一個鏈接一次性寫入進去,最後在斷開這個鏈接。
  • 3:在流式計算中,最好搞一個redies或者Hbase的鏈接池,由於程序一直在運行,因此頻繁的建立和銷燬鏈接開銷很大。
  • 4:Redies3.0已經能夠集羣了。
相關文章
相關標籤/搜索