Spark Streaming 編程指南[中]

基於Spark 2.0 Preview的材料翻譯,原[英]文地址: html

http://spark.apache.org/docs/2.0.0-preview/streaming-programming-guide.htmljava

Streaming應用實戰,參考:http://my.oschina.net/u/2306127/blog/635518python

Spark Streaming編程指南

概覽

Spark Streaming 是基於Spark 核心API的擴展,使高伸縮性、高帶寬、容錯的流式數據處理成爲可能。數據能夠來自於多種源,如Kafka、Flume、Kinesis、或者TCP sockets等,並且可使用map、reducejoinwindow等高級接口實現複雜算法的處理。最終,處理的數據能夠被推送到數據庫、文件系統以及動態佈告板。實際上,你還能夠將Spark的機器學習( machine learning) 和圖計算 (graph processing )算法用於數據流的處理。git

Spark Streaming

內部工做流程以下。Spark Streaming接收數據流的動態輸入,而後將數據分批,每一批數據經過Spark建立一個結果數據集而後進行處理。github

Spark Streaming

Spark Streaming提供一個高級別的抽象-離散數據流(DStream),表明一個連續的數據流。DStreams能夠從Kafka, Flume, and Kinesis等源中建立,或者在其它的DStream上執行高級操做。在內部,DStream表明一系列的 RDDsweb

本指南將岩石如何經過DStreams開始編寫一個Spark Streaming程序。你可使用Scala、Java或者Python。能夠經過相應的鏈接切換去查看相應語言的代碼。算法

注意:這在Python裏有一些不一樣,不多部分API暫時沒有,本指南進行了Python API標註。sql


快速例程

在開始Spark Streaming編程以前咱們先看看一個簡單的Spark Streaming程序將長什麼樣子。咱們從基於TCP socket的數據服務器接收一個文本數據,而後對單詞進行計數。看起來像下面這個樣子。shell

首先,咱們導入Spark Streaming的類命名空間和一些StreamingContext的轉換工具。 StreamingContext 是全部的Spark Streaming功能的主入口點。咱們建立StreamingContext,指定兩個執行線程和分批間隔爲1秒鐘。數據庫

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

使用這個context,咱們能夠建立一個DStream,這是來自於TCP數據源 的流數據,咱們經過hostname (e.g. localhost) 和端口 (e.g. 9999)來指定這個數據源。

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

這裏line是一個DStream對象,表明從服務器收到的流數據。每個DStream中的記錄是一個文本行。下一步,咱們將每一行中以空格分開的單詞分離出來。

// Split each line into words
val words = lines.flatMap(_.split(" "))

flatMap是「一對多」的DStream操做,經過對源DStream的每個記錄產生多個新的記錄建立新DStream。這裏,每一行將被分解多個單詞,而且單詞流表明了words DStream。下一步,咱們對這些單詞進行計數統計。

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

words DStream而後映射爲(word, 1)的鍵值對的Dstream,而後用於統計單詞出現的頻度。最後,wordCounts.print()打印出每秒鐘建立出的計數值。

注意,上面這些代碼行執行的時候,僅僅是設定了計算執行的邏輯,並無真正的處理數據。在全部的設定完成後,爲了啓動處理,須要調用:

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

完整的代碼能夠Spark Streaming 的例程 NetworkWordCount 中找到。

若是已經下載和構建了Spark,你能夠按照下面的方法運行這個例子。首先運行Netcat(一個Unix風格的小工具)做爲數據服務器,以下所示:

$ nc -lk 9999

而後,到一個控制檯窗口,啓動例程:

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

而後,任何在netcat服務器運行控制檯鍵入的行都會被計數而後每隔一秒鐘在屏幕上打印出來,以下所示:

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world



...
 
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...


基本概念

下一步,咱們將離開這個簡單的例子,詳細闡述Spark Streaming的基本概念和功能。

連接

與Spark相似,Spark Streaming也能夠經過Maven中心庫訪問。爲了編寫你本身的Spark Streaming程序,您將加入下面的依賴到你的SBT或者Maven工程文件。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.0.0-preview</version>
</dependency>

爲了從Kafka/Flume/Kinesis等非Spark Streaming核心API等數據源注入數據,咱們須要添加對應的spark-streaming-xyz_2.11到依賴中。例如,像下面的這樣:

Source Artifact
Kafka spark-streaming-kafka-0-8_2.11
Flume spark-streaming-flume_2.11
Kinesis spark-streaming-kinesis-asl_2.11 [Amazon Software License]
   

對於最新的列表,參考Maven repository 得到全面的數據源河訪問部件的列表。


初始化StreamingContext

爲了初始化Spark Streaming程序,StreamingContext 對象必須首先建立做爲總入口。

 StreamingContext 對象能夠經過 SparkConf 對象建立,以下所示。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

這裏 appName參數是應用在集羣中的名稱。 masterSpark, Mesos 或 YARN cluster URL或者「local[*]」 字符串指示運行在 local 模式下。實踐中,當運行一個集羣, 您不該該硬編碼 master 參數在集羣中, 而是經過 launch the application with spark-submit 接收其參數。可是, 對於本地測試和單元測試, 你能夠傳遞「local[*]」 來運行 Spark Streaming 在進程內運行(自動檢測本地系統的CPU內核數量)。 注意,這裏內部建立了 SparkContext (全部的Spark 功能的入口點) ,能夠經過 ssc.sparkContext進行存取。

分批間隔時間基於應用延遲需求和可用的集羣資源進行設定(譯註:設定間隔要大於應用數據的最小延遲需求,同時不能設置過小以致於系統沒法在給定的週期內處理完畢),參考 Performance Tuning 部分得到更多信息。

StreamingContext 對象也能夠從已有的 SparkContext 對象中建立。

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

 

在context建立以後,能夠接着開始以下的工做:

 

  1. 定義 input sources,經過建立 input DStreams完成。
  2. 定義 streaming 計算,經過DStreams的 transformation 和 output 操做實現。
  3. 啓動接收數據和處理,經過 streamingContext.start()
  4. 等待處理中止 (一般由於錯誤),經過streamingContext.awaitTermination().
  5. 處理過程能夠手動中止,經過 streamingContext.stop()

 

記住:

 

  • 一旦context啓動, 沒有新的streaming 計算能夠被設置和添加進來。
  • 一旦context被中止, 它不能被再次啓動。
  • 只有一個StreamingContext在JVM中在同一時間能夠被激活。
  • stop() 在StreamingContext執行時,同時中止了SparkContext。爲了僅終止StreamingContext, 在stopSparkContext的Stop時設置選項爲false。
  • SparkContext 能夠重用來建立多個 StreamingContexts, 一直到前一個StreamingContext被中止的時候 (不中止 SparkContext) ,才能建立下一個StreamingContext。

 DStreams離散數據流

離散數據流(DStream)是Spark Streaming最基本的抽象。它表明了一種連續的數據流,要麼從某種數據源提取數據,要麼從其餘數據流映射轉換而來。DStream內部是由一系列連 續的RDD組成的,每一個RDD都是不可變、分佈式的數據集(詳見Spark編程指南 – Spark Programming Guide)。每一個RDD都包含了特定時間間隔內的一批數據,以下圖所示:

Spark Streaming

任何做用於DStream的算子,其實都會被轉化爲對其內部RDD的操做。例如,在前面的例子中,咱們將 lines 這個DStream轉成words DStream對象,其實做用於lines上的flatMap算子,會施加於lines中的每一個RDD上,並生成新的對應的RDD,而這些新生成的RDD 對象就組成了words這個DStream對象。其過程以下圖所示:

Spark Streaming

底層的RDD轉換仍然是由Spark引擎來計算。DStream的算子將這些細節隱藏了起來,併爲開發者提供了更爲方便的高級API。後續會詳細討論這些高級算子。


輸入DStream和接收器

