大數據學習——spark-steaming學習

 

官網http://spark.apache.org/docs/latest/streaming-programming-guide.htmlhtml

1.1.  用Spark Streaming實現實時WordCount

1.安裝並啓動生成者node

首先在一臺Linux(ip:192.168.10.101)上用YUM安裝nc工具sql

yum install -y ncapache

 

啓動一個服務端並監聽9999端口socket

nc -lk 9999ide

 

 

2.編寫Spark Streaming程序函數

package org.apache.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds

object TCPWordCount {
  def main(args: Array[String]) {
    //setMaster("local[2]")本地執行2個線程,一個用來接收消息,一個用來計算
    val conf = new SparkConf().setMaster("local[2]").setAppName("TCPWordCount")
    //建立spark的streaming,傳入間隔多長時間處理一次,間隔在5秒左右,不然打印控制檯信息會被沖掉
    val scc = new StreamingContext(conf, Seconds(5))
    //讀取數據的地址:從某個ip和端口收集數據
    val lines = scc.socketTextStream("192.168.74.100", 9999) //進行rdd處理 val results = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) //將結果打印控制檯  results.print() //啓動spark streaming  scc.start() //等待終止  scc.awaitTermination() } }

 

3.啓動Spark Streaming程序:因爲使用的是本地模式"local[2]"因此能夠直接在本地運行該程序工具

注意:要指定並行度,如在本地運行設置setMaster("local[2]"),至關於啓動兩個線程,一個給receiver,一個給computer。若是是在集羣中運行,必需要求集羣中可用core數大於1測試

4.在Linux端命令行中輸入單詞網站

 

5.在IDEA控制檯中查看結果

 

問題:結果每次在Linux段輸入的單詞次數都被正確的統計出來,可是結果不能累加!若是須要累加須要使用updateStateByKey(func)來更新狀態,下面給出一個例子:

package org.apache.spark

import org.apache.spark.HashPartitioner
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

object TCPWordCountUpdate {
  /**
    * String:某個單詞
    * Seq:[1,1,1,1,1,1],當前批次出現的次數的序列
    * Option:歷史的結果的sum
    */

  val updateFunction = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
    iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0)))
    //iter.map{case(x,y,z)=>(x,y.sum+z.getOrElse(0))}
  }

  def updateFunction2(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    Some(newValues.sum + runningCount.getOrElse(0))
  }


  def main(args: Array[String]) {
    //setMaster("local[2]")本地執行2個線程,一個用來接收消息,一個用來計算
    val conf = new SparkConf().setMaster("local[2]").setAppName("TCPWordCount")
    //建立spark的streaming,傳入間隔多長時間處理一次,間隔在5秒左右,不然打印控制檯信息會被沖掉
    val scc = new StreamingContext(conf, Seconds(5))
    scc.checkpoint("./")//讀取數據的地址:從某個ip和端口收集數據
    val lines = scc.socketTextStream("192.168.74.100", 9999)
    //進行rdd處理
    /**
      * updateStateByKey()更新數據
      * 一、更新數據的具體實現函數
      * 二、分區信息
      * 三、boolean值
      */
    //val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction2 _)
    val results = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction, new HashPartitioner(scc.sparkContext.defaultParallelism), true)
    //將結果打印控制檯
    results.print()
    //啓動spark streaming
    scc.start()
    //等待終止
    scc.awaitTermination()
  }
}

 

1.1.  使用reduceByKeyAndWindow計算每分鐘數據

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkSqlTest {
  def main(args: Array[String]) {
    LoggerLevels.setStreamingLogLevels()
    val conf = new SparkConf().setAppName("sparksql").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("./")
    val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.74.100",9999)
    val result: DStream[(String, Int)] = textStream.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(5),Seconds(5))
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

1.1.  Spark Streaming整合Kafka完成網站點擊流實時統計

 

 

1.安裝並配置zk

2.安裝並配置Kafka

3.啓動zk

4.啓動Kafka

5.建立topic

bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181 \

--replication-factor 3 --partitions 3 --topic urlcount

6.編寫Spark Streaming應用程序

package cn.itcast.spark.streaming

package cn.itcast.spark

import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object UrlCount {
  val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
    iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}
  }

  def main(args: Array[String]) {
    //接收命令行中的參數
   // val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args
val Array(zkQuorum, groupId, topics, numThreads) = Array[String]("master1ha:2181,master2:2181,master2ha:2181","g1","wangsf-test","2")
    //建立SparkConf並設置AppName
    val conf = new SparkConf().setAppName("UrlCount")
    //建立StreamingContext
    val ssc = new StreamingContext(conf, Seconds(2))
    //設置檢查點
    ssc.checkpoint(hdfs)
    //設置topic信息
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //重Kafka中拉取數據建立DStream
    val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
    //切分數據,截取用戶點擊的url
    val urls = lines.map(x=>(x.split(" ")(6), 1))
    //統計URL點擊量
    val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    //將結果打印到控制檯
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

生產數據測試:

kafka-console-producer.sh --broker-list h2slave1:9092 --topic wangsf-test

相關文章
相關標籤/搜索