大數據技術之_19_Spark學習_04_Spark Streaming 應用解析 + Spark Streaming 概述、運行、解析 + DStream 的輸入、轉換、輸出 + 優化

第1章 Spark Streaming 概述1.1 什麼是 Spark Streaming1.2 爲何要學習 Spark Streaming1.3 Spark 與 Storm 的對比第2章 運行 Spark Streaming第3章 架構與抽象第4章 Spark Streaming 解析4.1 初始化 StreamingContext4.2 什麼是 DStreams4.3 DStream 的輸入4.3.1 基本數據源4.3.2 高級數據源4.4 DStream 的轉換4.4.1 無狀態轉化操做4.4.2 有狀態轉化操做4.4.3 重要操做4.5 DStream 的輸出4.6 累加器和廣播變量4.7 DataFrame ans SQL Operations4.8 Caching / Persistence4.9 不間斷運行 7x24 小時4.9.1 檢查點機制4.9.2 驅動器程序容錯4.9.3 工做節點容錯4.9.4 接收器容錯4.9.5 處理保證4.10 性能考量php


第1章 Spark Streaming 概述

1.1 什麼是 Spark Streaming


  Spark Streaming 相似於 Apache Storm,用於流式數據的處理。根據其官方文檔介紹,Spark Streaming 有高吞吐量和容錯能力強等特色。Spark Streaming 支持的數據輸入源不少,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後能夠用 Spark 的高度抽象,如:map、reduce、join、window 等進行運算。而結果也能保存在不少地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。

 


  和 Spark 基於 RDD 的概念很類似,Spark Streaming 使用離散化流(discretized stream)做爲抽象表示,叫做 DStream。DStream 是隨時間推移而收到的數據的序列。在內部,每一個時間區間收到的數據都做爲 RDD 存在,而 DStream 是由這些 RDD 所組成的序列(所以得名「離散化」)。

 


  DStream 能夠從各類輸入源建立,好比 Flume、Kafka 或者 HDFS。建立出來的 DStream 支持兩種操做,一種是轉化操做(transformation),會生成一個新的 DStream,另外一種是輸出操做(output operation),能夠把數據寫入外部系統中。DStream 提供了許多與 RDD 所支持的操做相相似的操做支持,還增長了與時間相關的新操做,好比滑動窗口。

 

  Spark Streaming 的關鍵抽象html


  DStream:Discretized Stream 離散化流

1.2 爲何要學習 Spark Streaming

一、易用
二、容錯
三、易整合到 Spark 體系java

1.3 Spark 與 Storm 的對比

第2章 運行 Spark Streaming


經過 IDEA 編寫程序
pom.xml 加入如下依賴:
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <!-- provided 表示編譯期可用,運行期不可用 -->
            <!--<scope>provided</scope>-->
        </dependency>

示例代碼以下:sql

package com.atguigu.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WorldCount {
  def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))

    // Create a DStream that will connect to hostname:port, like localhost:9999
    val lines = ssc.socketTextStream("hadoop102"9999)

    // Split each line into words
    val words = lines.flatMap(_.split(" "))
    // import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val results = pairs.reduceByKey(_ + _)

    // Print the first ten elements of each RDD generated in this DStream to the console
    results.print()

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate

    ssc.stop()
  }
}

安裝 Netcat 後,參考文章連接:https://www.cnblogs.com/chenmingjun/p/10785438.html
先啓動 Netcat,而後經過 Netcat 發送數據:數據庫

$ nc -l -p 9999         #監聽9999端口
hello world             #運行 jar 包後,發送測試數據

再按照 Spark Core 中的方式進行打包,並將程序上傳到Spark機器上。並運行:apache

/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.WorldCount /opt/software/sparkjars/networdcount-jar-with-dependencies.jar

注意:若是程序運行時,log 日誌太多,能夠將 spark 的 conf 目錄下的 log4j 文件裏面的日誌級別改爲 WARN。編程

第3章 架構與抽象

  Spark Streaming 使用「微批次」的架構,把流式計算看成一系列連續的小規模批處理來對待。Spark Streaming 從各類輸入源中讀取數據,並把數據分組爲小的批次。新的批次按均勻的時間間隔建立出來。在每一個時間區間開始的時候,一個新的批次就建立出來,在該區間內收到的數據都會被添加到這個批次中。在時間區間結束時,批次中止增加。時間區間的大小是由批次間隔這個參數決定的。批次間隔通常設在 500 毫秒到幾秒之間,由應用開發者配置。每一個輸入批次都造成一個 RDD,以 Spark 做業的方式處理並生成其餘的 RDD。處理的結果能夠以批處理的方式傳給外部系統。高層次的架構以下圖所示:bootstrap

  Spark Streaming 的編程抽象是離散化流,也就是 DStream。它是一個 RDD 序列,每一個 RDD 表明數據流中一個時間片內的數據。windows

  Spark Streaming 在 Spark 的驅動器程序 -- 工做節點的結構的執行過程以下圖所示。Spark Streaming 爲每一個輸入源啓動對應的接收器。接收器以任務的形式運行在應用的執行器進程中,從輸入源收集數據並保存爲 RDD。它們收集到輸入數據後會把數據複製到另外一個執行器進程來保障容錯性(默認行爲)。數據保存在執行器進程的內存中,和緩存 RDD 的方式同樣。驅動器程序中的 StreamingContext 會週期性地運行 Spark 做業來處理這些數據,把數據與以前時間區間中的 RDD 進行整合。緩存

第4章 Spark Streaming 解析

4.1 初始化 StreamingContext

源碼:

import org.apache.spark._
import org.apache.spark.streaming._

// 能夠經過 ssc.sparkContext 來訪問 SparkContext
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

// 或者經過已經存在的 SparkContext 來建立 StreamingContext
import org.apache.spark.streaming._
val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

初始化完 Context 以後:
  1)定義消息輸入源來建立 DStreams。
  2)定義 DStreams 的轉化操做和輸出操做。
  3)經過 streamingContext.start() 來啓動消息採集和處理.
  4)等待程序終止,能夠經過 streamingContext.awaitTermination() 來設置。
  5)經過 streamingContext.stop() 來手動終止處理程序。
注意
  StreamingContext 一旦啓動,對 DStreams 的操做就不能修改了。
  在同一時間一個 JVM 中只有一個 StreamingContext 能夠啓動。
  stop() 方法將同時中止 SparkContext,能夠傳入參數 stopSparkContext 用於只中止 StreamingContext。