輸入DStream表明從某種流式數據源流入的數據流。在以前的例子裏,lines 對象就是輸入DStream,它表明從netcat server收到的數據流。每一個輸入DStream(除文件數據流外)都和一個接收器(Receiver – Scala docJava doc)相關聯,而接收器則是專門從數據源拉取數據到內存中的對象。

Spark Streaming主要提供兩種內建的流式數據源:

  • 基礎數據源(Basic sources): 在StreamingContext API 中可直接使用的源,如:文件系統,套接字鏈接或者Akka actor。
  • 高級數據源(Advanced sources): 須要依賴額外工具類的源,如:Kafka、Flume、Kinesis、Twitter等數據源。這些數據源都須要增長額外的依賴,詳見依賴連接(linking)這一節。

本節中,咱們將會從每種數據源中挑幾個繼續深刻討論。

注意,若是你須要同時從多個數據源拉取數據,那麼你就須要建立多個DStream對象(詳見後續的性能調優這一小節)。多個DStream對象其實也就同 時建立了多個數據流接收器。可是請注意,Spark的worker/executor 都是長期運行的,所以它們都會各自佔用一個分配給Spark Streaming應用的CPU。因此,在運行Spark Streaming應用的時候,須要注意分配足夠的CPU core(本地運行時,須要足夠的線程)來處理接收到的數據,同時還要足夠的CPU core來運行這些接收器。

要點
  • 若是本地運行Spark Streaming應用,記得不能將master設爲」local」 或 「local[1]」。這兩個值都只會在本地啓動一個線程。而若是此時你使用一個包含接收器(如:套接字、Kafka、Flume等)的輸入 DStream,那麼這一個線程只能用於運行這個接收器,而處理數據的邏輯就沒有線程來執行了。所以,本地運行時,必定要將master設 爲」local[n]」,其中 n > 接收器的個數(有關master的詳情請參考Spark Properties)。
  • 將Spark Streaming應用置於集羣中運行時,一樣,分配給該應用的CPU core數必須大於接收器的總數。不然,該應用就只會接收數據,而不會處理數據。

基礎數據源

前面的快速入門例子中,咱們已經看到,使用ssc.socketTextStream(…) 能夠從一個TCP鏈接中接收文本數據。而除了TCP套接字外,StreamingContext API 還支持從文件或者Akka actor中拉取數據。

  • 文件數據流(File Streams): 能夠從任何兼容HDFS API(包括:HDFS、S三、NFS等)的文件系統,建立方式以下:

    streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
    • Spark Streaming將監視該dataDirectory目錄,並處理該目錄下任何新建的文件(目前還不支持嵌套目錄)。注意:

    • 各個文件數據格式必須一致。
    • dataDirectory中的文件必須經過moving或者renaming來建立。
    • 一旦文件move進dataDirectory以後,就不能再改動。因此若是這個文件後續還有寫入,這些新寫入的數據不會被讀取。
    • 另外,文件數據流不是基於接收器的,因此不須要爲其單獨分配一個CPU core。

      Python API fileStream目前暫時不可用,Python目前只支持textFileStream。

      對於簡單的文本文件,更簡單的方式是調用 streamingContext.textFileStream(dataDirectory)。

  • 基於自定義Actor的數據流(Streams based on Custom Actors): DStream能夠由Akka actor建立獲得,只需調用 streamingContext.actorStream(actorProps, actor-name)。詳見自定義接收器(Custom Receiver Guide)。actorStream暫時不支持Python API。

  • RDD隊列數據流(Queue of RDDs as a Stream): 若是須要測試Spark Streaming應用,你能夠建立一個基於一批RDD的DStream對象,只需調用 streamingContext.queueStream(queueOfRDDs)。RDD會被一個個依次推入隊列,而DStream則會依次以數據 流形式處理這些RDD的數據。

關於套接字、文件以及Akka actor數據流更詳細信息,請參考相關文檔:StreamingContext for Scala,JavaStreamingContext for Java, and StreamingContext for Python。

高級數據源

Python API自 Spark 2.0.0(譯註:1.6.1就已經支持了) 起,Kafka、Kinesis、Flume和MQTT這些數據源將支持Python。

使用這類數據源須要依賴一些額外的代碼庫,有些依賴還挺複雜的(如:Kafka、Flume)。所以爲了減小依賴項版本衝突問題,各個數據源 DStream的相關功能被分割到不一樣的代碼包中,只有用到的時候才須要連接打包進來。

例如,若是你須要使用Twitter的tweets做爲數據源,你 須要如下步驟:

  1. Linking: 將spark-streaming-twitter_2.10工件加入到SBT/Maven項目依賴中。
  2. Programming: 導入TwitterUtils class,而後調用 TwitterUtils.createStream 建立一個DStream,具體代碼見下放。
  3. Deploying: 生成一個uber Jar包,幷包含其全部依賴項(包括 spark-streaming-twitter_2.10及其自身的依賴樹),再部署這個Jar包。部署詳情請參考部署這一節(Deploying section)。

注意,高級數據源在spark-shell中不可用,所以不能用spark-shell來測試基於高級數據源的應用。若是真有須要的話,你須要自行下載相應數據源的Maven工件及其依賴項,並將這些Jar包部署到spark-shell的classpath中。

下面列舉了一些高級數據源:

自定義數據源

Python API自定義數據源目前還不支持Python。

輸入DStream也能夠用自定義的方式建立。你須要作的只是實現一個自定義的接收器(receiver),以便從自定義的數據源接收數據,而後將數據推入Spark中。詳情請參考自定義接收器指南(Custom Receiver Guide)。

接收器可靠性

從可靠性角度來劃分,大體有兩種數據源。其中,像Kafka、Flume這樣的數據源,它們支持對所傳輸的數據進行確認。系統收到這類可靠數據源過來的數據,而後發出確認信息,這樣就可以確保任何失敗狀況下,都不會丟數據。所以咱們能夠將接收器也相應地分爲兩類:

  1. 可靠接收器(Reliable Receiver) – 可靠接收器會在成功接收並保存好Spark數據副本後,向可靠數據源發送確認信息。
  2. 可靠接收器(Unreliable Receiver) – 不可靠接收器不會發送任何確認信息。不過這種接收器經常使用語於不支持確認的數據源,或者不想引入數據確認的複雜性的數據源。

自定義接收器指南(Custom Receiver Guide)中詳細討論瞭如何寫一個可靠接收器。


DStream支持的transformation算子

和RDD相似,DStream也支持從輸入DStream通過各類transformation算子映射成新的DStream。DStream支持不少RDD上常見的transformation算子,一些經常使用的見下表:

Transformation算子 用途
map(func) 返回會一個新的DStream,並將源DStream中每一個元素經過func映射爲新的元素
flatMap(func) 和map相似,不過每一個輸入元素再也不是映射爲一個輸出,而是映射爲0到多個輸出
filter(func) 返回一個新的DStream,幷包含源DStream中被func選中(func返回true)的元素
repartition(numPartitions) 更改DStream的並行度(增長或減小分區數)
union(otherStream) 返回新的DStream,包含源DStream和otherDStream元素的並集
count() 返回一個包含單元素RDDs的DStream,其中每一個元素是源DStream中各個RDD中的元素個數
reduce(func) 返回一個包含單元素RDDs的DStream,其中每一個元素是經過源RDD中各個RDD的元素經func(func輸入兩個參數並返回一個同類型結果數據)聚合獲得的結果。func必須知足結合律,以便支持並行計算。
countByValue() 若是源DStream包含的元素類型爲K,那麼該算子返回新的DStream包含元素爲(K, Long)鍵值對,其中K爲源DStream各個元素,而Long爲該元素出現的次數。
reduceByKey(func, [numTasks]) 若是源DStream 包含的元素爲 (K, V) 鍵值對,則該算子返回一個新的也包含(K, V)鍵值對的DStream,其中V是由func聚合獲得的。注意:默認狀況下,該算子使用Spark的默認併發任務數(本地模式爲2,集羣模式下由 spark.default.parallelism 決定)。你能夠經過可選參數numTasks來指定併發任務個數。
join(otherStream, [numTasks]) 若是源DStream包含元素爲(K, V),同時otherDStream包含元素爲(K, W)鍵值對,則該算子返回一個新的DStream,其中源DStream和otherDStream中每一個K都對應一個 (K, (V, W))鍵值對元素。
cogroup(otherStream, [numTasks]) 若是源DStream包含元素爲(K, V),同時otherDStream包含元素爲(K, W)鍵值對,則該算子返回一個新的DStream,其中每一個元素類型爲包含(K, Seq[V], Seq[W])的tuple。
transform(func) 返回一個新的DStream,其包含的RDD爲源RDD通過func操做後獲得的結果。利用該算子能夠對DStream施加任意的操做。
updateStateByKey(func) 返回一個包含新」狀態」的DStream。源DStream中每一個key及其對應的values會做爲func的輸入,而func能夠用於對每一個key的「狀態」數據做任意的更新操做。
   

