Spark Streaming——Spark第一代實時計算引擎

雖然SparkStreaming已經中止更新,Spark的重點也放到了 Structured Streaming ,但因爲Spark版本太低或者其餘技術選型問題,可能仍是會選擇SparkStreaming。
SparkStreaming對於時間窗口,事件時間雖然支撐較少,但仍是能夠知足部分的實時計算場景的,SparkStreaming資料較多,這裏也作一個簡單介紹。html

一. 什麼是Spark Streaming

Spark Streaming在當時是爲了與當時的Apache Storm競爭,也讓Spark能夠用於流式數據的處理。根據其官方文檔介紹,Spark Streaming有高吞吐量和容錯能力強等特色。Spark Streaming支持的數據輸入源不少,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數據輸入後能夠用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結果也能保存在不少地方,如HDFS,數據庫等。另外Spark Streaming也能和MLlib(機器學習)以及Graphx完美融合。
固然Storm目前已經漸漸淡出,Flink開始大放異彩。java

Spark與Storm的對比

2、SparkStreaming入門

Spark Streaming 是 Spark Core API 的擴展,它支持彈性的,高吞吐的,容錯的實時數據流的處理。數據能夠經過多種數據源獲取,例如 Kafka,Flume,Kinesis 以及 TCP sockets,也能夠經過例如 mapreducejoinwindow 等的高級函數組成的複雜算法處理。最終,處理後的數據能夠輸出到文件系統,數據庫以及實時儀表盤中。事實上,你還能夠在 data streams(數據流)上使用 [機器學習] 以及 [圖計算] 算法。
在內部,它工做原理以下,Spark Streaming 接收實時輸入數據流並將數據切分紅多個 batch(批)數據,而後由 Spark 引擎處理它們以生成最終的 stream of results in batches(分批流結果)。redis

Spark Streaming 提供了一個名爲 discretized streamDStream 的高級抽象,它表明一個連續的數據流。DStream 能夠從數據源的輸入數據流建立,例如 Kafka,Flume 以及 Kinesis,或者在其餘 DStream 上進行高層次的操做以建立。在內部,一個 DStream 是經過一系列的 [RDDs] 來表示。算法

本指南告訴你如何使用 DStream 來編寫一個 Spark Streaming 程序。你可使用 Scala,Java 或者 Python(Spark 1.2 版本後引進)來編寫 Spark Streaming 程序。數據庫

在idea中新建maven項目apache

引入依賴bootstrap

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>

Project Structure —— Global Libraries —— 把scala 添加到 add module設計模式

新建Scala Classapi

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object Demo {

  //屏蔽日誌
  Logger.getLogger("org.apache")setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {

    //local會有問題  最少兩個線程  一個拿數據 一個計算
    //val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
    val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[2]")

    //時間間隔
    val ssc = new StreamingContext(conf,Seconds(1))

    //接收數據 處理

    //socket  demo
    val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)


    val words: DStream[String] = value.flatMap(_.split(" "))

    val wordsTuple: DStream[(String, Int)] = words.map((_, 1))

    val wordcount: DStream[(String, Int)] = wordsTuple.reduceByKey(_ + _)

    //觸發action
    wordcount.print()

    ssc.start()

    //保持流的運行  等待程序被終止
    ssc.awaitTermination()

  }

}

測試數組

下載一個win10 用的netcat

https://eternallybored.org/misc/netcat/

下載netcat 1.12

解壓 在目錄下啓動cmd

輸入

nc  -L -p 9999

開始輸入單詞 在idea中驗證接收

原理

初始化StreamingContext

爲了初始化一個 Spark Streaming 程序,一個 StreamingContext 對象必需要被建立出來,它是全部的 Spark Streaming 功能的主入口點。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

appName 參數是展現在集羣 UI 界面上的應用程序的名稱

master 是local 或者spark集羣的url(mesos yarn)

本地測試能夠用local[*] 注意要多於兩個線程

Second(1)定義的是batch interval 批處理間隔 就是間隔多久去拿一次數據

在定義一個 context 以後,您必須執行如下操做。

  1. 經過建立輸入 DStreams 來定義輸入源。
  2. 經過應用轉換和輸出操做 DStreams 定義流計算(streaming computations)。
  3. 開始接收輸入而且使用 streamingContext.start() 來處理數據。
  4. 使用 streamingContext.awaitTermination() 等待處理被終止(手動或者因爲任何錯誤)。
  5. 使用 streamingContext.stop() 來手動的中止處理。