4.2 什麼是 DStreams

  Discretized Stream 是 Spark Streaming 的基礎抽象,表明持續性的數據流和通過各類 Spark 原語操做後的結果數據流。在內部實現上,DStream 是一系列連續的 RDD 來表示。每一個 RDD 含有一段時間間隔內的數據,以下圖:

  對數據的操做也是按照 RDD 爲單位來進行的,以下圖:

  計算過程由 Spark Engine 來完成,以下圖:

4.3 DStream 的輸入

  Spark Streaming 原生支持一些不一樣的數據源。一些「核心」數據源已經被打包到 Spark Streaming 的 Maven 工件中,而其餘的一些則能夠經過 spark-streaming-kafka 等附加工件獲取。每一個接收器都以 Spark 執行器程序中一個長期運行的任務的形式運行,所以會佔據分配給應用的 CPU 核心。此外,咱們還須要有可用的 CPU 核心來處理數據。這意味着若是要運行多個接收器,就必須至少有和接收器數目相同的核心數,還要加上用來完成計算所須要的核心數。例如,若是咱們想要在流計算應用中運行 10 個接收器,那麼至少須要爲應用分配 11 個 CPU 核心。因此若是在本地模式運行,不要使用 local 或者 local[1]。

4.3.1 基本數據源

文件數據源(實際開發中這種方式用的比較少)
Socket 數據流前面的例子已經看到過。
文件數據流:可以讀取全部 HDFS API 兼容的文件系統文件,經過 fileStream 方法進行讀取。

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming 將會監控 dataDirectory 目錄並不斷處理移動進來的文件,注意:目前不支持嵌套目錄。
  1)文件須要有相同的數據格式。
  2)文件進入 dataDirectory 的方式須要經過移動或者重命名來實現。
  3)一旦文件移動進目錄,則不能再修改,即使修改了也不會讀取新的數據。
若是文件比較簡單,則可使用 streamingContext.textFileStream(dataDirectory) 方法來讀取文件。文件流不須要接收器,不須要單獨分配 CPU 核。

Hdfs 讀取實例:(須要提早在 HDFS 上建好目錄

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> val ssc = new StreamingContext(sc, Seconds(1))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@4eb3b690

scala> val lines = ssc.textFileStream("hdfs://hadoop102:9000/data/")
lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@14c7ab73

scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@125bc00d

scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@4a3363c9

scala> wordCounts.print()

scala> ssc.start()

上傳文件到 HDFS 進行測試:

[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -mkdir /data/
[atguigu@hadoop102 hadoop-2.7.2]$ ls
bin  data  etc  include  input  lib  libexec  LICENSE.txt  logs  NOTICE.txt  README.txt  safemode.sh  sbin  share  wcinput  wcoutput
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -put ./LICENSE.txt /data/
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -put ./README.txt /data/

獲取計算結果:

-------------------------------------------
Time: 1504665716000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665717000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665718000 ms
-------------------------------------------
(227.7202-1,2)
(created,2)
(offer,8)
(BUSINESS,11)
(agree,10)
(hereunder,,1)
(「control」,1)
(Grant,2)
(2.2.,2)
(include,11)
...
-------------------------------------------
Time1504665740000 ms
-------------------------------------------
(under,1)
(Technology,1)
(distribution,2)
(http://hadoop.apache.org/core/,1)
(Unrestricted,1)
(740.13),1)
(check,1)
(have,1)
(policies,1)
(uses,1)
...
-------------------------------------------
Time1504665741000 ms
-------------------------------------------

自定義數據源(實際開發中用的較多)
  經過繼承 Receiver,並實現 onStart、onStop 方法來自定義數據源採集。

// Receiver 須要提供一個類型參數,該類型參數是 Receiver 接收到的數據的類型
class CustomReceiver(hostStringportIntextends Receiver[String](StorageLevel.MEMORY_AND_DISK_2{

  override def onStart(): Unit = {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      // 定義一個新的線程去執行 receive() 方法
      override def run() {
        receive()
      }
    }.start()
  }

  override def onStop(): Unit = {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /**
    * Create a socket connection and receive data until receiver is stopped
    */

  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)

      // Until stopped or connection broken continue reading
      // 獲取 Socket 的輸入對象
      val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))

      userInput = reader.readLine()
      // 當 Receiver 沒有中止而且 userInput 不爲空
      while (!isStopped && userInput != null) {
        // 經過 store() 方法將獲取到的 userInput 提交給 Spark 框架
        store(userInput)
        // 再獲取下一條
        userInput = reader.readLine()
      }
      reader.close()
      socket.close()

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }
}

能夠經過 streamingContext.receiverStream(<instance of custom receiver>) 來使用自定義的數據採集源。

// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = lines.flatMap(_.split(" "))
...

模擬 Spark 內置的 Socket 連接,所有源碼以下:

package com.atguigu.streaming

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

// Receiver 須要提供一個類型參數,該類型參數是 Receiver 接收到的數據的類型
class CustomReceiver(hostStringportIntextends Receiver[String](StorageLevel.MEMORY_AND_DISK_2{

  override def onStart(): Unit = {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      // 定義一個新的線程去執行 receive() 方法
      override def run() {
        receive()
      }
    }.start()
  }

  override def onStop(): Unit = {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /**
    * Create a socket connection and receive data until receiver is stopped
    */

  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)

      // Until stopped or connection broken continue reading
      // 獲取 Socket 的輸入對象
      val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))

      userInput = reader.readLine()
      // 當 Receiver 沒有中止而且 userInput 不爲空
      while (!isStopped && userInput != null) {
        // 經過 store() 方法將獲取到的 userInput 提交給 Spark 框架
        store(userInput)
        // 再獲取下一條
        userInput = reader.readLine()
      }
      reader.close()
      socket.close()

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }
}

object CustomReceiverDemo {
  def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))

    // Create a DStream that will connect to hostname:port, like localhost:9999
    val lines = ssc.receiverStream(new CustomReceiver("hadoop102"9999))

    // Split each line into words
    val words = lines.flatMap(_.split(" "))

    //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
    // ssc.stop()
  }
}

先啓動 Netcat,而後經過 Netcat 發送數據:

$ nc -l -p 9999         #監聽9999端口
hello world             #運行 jar 包後,發送測試數據