下面咱們會挑幾個transformation算子深刻討論一下。

updateStateByKey算子

updateStateByKey 算子支持維護一個任意的狀態。要實現這一點,只須要兩步:

  1. 定義狀態 – 狀態數據能夠是任意類型。
  2. 定義狀態更新函數 – 定義好一個函數,其輸入爲數據流以前的狀態和新的數據流數據,且可其更新步驟1中定義的輸入數據流的狀態。

在每個批次數據到達後,Spark都會調用狀態更新函數,來更新全部已有key(無論key是否存在於本批次中)的狀態。若是狀態更新函數返回None,則對應的鍵值對會被刪除。

舉例以下。假設你須要維護一個流式應用,統計數據流中每一個單詞的出現次數。這裏將各個單詞的出現次數這個整型數定義爲狀態。咱們接下來定義狀態更新函數以下:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

該狀態更新函數能夠做用於一個包括(word, 1) 鍵值對的DStream上(見本文開頭的例子)。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

該狀態更新函數會爲每一個單詞調用一次,且相應的newValues是一個包含不少個」1″的數組(這些1來自於(word,1)鍵值對),而runningCount包含以前該單詞的計數。本例的完整代碼請參考 StatefulNetworkWordCount.scala

注意,調用updateStateByKey前須要配置檢查點目錄,後續對此有詳細的討論,見檢查點(checkpointing)這節。

transform算子

transform算子(及其變體transformWith)能夠支持任意的RDD到RDD的映射操做。也就是說,你能夠用tranform算子來包裝 任何DStream API所不支持的RDD算子。例如,將DStream每一個批次中的RDD和另外一個Dataset進行關聯(join)操做,這個功能DStream API並無直接支持。不過你能夠用transform來實現這個功能,可見transform其實爲DStream提供了很是強大的功能支持。好比說, 你能夠用事先算好的垃圾信息,對DStream進行實時過濾。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
})

注意,這裏transform包含的算子,其調用時間間隔和批次間隔是相同的。因此你能夠基於時間改變對RDD的操做,如:在不一樣批次,調用不一樣的RDD算子,設置不一樣的RDD分區或者廣播變量等。

基於窗口(window)的算子

Spark Streaming一樣也提供基於時間窗口的計算,也就是說,你能夠對某一個滑動時間窗內的數據施加特定tranformation算子。以下圖所示:

Spark Streaming

如上圖所示,每次窗口滑動時,源DStream中落入窗口的RDDs就會被合併成新的windowed DStream。在上圖的例子中,這個操做會施加於3個RDD單元,而滑動距離是2個RDD單元。由此能夠得出任何窗口相關操做都須要指定一下兩個參數:

  • (窗口長度)window length – 窗口覆蓋的時間長度(上圖中爲3)
  • (滑動距離)sliding interval – 窗口啓動的時間間隔(上圖中爲2)

注意,這兩個參數都必須是DStream批次間隔(上圖中爲1)的整數倍.

下面我們舉個例子。假設,你須要擴展前面的那個小栗子,你須要每隔10秒統計一下前30秒內的單詞計數。爲此,咱們須要在包含(word, 1)鍵值對的DStream上,對最近30秒的數據調用reduceByKey算子。不過這些均可以簡單地用一個 reduceByKeyAndWindow搞定。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

如下列出了經常使用的窗口算子。全部這些算子都有前面提到的那兩個參數 – 窗口長度 和 滑動距離。

Transformation窗口算子 用途
window(windowLengthslideInterval) 將源DStream窗口化,並返回轉化後的DStream
countByWindow(windowLength,slideInterval) 返回數據流在一個滑動窗口內的元素個數
reduceByWindow(funcwindowLength,slideInterval) 基於數據流在一個滑動窗口內的元素,用func作聚合,返回一個單元素數據流。func必須知足結合律,以便支持並行計算。
reduceByKeyAndWindow(func,windowLengthslideInterval, [numTasks]) 基於(K, V)鍵值對DStream,將一個滑動窗口內的數據進行聚合,返回一個新的包含(K,V)鍵值對的DStream,其中每一個value都是各個key通過func聚合後的結果。
注意:若是不指定numTasks,其值將使用Spark的默認並行任務數(本地模式下爲2,集羣模式下由 spark.default.parallelism決定)。固然,你也能夠經過numTasks來指定任務個數。
reduceByKeyAndWindow(funcinvFunc,windowLength,slideInterval, [numTasks]) 和前面的reduceByKeyAndWindow() 相似,只是這個版本會用以前滑動窗口計算結果,遞增地計算每一個窗口的歸約結果。當新的數據進入窗口時,這些values會被輸入func作歸約計算,而這 些數據離開窗口時,對應的這些values又會被輸入 invFunc 作」反歸約」計算。舉個簡單的例子,就是把新進入窗口數據中各個單詞個數「增長」到各個單詞統計結果上,同時把離開窗口數據中各個單詞的統計個數從相應的 統計結果中「減掉」。不過,你的本身定義好」反歸約」函數,即:該算子不只有歸約函數(見參數func),還得有一個對應的」反歸約」函數(見參數中的 invFunc)。和前面的reduceByKeyAndWindow() 相似,該算子也有一個可選參數numTasks來指定並行任務數。注意,這個算子須要配置好檢查點(checkpointing)才能用。
countByValueAndWindow(windowLength,slideInterval, [numTasks]) 基於包含(K, V)鍵值對的DStream,返回新的包含(K, Long)鍵值對的DStream。其中的Long value都是滑動窗口內key出現次數的計數。
和前面的reduceByKeyAndWindow() 相似,該算子也有一個可選參數numTasks來指定並行任務數。
   

Join相關算子

最後,值得一提的是,你在Spark Streaming中作各類關聯(join)操做很是簡單。

流-流(Stream-stream)關聯

一個數據流能夠和另外一個數據流直接關聯。

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

上面代碼中,stream1的每一個批次中的RDD會和stream2相應批次中的RDD進行join。一樣,你能夠相似地使用 leftOuterJoin, rightOuterJoin, fullOuterJoin 等。此外,你還能夠基於窗口來join不一樣的數據流,其實現也很簡單,以下;)

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
流-數據集(stream-dataset)關聯

其實這種狀況已經在前面的DStream.transform算子中介紹過了,這裏再舉個基於滑動窗口的例子。

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

實際上,在上面代碼裏,你能夠動態地該表join的數據集(dataset)。傳給tranform算子的操做函數會在每一個批次從新求值,因此每次該函數都會用最新的dataset值,因此不一樣批次間你能夠改變dataset的值。

完整的DStream transformation算子列表見API文檔。Scala請參考 DStream 和 PairDStreamFunctions. Java請參考 JavaDStream 和 JavaPairDStream. Python見 DStream



DStream輸出算子

輸出算子能夠將DStream的數據推送到外部系統,如:數據庫或者文件系統。由於輸出算子會將最終完成轉換的數據輸出到外部系統,所以只有輸出算 子調用時,纔會真正觸發DStream transformation算子的真正執行(這一點相似於RDD 的action算子)。目前所支持的輸出算子以下表:

輸出算子 用途
print() 在驅動器(driver)節點上打印DStream每一個批次中的頭十個元素。
Python API 對應的Python API爲 pprint()
saveAsTextFiles(prefix, [suffix]) 將DStream的內容保存到文本文件。
每一個批次一個文件,各文件命名規則爲 「prefix-TIME_IN_MS[.suffix]」
saveAsObjectFiles(prefix, [suffix]) 將DStream內容以序列化Java對象的形式保存到順序文件中。
每一個批次一個文件,各文件命名規則爲 「prefix-TIME_IN_MS[.suffix]」Python API 暫不支持Python
saveAsHadoopFiles(prefix, [suffix]) 將DStream內容保存到Hadoop文件中。
每一個批次一個文件,各文件命名規則爲 「prefix-TIME_IN_MS[.suffix]」Python API 暫不支持Python
foreachRDD(func) 這是最通用的輸出算子了,該算子接收一個函數func,func將做用於DStream的每一個RDD上。
func應該實現將每一個RDD的數據推到外部系統中,好比:保存到文件或者寫到數據庫中。
注意,func函數是在streaming應用的驅動器進程中執行的,因此若是其中包含RDD的action算子,就會觸發對DStream中RDDs的實際計算過程。

使用foreachRDD的設計模式

DStream.foreachRDD是一個很是強大的原生工具函數,用戶能夠基於此算子將DStream數據推送到外部系統中。不過用戶須要瞭解如何正確而高效地使用這個工具。如下列舉了一些常見的錯誤。

一般,對外部系統寫入數據須要一些鏈接對象(如:遠程server的TCP鏈接),以便發送數據給遠程系統。所以,開發人員可能會不經意地在Spark驅動器(driver)進程中建立一個鏈接對象,而後又試圖在Spark worker節點上使用這個鏈接。以下例所示:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

這段代碼是錯誤的,由於它須要把鏈接對象序列化,再從驅動器節點發送到worker節點。而這些鏈接對象一般都是不能跨節點(機器)傳遞的。好比,鏈接對 象一般都不能序列化,或者在另外一個進程中反序列化後再次初始化(鏈接對象一般都須要初始化,所以從驅動節點發到worker節點後可能須要從新初始化) 等。解決此類錯誤的辦法就是在worker節點上建立鏈接對象。

然而,有些開發人員可能會走到另外一個極端 – 爲每條記錄都建立一個鏈接對象,例如:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

通常來講,鏈接對象是有時間和資源開銷限制的。所以,對每條記錄都進行一次鏈接對象的建立和銷燬會增長不少沒必要要的開銷,同時也大大減少了系統的吞吐量。 一個比較好的解決方案是使用 rdd.foreachPartition – 爲RDD的每一個分區建立一個單獨的鏈接對象,示例以下:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

這樣一來,鏈接對象的建立開銷就攤到不少條記錄上了。

最後,還有一個更優化的辦法,就是在多個RDD批次之間複用鏈接對象。開發者能夠維護一個靜態鏈接池來保存鏈接對象,以便在不一樣批次的多個RDD之間共享同一組鏈接對象,示例以下:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

注意,鏈接池中的鏈接應該是懶惰建立的,而且有肯定的超時時間,超時後自動銷燬。這個實現應該是目前發送數據最高效的實現方式。

其餘要點:

  • DStream的轉化執行也是懶惰的,須要輸出算子來觸發,這一點和RDD的懶惰執行由action算子觸發很相似。特別地,DStream輸出 算子中包含的RDD action算子會強制觸發對所接收數據的處理。所以,若是你的Streaming應用中沒有輸出算子,或者你用了 dstream.foreachRDD(func)卻沒有在func中調用RDD action算子,那麼這個應用只會接收數據,而不會處理數據,接收到的數據最後只是被簡單地丟棄掉了。
  • 默認地,輸出算子只能一次執行一個,且按照它們在應用程序代碼中定義的順序執行。

累加器和廣播變量

首先須要注意的是,累加器(Accumulators)和廣播變量(Broadcast variables)是沒法從Spark Streaming的檢查點中恢復回來的。因此若是你開啓了檢查點功能,並同時在使用累加器和廣播變量,那麼你最好是使用懶惰實例化的單例模式,由於這樣累加器和廣播變量才能在驅動器(driver)故障恢復後從新實例化。代碼示例以下:

object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: Accumulator[Long] = null

  def getInstance(sc: SparkContext): Accumulator[Long] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}

wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
  // Get or register the blacklist Broadcast
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter += count
      false
    } else {
      true
    }
  }.collect()
  val output = "Counts at time " + time + " " + counts
})

這裏有完整代碼:source code


DataFrame和SQL相關算子

在Streaming應用中能夠調用DataFrames and SQL來 處理流式數據。開發者能夠用經過StreamingContext中的SparkContext對象來建立一個SQLContext,而且,開發者須要確保一旦驅動器(driver)故障恢復後,該SQLContext對象能從新建立出來。一樣,你仍是可使用懶惰建立的單例模式來實例化 SQLContext,以下面的代碼所示,這裏咱們將最開始的那個例子作了一些修改,使用DataFrame和SQL來統計單詞計數。其實就是,將每一個 RDD都轉化成一個DataFrame,而後註冊成臨時表,再用SQL查詢這些臨時表。

/** DataFrame operations inside your streaming program */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // Get the singleton instance of SQLContext
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame = 
    sqlContext.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

 

這裏有完整代碼:source code

你也能夠在其餘線程裏執行SQL查詢(異步查詢,即:執行SQL查詢的線程和運行StreamingContext的線程不一樣)。不過這種狀況下, 你須要確保查詢的時候 StreamingContext 沒有把所需的數據丟棄掉,不然StreamingContext有可能已將老的RDD數據丟棄掉了,那麼異步查詢的SQL語句也可能沒法獲得查詢結果。舉 個栗子,若是你須要查詢上一個批次的數據,可是你的SQL查詢可能要執行5分鐘,那麼你就須要StreamingContext至少保留最近5分鐘的數 據:streamingContext.remember(Minutes(5)) (這是Scala爲例,其餘語言差很少)

更多DataFrame和SQL的文檔見這裏: DataFrames and SQL


MLlib算子

MLlib 提供了不少機器學習算法。首先,你須要關注的是流式計算相關的機器學習算法(如:Streaming Linear RegressionStreaming KMeans),這些流式算法能夠在流式數據上一邊學習訓練模型,一邊用最新的模型處理數據。除此之外,對更多的機器學習算法而言,你須要離線訓練這些模型,而後將訓練好的模型用於在線的流式數據。詳見MLlib


緩存/持久化

和RDD相似,DStream也支持將數據持久化到內存中。只須要調用 DStream的persist() 方法,該方法內部會自動調用DStream中每一個RDD的persist方法進而將數據持久化到內存中。這對於可能須要計算不少次的DStream很是有 用(例如:對於同一個批數據調用多個算子)。對於基於滑動窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或 者有狀態的算子,如:updateStateByKey,數據持久化就更重要了。所以,滑動窗口算子產生的DStream對象默認會自動持久化到內存中 (不須要開發者調用persist)。

對於從網絡接收數據的輸入數據流(如:Kafka、Flume、socket等),默認的持久化級別會將數據持久化到兩個不一樣的節點上互爲備份副本,以便支持容錯。

注意,與RDD不一樣的是,DStream的默認持久化級別是將數據序列化到內存中。進一步的討論見性能調優這一小節。關於持久化級別(或者存儲級別)的更詳細說明見Spark編程指南(Spark Programming Guide)。


檢查點

