spark streaming updateStateByKey 使用方法

  1. updateStateByKey 解釋:
    以DStream中的數據進行按key作reduce操做,而後對各個批次的數據進行累加
    在有新的數據信息進入或更新時。可以讓用戶保持想要的不論什麼狀。使用這個功能需要完畢兩步:
    1) 定義狀態:可以是隨意數據類型
    2) 定義狀態更新函數:用一個函數指定怎樣使用先前的狀態。從輸入流中的新值更新狀態。
    對於有狀態操做,要不斷的把當前和歷史的時間切片的RDD累加計算,隨着時間的流失,計算的數據規模會變得愈來愈大。java

  2. updateStateByKey源代碼:git

    /**github

    • Return a new 「state」 DStream where the state for each key is updated by applying
    • the given function on the previous state of the key and the new values of the key.
    • org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    • @param updateFunc State update function. If this function returns None, then
    • corresponding state key-value pair will be eliminated.
    • @param partitioner Partitioner for controlling the partitioning of each RDD in the new
    • DStream.
    • @param initialRDD initial state value of each key.
    • @tparam S State type
      */
      def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S],
      partitioner: Partitioner,
      initialRDD: RDD[(K, S)]
      ): DStream[(K, S)] = {
      val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
      iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
      }
      updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)
      }
  3. 代碼實現apache

    • StatefulNetworkWordCountmarkdown

      object StatefulNetworkWordCount {
      def main(args: Array[String]) {
      if (args.length < 2) {
        System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
        System.exit(1)
      }
      
      Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
      
      val updateFunc = (values: Seq[Int], state: Option[Int]) => {
        val currentCount = values.sum
      
        val previousCount = state.getOrElse(0)
      
        Some(currentCount + previousCount)
      }
      
      val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
        iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
      }
      
      val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local")
      // Create the context with a 1 second batch size
      val ssc = new StreamingContext(sparkConf, Seconds(1))
      ssc.checkpoint(".")
      
      // Initial RDD input to updateStateByKey
      val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
      
      // Create a ReceiverInputDStream on target ip:port and count the
      // words in input stream of \n delimited test (eg. generated by 'nc')
      val lines = ssc.socketTextStream(args(0), args(1).toInt)
      val words = lines.flatMap(_.split(" "))
      val wordDstream = words.map(x => (x, 1))
      
      // Update the cumulative count using updateStateByKey
      // This will give a Dstream made of state (which is the cumulative count of the words)
      val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
        new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
      stateDstream.print()
      ssc.start()
      ssc.awaitTermination()
      }
      }
    • NetworkWordCountsession

import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }


    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    //使用updateStateByKey前需要設置checkpoint
    ssc.checkpoint("hdfs://master:8020/spark/checkpoint")

    val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
      //經過Spark內部的reduceByKey按key規約。而後這裏傳入某key當前批次的Seq/List,再計算當前批次的總和
      val currentCount = currValues.sum
      // 已累加的值
      val previousCount = prevValueState.getOrElse(0)
      // 返回累加後的結果。是一個Option[Int]類型
      Some(currentCount + previousCount)
    }

    val lines = ssc.socketTextStream(args(0), args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))

    //val currWordCounts = pairs.reduceByKey(_ + _)
    //currWordCounts.print()

    val totalWordCounts = pairs.updateStateByKey[Int](addFunc)
    totalWordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  • WebPagePopularityValueCalculator

socket

package com.spark.streaming import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Duration, Seconds, StreamingContext} /** * ━━━━━━神獸出沒━━━━━━ *    ┏┓   ┏┓ *   ┏┛┻━━━┛┻┓ *   ┃       ┃ *   ┃   ━   ┃ *   ┃ ┳┛ ┗┳ ┃ *   ┃       ┃ *   ┃   ┻   ┃ *   ┃       ┃ *   ┗━┓   ┏━┛ *     ┃   ┃神獸保佑, 永無BUG! *      ┃   ┃Code is far away from bug with the animal protecting *     ┃   ┗━━━┓ *     ┃       ┣┓ *     ┃       ┏┛ *     ┗┓┓┏━┳┓┏┛ *      ┃┫┫ ┃┫┫ *      ┗┻┛ ┗┻┛ * ━━━━━━感受萌萌噠━━━━━━ * Module Desc: * User: wangyue * DateTime: 15-11-9上午10:50 */ object WebPagePopularityValueCalculator { private val checkpointDir = "popularity-data-checkpoint" private val msgConsumerGroup = "user-behavior-topic-message-consumer-group" def main(args: Array[String]) { if (args.length < 2) { println("Usage:WebPagePopularityValueCalculator zkserver1:2181, zkserver2: 2181, zkserver3: 2181 consumeMsgDataTimeInterval (secs) ") System.exit(1) } val Array(zkServers, processingInterval) = args val conf = new SparkConf().setAppName("Web Page Popularity Value Calculator") val ssc = new StreamingContext(conf, Seconds(processingInterval.toInt)) //using updateStateByKey asks for enabling checkpoint ssc.checkpoint(checkpointDir) val kafkaStream = KafkaUtils.createStream( //Spark streaming context ssc, //zookeeper quorum. e.g zkserver1:2181,zkserver2:2181,... zkServers, //kafka message consumer group ID msgConsumerGroup, //Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread Map("user-behavior-topic" -> 3)) val msgDataRDD = kafkaStream.map(_._2) //for debug use only //println("Coming data in this interval...") //msgDataRDD.print() // e.g page37|5|1.5119122|-1 val popularityData = msgDataRDD.map { msgLine => { val dataArr: Array[String] = msgLine.split("\\|") val pageID = dataArr(0) //calculate the popularity value val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1 (pageID, popValue) } } //sum the previous popularity value and current value //定義一個匿名函數去把網頁熱度上一次的計算結果值和新計算的值相加,獲得最新的熱度值。

val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => { iterator.flatMap(t => { val newValue: Double = t._2.sum val stateValue: Double = t._3.getOrElse(0); Some(newValue + stateValue) }.map(sumedValue => (t._1, sumedValue))) } val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00))) //調用 updateStateByKey 原語並傳入上面定義的匿名函數更新網頁熱度值。 val stateDStream = popularityData.updateStateByKey[Double](updatePopularityValue, new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD) //set the checkpoint interval to avoid too frequently data checkpoint which may //may significantly reduce operation throughput stateDStream.checkpoint(Duration(8 * processingInterval.toInt * 1000)) //after calculation, we need to sort the result and only show the top 10 hot pages //最後獲得最新結果後,需要對結果進行排序。最後打印熱度值最高的 10 個網頁。app

stateDStream.foreachRDD { rdd => { val sortedData = rdd.map { case (k, v) => (v, k) }.sortByKey(false) val topKData = sortedData.take(10).map { case (v, k) => (k, v) } topKData.foreach(x => { println(x) }) } } ssc.start() ssc.awaitTermination() } }

參考文章:
http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/
https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
http://stackoverflow.com/questions/28998408/spark-streaming-example-calls-updatestatebykey-with-additional-parameters
http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation函數

尊重原創,未經贊成不得轉載:
http://blog.csdn.net/stark_summer/article/details/47666337oop

相關文章
相關標籤/搜索