按照 Spark Core 中的方式進行打包,並將程序上傳到Spark機器上。並運行:

/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.CustomReceiverDemo /opt/software/sparkjars/sparkstreaming_customerReceiver-1.0-SNAPSHOT-jar-with-dependencies.jar

輸出結果截圖:


RDD 隊列(用在 Spark Streaming 與 RDD 的結合時,即混合程序)
  測試過程當中,能夠經過使用 streamingContext.queueStream(queueOfRDDs) 來建立 DStream,每個推送到這個隊列中的 RDD,都會做爲一個 DStream 處理。

package com.atguigu.streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object QueueRdd {

  def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local[2]").setAppName("QueueRdd")
    val ssc = new StreamingContext(conf, Seconds(1))

    // Create the queue through which RDDs can be pushed to
    // a QueueInputDStream
    // 建立 RDD 隊列
    val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()

    // Create the QueueInputDStream and use it do some processing
    // 建立 QueueInputDStream
    val inputStream = ssc.queueStream(rddQueue)

    // 處理隊列中的 RDD 數據
    val mappedStream = inputStream.map(x => (x % 101))
    val reducedStream = mappedStream.reduceByKey(_ + _)

    // 打印結果
    reducedStream.print()

    // 啓動計算
    ssc.start()

    // Create and push some RDDs into
    for (i <- 1 to 30) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 30010)
      Thread.sleep(2000)
      // 經過程序中止 StreamingContext 的運行
      // ssc.stop()
    }
  }
}

運行jar 包

/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.QueueRdd /opt/software/sparkjars/sparkstreaming_queueRdd-1.0-SNAPSHOT-jar-with-dependencies.jar

輸出結果以下:

[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class com.atguigu.streaming.QueueRdd /opt/software/sparkjars/sparkstreaming_queueRdd-1.0-SNAPSHOT-jar-with-dependencies.jar
19/04/28 20:30:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------                                     
Time1556454615000 ms
-------------------------------------------
(4,30)
(0,30)
(6,30)
(8,30)
(2,30)
(1,30)
(3,30)
(7,30)
(9,30)
(5,30)

-------------------------------------------
Time1556454616000 ms
-------------------------------------------

-------------------------------------------
Time1556454617000 ms
-------------------------------------------
(4,30)
(0,30)
(6,30)
(8,30)
(2,30)
(1,30)
(3,30)
(7,30)
(9,30)
(5,30)

-------------------------------------------
Time1556454618000 ms
-------------------------------------------
......

4.3.2 高級數據源

除核心數據源外,還能夠用附加數據源接收器來從一些知名數據獲取系統中接收的數據,這些接收器都做爲 Spark Streaming 的組件進行獨立打包了。它們仍然是 Spark 的一部分,不過你須要在構建文件中添加額外的包才能使用它們。現有的接收器包括 Twitter、Apache Kafka、Amazon Kinesis、Apache Flume,以及 ZeroMQ。能夠經過添加與 Spark 版本匹配 的 Maven 工件 spark-streaming-[projectname]_2.10 來引入這些附加接收器。

Apache Kafka

在工程中須要引入 Maven 工件 spark- streaming-kafka_2.10 來使用它。包內提供的 KafkaUtils 對象能夠在 StreamingContext 和 JavaStreamingContext 中以你的 Kafka 消息建立出 DStream。因爲 KafkaUtils 能夠訂閱多個主題,所以它建立出的 DStream 由成對的主題和消息組成。要建立出一個流數據,須要使用 StreamingContext 實例、一個由逗號隔開的 ZooKeeper 主機列表字符串、消費者組的名字(惟一名字),以及一個從主題到針對這個主題的接收器線程數的映射表來調用 createStream() 方法。

import org.apache.spark.streaming.kafka._
...
// 建立一個從主題到接收器線程數的映射表
val topics = List(("pandas"1), ("logs"1)).toMap

val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics) 
topicLines.map(_._2)

下面咱們進行一個實例,演示 SparkStreaming 如何從 Kafka 讀取消息,以及如何經過鏈接池方法把消息處理完成後再寫回 Kafka:

pom.xml 須要加入的依賴以下:

        <!-- 用來提供對象鏈接池 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.4.2</version>
        </dependency>

        <!-- 用來鏈接 Kafka 的工具類 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

kafka Connection Pool 程序:

package com.atguigu.streaming

import java.util.Properties
import org.apache.commons.pool2.impl.DefaultPooledObject
import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

// 自定義的樣例類(是池化的對象)
case class KafkaProducerProxy(brokerList: String,
                              producerConfig: Properties = new Properties,
                              defaultTopic: Option[String] = None,
                              producer: Option[KafkaProducer[String, String]] = None)
 
{
  type Key = String
  type Val = String

  require(brokerList == null || !brokerList.isEmpty, "Must set broker list")

  private val p 
= producer getOrElse {
    val props: Properties = new Properties();
    props.put("bootstrap.servers", brokerList);
    props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");

    new KafkaProducer[String, String](props)
  }

  // 把我要發送的消息包裝成了 ProducerRecord
  private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = {
    val t = topic.getOrElse(defaultTopic.getOrElse(throw new IllegalArgumentException("Must provide topic or default topic")))
    require(!t.isEmpty, "Topic must not be empty")
    key match {
      case Some(k) => new ProducerRecord(t, k, value)
      case _ => new ProducerRecord(t, value)
    }
  }

  def send(key: Key, value: Val, topic: Option[String] = None) {
    // 調用 KafkaProducer 對象的 send 方法來發送消息
    p.send(toMessage(value, Option(key), topic))
  }

  def send(value: Val, topic: Option[String]) {
    send(null, value, topic)
  }

  def send(value: Val, topic: String) {
    send(null, value, Option(topic))
  }

  def send(value: Val) {
    send(null, value, None)
  }

  def shutdown(): Unit = p.close()
}

abstract class KafkaProducerFactory(brokerList: String, config: Properties, topic: Option[String] = None) extends Serializable {

  def newInstance(): KafkaProducerProxy
}

class BaseKafkaProducerFactory(brokerList: String,
                               config: Properties = new Properties,
                               defaultTopic: Option[String] = None)

  extends KafkaProducerFactory(brokerList, config, defaultTopic) 
{

  override def newInstance() new KafkaProducerProxy(brokerList, config, defaultTopic)
}

