【大數據】SparkStreaming學習筆記

 

1Spark Streaming概述

1.1 Spark Streaming是什麼

Spark Streaming用於流式數據的處理。Spark Streaming支持的數據輸入源不少,例如:KafkaFlumeTwitterZeroMQ和簡單的TCP套接字等等。數據輸入後能夠用Spark的高度抽象原語如:mapreducejoinwindow等進行運算。而結果也能保存在不少地方,如HDFS,數據庫等。java

 

Spark基於RDD的概念很類似,Spark Streaming使用離散化流(discretized stream)做爲抽象表示,叫做DStreamDStream 是隨時間推移而收到的數據的序列。在內部,每一個時間區間收到的數據都做爲 RDD 存在,而DStream是由這些RDD所組成的序列(所以得名離散化」)數據庫

1.2 Spark Streaming特色

1.易用apache

 

2.容錯windows

 

3.易整合到Spark體系網絡

 

1.3 SparkStreaming架構

 

1-1 SparkStreaming架構圖架構

 

2Dstream入門

2.1 WordCount案例實操

1.需求:使用netcat工具向9999端口不斷的發送數據,經過SparkStreaming讀取端口數據並統計不一樣單詞出現的次數app

2.添加依賴socket

<dependency>ide

    <groupId>org.apache.spark</groupId>函數

    <artifactId>spark-streaming_2.11</artifactId>

    <version>2.1.1</version>

</dependency>

3.編寫代碼

package com.atguigu

 

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.SparkConf

 

object StreamWordCount {

 

  def main(args: Array[String]): Unit = {

 

    //1.初始化Spark配置信息

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")

 

    //2.初始化SparkStreamingContext

    val ssc = new StreamingContext(sparkConf, Seconds(5))

 

    //3.經過監控端口建立DStream,讀進來的數據爲一行行

    val lineStreams = ssc.socketTextStream("hadoop102", 9999)

 

    //將每一行數據作切分,造成一個個單詞

    val wordStreams = lineStreams.flatMap(_.split(" "))

 

    //將單詞映射成元組(word,1

    val wordAndOneStreams = wordStreams.map((_, 1))

 

    //將相同的單詞次數作統計

    val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)

 

    //打印

    wordAndCountStreams.print()

 

    //啓動SparkStreamingContext

    ssc.start()

    ssc.awaitTermination()

  }

}

4.啓動程序並經過NetCat發送數據:

[atguigu@hadoop102 spark]$ nc -lk 9999

hello atguigu

注意:若是程序運行時,log日誌太多,能夠將spark conf目錄下的log4j文件裏面的日誌級別改爲WARN

2.2 WordCount解析

Discretized StreamSpark Streaming的基礎抽象,表明持續性的數據流和通過各類Spark原語操做後的結果數據流。在內部實現上,DStream是一系列連續的RDD來表示。每一個RDD含有一段時間間隔內的數據,以下圖:

 

對數據的操做也是按照RDD爲單位來進行的

 

計算過程由Spark engine來完成

 

3Dstream建立

Spark Streaming原生支持一些不一樣的數據源。一些核心數據源已經被打包到Spark Streaming Maven 工件中,而其餘的一些則能夠經過 spark-streaming-kafka 等附加工件獲取。每一個接收器都以 Spark 執行器程序中一個長期運行的任務的形式運行,所以會佔據分配給應用的 CPU 核心。此外,咱們還須要有可用的 CPU 核心來處理數據。這意味着若是要運行多個接收器,就必須至少有和接收器數目相同的核心數,還要加上用來完成計算所須要的核心數。例如,若是咱們想要在流計算應用中運行 10 個接收器,那麼至少須要爲應用分配 11 CPU 核心。因此若是在本地模式運行,不要使用local[1]

3.1文件數據源

3.1.1 用法及說明

文件數據流:可以讀取全部HDFS API兼容的文件系統文件,經過fileStream方法進取,Spark Streaming 將會監控 dataDirectory 目錄並不斷處理移動進來的文件,記住目前不支持嵌套目錄。

streamingContext.textFileStream(dataDirectory)

注意事項:

1文件須要有相同的數據格式;

2)文件進入 dataDirectory的方式須要經過移動或者重命名來實現;

3一旦文件移動進目錄,則不能再修改,即使修改了也不會讀取新數據