通常來講Streaming 應用都須要7*24小時長期運行,因此必須對一些與業務邏輯無關的故障有很好的容錯(如:系統故障、JVM崩潰等)。對於這些可能性,Spark Streaming 必須在檢查點保存足夠的信息到一些可容錯的外部存儲系統中,以便可以隨時從故障中恢復回來。因此,檢查點須要保存如下兩種數據:

  • 元數據檢查點(Metadata checkpointing) – 保存流式計算邏輯的定義信息到外部可容錯存儲系統(如:HDFS)。主要用途是用於在故障後回覆應用程序自己(後續詳談)。元數包括:
    • Configuration – 建立Streaming應用程序的配置信息。
    • DStream operations – 定義流式處理邏輯的DStream操做信息。
    • Incomplete batches – 已經排隊但未處理完的批次信息。
  • 數據檢查點(Data checkpointing) – 將生成的RDD保存到可靠的存儲中。這對一些須要跨批次組合數據或者有狀態的算子來講頗有必要。在這種轉換算子中,每每新生成的RDD是依賴於前幾個批次 的RDD,所以隨着時間的推移,有可能產生很長的依賴鏈條。爲了不在恢復數據的時候須要恢復整個依賴鏈條上全部的數據,檢查點須要週期性地保存一些中間 RDD狀態信息,以斬斷無限制增加的依賴鏈條和恢復時間。

總之,元數據檢查點主要是爲了恢復驅動器節點上的故障,而數據或RDD檢查點是爲了支持對有狀態轉換操做的恢復。

什麼時候啓用檢查點

若是有如下狀況出現,你就必須啓用檢查點了:

  • 使用了有狀態的轉換算子(Usage of stateful transformations) – 無論是用了 updateStateByKey 仍是用了 reduceByKeyAndWindow(有」反歸約」函數的那個版本),你都必須配置檢查點目錄來週期性地保存RDD檢查點。
  • 支持驅動器故障中恢復(Recovering from failures of the driver running the application) – 這時候須要元數據檢查點以便恢復流式處理的進度信息。

注意,一些簡單的流式應用,若是沒有用到前面所說的有狀態轉換算子,則徹底能夠不開啓檢查點。不過這樣的話,驅動器(driver)故障恢復後,有 可能會丟失部分數據(有些已經接收但還未處理的數據可能會丟失)。不過一般這點丟失時可接受的,不少Spark Streaming應用也是這樣運行的。對非Hadoop環境的支持將來還會繼續改進。

如何配置檢查點

檢查點的啓用,只須要設置好保存檢查點信息的檢查點目錄便可,通常會會將這個目錄設爲一些可容錯的、可靠性較高的文件系統(如:HDFS、S3 等)。開發者只須要調用 streamingContext.checkpoint(checkpointDirectory)。設置好檢查點,你就可使用前面提到的有狀態轉換 算子了。另外,若是你須要你的應用可以支持從驅動器故障中恢復,你可能須要重寫部分代碼,實現如下行爲:

  • 若是程序是首次啓動,就須要new一個新的StreamingContext,並定義好全部的數據流處理,而後調用StreamingContext.start()。
  • 若是程序是故障後重啓,就須要從檢查點目錄中的數據中從新構建StreamingContext對象。
  • Scala
  • Java
  • Python

不過這個行爲能夠用StreamingContext.getOrCreate來實現,示例以下:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

 

若是 checkpointDirectory 目錄存在,則context對象會從檢查點數據從新構建出來。若是該目錄不存在(如:首次運行),則 functionToCreateContext 函數會被調用,建立一個新的StreamingContext對象並定義好DStream數據流。完整的示例請參見RecoverableNetworkWordCount,這個例子會將網絡數據中的單詞計數統計結果添加到一個文件中。

除了使用getOrCreate以外,開發者還須要確保驅動器進程能在故障後重啓。這一點只能由應用的部署環境基礎設施來保證。進一步的討論見部署(Deployment)這一節。

另外須要注意的是,RDD檢查點會增長額外的保存數據的開銷。這可能會致使數據流的處理時間變長。所以,你必須仔細的調整檢查點間隔時間。若是批次 間隔過小(好比:1秒),那麼對每一個批次保存檢查點數據將大大減少吞吐量。另外一方面,檢查點保存過於頻繁又會致使血統信息和任務個數的增長,這一樣會影響 系統性能。對於須要RDD檢查點的有狀態轉換算子,默認的間隔是批次間隔的整數倍,且最小10秒。開發人員能夠這樣來自定義這個間 隔:dstream.checkpoint(checkpointInterval)。通常推薦設爲批次間隔時間的5~10倍。


部署應用

本節中將主要討論一下如何部署Spark Streaming應用。

前提條件

要運行一個Spark Streaming 應用,你首先須要具有如下條件:

  • 集羣以及集羣管理器 – 這是通常Spark應用的基本要求,詳見 deployment guide
  • 給Spark應用打個JAR包 – 你須要將你的應用打成一個JAR包。若是使用spark-submit 提交應用,那麼你不須要提供Spark和Spark Streaming的相關JAR包。可是,若是你使用了高級數據源(advanced sources – 如:Kafka、Flume、Twitter等),那麼你須要將這些高級數據源相關的JAR包及其依賴一塊兒打包並部署。例如,若是你使用了 TwitterUtils,那麼就必須將spark-streaming-twitter_2.10及其相關依賴都打到應用的JAR包中。
  • 爲執行器(executor)預留足夠的內存 – 執行器必須配置預留好足夠的內存,由於接受到的數據都得存在內存裏。注意,若是某些窗口長度達到10分鐘,那也就是說你的系統必須知道保留10分鐘的數據在內存裏。可見,到底預留多少內存是取決於你的應用處理邏輯的。
  • 配置檢查點 – 若是你的流式應用須要檢查點,那麼你須要配置一個Hadoop API兼容的可容錯存儲目錄做爲檢查點目錄,流式應用的信息會寫入這個目錄,故障恢復時會用到這個目錄下的數據。詳見前面的檢查點小節。
  • 配置驅動程序自動重啓 – 流式應用自動恢復的前提就是,部署基礎設施可以監控驅動器進程,而且可以在其故障時,自動重啓之。不一樣的集羣管理器有不一樣的工具來實現這一功能:
    • Spark獨立部署 – Spark獨立部署集羣能夠支持將Spark應用的驅動器提交到集羣的某個worker節點上運行。同時,Spark的集羣管理器能夠對該驅動器進程進行 監控,一旦驅動器退出且返回非0值,或者因worker節點原始失敗,Spark集羣管理器將自動重啓這個驅動器。詳見Spark獨立部署指南(Spark Standalone guide)。
    • YARN – YARN支持和獨立部署相似的重啓機制。詳細請參考YARN的文檔。
    • Mesos – Mesos上須要用Marathon來實現這一功能。
  • 配置WAL(write ahead log)- 從Spark 1.2起,咱們引入了write ahead log來提升容錯性。若是啓用這個功能,則全部接收到的數據都會以write ahead log形式寫入配置好的檢查點目錄中。這樣就能確保數據零丟失(容錯語義有詳細的討論)。用戶只需將 spark.streaming.receiver.writeAheadLog 設爲true。不過,這一樣可能會致使接收器的吞吐量降低。不過你能夠啓動多個接收器並行接收數據,從而提高總體的吞吐量(more receivers in parallel)。 另外,建議在啓用WAL後禁用掉接收數據多副本功能,由於WAL其實已是存儲在一個多副本存儲系統中了。你只須要把存儲級別設爲 StorageLevel.MEMORY_AND_DISK_SER。若是是使用S3(或者其餘不支持flushing的文件系統)存儲WAL,必定要記 得啓用這兩個標識:spark.streaming.driver.writeAheadLog.closeFileAfterWrite 和 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。更詳細請參考: Spark Streaming Configuration
  • 設置好最大接收速率 – 若是集羣可用資源不足以跟上接收數據的速度,那麼能夠在接收器設置一下最大接收速率,即:每秒接收記錄的條數。相關的主要配置 有:spark.streaming.receiver.maxRate,若是使用Kafka Direct API 還須要設置 spark.streaming.kafka.maxRatePerPartition。從Spark 1.5起,咱們引入了backpressure的概念來動態地根據集羣處理速度,評估並調整該接收速率。用戶只需將 spark.streaming.backpressure.enabled設爲true便可啓用該功能。