// 繼承一個基礎的鏈接池,須要提供池化的對象類型
class PooledKafkaProducerAppFactory(val factoryKafkaProducerFactory)
  extends BasePooledObjectFactory[KafkaProducerProxywith Serializable 
{

  // 用於鏈接池建立對象
  override def create(): KafkaProducerProxy = factory.newInstance()

  // 用於鏈接池包裝對象
  override def wrap(obj: KafkaProducerProxy): PooledObject[KafkaProducerProxy] new DefaultPooledObject(obj)

  // 用於鏈接池銷燬對象
  override def destroyObject(p: PooledObject[KafkaProducerProxy]): Unit = {
    p.getObject.shutdown()
    super.destroyObject(p)
  }
}

KafkaStreaming main:

package com.atguigu.streaming

import org.apache.commons.pool2.impl.{GenericObjectPool, GenericObjectPoolConfig}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

// 單例對象(即保證了 kafka 鏈接池只有一個)
object createKafkaProducerPool {

  // 用於返回真正的對象池 GenericObjectPool
  def apply(brokerList: String, topic: String): GenericObjectPool[KafkaProducerProxy] = {
    val producerFactory = new BaseKafkaProducerFactory(brokerList, defaultTopic = Option(topic))
    val pooledProducerFactory = new PooledKafkaProducerAppFactory(producerFactory)
    // 指定 kafka 對象池的大小
    val poolConfig = {
      val c = new GenericObjectPoolConfig
      val maxNumProducers = 10
      c.setMaxTotal(maxNumProducers)
      c.setMaxIdle(maxNumProducers)
      c
    }
    // 返回一個對象池
    new GenericObjectPool[KafkaProducerProxy](pooledProducerFactory, poolConfig)
  }
}

object KafkaStreaming {

  def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))

    // 建立 topic
    val brobrokers = "192.168.25.102:9092,192.168.25.103:9092,192.168.25.104:9092" // kafka 集羣的地址
    val sourcetopic = "source"// kafka 的隊列名稱
    val targettopic = "target"// kafka 的隊列名稱

    // 建立消費者組
    val group = "con-consumer-group"
    // 消費者配置
    val kafkaParam = Map(
      "bootstrap.servers" -> brobrokers, // 用於初始化連接到集羣的地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],

      // 用於標識這個消費者屬於哪一個消費團體
      "group.id" -> group,

      // 若是沒有初始化偏移量或者當前的偏移量不存在任何服務器上,可使用這個配置屬性
      // 可使用這個配置,latest 自動重置偏移量爲最新的偏移量
      "auto.offset.reset" -> "latest",

      // 若是是 true,則這個消費者的偏移量會在後臺自動提交
      "enable.auto.commit" -> (false: java.lang.Boolean)
    );

    // ssc.sparkContext.broadcast(pool)

    // 建立 DStream,返回接收到的輸入數據
    val stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(sourcetopic), kafkaParam))

    // 每個 stream 都是一個 ConsumerRecord
    stream.map(s => ("id:" + s.key(), ">>>>:" + s.value())).foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
        // Get a producer from the shared pool
        val pool = createKafkaProducerPool(brobrokers, targettopic)
        val p = pool.borrowObject()

        partitionOfRecords.foreach { message => System.out.println(message._2); p.send(message._2, Option(targettopic)) }

        // Returning the producer to the pool also shuts it down
        pool.returnObject(p)
      })
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

程序部署:
一、啓動 zookeeper 集羣和 kafka 集羣。

[atguigu@hadoop102 zookeeper-3.4.10]$ pwd
/opt/module/zookeeper-3.4.10
[atguigu@hadoop102 zookeeper-3.4.10]$ /bin/zkServer.sh start
[atguigu@hadoop103 zookeeper-3.4.10]$ /bin/zkServer.sh start
[atguigu@hadoop104 zookeeper-3.4.10]$ /bin/zkServer.sh start

[atguigu@hadoop102 kafka]$ pwd
/opt/module/kafka
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties

二、建立兩個 topic,一個爲 source,一個爲 target

bin/kafka-topics.sh --create \
--zookeeper 192.168.25.102:2181,192.168.25.103:2181,192.168.25.104:2181 \
--replication-factor 2 \
--partitions 2 \
--topic source

bin/kafka-topics.sh --create \
--zookeeper 192.168.25.102:2181,192.168.25.103:2181,192.168.25.104:2181 \
--replication-factor 2 \
--partitions 2 \
--topic targe

三、啓動 kafka console producer 寫入 source topic

bin/kafka-console-producer.sh \
--broker-list 192.168.25.102:9092,192.168.25.103:9092,192.168.25.104:9092 \
--topic source

四、啓動 kafka console consumer 監聽 target topic

bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.25.102:9092,192.168.25.103:9092,192.168.25.104:9092 \
--topic target

五、啓動 kafka Streaming 程序

[atguigu@hadoop102 ~]$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class com.atguigu.streaming.KafkaStreaming \
/opt/software/sparkjars/kafkastreaming-jar-with-dependencies.jar

六、程序運行截圖
生產者


Spark Stream

消費者

kafka 知識補充:
kafka 集羣圖解


分片圖解

新舊 kafka 版本對比

Flume-ng