3.1.2 案例實操

1)在HDFS上建好目錄

[atguigu@hadoop102 spark]$ hadoop fs -mkdir /fileStream

2)在/opt/module/data建立三個文件

[atguigu@hadoop102 data]$ touch a.tsv

[atguigu@hadoop102 data]$ touch b.tsv

[atguigu@hadoop102 data]$ touch c.tsv

 

添加以下數據:

Hello atguigu

Hello spark

(3)編寫代碼

package com.atguigu

 

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.DStream

 

object FileStream {

 

  def main(args: Array[String]): Unit = {

 

    //1.初始化Spark配置信息

Val sparkConf = new SparkConf().setMaster("local[*]")

.setAppName("StreamWordCount")

 

    //2.初始化SparkStreamingContext

    val ssc = new StreamingContext(sparkConf, Seconds(5))

 

//3.監控文件夾建立DStream

    val dirStream = ssc.textFileStream("hdfs://hadoop102:9000/fileStream")

 

    //4.將每一行數據作切分,造成一個個單詞

    val wordStreams = dirStream.flatMap(_.split("\t"))

 

    //5.將單詞映射成元組(word,1

    val wordAndOneStreams = wordStreams.map((_, 1))

 

    //6.將相同的單詞次數作統計

    val wordAndCountStreams] = wordAndOneStreams.reduceByKey(_ + _)

 

    //7.打印

    wordAndCountStreams.print()

 

    //8.啓動SparkStreamingContext

    ssc.start()

    ssc.awaitTermination()

  }

}

(4)啓動程序並向fileStream目錄上傳文件

[atguigu@hadoop102 data]$ hadoop fs -put ./a.tsv /fileStream

[atguigu@hadoop102 data]$ hadoop fs -put ./b.tsv /fileStream

[atguigu@hadoop102 data]$ hadoop fs -put ./c.tsv /fileStream

(5)獲取計算結果

-------------------------------------------

Time: 1539073810000 ms

-------------------------------------------

 

-------------------------------------------

Time: 1539073815000 ms

-------------------------------------------

(Hello,4)

(spark,2)

(atguigu,2)

 

-------------------------------------------

Time: 1539073820000 ms

-------------------------------------------

(Hello,2)

(spark,1)

(atguigu,1)

 

-------------------------------------------

Time: 1539073825000 ms

-------------------------------------------

3.2 RDD隊列(瞭解)

3.2.1 用法及說明

測試過程當中,能夠經過使用ssc.queueStream(queueOfRDDs)來建立DStream,每個推送到這個隊列中的RDD,都會做爲一個DStream處理。

3.2.2 案例實操

1)需求:循環建立幾個RDD,將RDD放入隊列。經過SparkStream建立Dstream,計算WordCount

2)編寫代碼

package com.atguigu

 

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.streaming.dstream.{DStream, InputDStream}

import org.apache.spark.streaming.{Seconds, StreamingContext}

 

import scala.collection.mutable

 

object RDDStream {

 

  def main(args: Array[String]) {

 

    //1.初始化Spark配置信息

    val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")

 

    //2.初始化SparkStreamingContext

    val ssc = new StreamingContext(conf, Seconds(4))

 

    //3.建立RDD隊列

    val rddQueue = new mutable.Queue[RDD[Int]]()

 

    //4.建立QueueInputDStream

    val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)

 

    //5.處理隊列中的RDD數據

    val mappedStream = inputStream.map((_,1))

    val reducedStream = mappedStream.reduceByKey(_ + _)

 

    //6.打印結果

    reducedStream.print()

 

    //7.啓動任務

    ssc.start()

 

//8.循環建立並向RDD隊列中放入RDD

    for (i <- 1 to 5) {

      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)

      Thread.sleep(2000)

    }

 

    ssc.awaitTermination()

  }

}

3)結果展現

-------------------------------------------

Time: 1539075280000 ms

-------------------------------------------

(4,60)

(0,60)

(6,60)

(8,60)

(2,60)

(1,60)

(3,60)

(7,60)

(9,60)

(5,60)

 

-------------------------------------------

Time: 1539075284000 ms

-------------------------------------------

(4,60)

(0,60)

(6,60)

(8,60)

(2,60)

(1,60)

(3,60)

(7,60)

(9,60)

(5,60)

 

-------------------------------------------