升級應用代碼

升級Spark Streaming應用程序代碼,可使用如下兩種方式:

  • 新的Streaming程序和老的並行跑一段時間,新程序完成初始化之後,再關閉老的。注意,這種方式適用於能同時發送數據到多個目標的數據源(即:數據源同時將數據發給新老兩個Streaming應用程序)。
  • 老程序可以優雅地退出(參考  StreamingContext.stop(...) or JavaStreamingContext.stop(...) ), 即:確保所收到的數據都已經處理完畢後再退出。而後再啓動新的Streaming程序,而新程序將接着在老程序退出點上繼續拉取數據。注意,這種方式須要 數據源支持數據緩存(或者叫數據堆積,如:Kafka、Flume),由於在新舊程序交接的這個空檔時間,數據須要在數據源處緩存。目前還不能支持從檢查 點重啓,由於檢查點存儲的信息包含老程序中的序列化對象信息,在新程序中將其反序列化可能會出錯。這種狀況下,只能要麼指定一個新的檢查點目錄,要麼刪除 老的檢查點目錄。

應用監控

除了Spark自身的監控能力(monitoring capabilities)以外,對Spark Streaming還有一些額外的監控功能可用。若是實例化了StreamingContext,那麼你能夠在Spark web UI上看到多出了一個Streaming tab頁,上面顯示了正在運行的接收器(是否活躍,接收記錄的條數,失敗信息等)和處理完的批次信息(批次處理時間,查詢延時等)。這些信息均可以用來監控streaming應用。

web UI上有兩個度量特別重要:

  • 批次處理耗時(Processing Time) – 處理單個批次耗時
  • 批次調度延時(Scheduling Delay) -各批次在隊列中等待時間(等待上一個批次處理完)

若是批次處理耗時一直比批次間隔時間大,或者批次調度延時持續上升,就意味着系統處理速度跟不上數據接收速度。這時候你就得考慮一下怎麼把批次處理時間降下來(reducing)。

Spark Streaming程序的處理進度能夠用StreamingListener接口來監聽,這個接口能夠監聽到接收器的狀態和處理時間。不過須要注意的是,這是一個developer API接口,換句話說這個接口將來極可能會變更(可能會增長更多度量信息)。



 

性能調優

要得到Spark Streaming應用的最佳性能須要一點點調優工做。本節將深刻解釋一些可以改進Streaming應用性能的配置和參數。整體上來講,你須要考慮這兩方面的事情:

  1. 提升集羣資源利用率,減小單批次處理耗時。
  2. 設置合適的批次大小,以便使數據處理速度能跟上數據接收速度。

 

減小批次處理時間

有很多優化手段均可以減小Spark對每一個批次的處理時間。細節將在優化指南(Tuning Guide)中詳談。這裏僅列舉一些最重要的。

 

數據接收併發度

跨網絡接收數據(如:從Kafka、Flume、socket等接收數據)須要在Spark中序列化並存儲數據。

若是接收數據的過程是系統瓶頸,那麼能夠考慮增長數據接收的並行度。注意,每一個輸入DStream只包含一個單獨的接收器(receiver,運行 約worker節點),每一個接收器單獨接收一路數據流。因此,配置多個輸入DStream就能從數據源的不一樣分區分別接收多個數據流。例如,能夠將從 Kafka拉取兩個topic的數據流分紅兩個Kafka輸入數據流,每一個數據流拉取其中一個topic的數據,這樣一來會同時有兩個接收器並行地接收數 據,於是增長了整體的吞吐量。同時,另外一方面咱們又能夠把這些DStream數據流合併成一個,而後能夠在合併後的DStream上使用任何可用的 transformation算子。示例代碼以下:

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

 

另外一個能夠考慮優化的參數就是接收器的阻塞間隔,該參數由配置參數(configuration parameter)spark.streaming.blockInterval 決定。大多數接收器都會將數據合併成一個個數據塊,而後再保存到spark內存中。對於map類算子來講,每一個批次中數據塊的個數將會決定處理這批數據並 行任務的個數,每一個接收器每批次數據處理任務數約等於 (批次間隔 / 數據塊間隔)。例如,對於2秒的批次間隔,若是數據塊間隔爲200ms,則建立的併發任務數爲10。若是任務數太少(少於單機cpu core個數),則資源利用不夠充分。如需增長這個任務數,對於給定的批次間隔來講,只須要減小數據塊間隔便可。不過,咱們仍是建議數據塊間隔至少要 50ms,不然任務的啓動開銷佔比就過高了。

另外一個切分接收數據流的方法是,顯示地將輸入數據流劃分爲多個分區(使用 inputStream.repartition(<number of partitions>))。該操做會在處理前,將數據散開從新分發到集羣中多個節點上。

數據處理併發度

在計算各個階段(stage)中,任何一個階段的併發任務數不足都有可能形成集羣資源利用率低。例如,對於reduce類的算子, 如:reduceByKey 和 reduceByKeyAndWindow,其默認的併發任務數是由 spark.default.parallelism 決定的。你既能夠修改這個默認值(spark.default.parallelism),也能夠經過參數指定這個併發數量(見PairDStreamFunctions)。

數據的序列化

調整數據的序列化格式能夠大大減小數據序列化的開銷。在spark Streaming中主要有兩種類型的數據須要序列化:

  • 輸入數據: 默認地,接收器收到的數據是以 StorageLevel.MEMORY_AND_DISK_SER_2 的 存儲級別存儲到執行器(executor)內存中的。也就是說,收到的數據會被序列化以減小GC開銷,同時保存兩個副本以容錯。同時,數據會優先保存在內 存裏,當內存不足時才吐出到磁盤上。很明顯,這個過程當中會有數據序列化的開銷 – 接收器首先將收到的數據反序列化,而後再以spark所配置指定的格式來序列化數據。
  • Streaming算子所生產的持久化的RDDs: Streaming計算所生成的RDD可能會持久化到內存中。例如,基於窗口的算子會將數據持久化到內存,由於窗口數據可能會屢次處理。所不一樣的是,spark core默認用 StorageLevel.MEMORY_ONLY 級別持久化RDD數據,而spark streaming默認使用StorageLevel.MEMORY_ONLY_SER 級別持久化接收到的數據,以便儘可能減小GC開銷。

無論是上面哪種數據,均可以使用Kryo序列化來減小CPU和內存開銷,詳見Spark Tuning Guide。另,對於Kryo,你能夠考慮這些優化:註冊自定義類型,禁用對象引用跟蹤(詳見Configuration Guide)。

在一些特定的場景下,若是數據量不是很大,那麼你能夠考慮不用序列化格式,不過你須要注意的是取消序列化是否會致使大量的GC開銷。例如,若是你的 批次間隔比較短(幾秒)而且沒有使用基於窗口的算子,這種狀況下你能夠考慮禁用序列化格式。這樣能夠減小序列化的CPU開銷以優化性能,同時GC的增加也 很少。

 

任務啓動開銷

若是每秒啓動的任務數過多(好比每秒50個以上),那麼將任務發送給slave節點的開銷會明顯增長,那麼你也就很難達到亞秒級(sub-second)的延遲。不過如下兩個方法能夠減小任務的啓動開銷:

  • 任務序列化(Task Serialization): 使用Kryo來序列化任務,以減小任務自己的大小,從而提升發送任務的速度。任務的序列化格式是由 spark.closure.serializer 屬性決定的。不過,目前還不支持閉包序列化,將來的版本可能會增長對此的支持。
  • 執行模式(Execution mode): Spark獨立部署或者Mesos粗粒度模式下任務的啓動時間比Mesos細粒度模式下的任務啓動時間要短。詳見Running on Mesos guide

這些調整有可能可以減小100ms的批次處理時間,這也使得亞秒級的批次間隔成爲可能。


 

設置合適的批次間隔

要想streaming應用在集羣上穩定運行,那麼系統處理數據的速度必須能跟上其接收數據的速度。換句話說,批次數據的處理速度應該和其生成速度同樣快。對於特定的應用來講,能夠從其對應的監控(monitoring)頁面上觀察驗證,頁面上顯示的處理耗時應該要小於批次間隔時間。

