updateStateByKey 解釋:
以DStream中的數據進行按key作reduce操做,而後對各個批次的數據進行累加
在有新的數據信息進入或更新時,能夠讓用戶保持想要的任何狀。使用這個功能須要完成兩步:
1) 定義狀態:能夠是任意數據類型
2) 定義狀態更新函數:用一個函數指定如何使用先前的狀態,從輸入流中的新值更新狀態。
對於有狀態操做,要不斷的把當前和歷史的時間切片的RDD累加計算,隨着時間的流失,計算的數據規模會變得愈來愈大。git
updateStateByKey源碼:github
/**apache
this
function returns None, then代碼實現markdown
StatefulNetworkWordCountsession
object StatefulNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
System.exit(1)
}
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")
// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
NetworkWordCountapp
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(10))
//使用updateStateByKey前須要設置checkpoint
ssc.checkpoint("hdfs://master:8020/spark/checkpoint")
val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
//經過Spark內部的reduceByKey按key規約,而後這裏傳入某key當前批次的Seq/List,再計算當前批次的總和
val currentCount = currValues.sum
// 已累加的值
val previousCount = prevValueState.getOrElse(0)
// 返回累加後的結果,是一個Option[Int]類型
Some(currentCount + previousCount)
}
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
//val currWordCounts = pairs.reduceByKey(_ + _)
//currWordCounts.print()
val totalWordCounts = pairs.updateStateByKey[Int](addFunc)
totalWordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
object WebPagePopularityValueCalculator {
private val checkpointDir = "popularity-data-checkpoint"
private val msgConsumerGroup = "user-behavior-topic-message-consumer-group"
def main(args: Array[String]) {
if (args.length < 2) {
println("Usage:WebPagePopularityValueCalculator zkserver1:2181, zkserver2: 2181, zkserver3: 2181 consumeMsgDataTimeInterval (secs) ")
System.exit(1)
}
val Array(zkServers, processingInterval) = args
val conf = new SparkConf().setAppName("Web Page Popularity Value Calculator")
val ssc = new StreamingContext(conf, Seconds(processingInterval.toInt))
//using updateStateByKey asks for enabling checkpoint
ssc.checkpoint(checkpointDir)
val kafkaStream = KafkaUtils.createStream(
//Spark streaming context
ssc,
//zookeeper quorum. e.g zkserver1:2181,zkserver2:2181,...
zkServers,
//kafka message consumer group ID
msgConsumerGroup,
//Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
Map("user-behavior-topic" -> 3))
val msgDataRDD = kafkaStream.map(_._2)
//for debug use only
//println("Coming data in this interval...")
//msgDataRDD.print()
// e.g page37|5|1.5119122|-1
val popularityData = msgDataRDD.map { msgLine => {
val dataArr: Array[String] = msgLine.split("\\|")
val pageID = dataArr(0)
//calculate the popularity value
val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
(pageID, popValue)
}
}
//sum the previous popularity value and current value
//定義一個匿名函數去把網頁熱度上一次的計算結果值和新計算的值相加,獲得最新的熱度值。
val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {
iterator.flatMap(t => {
val newValue: Double = t._2.sum
val stateValue: Double = t._3.getOrElse(0);
Some(newValue + stateValue)
}.map(sumedValue => (t._1, sumedValue)))
}
val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00)))
//調用 updateStateByKey 原語並傳入上面定義的匿名函數更新網頁熱度值。
val stateDStream = popularityData.updateStateByKey[Double](updatePopularityValue,
new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
//set the checkpoint interval to avoid too frequently data checkpoint which may
//may significantly reduce operation throughput
stateDStream.checkpoint(Duration(8 * processingInterval.toInt * 1000))
//after calculation, we need to sort the result and only show the top 10 hot pages
//最後獲得最新結果後,須要對結果進行排序,最後打印熱度值最高的 10 個網頁。
stateDStream.foreachRDD { rdd => {
val sortedData = rdd.map { case (k, v) => (v, k) }.sortByKey(false)
val topKData = sortedData.take(10).map { case (v, k) => (k, v) }
topKData.foreach(x => {
println(x)
})
}
}
ssc.start()
ssc.awaitTermination()
}
}
參考文章:
http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/
https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
http://stackoverflow.com/questions/28998408/spark-streaming-example-calls-updatestatebykey-with-additional-parameters
http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementationsocket
尊重原創,未經容許不得轉載:
http://blog.csdn.net/stark_summer/article/details/47666337函數
版權聲明:本文爲博主原創文章,未經博主容許不得轉載。oop