Time: 1539075288000 ms

-------------------------------------------

(4,30)

(0,30)

(6,30)

(8,30)

(2,30)

(1,30)

(3,30)

(7,30)

(9,30)

(5,30)

 

-------------------------------------------

Time: 1539075292000 ms

-------------------------------------------

3.3 自定義數據源

3.3.1 用法及說明

須要繼承Receiver,並實現onStart、onStop方法來自定義數據源採集。

3.3.2 案例實操

1)需求:自定義數據源,實現監控某個端口號,獲取該端口號內容。

2)代碼實現

package com.atguigu

 

import java.io.{BufferedReader, InputStreamReader}

import java.net.Socket

import java.nio.charset.StandardCharsets

 

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.receiver.Receiver

 

class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {

 

  //最初啓動的時候,調用該方法,做用爲:讀數據並將數據發送給Spark

  override def onStart(): Unit = {

    new Thread("Socket Receiver") {

      override def run() {

        receive()

      }

    }.start()

  }

 

  //讀數據並將數據發送給Spark

  def receive(): Unit = {

 

    //建立一個Socket

    var socket: Socket = new Socket(host, port)

 

    //定義一個變量,用來接收端口傳過來的數據

    var input: String = null

 

    //建立一個BufferedReader用於讀取端口傳來的數據

    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))

 

    //讀取數據

    input = reader.readLine()

 

    //receiver沒有關閉而且輸入數據不爲空,則循環發送數據給Spark

    while (!isStopped() && input != null) {

      store(input)

      input = reader.readLine()

    }

 

    //跳出循環則關閉資源

    reader.close()

    socket.close()

 

    //重啓任務

    restart("restart")

  }

 

  override def onStop(): Unit = {}

}

3)使用自定義的數據源採集數據

package com.atguigu

 

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.DStream

 

object FileStream {

 

  def main(args: Array[String]): Unit = {

 

    //1.初始化Spark配置信息

Val sparkConf = new SparkConf().setMaster("local[*]")

.setAppName("StreamWordCount")

 

    //2.初始化SparkStreamingContext

    val ssc = new StreamingContext(sparkConf, Seconds(5))

 

//3.建立自定義receiverStreaming

val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))

 

    //4.將每一行數據作切分,造成一個個單詞

    val wordStreams = lineStream.flatMap(_.split("\t"))

 

    //5.將單詞映射成元組(word,1

    val wordAndOneStreams = wordStreams.map((_, 1))

 

    //6.將相同的單詞次數作統計

    val wordAndCountStreams] = wordAndOneStreams.reduceByKey(_ + _)

 

    //7.打印

    wordAndCountStreams.print()

 

    //8.啓動SparkStreamingContext

    ssc.start()

    ssc.awaitTermination()

  }

}

3.4 Kafka數據源(重點)

3.4.1 用法及說明

在工程中須要引入 Maven 工件 spark- streaming-kafka_2.10 來使用它。包內提供的 KafkaUtils 對象能夠在 StreamingContext JavaStreamingContext 中以你的 Kafka 消息建立出 DStream。因爲 KafkaUtils 能夠訂閱多個主題,所以它建立出的 DStream 由成對的主題和消息組成。要建立出一個流數據,須要使用 StreamingContext 實例、一個由逗號隔開的 ZooKeeper 主機列表字符串、消費者組的名字(惟一名字),以及一個從主題到針對這個主題的接收器線程數的映射表來調用 createStream() 方法

3.4.2 案例實操

1)需求1:經過SparkStreamingKafka讀取數據,並將讀取過來的數據作簡單計算(WordCount),最終打印到控制檯。

1)導入依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.2</version>
</dependency>

2)編寫代碼

package com.atguigu

 

import kafka.serializer.StringDecoder

import org.apache.kafka.clients.consumer.ConsumerConfig

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.dstream.ReceiverInputDStream

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.spark.streaming.{Seconds, StreamingContext}

 

object KafkaSparkStreaming {

 

  def main(args: Array[String]): Unit = {

 

    //1.建立SparkConf並初始化SSC

    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

 

    //2.定義kafka參數

    val brokers = "hadoop102:9092,hadoop103:9092,hadoop104:9092"

    val topic = "source"

    val consumerGroup = "spark"

 

    //3.kafka參數映射爲map

    val kafkaParam: Map[String, String] = Map[String, String](

      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",

      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",

      ConsumerConfig.GROUP_ID_CONFIG -> consumerGroup,

      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers

    )

 

    //4.經過KafkaUtil建立kafkaDSteam

    val kafkaDSteam: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](

      ssc,

      kafkaParam,

      Set(topic),

      StorageLevel.MEMORY_ONLY

    )

 