根據spark streaming計算的性質,在必定的集羣資源限制下,批次間隔的值會極大地影響系統的數據處理能力。例如,在WordCountNetwork示例 中,對於特定的數據速率,一個系統可能可以在批次間隔爲2秒時跟上數據接收速度,但若是把批次間隔改成500毫秒系統可能就處理不過來了。因此,批次間隔 須要謹慎設置,以確保生產系統可以處理得過來。

要找出適合的批次間隔,你能夠從一個比較保守的批次間隔值(如5~10秒)開始測試。要驗證系統是否能跟上當前的數據接收速率,你可能須要檢查一下端到端的批次處理延遲(能夠看看Spark驅動器log4j日誌中的Total delay,也能夠用StreamingListener接 口來檢測)。若是這個延遲能保持和批次間隔差很少,那麼系統基本就是穩定的。不然,若是這個延遲持久在增加,也就是說系統跟不上數據接收速度,那也就意味 着系統不穩定。一旦系統文檔下來後,你就能夠嘗試提升數據接收速度,或者減小批次間隔值。不過須要注意,瞬間的延遲增加能夠只是暫時的,只要這個延遲後續 會自動降下來就沒有問題(如:降到小於批次間隔值)


 

內存調優

Spark應用內存佔用和GC調優已經在調優指南(Tuning Guide)中有詳細的討論。牆裂建議你讀一讀那篇文檔。本節中,咱們只是討論一下幾個專門用於Spark Streaming的調優參數。

Spark Streaming應用在集羣中佔用的內存量嚴重依賴於具體所使用的tranformation算子。例如,若是想要用一個窗口算子操縱最近10分鐘的數 據,那麼你的集羣至少須要在內存裏保留10分鐘的數據;另外一個例子是updateStateByKey,若是key不少的話,相對應的保存的key的 state也會不少,而這些都須要佔用內存。而若是你的應用只是作一個簡單的 「映射-過濾-存儲」(map-filter-store)操做的話,那須要的內存就不多了。

通常狀況下,streaming接收器接收到的數據會以 StorageLevel.MEMORY_AND_DISK_SER_2 這個存儲級別存到spark中,也就是說,若是內存裝不下,數據將被吐到磁盤上。數據吐到磁盤上會大大下降streaming應用的性能,所以仍是建議根 據你的應用處理的數據量,提供充足的內存。最好就是,一邊小規模地放大內存,再觀察評估,而後再放大,再評估。

另外一個內存調優的方向就是垃圾回收。由於streaming應用每每都須要低延遲,因此確定不但願出現大量的或耗時較長的JVM垃圾回收暫停。

