194 Spark Streaming實現實時WordCount

架構圖:
在這裏插入圖片描述
1.安裝並啓動生成者
首先在一臺Linux(ip:192.168.10.101)上用YUM安裝nc工具apache

yum install -y nc

啓動一個服務端並監聽9999端口網絡

nc -lk 9999

2.編寫Spark Streaming程序架構

package cn.itcast.spark.streaming

import cn.itcast.spark.util.LoggerLevel
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {
  def main(args: Array[String]) {
    //設置日誌級別
    LoggerLevel.setStreamingLogLevels()
 
    //建立SparkConf並設置爲本地模式運行
    //注意local[2]表明開兩個線程
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
 
    //設置DStream批次時間間隔爲2秒
    val ssc = new StreamingContext(conf, Seconds(2))
 
    //經過網絡讀取數據
    val lines = ssc.socketTextStream("192.168.10.101", 9999)
 
    //將讀到的數據用空格切成單詞
    val words = lines.flatMap(_.split(" "))
 
    //將單詞和1組成一個pair
    val pairs = words.map(word => (word, 1))
 
    //按單詞進行分組求相同單詞出現的次數
    val wordCounts = pairs.reduceByKey(_ + _)
 
    //打印結果到控制檯
    wordCounts.print()
 
    //開始計算
    ssc.start()
 
    //等待中止
    ssc.awaitTermination()
  }
}

3.啓動Spark Streaming程序:因爲使用的是本地模式"local[2]"因此能夠直接在本地運行該程序
注意:要指定並行度,如在本地運行設置setMaster(「local[2]」),至關於啓動兩個線程,一個給receiver,一個給computer。若是是在集羣中運行,必需要求集羣中可用core數大於1
在這裏插入圖片描述
4.在Linux端命令行中輸入單詞
在這裏插入圖片描述5.在IDEA控制檯中查看結果
在這裏插入圖片描述
問題:結果每次在Linux段輸入的單詞次數都被正確的統計出來,可是結果不能累加!若是須要累加須要使用updateStateByKey(func)來更新狀態,下面給出一個例子:socket

package cn.itcast.spark.streaming

import cn.itcast.spark.util.LoggerLevel
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Seconds}

object NetworkUpdateStateWordCount {
  /**
    * String : 單詞 hello
    * Seq[Int] :單詞在當前批次出現的次數
    * Option[Int] : 歷史結果
    */
  val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
    //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
    iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
  }

  def main(args: Array[String]) {
    LoggerLevel.setStreamingLogLevels()
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkUpdateStateWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
  
    //作checkpoint 寫入共享存儲中
    ssc.checkpoint("c://aaa")
    val lines = ssc.socketTextStream("192.168.10.100", 9999)
   
    //reduceByKey 結果不累加
    //val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
    //updateStateByKey結果能夠累加可是須要傳入一個自定義的累加函數:updateFunc
    val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

相關文章
相關標籤/搜索