    //5.kafkaDSteam作計算(WordCount

    kafkaDSteam.foreachRDD {

      rdd => {

        val word: RDD[String] = rdd.flatMap(_._2.split(" "))

        val wordAndOne: RDD[(String, Int)] = word.map((_, 1))

        val wordAndCount: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

        wordAndCount.collect().foreach(println)

      }

    }

 

    //6.啓動SparkStreaming

    ssc.start()

    ssc.awaitTermination()

  }

}

4 DStream轉換

DStream上的原語與RDD的相似,分爲Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操做中還有一些比較特殊的原語,如:updateStateByKey()transform()以及各類Window相關的原語。

4.1 無狀態轉化操做

無狀態轉化操做就是把簡單的RDD轉化操做應用到每一個批次上,也就是轉化DStream中的每個RDD。部分無狀態轉化操做列在了表中。注意,針對鍵值對的DStream轉化操做(好比 reduceByKey())要添加import StreamingContext._才能在Scala中使用。

 

須要記住的是,儘管這些函數看起來像做用在整個流上同樣,但事實上每一個DStream在內部是由許多RDD(批次)組成,且無狀態轉化操做是分別應用到每一個RDD上的。例如,reduceByKey()會歸約每一個時間區間中的數據,但不會歸約不一樣區間之間的數據。

舉個例子,在以前的wordcount程序中,咱們只會統計5秒內接收到的數據的單詞個數,而不會累加。 

無狀態轉化操做也能在多個DStream間整合數據,不過也是在各個時間區間內。例如,鍵 值對DStream擁有和RDD同樣的與鏈接相關的轉化操做,也就是cogroup()、join()leftOuterJoin() 等。咱們能夠在DStream上使用這些操做,這樣就對每一個批次分別執行了對應的RDD操做。

咱們還能夠像在常規的Spark 中同樣使用 DStreamunion() 操做將它和另外一個DStream 的內容合併起來,也可使用StreamingContext.union()來合併多個流。

4.2 有狀態轉化操做(重點)

4.2.1 UpdateStateByKey

UpdateStateByKey原語用於記錄歷史記錄,有時,咱們須要在 DStream 中跨批次維護狀態(例如流計算中累加wordcount)。針對這種狀況,updateStateByKey() 爲咱們提供了對一個狀態變量的訪問,用於鍵值對形式的 DStream。給定一個由(鍵,事件)對構成的 DStream,並傳遞一個指定如何根據新的事件 更新每一個鍵對應狀態的函數,它能夠構建出一個新的 DStream,其內部數據爲(鍵,狀態) 對。

updateStateByKey() 的結果會是一個新的 DStream,其內部的 RDD 序列是由每一個時間區間對應的(鍵,狀態)對組成的。

updateStateByKey操做使得咱們能夠在用新信息進行更新時保持任意的狀態。爲使用這個功能,你須要作下面兩步: 
1. 定義狀態,狀態能夠是一個任意的數據類型。 
2. 定義狀態更新函數,用此函數闡明如何使用以前的狀態和來自輸入流的新值對狀態進行更新。

使用updateStateByKey須要對檢查點目錄進行配置,會使用檢查點來保存狀態。

更新版的wordcount

1)編寫代碼

package com.atguigu.streaming

 

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

 

object WorldCount {

 

  def main(args: Array[String]) {

 

    // 定義更新狀態方法,參數values爲當前批次單詞頻度,state爲以往批次單詞頻度

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {

      val currentCount = values.foldLeft(0)(_ + _)

      val previousCount = state.getOrElse(0)

      Some(currentCount + previousCount)

    }

 

    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

    val ssc = new StreamingContext(conf, Seconds(3))

    ssc.checkpoint("hdfs://hadoop102:9000/streamCheck")

 

    // Create a DStream that will connect to hostname:port, like hadoop102:9999

    val lines = ssc.socketTextStream("hadoop102", 9999)

 

    // Split each line into words

    val words = lines.flatMap(_.split(" "))

 

    //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

    // Count each word in each batch

    val pairs = words.map(word => (word, 1))

 

 

    // 使用updateStateByKey來更新狀態,統計從運行開始以來單詞總的次數

    val stateDstream = pairs.updateStateByKey[Int](updateFunc)

    stateDstream.print()

 

    //val wordCounts = pairs.reduceByKey(_ + _)

 

    // Print the first ten elements of each RDD generated in this DStream to the console

    //wordCounts.print()

 

    ssc.start()             // Start the computation

    ssc.awaitTermination()  // Wait for the computation to terminate

    //ssc.stop()

  }

 

}