Spark 提供兩個不一樣的接收器來使用 Apache Flume(http://flume.apache.org)。
兩個接收器簡介以下。
  • 推式接收器:該接收器以 Avro 數據池的方式工做,由 Flume 向其中推數據。
  • 拉式接收器:該接收器能夠從自定義的中間數據池中拉數據,而其餘進程可使用 Flume 把數據推動該中間數據池。
兩種方式都須要從新配置 Flume,並在某個節點配置的端口上運行接收器(不是已有的 Spark 或者 Flume 使用的端口)。要使用其中任何一種方法,都須要在工程中引入 Maven 工件 spark-streaming-flume_2.10。

  推式接收器的方法設置起來很容易,可是它不使用事務來接收數據。在這種方式中,接收器以 Avro 數據池的方式工做,咱們須要配置 Flume 來把數據發到 Avro 數據池。咱們提供的 FlumeUtils 對象會把接收器配置在一個特定的工做節點的主機名及端口號 上。這些設置必須和 Flume 配置相匹配。

  雖然這種方式很簡潔,但缺點是沒有事務支持。這會增長運行接收器的工做節點發生錯誤 時丟失少許數據的概率。不只如此,若是運行接收器的工做節點發生故障,系統會嘗試從 另外一個位置啓動接收器,這時須要從新配置 Flume 才能將數據發給新的工做節點。這樣配 置會比較麻煩。

  較新的方式是拉式接收器(在Spark 1.1中引入),它設置了一個專用的Flume數據池供 Spark Streaming 讀取,並讓接收器主動從數據池中拉取數據。這種方式的優勢在於彈性較 好,Spark Streaming 經過事務從數據池中讀取並複製數據。在收到事務完成的通知前,這些數據還保留在數據池中。

  咱們須要先把自定義數據池配置爲 Flume 的第三方插件。安裝插件的最新方法請參考 Flume 文檔的相關部分(https://flume.apache.org/FlumeUserGuide.html#installing-third-party- plugins)。因爲插件是用 Scala 寫的,所以須要把插件自己以及 Scala 庫都添加到 Flume 插件 中。Spark 1.1 中對應的 Maven 索引以下所示。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume-sink_2.11</artifactId>
    <version>1.2.0</version>
</dependency>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.11</version>
</dependency>

當你把自定義 Flume 數據池添加到一個節點上以後,就須要配置 Flume 來把數據推送到這個數據池中。

a1.sinks = spark
a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.spark.hostname = receiver-hostname
a1.sinks.spark.port = port-used-for-sync-not-spark-port
a1.sinks.spark.channel = memoryChannel

等到數據已經在數據池中緩存起來,就能夠調用 FlumeUtils 來讀取數據了。

4.4 DStream 的轉換

DStream 上的原語與 RDD 的相似,分爲 Transformations(轉換)和 Output Operations(輸出)兩種,此外轉換操做中還有一些比較特殊的原語,如:updateStateByKey()、transform() 以及各類 Window 相關的原語。

DStream 的轉化操做能夠分爲無狀態(stateless)和有狀態(stateful)兩種。
  • 在無狀態轉化操做中,每一個批次的處理不依賴於以前批次的數據。常見的 RDD 轉化操做,例如 map()、filter()、reduceByKey() 等,都是無狀態轉化操做。
  • 相對地,有狀態轉化操做須要使用以前批次的數據或者是中間結果來計算當前批次的數據。有狀態轉化操做包括基於滑動窗口的轉化操做追蹤狀態變化的轉化操做

4.4.1 無狀態轉化操做


  無狀態轉化操做就是把簡單的 RDD 轉化操做應用到每一個批次上,也就是轉化 DStream 中的每個 RDD。部分無狀態轉化操做列在了下表中。注意,針對鍵值對的 DStream 轉化操做(好比 reduceByKey()) 要添加 import StreamingContext._ 才能在 Scala 中使用。

  須要記住的是,儘管這些函數看起來像做用在整個流上同樣,但事實上每一個 DStream 在內部是由許多 RDD(批次)組成,且無狀態轉化操做是分別應用到每一個 RDD 上的。例如,reduceByKey() 會歸約每一個時間區間中的數據,但不會歸約不一樣區間之間的數據。
  舉個例子,在以前的 wordcount 程序中,咱們只會統計1秒內接收到的數據的單詞個數,而不會累加。
  無狀態轉化操做也能在多個 DStream 間整合數據,不過也是在各個時間區間內。例如,鍵值對 DStream 擁有和 RDD 同樣的與鏈接相關的轉化操做,也就是 cogroup()、join()、leftOuterJoin() 等。咱們能夠在 DStream 上使用這些操做,這樣就對每一個批次分別執行了對應的 RDD 操做。
  咱們還能夠像在常規的 Spark 中同樣使用 DStream 的 union() 操做將它和另外一個 DStream 的內容合併起來,也可使用 StreamingContext.union() 來合併多個流。

4.4.2 有狀態轉化操做

特殊的 Transformations。

追蹤狀態變化 UpdateStateByKey

  updateStateByKey 原語是用於記錄歷史記錄,有時,咱們須要在 DStream 中跨批次維護狀態(例如流計算中累加 wordcount)。針對這種狀況,updateStateByKey() 爲咱們提供了對一個狀態變量的訪問,用於鍵值對形式的 DStream。給定一個由 (鍵,事件) 對構成的 DStream,並傳遞一個指定如何根據新的事件更新每一個鍵對應狀態的函數,它能夠構建出一個新的 DStream,其內部數據爲 (鍵,狀態) 對。
  updateStateByKey() 的結果會是一個新的 DStream,其內部的 RDD 序列是由每一個時間區間對應的 (鍵,狀態) 對組成的。
  updateStateByKey() 操做使得咱們能夠在用新信息進行更新時保持任意的狀態。爲使用這個功能,你須要作下面兩步:
  • 1)定義狀態,狀態能夠是一個任意的數據類型。
  • 2)定義狀態更新函數,用此函數闡明如何使用以前的狀態和來自輸入流的新值對狀態進行更新。
  使用 updateStateByKey 須要對檢查點目錄進行配置,會使用檢查點來保存狀態。

WordCount 第二版:
代碼以下:

package com.atguigu.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WorldCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(3))
    ssc.checkpoint("hdfs://192.168.25.102:9000/spark/checkpoints"// 設置一個檢查點的目錄

    // Create a DStream that will connect to hostname:port, like localhost:9999
    val lines = ssc.socketTextStream("hadoop102"9999)

    // Split each line into words
    val words = lines.flatMap(_.split(" ")) // DStream[RDD[String]]

    // import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1)) // 將每個單詞映射成一個元組 (word,1)

    // 定義更新狀態方法,參數 values 爲當前批次單詞頻度,state 爲以往批次單詞頻度(該參數是由框架提供的)
    val updateFunc = (values: Seq[Int], state: Option[Int]) => { // 匿名函數
      // 計算當前批次相同 key 的單詞總數
      val currentCount = values.foldLeft(0)(_ + _)
      // 獲取上一次保存的單詞總數
      val previousCount = state.getOrElse(0)
      // 返回新的單詞總數
      Some(currentCount + previousCount)
    }

    // 使用 updateStateByKey 來更新狀態,統計從運行開始以來單詞總的次數
    val stateDstream = pairs.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    // 以 text 文件形式存儲這個 DStream 的內容。第一個參數是存儲路徑,第二個參數是文件的後綴名。
    stateDstream.saveAsTextFiles("hdfs://192.168.25.102:9000/stateful""abc")

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
    // ssc.stop()
  }
}