如下是一些可以幫助你減小內存佔用和GC開銷的參數或手段:

  • DStream持久化級別(Persistence Level of DStreams): 前面數據序列化(Data Serialization) 這小節已經提到過,默認streaming的輸入RDD會被持久化成序列化的字節流。相對於非序列化數據,這樣能夠減小內存佔用和GC開銷。若是啓用 Kryo序列化,還能進一步減小序列化數據大小和內存佔用量。若是你還須要進一步減小內存佔用的話,能夠開啓數據壓縮(經過 spark.rdd.compress這個配置設定),只不過數據壓縮會增長CPU消耗。
  • 清除老數據(Clearing old data): 默認狀況下,全部的輸入數據以及DStream的transformation算子產生的持久化RDD都是自動清理的。Spark Streaming會根據所使用的transformation算子來清理老數據。例如,你用了一個窗口操做處理最近10分鐘的數據,那麼Spark Streaming會保留至少10分鐘的數據,而且會主動把更早的數據都刪掉。固然,你能夠設置 streamingContext.remember 以保留更長時間段的數據(好比:你可能會須要交互式地查詢更老的數據)。
  • CMS垃圾回收器(CMS Garbage Collector): 爲了儘可能減小GC暫停的時間,咱們牆裂建議使用CMS垃圾回收器(concurrent mark-and-sweep GC)。雖然CMS GC會稍微下降系統的整體吞吐量,但咱們仍建議使用它,由於CMS GC能使批次處理的時間保持在一個比較恆定的水平上。最後,你須要確保在驅動器(經過spark-submit中的–driver-java- options設置)和執行器(使用spark.executor.extraJavaOptions配置參數)上都設置了CMS GC。
  • 其餘提示: 若是還想進一步減小GC開銷,如下是更進一步的能夠嘗試的手段:
    • 配合Tachyon使用堆外內存來持久化RDD。詳見Spark編程指南(Spark Programming Guide
    • 使用更多可是更小的執行器進程。這樣GC壓力就會分散到更多的JVM堆中。


容錯語義

本節中,咱們將討論Spark Streaming應用在出現失敗時的具體行爲。

 

背景

要理解Spark Streaming所提供的容錯語義,咱們首先須要回憶一下Spark RDD所提供的基本容錯語義。

  1. RDD是不可變的,可重算的,分佈式數據集。每一個RDD都記錄了其建立算子的血統信息,其中每一個算子都以可容錯的數據集做爲輸入數據。
  2. 若是RDD的某個分區由於節點失效而丟失,則該分區能夠根據RDD的血統信息以及相應的原始輸入數據集從新計算出來。
  3. 假定全部RDD transformation算子計算過程都是肯定性的,那麼經過這些算子獲得的最終RDD老是包含相同的數據,而與Spark集羣的是否故障無關。

Spark主要操做一些可容錯文件系統的數據,如:HDFS或S3。所以,全部從這些可容錯數據源產生的RDD也是可容錯的。然而,對於Spark Streaming並不是如此,由於多數狀況下Streaming須要從網絡遠端接收數據,這回致使Streaming的數據源並不可靠(尤爲是對於使用了 fileStream的應用)。要實現RDD相同的容錯屬性,數據接收就必須用多個不一樣worker節點上的Spark執行器來實現(默認副本因子是 2)。所以一旦出現故障,系統須要恢復兩種數據:

  1. 接收並保存了副本的數據 – 數據不會由於單個worker節點故障而丟失,由於有副本!
  2. 接收但還沒有保存副本數據 – 由於數據並無副本,因此一旦故障,只能從數據源從新獲取。

此外,還有兩種可能的故障類型須要考慮:

  1. Worker節點故障 – 任何運行執行器的worker節點一旦故障,節點上內存中的數據都會丟失。若是這些節點上有接收器在運行,那麼其包含的緩存數據也會丟失。
  2. Driver節點故障 – 若是Spark Streaming的驅動節點故障,那麼很顯然SparkContext對象就沒了,全部執行器及其內存數據也會丟失。

有了以上這些基本知識,下面咱們就進一步瞭解一下Spark Streaming的容錯語義。

 

定義

流式系統的可靠度語義能夠據此來分類:單條記錄在系統中被處理的次數保證。一個流式系統可能提供保證一定是如下三種之一(無論系統是否出現故障):

  1. 至多一次(At most once): 每條記錄要麼被處理一次,要麼就沒有處理。
  2. 至少一次(At least once): 每條記錄至少被處理過一次(一次或屢次)。這種保證能確保沒有數據丟失,比「至多一次」要強。但有可能出現數據重複。
  3. 精確一次(Exactly once): 每條記錄都精確地只被處理一次 – 也就是說,既沒有數據丟失,也不會出現數據重複。這是三種保證中最強的一種。

 

基礎語義

任何流式處理系統通常都會包含如下三個數據處理步驟:

  1. 數據接收(Receiving the data): 從數據源拉取數據。
  2. 數據轉換(Transforming the data): 將接收到的數據進行轉換(使用DStream和RDD transformation算子)。
  3. 數據推送(Pushing out the data): 將轉換後最終數據推送到外部文件系統,數據庫或其餘展現系統。

若是Streaming應用須要作到端到端的「精確一次」的保證,那麼就必須在以上三個步驟中各自都保證精確一次:即,每條記錄必須,只接收一次、處理一次、推送一次。下面讓咱們在Spark Streaming的上下文環境中來理解一下這三個步驟的語義:

  1. 數據接收: 不一樣數據源提供的保證不一樣,下一節再詳細討論。
  2. 數據轉換: 全部的數據都會被「精確一次」處理,這要歸功於RDD提供的保障。即便出現故障,只要數據源還能訪問,最終所轉換獲得的RDD老是包含相同的內容。
  3. 數據推送: 輸出操做默認保證「至少一次」的語義,是否能「精確一次」還要看所使用的輸出算子(是否冪等)以及下游系統(是否支持事務)。不過用戶也能夠開發本身的事務機制來實現「精確一次」語義。這個後續會有詳細討論。

 

接收數據語義

不一樣的輸入源提供不一樣的數據可靠性級別,從「至少一次」到「精確一次」。

從文件接收數據

若是全部的輸入數據都來源於可容錯的文件系統,如HDFS,那麼Spark Streaming就能在任何故障中恢復並處理全部的數據。這種狀況下就能保證精確一次語義,也就是說無論出現什麼故障,全部的數據老是精確地只處理一次,很少也很多。

基於接收器接收數據

對於基於接收器的輸入源,容錯語義將同時依賴於故障場景和接收器類型。前面也已經提到過,spark Streaming主要有兩種類型的接收器:

  1. 可靠接收器 – 這類接收器會在數據接收並保存好副本後,向可靠數據源發送確認信息。這類接收器故障時,是不會給緩存的(已接收但還沒有保存副本)數據發送確認信息。所以,一旦接收器重啓,沒有收到確認的數據,會從新從數據源再獲取一遍,因此即便有故障也不會丟數據。
  2. 不可靠接收器 – 這類接收器不會發送確認信息,所以一旦worker和driver出現故障,就有可能會丟失數據。

對於不一樣的接收器,咱們能夠得到以下不一樣的語義。若是一個worker節點故障了,對於可靠接收器來書,不會有數據丟失。而對於不可靠接收器,緩存 的(接收但還沒有保存副本)數據可能會丟失。若是driver節點故障了,除了接收到的數據以外,其餘的已經接收且已經保存了內存副本的數據都會丟失,這將 會影響有狀態算子的計算結果。

爲了不丟失已經收到且保存副本的數,從 spark 1.2 開始引入了WAL(write ahead logs),以便將這些數據寫入到可容錯的存儲中。只要你使用可靠接收器,同時啓用WAL(write ahead logs enabled),那麼久不再用爲數據丟失而擔憂了。而且這時候,還能提供「至少一次」的語義保證。

下表總結了故障狀況下的各類語義:

部署場景 Worker 故障 Driver 故障
Spark 1.1及之前版本 或者
Spark 1.2及之後版本,且未開啓WAL
若使用不可靠接收器,則可能丟失緩存(已接收但還沒有保存副本)數據;
若使用可靠接收器,則沒有數據丟失,且提供至少一次處理語義
若使用不可靠接收器,則緩存數據和已保存數據均可能丟失;
若使用可靠接收器,則沒有緩存數據丟失,但已保存數據可能丟失,且不提供語義保證
Spark 1.2及之後版本,並啓用WAL 若使用可靠接收器,則沒有數據丟失,且提供至少一次語義保證 若使用可靠接收器和文件,則無數據丟失,且提供至少一次語義保證

 

 

從Kafka Direct API接收數據

從Spark 1.3開始,咱們引入Kafka Direct API,該API能爲Kafka數據源提供「精確一次」語義保證。有了這個輸入API,再加上輸出算子的「精確一次」保證,你就能真正實現端到端的「精確 一次」語義保證。(改功能截止Spark 1.6.1仍是實驗性的)更詳細的說明見:Kafka Integration Guide

輸出算子的語義

輸出算子(如 foreachRDD)提供「至少一次」語義保證,也就是說,若是worker故障,單條輸出數據可能會被屢次寫入外部實體中。不過這對於文件系統來講是 能夠接受的(使用saveAs***Files 屢次保存文件會覆蓋以前的),因此咱們須要一些額外的工做來實現「精確一次」語義。主要有兩種實現方式:

  • 冪等更新(Idempotent updates): 就是說屢次操做,產生的結果相同。例如,屢次調用saveAs***Files保存的文件老是包含相同的數據。
  • 事務更新(Transactional updates): 全部的更新都是事務性的,這樣一來就能保證更新的原子性。如下是一種實現方式:
    • 用批次時間(在foreachRDD中可用)和分區索引建立一個惟一標識,該標識表明流式應用中惟一的一個數據塊。
    • 基於這個標識創建更新事務,並使用數據塊數據更新外部系統。也就是說,若是該標識未被提交,則原子地將標識表明的數據更新到外部系統。不然,就認爲該標識已經被提交,直接忽略之。
      dstream.foreachRDD { (rdd, time) =>
        rdd.foreachPartition { partitionIterator =>
          val partitionId = TaskContext.get.partitionId()
          val uniqueId = generateUniqueId(time.milliseconds, partitionId)
          // 使用uniqueId做爲事務的惟一標識,基於uniqueId實現partitionIterator所指向數據的原子事務提交
        }
      }



移植:從 0.9.1 如下到 1.x

在Spark 0.9.1和Spark 1.0之間,有一些API接口變動,變動目的是爲了保障將來版本API的穩定。本節將詳細說明一下從已有版本遷移升級到1.0所需的工做。

輸入DStream(Input DStreams): 全部建立輸入流的算子(如:StreamingContext.socketStream, FlumeUtils.createStream 等)的返回值再也不是DStream(對Java來講是JavaDStream),而是 InputDStream / ReceiverInputDStream(對Java來講是JavaInputDStream / JavaPairInputDStream /JavaReceiverInputDStream / JavaPairReceiverInputDStream)。這樣才能確保特定輸入流的功能可以在將來持續增長到這些class中,而不會打破二進制兼容性。注意,已有的Spark Streaming應用應該不須要任何代碼修改(新的返回類型都是DStream的子類),只不過須要基於Spark 1.0從新編譯一把。

定製網絡接收器(Custom Network Receivers): 自從Spark Streaming發佈以來,Scala就能基於NetworkReceiver來定製網絡接收器。但因爲錯誤處理和彙報API方便的限制,該類型不能在Java中使用。因此Spark 1.0開始,用 Receiver 來替換掉這個NetworkReceiver,主要的好處以下:

  • 該類型新增了stop和restart方法,便於控制接收器的生命週期。詳見custom receiver guide
  • 定製接收器用Scala和Java都能實現。

 

爲了將已有的基於NetworkReceiver的自定義接收器遷移到Receiver上來,你須要以下工做:

  • 首先你的自定義接收器類型須要從 org.apache.spark.streaming.receiver.Receiver繼承,而再也不是org.apache.spark.streaming.dstream.NetworkReceiver。
  • 原先,咱們須要在自定義接收器中建立一個BlockGenerator來保存接收到的數據。你必須顯示的實現onStart() 和 onStop() 方法。而在新的Receiver class中,這些都不須要了,你只須要調用它的store系列的方法就能將數據保存到Spark中。因此你接下來須要作的遷移工做就是,刪除 BlockGenerator對象(這個類型在Spark 1.0以後也沒有了~),而後用store(…)方法來保存接收到的數據。

 

基於Actor的接收器(Actor-based Receivers): 從actor class繼承後,並實現了org.apache.spark.streaming.receiver.Receiver 後,便可從Akka Actors中獲取數據。獲取數據的類被重命名爲  org.apache.spark.streaming.receiver.ActorHelper ,而保存數據的pushBlocks(…)方法也被重命名爲 store(…)。其餘org.apache.spark.streaming.receivers包中的工具類也被移到  org.apache.spark.streaming.receiver 包下並重命名,新的類名應該比以前更加清晰。



更多的參考資料

相關文章
相關標籤/搜索