2)啓動程序並向9999端口發送數據

[atguigu@hadoop102 kafka]$ nc -lk 9999

ni shi shui

ni hao ma

3)結果展現

-------------------------------------------

Time: 1504685175000 ms

-------------------------------------------

-------------------------------------------

Time: 1504685181000 ms

-------------------------------------------

(shi,1)

(shui,1)

(ni,1)

-------------------------------------------

Time: 1504685187000 ms

-------------------------------------------

(shi,1)

(ma,1)

(hao,1)

(shui,1)

(ni,2)

4.2.2 Window Operations

Window Operations能夠設置窗口的大小和滑動窗口的間隔來動態的獲取當前Steaming的容許狀態。基於窗口的操做會在一個比 StreamingContext 的批次間隔更長的時間範圍內,經過整合多個批次的結果,計算出整個窗口的結果。

 

注意:全部基於窗口的操做都須要兩個參數,分別爲窗口時長以及滑動步長,二者都必須是 StreamContext 的批次間隔的整數倍。

窗口時長控制每次計算最近的多少個批次的數據,其實就是最近的 windowDuration/batchInterval 個批次。若是有一個以 10 秒爲批次間隔的源 DStream,要建立一個最近 30 秒的時間窗口(即最近 3 個批次),就應當把 windowDuration 設爲 30 秒。而滑動步長的默認值與批次間隔相等,用來控制對新的 DStream 進行計算的間隔。若是源 DStream 批次間隔爲 10 秒,而且咱們只但願每兩個批次計算一次窗口結果, 就應該把滑動步長設置爲 20 秒。

假設,你想拓展前例從而每隔十秒對持續30秒的數據生成word count。爲作到這個,咱們須要在持續30秒數據的(word,1)DStream上應用reduceByKey。使用操做reduceByKeyAndWindow.

# reduce last 30 seconds of data, every 10 second

windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 20)

 

關於Window的操做有以下原語:

1window(windowLength, slideInterval): 基於對源DStream窗化的批次進行計算返回一個新的Dstream

2countByWindow(windowLength, slideInterval):返回一個滑動窗口計數流中的元素。

3reduceByWindow(func, windowLength, slideInterval):經過使用自定義函數整合滑動區間流元素來建立一個新的單元素流。

4reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):當在一個(K,V)對的DStream上調用此函數,會返回一個新(K,V)對的DStream,此處經過對滑動窗口中批次數據使用reduce函數來整合每一個keyvalue值。Note:默認狀況下,這個操做使用Spark的默認數量並行任務(本地是2),在集羣模式中依據配置屬性(spark.default.parallelism)來作grouping。你能夠經過設置可選參數numTasks來設置不一樣數量的tasks

5reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):這個函數是上述函數的更高效版本,每一個窗口的reduce值都是經過用前一個窗的reduce值來遞增計算。經過reduce進入到滑動窗口數據並」反向reduce」離開窗口的舊數據來實現這個操做。一個例子是隨着窗口滑動對keys的「加」「減」計數。經過前邊介紹能夠想到,這個函數只適用於」可逆的reduce函數」,也就是這些reduce函數有相應的」反reduce」函數(以參數invFunc形式傳入)。如前述函數,reduce任務的數量經過可選參數來配置。注意:爲了使用這個操做,檢查點必須可用。 

6countByValueAndWindow(windowLength,slideInterval, [numTasks]):對(K,V)對的DStream調用,返回(K,Long)對的新DStream,其中每一個key的值是其在滑動窗口中頻率。如上,可配置reduce任務數量。