更新狀態方法 updateFunc 圖解以下:

測試:
先啓動 netcat,再啓動統計程序,再經過 netcat 發送測試數據

[atguigu@hadoop102 ~]$ nc -l -p 9999
hello hello china world             #發送第一個 RRD
hello hello china china             #發送第二個 RRD

啓動統計程序

bin/spark-submit \
--class com.atguigu.streaming.WorldCount \
/opt/software/sparkjars/statefulwordcount-jar-with-dependencies.jar

[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ pwd
/opt/module/spark-2.1.1-bin-hadoop2.7
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ bin/spark-submit \
> --class com.atguigu.streaming.WorldCount \
> /opt/software/sparkjars/statefulwordcount-jar-with-dependencies.jar
19/04/29 11:26:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1556508402000 ms
-------------------------------------------

19/04/29 11:26:44 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/04/29 11:26:44 WARN BlockManager: Block input-0-1556508404000 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1556508405000 ms
-------------------------------------------
(hello,2)
(world,1)
(china,1)

19/04/29 11:26:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/04/29 11:26:47 WARN BlockManager: Block input-0-1556508407400 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1556508408000 ms
-------------------------------------------
(hello,4)
(world,1)
(china,3)

-------------------------------------------
Time: 1556508411000 ms
-------------------------------------------
(hello,4)
(world,1)
(china,3)

在 HDFS 上查看檢查點目錄


Window Operations

  Window Operations 有點相似於 Storm中 的 State,能夠設置窗口的大小和滑動窗口的間隔來動態的獲取當前 Steaming 的容許狀態。基於窗口的操做會在一個比 StreamingContext 的批次間隔更長的時間範圍內,經過整合多個批次的結果,計算出整個窗口的結果。

  全部基於窗口的操做都須要兩個參數,分別爲窗口時長以及滑動步長,二者都必須是 StreamContext 的批次間隔的整數倍。窗口時長控制每次計算最近的多少個批次的數據,其實就是最近的 windowDuration/batchInterval 個批次,以下圖所示。若是有一個以 10 秒爲批次間隔的源 DStream,要建立一個最近 30 秒的時間窗口(即最近 3 個批次),就應當把 windowDuration 設爲 30 秒。而滑動步長的默認值與批次間隔相等,用來控制對新的 DStream 進行計算的間隔。若是源 DStream 批次間隔爲 10 秒,而且咱們只但願每兩個批次計算一次窗口結果,就應該把滑動步長設置爲 20 秒。假設,你想拓展前例從而每隔十秒對持續 30 秒的數據生成 wordcount。爲作到這個,咱們須要在持續 30 秒數據的 (word,1) 對 DStream上應用 reduceByKey。

使用操做 reduceByKeyAndWindow:

# reduce last 30 seconds of data, every 10 second
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 3020)

Window Operations 經常使用函數


reduceByWindow() 和 reduceByKeyAndWindow() 讓咱們能夠對每一個窗口 更高效地進行歸約操做。它們接收一個歸約函數,在整個窗口上執行,好比 +。除此之外,它們還有一種特殊形式,經過只考慮新進入窗口的數據和離開窗 口的數據,讓 Spark 增量計算歸約結果。這種特殊形式須要提供歸約函數的一個 逆函數,好比 + 對應的逆函數爲 -。對於較大的窗口,提供逆函數能夠大大提升執行效率。以下圖所示:

示例代碼:

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(

  {(x, y) => x + y},
  {(x, y) => x - y},
  Seconds(30),
  Seconds(10))
  // 加上新進入窗口的批次中的元素 // 移除離開窗口的老批次中的元素 // 窗口時長// 滑動步長 

countByWindow() 和 countByValueAndWindow() 做爲對數據進行計數操做的簡寫。countByWindow() 返回一個表示每一個窗口中元素個數的 DStream,而 countByValueAndWindow() 返回的 DStream 則包含窗口中每一個值的個數。

val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}

val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) 
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))

WordCount 第三版:3 秒一個批次,窗口 12 秒,滑步 6 秒。

package com.atguigu.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WorldCount {

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(3))
    ssc.checkpoint("hdfs://192.168.25.102:9000/spark/checkpoints"// 設置一個檢查點的目錄

    // Create a DStream that will connect to hostname:port, like localhost:9999
    val lines = ssc.socketTextStream("hadoop102"9999)

    // Split each line into words
    val words = lines.flatMap(_.split(" "))

    //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))

    // 3 秒一個批次,窗口 12 秒,會有 12 / 3 = 4 個批次
    // 滑動步長 6 秒,會有 6 / 3 = 2 個批次
    val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(12), Seconds(6))

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
    // ssc.stop()
  }
}

測試:
先啓動 netcat,再啓動統計程序,再經過 netcat 發送測試數據

[atguigu@hadoop102 ~]$ nc -l -p 9999
hello hello china world             #發送第一個 RRD
hello hello china china             #發送第二個 RRD

啓動統計程序

bin/spark-submit \
--class com.atguigu.streaming.WorldCount \
/opt/software/sparkjars/windowwordcount-jar-with-dependencies.jar

4.4.3 重要操做

Transform Operation
Transform 原語容許 DStream 上執行任意的 RDD-to-RDD 函數。即便這些函數並無在 DStream 的 API 中暴露出來,經過該函數能夠方便的擴展 Spark API。
該函數每一批次調度一次。
好比下面的例子,在進行單詞統計的時候,想要過濾掉 spam 的信息。
其實也就是對 DStream 中的 RDD 應用轉換。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}

Join 操做
鏈接操做(leftOuterJoin, rightOuterJoin, fullOuterJoin 也能夠),能夠鏈接 stream-stream,windows-stream to windows-stream、stream-dataset
1)stream-stream joins

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)

2)stream-dataset joins

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

4.5 DStream 的輸出

輸出操做指定了對流數據經轉化操做獲得的數據所要執行的操做(例如把結果推入外部數據庫或輸出到屏幕上)。與 RDD 中的惰性求值相似,若是一個 DStream 及其派生出的 DStream 都沒有被執行輸出操做,那麼這些 DStream 就都不會被求值。若是 StreamingContext 中沒有設定輸出操做,那麼整個 context 就都不會啓動。