須要記住的幾點:

  • 一旦一個 context 已經啓動,將不會有新的數據流的計算能夠被建立或者添加到它。
  • 一旦一個 context 已經中止,它不會被從新啓動。
  • 同一時間內在 JVM 中只有一個 StreamingContext 能夠被激活。
  • 在 StreamingContext 上的 stop() 一樣也中止了 SparkContext。爲了只中止 StreamingContext,設置 stop() 的可選參數,名叫 stopSparkContext 爲 false。
  • 一個 SparkContext 就能夠被重用以建立多個 StreamingContexts,只要前一個 StreamingContext 在下一個StreamingContext 被建立以前中止(不中止 SparkContext)。
Discretized Stream or DStream

Discretized Stream or DStream 是 Spark Streaming 提供的基本抽象。它表明了一個連續的數據流。多是數據源接收的流,也多是轉換後的流。

DStream就是多個和時間相關的一系列連續RDD的集合,好比本例就是間隔一秒的一堆RDD的集合

DStream也是有依賴關係的

flatMap 操做也是直接做用在DStream上的,就和做用於RDD同樣 這樣很好理解

咱們先來看數據源接收的流 這種叫作Input DStreams 他會經過Receivers接收器去不一樣的數據源接收數據。

Spark Streaming內置了兩種數據源:

  • 基礎的數據源:好比剛纔用的socket接收 還有file systems
  • 高級的數據源:好比kafka 還有flume kinesis等等

注意本地運行時,不要用local或者local[1],一個線程不夠。放到集羣上時分配給SparkStreaming的核數必須大於接收器的數量,留一個核去處理數據。

咱們也能夠自定義數據源,那咱們就須要本身開發一個接收器。

Transformations

在咱們接收到Dstreams以後能夠進行轉換操做,常見轉換以下:

Transformation(轉換) Meaning(含義)
map(func) 利用函數 func 處理原 DStream 的每一個元素,返回一個新的 DStream。
flatMap(func) 與 map 類似,可是每一個輸入項可用被映射爲 0 個或者多個輸出項。。
filter(func) 返回一個新的 DStream,它僅僅包含原 DStream 中函數 func 返回值爲 true 的項。
repartition(numPartitions) 經過建立更多或者更少的 partition 以改變這個 DStream 的並行級別(level of parallelism)。
union(otherStream) 返回一個新的 DStream,它包含源 DStream 和 otherDStream 的全部元素。
count() 經過 count 源 DStream 中每一個 RDD 的元素數量,返回一個包含單元素(single-element)RDDs 的新 DStream。
reduce(func) 利用函數 func 彙集源 DStream 中每一個 RDD 的元素,返回一個包含單元素(single-element)RDDs 的新 DStream。函數應該是相關聯的,以使計算能夠並行化。
countByValue() 在元素類型爲 K 的 DStream上,返回一個(K,long)pair 的新的 DStream,每一個 key 的值是在原 DStream 的每一個 RDD 中的次數。
reduceByKey(func, [numTasks]) 當在一個由 (K,V) pairs 組成的 DStream 上調用這個算子時,返回一個新的,由 (K,V) pairs 組成的 DStream,每個 key 的值均由給定的 reduce 函數聚合起來。注意:在默認狀況下,這個算子利用了 Spark 默認的併發任務數去分組。你能夠用 numTasks 參數設置不一樣的任務數。
join(otherStream, [numTasks]) 當應用於兩個 DStream(一個包含(K,V)對,一個包含 (K,W) 對),返回一個包含 (K, (V, W)) 對的新 DStream。
cogroup(otherStream, [numTasks]) 當應用於兩個 DStream(一個包含(K,V)對,一個包含 (K,W) 對),返回一個包含 (K, Seq[V], Seq[W]) 的 tuples(元組)。
transform(func) 經過對源 DStream 的每一個 RDD 應用 RDD-to-RDD 函數,建立一個新的 DStream。這個能夠在 DStream 中的任何 RDD 操做中使用。
updateStateByKey(func) 返回一個新的 "狀態" 的 DStream,其中每一個 key 的狀態經過在 key 的先前狀態應用給定的函數和 key 的新 valyes 來更新。這能夠用於維護每一個 key 的任意狀態數據。

這裏咱們特別介紹一下updateStateByKey

咱們若是須要對歷史數據進行統計,可能須要去kafka裏拿一下以前留存的數據,也能夠用updateStateByKey這個方法。

//保存狀態  聚合相同的單詞
    val  wordcount = wordsTuple.updateStateByKey[Int](
      //updateFunction _
      (newValues: Seq[Int], runningCount: Option[Int])=> {
        val newCount = Some(newValues.sum + runningCount.getOrElse(0))
        newCount
      }
    )

好比剛纔的單詞計數,咱們只能統計每一次發過來的消息,可是若是但願統計屢次消息就須要用到這個,咱們要指定一個checkpoint,就是從哪開始算。

//增長成員變量
val checkpointDir = "./ckp"

//在方法中加入checkpoint
ssc.checkpoint(checkpointDir)
    val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    value.checkpoint(Seconds(4))//官方建議批次時間的1-5倍