reduceByWindow() reduceByKeyAndWindow() 讓咱們能夠對每一個窗口更高效地進行歸約操做。它們接收一個歸約函數,在整個窗口上執行,好比 +。除此之外,它們還有一種特殊形式,經過只考慮新進入窗口的數據和離開窗口的數據,讓 Spark 增量計算歸約結果。這種特殊形式須要提供歸約函數的一個逆函數,比 如 + 對應的逆函數爲 -。對於較大的窗口,提供逆函數能夠大大提升執行效率

 

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
  {(x, y) => x + y},
  {(x, y) => x - y},
  Seconds(30),
  Seconds(10))
  // 加上新進入窗口的批次中的元素 // 移除離開窗口的老批次中的元素 // 窗口時長// 滑動步長

countByWindow()countByValueAndWindow()做爲對數據進行計數操做的簡寫。countByWindow()返回一個表示每一個窗口中元素個數的DStream,而countByValueAndWindow()返回的DStream則包含窗口中每一個值的個數

val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}

val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))

val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))

WordCount第三版:3秒一個批次,窗口12秒,滑步6秒。

package com.atguigu.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WorldCount {

  def main(args: Array[String]) {

    // 定義更新狀態方法,參數values爲當前批次單詞頻度,state爲以往批次單詞頻度
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }

    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(3))
    ssc.checkpoint(".")

    // Create a DStream that will connect to hostname:port, like localhost:9999
    val lines = ssc.socketTextStream("hadoop102", 9999)

    // Split each line into words
    val words = lines.flatMap(_.split(" "))

    //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))

    val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6))

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()

    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
    //ssc.stop()
  }

}

4.3 其餘重要操做

4.3.1 Transform

Transform原語容許DStream上執行任意的RDD-to-RDD函數。即便這些函數並無在DStreamAPI中暴露出來,經過該函數能夠方便的擴展Spark API。該函數每一批次調度一次。其實也就是對DStream中的RDD應用轉換。

好比下面的例子,在進行單詞統計的時候,想要過濾掉spam的信息。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}

4.3.2 Join

鏈接操做(leftOuterJoin, rightOuterJoin, fullOuterJoin也能夠),能夠鏈接Stream-Streamwindows-stream to windows-streamstream-dataset

Stream-Stream Joins

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)

Stream-dataset joins

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

5DStream輸出

輸出操做指定了對流數據經轉化操做獲得的數據所要執行的操做(例如把結果推入外部數據庫或輸出到屏幕上)。與RDD中的惰性求值相似,若是一個DStream及其派生出的DStream都沒有被執行輸出操做,那麼這些DStream就都不會被求值。若是StreamingContext中沒有設定輸出操做,整個context就都不會啓動。

輸出操做以下:

1print():在運行流程序的驅動結點上打印DStream中每一批次數據的最開始10個元素。這用於開發和調試。在Python API中,一樣的操做叫print()

2saveAsTextFiles(prefix, [suffix]):以text文件形式存儲這個DStream的內容。每一批次的存儲文件名基於參數中的prefixsuffix。」prefix-Time_IN_MS[.suffix]

3saveAsObjectFiles(prefix, [suffix]):Java對象序列化的方式將Stream中的數據保存爲 SequenceFiles . 每一批次的存儲文件名基於參數中的爲"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。

4saveAsHadoopFiles(prefix, [suffix]):將Stream中的數據保存爲 Hadoop files. 每一批次的存儲文件名基於參數中的爲"prefix-TIME_IN_MS[.suffix]"。
Python API Python中目前不可用。

5foreachRDD(func):這是最通用的輸出操做,即將函數 func 用於產生於 stream的每個RDD。其中參數傳入的函數func應該實現將每個RDD中數據推送到外部系統,如將RDD存入文件或者經過網絡將其寫入數據庫。注意:函數func在運行流應用的驅動中被執行,同時其中通常函數RDD操做從而強制其對於流RDD的運算。

通用的輸出操做foreachRDD(),它用來對DStream中的RDD運行任意計算。這和transform() 有些相似,均可以讓咱們訪問任意RDD。在foreachRDD()中,能夠重用咱們在Spark中實現的全部行動操做。

好比,常見的用例之一是把數據寫到諸如MySQL的外部數據庫中。 注意:

1)鏈接不能寫在driver層面;

2)若是寫在foreach則每一個RDD都建立,得不償失;

3)增長foreachPartition,在分區建立。

相關文章
相關標籤/搜索