通用的輸出操做 foreachRDD(),它用來對 DStream 中的 RDD 運行任意計算。這和transform() 有些相似,均可以讓咱們訪問任意 RDD。在 foreachRDD() 中,能夠重用咱們在 Spark 中實現的全部行動操做。好比,常見的用例之一是把數據寫到諸如 MySQL 的外部數據庫中。
須要注意的:
  • 1)鏈接不能寫在 driver 層面。
  • 2)若是寫在 foreach 中則每一個 RDD 都建立,得不償失。
  • 3)增長 foreachPartition,在分區建立。
  • 4)能夠考慮使用鏈接池優化。

dstream.foreachRDD { rdd =>
  // error val connection = createNewConnection()  // executed at the driver 序列化錯誤

  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record) // executed at the worker
    )
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

4.6 累加器和廣播變量

累加器(Accumulators)和廣播變量(Broadcast variables)不能從 Spark Streaming 的檢查點中恢復。若是你啓用檢查並也使用了累加器和廣播變量,那麼你必須建立累加器和廣播變量的延遲單實例從而在驅動因失效重啓後他們能夠被從新實例化。以下例述:

object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] 
= {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a""b""c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator 
= {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  // Get or register the blacklist Broadcast
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("["", ""]")
  val output = "Counts at time " + time + " " + counts
})

4.7 DataFrame ans SQL Operations

你能夠很容易地在流數據上使用 DataFrames 和 SQL。你必須使用 SparkContext 來建立 StreamingContext 要用的 SQLContext。此外,這一過程能夠在驅動失效後重啓。咱們經過建立一個實例化的 SQLContext 單實例來實現這個工做。以下例所示。咱們對前例 wordcount 進行修改從而使用 DataFrames 和 SQL 來產生 wordcounts。每一個 RDD 被轉換爲 DataFrame,以臨時表格配置並用 SQL 進行查詢。

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame =
  spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

你也能夠從不一樣的線程在定義流數據的表上運行 SQL 查詢(也就是說,異步運行 StreamingContext)。僅肯定你設置 StreamingContext 記住了足夠數量的流數據以使得查詢操做能夠運行。不然,StreamingContext 不會意識到任何異步的 SQL 查詢操做,那麼其就會在查詢完成以後刪除舊的數據。例如,若是你要查詢最後一批次,可是你的查詢會運行 5 分鐘,那麼你須要調用 streamingContext.remember(Minutes(5))
(in Scala 或者其餘語言的等價操做)。

4.8 Caching / Persistence

和 RDDs 相似,DStream 一樣容許開發者將流數據保存在內存中。也就是說,在 DStream 上使用 persist() 方法將會自動把 DStream 中的每一個 RDD 保存在內存中。當 DStream 中的數據要被屢次計算時,這個很是有用(如在一樣數據上的屢次操做)。對於像 reduceByWindow 和 reduceByKeyAndWindow 以及基於狀態的 (updateStateByKey) 這種操做,保存在內存中是隱含默認的。所以,即便開發者沒有調用 persist(),由基於窗操做產生的 DStream 會自動保存在內存中

4.9 不間斷運行 7x24 小時

4.9.1 檢查點機制

檢查點機制是咱們在 Spark Streaming 中用來保障容錯性的主要機制。與應用程序邏輯無關的錯誤(即系統錯位,JVM 崩潰等)有迅速恢復的能力。
它可使 Spark Streaming 階段性地把應用數據存儲到諸如 HDFS 或 Amazon S3 這樣的可靠存儲系統中,以供恢復時使用。具體來講,檢查點機制主要爲如下兩個目的服務:
  • 1)控制發生失敗時須要重算的狀態數。SparkStreaming 能夠經過轉化圖的譜系圖來重算狀態,檢查點機制則能夠控制須要在轉化圖中回溯多遠。
  • 2)提供驅動器程序容錯。若是流計算應用中的驅動器程序崩潰了,你能夠重啓驅動器程序並讓驅動器程序從檢查點恢復,這樣 Spark Streaming 就能夠讀取以前運行的程序處理數據的進度,並從那裏繼續。

爲了實現這個,Spark Streaming 須要爲容錯存儲系統 checkpoint 提供足夠的信息從而使得其能夠從失敗中恢復過來。有兩種類型的數據設置檢查點:

Metadata checkpointing:將定義流計算的信息存入容錯的系統如 HDFS。元數據包括:
  配置 – 用於建立流應用的配置。
  DStreams 操做 – 定義流應用的 DStreams 操做集合。
  不完整批次 – 批次的工做已進行排隊可是並未完成。

Data checkpointing:將產生的 RDDs 存入可靠的存儲空間。對於在多批次間合併數據的狀態轉換,這個頗有必要。在這樣的轉換中,RDDs 的產生基於以前批次的 RDDs,這樣依賴鏈長度隨着時間遞增。爲了不在恢復期這種無限的時間增加(和鏈長度成比例),狀態轉換中間的 RDDs 週期性寫入可靠地存儲空間(如 HDFS)從而切短依賴鏈。

總而言之,元數據檢查點在由驅動失效中恢復是首要須要的。而數據或者 RDD 檢查點甚至在使用了狀態轉換的基礎函數中也是必要的。
出於這些緣由,檢查點機制對於任何生產環境中的流計算應用都相當重要。你能夠經過向 ssc.checkpoint() 方法傳遞一個路徑參數 (HDFS、S3 或者本地路徑都可) 來配置檢查點機制,同時你的應用應該可以使用檢查點的數據。
  • 1)當程序首次啓動,其將建立一個新的 StreamingContext,設置全部的流並調用 start()。
  • 2)當程序在失效後重啓,其將依據檢查點目錄的檢查點數據從新建立一個 StreamingContext。經過使用 StraemingContext.getOrCreate 很容易得到這個性能。

ssc.checkpoint("hdfs://...") 


# 建立和設置一個新的 StreamingContext
def functionToCreateContext():
    sc 
= SparkContext(...) # new context
    ssc = new StreamingContext(...)
    lines = ssc.socketTextStream(...) # create DStreams
    ...
    ssc.checkpoint(checkpointDirectory) # 設置檢查點目錄
    return ssc

# 從檢查點數據中獲取 StreamingContext 或者從新建立一個
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# 在須要完成的 context 上作額外的配置

# 不管其有沒有啓動
context ...