這時候咱們創建StreamingContext的方法就要改變了 咱們把剛纔的建立過程提取成方法。

def creatingFunc():StreamingContext = {

    val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(1))

    ssc.checkpoint(checkpointDir)


    val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

    value.checkpoint(Seconds(4))//官方建議批次時間的1-5倍

    val words: DStream[String] = value.flatMap(_.split(" "))

    val wordsTuple: DStream[(String, Int)] = words.map((_, 1))


    //保存狀態  聚合相同的單詞
    val  wordcount = wordsTuple.updateStateByKey[Int](
      //updateFunction _
      (newValues: Seq[Int], runningCount: Option[Int])=> {
        val newCount = Some(newValues.sum + runningCount.getOrElse(0))
        newCount
      }
    )

    //觸發action
    wordcount.print()
    ssc
  }

在mian函數中修改成:

def main(args: Array[String]): Unit = {
      val ssc = StreamingContext.getOrCreate(checkpointDir,creatingFunc _)
      ssc.start()
      //保持流的運行  等待程序被終止
      ssc.awaitTermination()
}

這樣就是,若是有checkpoint,程序會在checkpoint中把程序加載回來(程序被保存爲二進制),沒有checkpoint的話纔會建立。

將目錄下的checkpoint刪除,就能夠將狀態刪除。

生產中updateStateByKey因爲會將數據備份要慎重使用,能夠考慮用hbase,redis等作替代。或者藉助kafka作聚合處理。

//若是不用updatestateByKey  能夠考慮redis
    wordsTuple.foreachRDD(rdd => {
      rdd.foreachPartition(i =>
        {
          //redis
        }
      )
    })
窗口操做

Spark Streaming 也支持 _windowed computations(窗口計算),它容許你在數據的一個滑動窗口上應用 transformation(轉換)。

如上圖顯示,窗口在源 DStream 上 _slides(滑動),任何一個窗口操做都須要指定兩個參數:

  • window length(窗口長度) - 窗口的持續時間。
  • sliding interval(滑動間隔) - 執行窗口操做的間隔。

好比計算過去30秒的詞頻:

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些經常使用的窗口操做以下所示,這些操做都須要用到上文提到的兩個參數 - windowLength(窗口長度)slideInterval(滑動的時間間隔)

Transformation(轉換) Meaning(含義)
window(windowLength, slideInterval) 返回一個新的 DStream,它是基於 source DStream 的窗口 batch 進行計算的。
countByWindow(windowLength, slideInterval) 返回 stream(流)中滑動窗口元素的數
reduceByWindow(func, windowLength, slideInterval) 返回一個新的單元素 stream(流),它經過在一個滑動間隔的 stream 中使用 func 來聚合以建立。該函數應該是 associative(關聯的)且 commutative(可交換的),以便它能夠並行計算
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 在一個 (K, V) pairs 的 DStream 上調用時,返回一個新的 (K, V) pairs 的 Stream,其中的每一個 key 的 values 是在滑動窗口上的 batch 使用給定的函數 func 來聚合產生的。Note(注意): 默認狀況下,該操做使用 Spark 的默認並行任務數量(local model 是 2,在 cluster mode 中的數量經過 spark.default.parallelism 來肯定)來作 grouping。您能夠經過一個可選的 numTasks 參數來設置一個不一樣的 tasks(任務)數量。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 上述 reduceByKeyAndWindow() 的更有效的一個版本,其中使用前一窗口的 reduce 值逐漸計算每一個窗口的 reduce值。這是經過減小進入滑動窗口的新數據,以及 「inverse reducing(逆減)」 離開窗口的舊數據來完成的。一個例子是當窗口滑動時」添加」 和 「減」 keys 的數量。然而,它僅適用於 「invertible reduce functions(可逆減小函數)」,即具備相應 「inverse reduce(反向減小)」 函數的 reduce 函數(做爲參數 invFunc </ i>)。像在 reduceByKeyAndWindow 中的那樣,reduce 任務的數量能夠經過可選參數進行配置。請注意,針對該操做的使用必須啓用 checkpointing.
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 在一個 (K, V) pairs 的 DStream 上調用時,返回一個新的 (K, Long) pairs 的 DStream,其中每一個 key 的 value 是它在一個滑動窗口以內的頻次。像 code>reduceByKeyAndWindow 中的那樣,reduce 任務的數量能夠經過可選參數進行配置。
Join操做

在 Spark Streaming 中能夠執行不一樣類型的 join

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
//也能夠用窗口
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
DStreams輸出操做

輸出操做容許將 DStream 的數據推送到外部系統,如數據庫或文件系統。

會觸發全部變換的執行,相似RDD的action操做。有以下操做:

Output Operation Meaning
print() 在運行流應用程序的 driver 節點上的DStream中打印每批數據的前十個元素。這對於開發和調試頗有用。
Python API 這在 Python API 中稱爲 pprint()
saveAsTextFiles(prefix, [suffix]) 將此 DStream 的內容另存爲文本文件。每一個批處理間隔的文件名是根據 前綴後綴_:"prefix-TIME_IN_MS[.suffix]"_ 生成的。
saveAsObjectFiles(prefix, [suffix]) 將此 DStream 的內容另存爲序列化 Java 對象的 SequenceFiles。每一個批處理間隔的文件名是根據 前綴後綴_:"prefix-TIME_IN_MS[.suffix]"_ 生成的。
Python API 這在Python API中是不可用的。
saveAsHadoopFiles(prefix, [suffix]) 將此 DStream 的內容另存爲 Hadoop 文件。每一個批處理間隔的文件名是根據 前綴後綴_:"prefix-TIME_IN_MS[.suffix]"_ 生成的。
Python API 這在Python API中是不可用的。
foreachRDD(func) 對從流中生成的每一個 RDD 應用函數 func 的最通用的輸出運算符。此功能應將每一個 RDD 中的數據推送到外部系統,例如將 RDD 保存到文件,或將其經過網絡寫入數據庫。請注意,函數 func 在運行流應用程序的 driver 進程中執行,一般會在其中具備 RDD 動做,這將強制流式傳輸 RDD 的計算。
foreachRDD設計模式使用

dstream.foreachRDD容許將數據發送到外部系統。

但咱們不要每次都建立一個鏈接,解決方案以下:

減小開銷,分區分攤開銷

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

更好的作法是用靜態資源池:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

鏈接Kafka

Apache Kafka是一個高性能的消息系統,由Scala 寫成。是由Apache 軟件基金會開發的一個開源消息系統項目。

Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2012 年10 月從Apache Incubator 畢業。該項目的目標是爲處理實時數據提供一個統1、高通量、低等待(低延時)的平臺。

更多kafka相關請查看Kafka入門寶典(詳細截圖版)

Spark Streaming 2.4.4兼容 kafka 0.10.0 或者更高的版本

Spark Streaming在2.3.0版本以前是提供了對kafka 0.8 和 0.10的支持的 ,不過在2.3.0之後對0.8的支持取消了。

Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.

spark-streaming-kafka-0-8 spark-streaming-kafka-0-10
Broker Version 0.8.2.1 or higher 0.10.0 or higher
API Maturity Deprecated Stable
Language Support Scala, Java, Python Scala, Java
Receiver DStream Yes No
Direct DStream Yes Yes
SSL / TLS Support No Yes
Offset Commit API No Yes
Dynamic Topic Subscription No Yes
Receiver

這裏簡單介紹一下對kafka0.8的一種支持方式:基於Receiver

依賴:

groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.12
 version = 2.4.4
import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

這種狀況 程序停掉數據會丟失,爲了避免丟失本身又寫了一份,這種是不少餘的。

因爲採用了kafka高階api,偏移量offset不可控。

Direct

Kafka 0.10.0版本之後,採用了更好的一種Direct方式,這種咱們須要本身維護偏移量offset。

直連方式 並行度會更高 生產環境用的最多,0.8版本須要在zk或者redis等地方本身維護偏移量。咱們使用0.10以上版本支持本身設置偏移量,咱們只須要本身將偏移量寫回kafka就能夠。

依賴

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 2.4.4

kafka 0.10之後 能夠將offset寫回kafka 咱們不須要本身維護offset了,具體代碼以下:

val conf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(2))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      //latest  none   earliest
      "auto.offset.reset" -> "earliest",
      //自動提交偏移量 false
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    //val topics = Array("topicA", "topicB")
    val topics = Array("test_topic")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      // 與kafka broker不在一個節點上  用不一樣策略
      //在一個節點用 PreferBrokers策略  不多見
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    stream.foreachRDD(rdd => {
      //普通的RDD不能強轉HasOffsetRanges   但kafkaRDD有 with這個特性 能夠強轉
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //處理數據 計算邏輯
      rdd.foreachPartition { iter =>
        //一次處理一個分區的數據  獲取這個分區的偏移量
        //計算完之後修改偏移量  要開啓事務 相似數據庫 connection -> conn.setAutoCommit(false) 各類操做  conn.commit(); conn.rollback()
        //獲取偏移量  若是要本身記錄的話這個
        //val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
        //println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        //處理數據
         iter.foreach(println)
      }
      //kafka 0.10新特性  處理完數據後  將偏移量寫回kafka
      // some time later, after outputs have completed
      //kafka有一個特殊的topic  保存偏移量
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })

更多Flink,Kafka,Spark等相關技術博文,科技資訊,歡迎關注實時流式計算 公衆號後臺回覆 「電子書」 下載300頁Flink實戰電子書

相關文章
相關標籤/搜索