官網http://spark.apache.org/docs/latest/streaming-programming-guide.htmlhtml
1.安裝並啓動生成者node
首先在一臺Linux(ip:192.168.10.101)上用YUM安裝nc工具sql
yum install -y ncapache
啓動一個服務端並監聽9999端口socket
nc -lk 9999ide
2.編寫Spark Streaming程序函數
package org.apache.spark import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds object TCPWordCount { def main(args: Array[String]) { //setMaster("local[2]")本地執行2個線程,一個用來接收消息,一個用來計算 val conf = new SparkConf().setMaster("local[2]").setAppName("TCPWordCount") //建立spark的streaming,傳入間隔多長時間處理一次,間隔在5秒左右,不然打印控制檯信息會被沖掉 val scc = new StreamingContext(conf, Seconds(5)) //讀取數據的地址:從某個ip和端口收集數據 val lines = scc.socketTextStream("192.168.74.100", 9999) //進行rdd處理 val results = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) //將結果打印控制檯 results.print() //啓動spark streaming scc.start() //等待終止 scc.awaitTermination() } }
3.啓動Spark Streaming程序:因爲使用的是本地模式"local[2]"因此能夠直接在本地運行該程序工具
注意:要指定並行度,如在本地運行設置setMaster("local[2]"),至關於啓動兩個線程,一個給receiver,一個給computer。若是是在集羣中運行,必需要求集羣中可用core數大於1測試
4.在Linux端命令行中輸入單詞網站
5.在IDEA控制檯中查看結果
問題:結果每次在Linux段輸入的單詞次數都被正確的統計出來,可是結果不能累加!若是須要累加須要使用updateStateByKey(func)來更新狀態,下面給出一個例子:
package org.apache.spark import org.apache.spark.HashPartitioner import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext object TCPWordCountUpdate { /** * String:某個單詞 * Seq:[1,1,1,1,1,1],當前批次出現的次數的序列 * Option:歷史的結果的sum */ val updateFunction = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0))) //iter.map{case(x,y,z)=>(x,y.sum+z.getOrElse(0))} } def updateFunction2(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { Some(newValues.sum + runningCount.getOrElse(0)) } def main(args: Array[String]) { //setMaster("local[2]")本地執行2個線程,一個用來接收消息,一個用來計算 val conf = new SparkConf().setMaster("local[2]").setAppName("TCPWordCount") //建立spark的streaming,傳入間隔多長時間處理一次,間隔在5秒左右,不然打印控制檯信息會被沖掉 val scc = new StreamingContext(conf, Seconds(5)) scc.checkpoint("./")//讀取數據的地址:從某個ip和端口收集數據 val lines = scc.socketTextStream("192.168.74.100", 9999) //進行rdd處理 /** * updateStateByKey()更新數據 * 一、更新數據的具體實現函數 * 二、分區信息 * 三、boolean值 */ //val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction2 _) val results = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction, new HashPartitioner(scc.sparkContext.defaultParallelism), true) //將結果打印控制檯 results.print() //啓動spark streaming scc.start() //等待終止 scc.awaitTermination() } }
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkSqlTest { def main(args: Array[String]) { LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("sparksql").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(5)) ssc.checkpoint("./") val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.74.100",9999) val result: DStream[(String, Int)] = textStream.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(5),Seconds(5)) result.print() ssc.start() ssc.awaitTermination() } }
1.安裝並配置zk
2.安裝並配置Kafka
3.啓動zk
4.啓動Kafka
5.建立topic
bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181 \
--replication-factor 3 --partitions 3 --topic urlcount
6.編寫Spark Streaming應用程序
package cn.itcast.spark.streaming package cn.itcast.spark import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object UrlCount { val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))} } def main(args: Array[String]) { //接收命令行中的參數 // val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args val Array(zkQuorum, groupId, topics, numThreads) = Array[String]("master1ha:2181,master2:2181,master2ha:2181","g1","wangsf-test","2") //建立SparkConf並設置AppName val conf = new SparkConf().setAppName("UrlCount") //建立StreamingContext val ssc = new StreamingContext(conf, Seconds(2)) //設置檢查點 ssc.checkpoint(hdfs) //設置topic信息 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap //重Kafka中拉取數據建立DStream val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2) //切分數據,截取用戶點擊的url val urls = lines.map(x=>(x.split(" ")(6), 1)) //統計URL點擊量 val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) //將結果打印到控制檯 result.print() ssc.start() ssc.awaitTermination() } }
生產數據測試:
kafka-console-producer.sh --broker-list h2slave1:9092 --topic wangsf-test