# 啓動 context
context.start()
contaxt.awaitTermination()

  若是檢查點目錄(checkpointDirectory)存在,那麼 context 將會由檢查點數據從新建立。若是目錄不存在(首次運行),那麼函數 functionToCreateContext 將會被調用來建立一個新的 context 並設置 DStreams。
  注意:RDDs 的檢查點引發存入可靠內存的開銷。在 RDDs 須要檢查點的批次裏,處理的時間會所以而延長。因此,檢查點的間隔須要很仔細地設置。在小尺寸批次(1秒鐘)。每一批次檢查點會顯著減小操做吞吐量。反之,檢查點設置的過於頻繁致使「血統」和任務尺寸增加,這會有很很差的影響。對於須要 RDD 檢查點設置的狀態轉換,默認間隔是批次間隔的乘數通常至少爲 10 秒鐘。能夠經過 dstream.checkpoint(checkpointInterval)。一般,檢查點設置間隔是 5-10 個 DStream 的滑動間隔。

4.9.2 驅動器程序容錯

驅動器程序的容錯要求咱們以特殊的方式建立 StreamingContext。咱們須要把檢查點目錄提供給 StreamingContext。與直接調用 new StreamingContext 不一樣,應該使用 StreamingContext.getOrCreate() 函數。

4.9.3 工做節點容錯

爲了應對工做節點失敗的問題,Spark Streaming 使用與 Spark 的容錯機制相同的方法。全部從外部數據源中收到的數據都在多個工做節點上備份。全部從備份數據轉化操做的過程當中建立出來的 RDD 都能容忍一個工做節點的失敗,由於根據 RDD 譜系圖,系統能夠把丟失的數據從倖存的輸入數據備份中重算出來

4.9.4 接收器容錯

  運行接收器的工做節點的容錯也是很重要的。若是這樣的節點發生錯誤,Spark Streaming 會在集羣中別的節點上重啓失敗的接收器。然而,這種狀況會不會致使數據的丟失取決於數據源的行爲(數據源是否會重發數據) 以及接收器的實現(接收器是否會向數據源確認收到數據)。舉個例子,使用 Flume 做爲數據源時,兩種接收器的主要區別在於數據丟失時的保障。在 「接收器從數據池中拉取數據」 的模型中,Spark 只會在數據已經在集羣中備份時纔會從數據池中移除元素。而在 「向接收器推數據」 的模型中,若是接收器在數據備份以前失敗,一些數據可能就會丟失。總的來講,對於任意一個接收器,必須同時考慮上游數據源的容錯性(是否支持事務)來確保零數據丟失。
  總的來講,接收器提供如下保證:
  • 全部從可靠文件系統中讀取的數據 (好比經過 StreamingContext.hadoopFiles 讀取的) 都是可靠的,由於底層的文件系統是有備份的。Spark Streaming 會記住哪些數據存放到了檢查點中,並在應用崩潰後從檢查點處繼續執行。
  • 對於像 Kafka、推式 Flume、Twitter 這樣的不可靠數據源,Spark 會把輸入數據複製到其餘節點上,可是若是接收器任務崩潰,Spark 仍是會丟失數據。在 Spark 1.1 以及更早的版本中,收到的數據只被備份到執行器進程的內存中,因此一旦驅動器程序崩潰(此時全部的執行器進程都會丟失鏈接),數據也會丟失。在 Spark 1.2 中,收到的數據被記錄到諸如 HDFS 這樣的可靠的文件系統中,這樣即便驅動器程序重啓也不會致使數據丟失。
  綜上所述,確保全部數據都被處理的最佳方式是使用可靠的數據源(例如 HDFS、拉式 Flume 等)。若是你還要在批處理做業中處理這些數據,使用可靠數據源是最佳方式,由於這種方式確保了你的批處理做業流計算做業能讀取到相同的數據,於是能夠獲得相同的結果。

4.9.5 處理保證

  因爲 Spark Streaming 工做節點的容錯保障,Spark Streaming 能夠爲全部的轉化操做提供 「精確一次」 執行的語義,即便一個工做節點在處理部分數據時發生失敗,最終的轉化結果(即轉化操做獲得的 RDD)仍然與數據只被處理一次獲得的結果同樣。
  然而,當把轉化操做獲得的結果使用輸出操做推入外部系統中時,寫結果的任務可能因故障而執行屢次,一些數據可能也就被寫了屢次。因爲這引入了外部系統,所以咱們須要專門針對各系統的代碼來處理這樣的狀況。咱們能夠使用事務操做來寫入外部系統(即原子化地將一個 RDD 分區一次寫入),或者設計冪等的更新操做(即屢次運行同一個更新操做仍生成相同的結果)。好比 Spark Streaming 的 saveAs…File 操做會在一個文件寫完時自動將其原子化地移動到最終位置上,以此確保每一個輸出文件只存在一份。

4.10 性能考量

  最多見的問題是 Spark Streaming 可使用的最小批次間隔是多少。總的來講,500 毫秒已經被證明爲對許多應用而言是比較好的最小批次大小。尋找最小批次大小的最佳實踐是從一個比較大的批次大小(10 秒左右)開始,不斷使用更小的批次大小。若是 Streaming 用戶界面中顯示的處理時間保持不變,你就能夠進一步減少批次大小。若是處理時間開始增長,你可能已經達到了應用的極限。
  類似地,對於窗口操做,計算結果的間隔(也就是滑動步長)對於性能也有巨大的影響。當計算代價巨大併成爲系統瓶頸時,就應該考慮提升滑動步長了。
  減小批處理所消耗時間的常見方式還有提升並行度。有如下三種方式能夠提升並行度:
  • 增長接收器數目。有時若是記錄太多會致使單臺機器來不及讀入並分發的話,接收器會成爲系統瓶頸。這時你就須要經過建立多個輸入 DStream(這樣會建立多個接收器) 來增長接收器數目,而後使用 union 來把數據合併爲一個數據源。
  • 將收到的數據顯式地從新分區。若是接收器數目沒法再增長,你能夠經過使用 DStream.repartition 來顯式從新分區輸入流(或者合併多個流獲得的數據流) 來從新分配收到的數據。
  • 提升聚合計算的並行度。對於像 reduceByKey() 這樣的操做,你能夠在第二個參數中指定並行度,咱們在介紹 RDD 時提到過相似的手段。

相關文章
相關標籤/搜索