架構圖:
1.安裝並啓動生成者
首先在一臺Linux(ip:192.168.10.101)上用YUM安裝nc工具apache
yum install -y nc
啓動一個服務端並監聽9999端口網絡
nc -lk 9999
2.編寫Spark Streaming程序架構
package cn.itcast.spark.streaming import cn.itcast.spark.util.LoggerLevel import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object NetworkWordCount { def main(args: Array[String]) { //設置日誌級別 LoggerLevel.setStreamingLogLevels() //建立SparkConf並設置爲本地模式運行 //注意local[2]表明開兩個線程 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") //設置DStream批次時間間隔爲2秒 val ssc = new StreamingContext(conf, Seconds(2)) //經過網絡讀取數據 val lines = ssc.socketTextStream("192.168.10.101", 9999) //將讀到的數據用空格切成單詞 val words = lines.flatMap(_.split(" ")) //將單詞和1組成一個pair val pairs = words.map(word => (word, 1)) //按單詞進行分組求相同單詞出現的次數 val wordCounts = pairs.reduceByKey(_ + _) //打印結果到控制檯 wordCounts.print() //開始計算 ssc.start() //等待中止 ssc.awaitTermination() } }
3.啓動Spark Streaming程序:因爲使用的是本地模式"local[2]"因此能夠直接在本地運行該程序
注意:要指定並行度,如在本地運行設置setMaster(「local[2]」),至關於啓動兩個線程,一個給receiver,一個給computer。若是是在集羣中運行,必需要求集羣中可用core數大於1
4.在Linux端命令行中輸入單詞5.在IDEA控制檯中查看結果
問題:結果每次在Linux段輸入的單詞次數都被正確的統計出來,可是結果不能累加!若是須要累加須要使用updateStateByKey(func)來更新狀態,下面給出一個例子:socket
package cn.itcast.spark.streaming import cn.itcast.spark.util.LoggerLevel import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.{StreamingContext, Seconds} object NetworkUpdateStateWordCount { /** * String : 單詞 hello * Seq[Int] :單詞在當前批次出現的次數 * Option[Int] : 歷史結果 */ val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x))) iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))} } def main(args: Array[String]) { LoggerLevel.setStreamingLogLevels() val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkUpdateStateWordCount") val ssc = new StreamingContext(conf, Seconds(5)) //作checkpoint 寫入共享存儲中 ssc.checkpoint("c://aaa") val lines = ssc.socketTextStream("192.168.10.100", 9999) //reduceByKey 結果不累加 //val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) //updateStateByKey結果能夠累加可是須要傳入一個自定義的累加函數:updateFunc val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) results.print() ssc.start() ssc.awaitTermination() } }