Apache Spark 2.2.0 中文文檔 - Spark Streaming 編程指南

Spark Streaming 編程指南
html

概述java

一個入門示例node

基礎概念python

依賴git

初始化 StreamingContextgithub

Discretized Streams (DStreams)(離散化流)web

Input DStreams 和 Receivers(接收器)算法

DStreams 上的 Transformations(轉換)sql

DStreams 上的輸出操做shell

DataFrame 和 SQL 操做

MLlib 操做

緩存 / 持久性

Checkpointing

Accumulators, Broadcast 變量, 和 Checkpoint

應用程序部署

Monitoring Applications (監控應用程序)

Performance Tuning (性能調優)

Reducing the Batch Processing Times (減小批處理時間)

Setting the Right Batch Interval (設置正確的批次間隔)

Memory Tuning (內存調優)

Fault-tolerance Semantics (容錯語義)

快速連接

概述

Spark Streaming 是 Spark Core API 的擴展, 它支持彈性的, 高吞吐的, 容錯的實時數據流的處理. 數據能夠經過多種數據源獲取, 例如 Kafka, Flume, Kinesis 以及 TCP sockets, 也能夠經過例如map,reduce,join,window等的高級函數組成的複雜算法處理. 最終, 處理後的數據能夠輸出到文件系統, 數據庫以及實時儀表盤中. 事實上, 你還能夠在 data streams(數據流)上使用機器學習以及圖形處理算法.

 

在內部, 它工做原理以下, Spark Streaming 接收實時輸入數據流並將數據切分紅多個 batch(批)數據, 而後由 Spark 引擎處理它們以生成最終的 stream of results in batches(分批流結果).

Spark Streaming 提供了一個名爲discretized stream或DStream的高級抽象, 它表明一個連續的數據流. DStream 能夠從數據源的輸入數據流建立, 例如 Kafka, Flume 以及 Kinesis, 或者在其餘 DStream 上進行高層次的操做以建立. 在內部, 一個 DStream 是經過一系列的RDDs來表示.

本指南告訴你如何使用 DStream 來編寫一個 Spark Streaming 程序. 你可使用 Scala , Java 或者 Python(Spark 1.2 版本後引進)來編寫 Spark Streaming 程序. 全部這些都在本指南中介紹. 您能夠在本指南中找到標籤, 讓您能夠選擇不一樣語言的代碼段.

Note(注意):在 Python 有些 API 可能會有不一樣或不可用. 在本指南, 您將找到Python API的標籤來高亮顯示不一樣的地方.

一個入門示例

在咱們詳細介紹如何編寫你本身的 Spark Streaming 程序的細節以前, 讓咱們先來看一看一個簡單的 Spark Streaming 程序的樣子. 比方說, 咱們想要計算從一個監聽 TCP socket 的數據服務器接收到的文本數據(text data)中的字數. 你須要作的就是照着下面的步驟作.

Scala

Java

Python

首先, 咱們導入了 Spark Streaming 類和部分從 StreamingContext 隱式轉換到咱們的環境的名稱, 目的是添加有用的方法到咱們須要的其餘類(如 DStream).StreamingContext是全部流功能的主要入口點. 咱們建立了一個帶有 2 個執行線程和間歇時間爲 1 秒的本地 StreamingContext.

importorg.apache.spark._importorg.apache.spark.streaming._importorg.apache.spark.streaming.StreamingContext._// 自從 Spark 1.3 開始, 再也不是必要的了// 建立一個具備兩個工做線程(working thread)而且批次間隔爲 1 秒的本地 StreamingContext .// master 須要 2 個核, 以防止飢餓狀況(starvation scenario).valconf=newSparkConf().setMaster("local[2]").setAppName("NetworkWordCount")valssc=newStreamingContext(conf,Seconds(1))

Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g.localhost) and port (e.g.9999). 使用該 context, 咱們能夠建立一個表明從 TCP 源流數據的離散流(DStream), 指定主機名(hostname)(例如 localhost)和端口(例如 9999).

// 建立一個將要鏈接到 hostname:port 的 DStream,如 localhost:9999vallines=ssc.socketTextStream("localhost",9999)

上一步的這個linesDStream 表示將要從數據服務器接收到的數據流. 在這個離散流(DStream)中的每一條記錄都是一行文本(text). 接下來,咱們想要經過空格字符(space characters)拆分這些數據行(lines)成單詞(words).

// 將每一行拆分紅 words(單詞)valwords=lines.flatMap(_.split(" "))

flatMap是一種 one-to-many(一對多)的離散流(DStream)操做,它會經過在源離散流(source DStream)中根據每一個記錄(record)生成多個新紀錄的形式建立一個新的離散流(DStream). 在這種狀況下,在這種狀況下,每一行(each line)都將被拆分紅多個單詞(words)和表明單詞離散流(words DStream)的單詞流. 接下來,咱們想要計算這些單詞.

importorg.apache.spark.streaming.StreamingContext._// not necessary since Spark 1.3// 計算每個 batch(批次)中的每個 word(單詞)valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKey(_+_)// 在控制檯打印出在這個離散流(DStream)中生成的每一個 RDD 的前十個元素// 注意: 必須要觸發 action(不少初學者會忘記觸發 action 操做,致使報錯:No output operations registered, so nothing to execute)wordCounts.print()

上一步的wordsDStream 進行了進一步的映射(一對一的轉換)爲一個 (word, 1) paris 的離散流(DStream),這個 DStream 而後被規約(reduce)來得到數據中每一個批次(batch)的單詞頻率. 最後,wordCounts.print()將會打印一些每秒生成的計數.

Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call

請注意,當這些行(lines)被執行的時候, Spark Streaming 僅僅設置了計算, 只有在啓動時纔會執行,並無開始真正地處理. 爲了在全部的轉換都已經設置好以後開始處理,咱們在最後調用:

ssc.start()// 開始計算ssc.awaitTermination()// 等待計算被中斷

該部分完整的代碼能夠在 Spark Streaming 示例NetworkWordCount中找到.

若是你已經下載而且構建Spark, 您可使用以下方式來運行該示例. 你首先須要運行 Netcat(一個在大多數類 Unix 系統中的小工具)做爲咱們使用的數據服務器.

$ nc -lk9999

而後,在另外一個不一樣的終端,你能夠經過執行以下命令來運行該示例:

Scala

Java

Python

$ ./bin/run-example streaming.NetworkWordCount localhost9999

而後,在運行在 netcat 服務器上的終端輸入的任何行(lines),都將被計算,而且每一秒都顯示在屏幕上,它看起來就像下面這樣:

# TERMINAL 1:# Running Netcat$ nc -lk9999hello world...

Scala

Java

Python

# TERMINAL 2: RUNNING NetworkWordCount$ ./bin/run-example streaming.NetworkWordCount localhost9999...-------------------------------------------Time:1357008430000ms-------------------------------------------(hello,1)(world,1)...

基礎概念

接下來,咱們瞭解完了簡單的例子,開始闡述 Spark Streaming 的基本知識。

依賴

與 Spark 相似,Spark Streaming 能夠經過 Maven 來管理依賴. 爲了編寫你本身的 Spark Streaming 程序,你必須添加如下的依賴到你的 SBT 或者 Maven 項目中.

Maven

SBT

org.apache.spark

spark-streaming_2.11

2.2.0

針對從 Spark Streaming Core API 中不存在的數據源中獲取數據,如 Kafka, Flume,Kinesis ,你必須添加相應的座標spark-streaming-xyz_2.11到依賴中. 例如,有一些常見的依賴以下.

Source(數據源)Artifact(座標)

Kafkaspark-streaming-kafka-0-8_2.11

Flumespark-streaming-flume_2.11

Kinesis

spark-streaming-kinesis-asl_2.11 [Amazon Software License]

想要查看一個實時更新的列表,請參閱Maven repository來了解支持的 sources(數據源)和 artifacts(座標)的完整列表。

初始化 StreamingContext

爲了初始化一個 Spark Streaming 程序, 一個StreamingContext對象必需要被建立出來,它是全部的 Spark Streaming 功能的主入口點。

Scala

Java

Python

一個StreamingContext對象能夠從一個SparkConf對象中來建立.

importorg.apache.spark._importorg.apache.spark.streaming._valconf=newSparkConf().setAppName(appName).setMaster(master)valssc=newStreamingContext(conf,Seconds(1))

這個appName參數是展現在集羣 UI 界面上的應用程序的名稱.master是一個Spark, Mesos or YARN cluster URL, 或者一個特殊的「local[*]」字符串以使用 local mode(本地模式)來運行. 在實踐中,當在集羣上運行時,你不會想在應用程序中硬編碼master,而是使用spark-submit來啓動應用程序, 而且接受該參數. 然而,對於本地測試和單元測試,你能夠傳遞 「local[*]」 來運行 Spark Streaming 進程(檢測本地系統中內核的個數). 請注意,作個內部建立了一個SparkContext(全部 Spark 功能的出發點),它能夠像 ssc.sparkContext 這樣被訪問.

這個 batch interval(批間隔)必須根據您的應用程序和可用的集羣資源的等待時間要求進行設置. 更多詳情請參閱優化指南部分.

一個StreamingContext對象也能夠從一個現有的SparkContext對象來建立.

importorg.apache.spark.streaming._valsc=...// 已存在的 SparkContextvalssc=newStreamingContext(sc,Seconds(1))

在定義一個 context 以後,您必須執行如下操做.

經過建立輸入 DStreams 來定義輸入源.

經過應用轉換和輸出操做 DStreams 定義流計算(streaming computations).

開始接收輸入而且使用streamingContext.start()來處理數據.

使用streamingContext.awaitTermination()等待處理被終止(手動或者因爲任何錯誤).

使用streamingContext.stop()來手動的中止處理.

須要記住的幾點:

一旦一個 context 已經啓動,將不會有新的數據流的計算能夠被建立或者添加到它。.

一旦一個 context 已經中止,它不會被從新啓動.

同一時間內在 JVM 中只有一個 StreamingContext 能夠被激活.

在 StreamingContext 上的 stop() 一樣也中止了 SparkContext 。爲了只中止 StreamingContext ,設置stop()的可選參數,名叫stopSparkContext爲 false.

一個 SparkContext 就能夠被重用以建立多個 StreamingContexts,只要前一個 StreamingContext 在下一個StreamingContext 被建立以前中止(不中止 SparkContext).

Discretized Streams (DStreams)(離散化流)

Discretized StreamorDStream是 Spark Streaming 提供的基本抽象. 它表明了一個連續的數據流, 不管是從 source(數據源)接收到的輸入數據流, 仍是經過轉換輸入流所產生的處理過的數據流. 在內部, 一個 DStream 被表示爲一系列連續的 RDDs, 它是 Spark 中一個不可改變的抽象, distributed dataset (的更多細節請看Spark 編程指南. 在一個 DStream 中的每一個 RDD 包含來自必定的時間間隔的數據,以下圖所示.

應用於 DStream 的任何操做轉化爲對於底層的 RDDs 的操做. 例如,在先前的示例,轉換一個行(lines)流成爲單詞(words)中,flatMap 操做被應用於在行離散流(lines DStream)中的每一個 RDD 來生成單詞離散流(words DStream)的 RDDs . 以下所示.

 

這些底層的 RDD 變換由 Spark 引擎(engine)計算。 DStream 操做隱藏了大多數這些細節併爲了方便起見,提供給了開發者一個更高級別的 API 。這些操做細節會在後邊的章節中討論。

Input DStreams 和 Receivers(接收器)

輸入 DStreams 是表明輸入數據是從流的源數據(streaming sources)接收到的流的 DStream. 在一個入門示例中,lines是一個 input DStream, 由於它表明着從 netcat 服務器接收到的數據的流. 每個 input DStream(除了 file stream 以外, 會在本章的後面來討論)與一個Receiver(Scala doc,Java doc) 對象關聯, 它從 source(數據源)中獲取數據,而且存儲它到 Sparl 的內存中用於處理.

Spark Streaming 提供了兩種內置的 streaming source(流的數據源).

Basic sources(基礎的數據源): 在 StreamingContext API 中直接可使用的數據源. 例如: file systems 和 socket connections.

Advanced sources(高級的數據源): 像 Kafka, Flume, Kinesis, 等等這樣的數據源. 能夠經過額外的 utility classes 來使用. 像在依賴中討論的同樣, 這些都須要額外的外部依賴.

在本節的後邊,咱們將討論每種類別中的現有的一些數據源.

請注意, 若是你想要在你的流處理程序中並行的接收多個數據流, 你能夠建立多個 input DStreams(在性能優化部分進一步討論). 這將建立同時接收多個數據流的多個 receivers(接收器). 但須要注意,一個 Spark 的 worker/executor 是一個長期運行的任務(task),所以它將佔用分配給 Spark Streaming 的應用程序的全部核中的一個核(core). 所以,要記住,一個 Spark Streaming 應用須要分配足夠的核(core)(或線程(threads),若是本地運行的話)來處理所接收的數據,以及來運行接收器(receiver(s)).

要記住的幾點

當在本地運行一個 Spark Streaming 程序的時候,不要使用 「local」 或者 「local[1]」 做爲 master 的 URL. 這兩種方法中的任何一個都意味着只有一個線程將用於運行本地任務. 若是你正在使用一個基於接收器(receiver)的輸入離散流(input DStream)(例如, sockets ,Kafka ,Flume 等),則該單獨的線程將用於運行接收器(receiver),而沒有留下任何的線程用於處理接收到的數據. 所以,在本地運行時,老是用 「local[n]」 做爲 master URL ,其中的 n > 運行接收器的數量(查看Spark 屬性來了解怎樣去設置 master 的信息).

將邏輯擴展到集羣上去運行,分配給 Spark Streaming 應用程序的內核(core)的內核數必須大於接收器(receiver)的數量。不然系統將接收數據,可是沒法處理它.

基礎的 Sources(數據源)

咱們已經簡單地瞭解過了在入門示例中ssc.socketTextStream(...)的例子,例子中是經過從一個 TCP socket 鏈接接收到的文本數據來建立了一個離散流(DStream). 除了 sockets 以外,StreamingContext API 也提供了根據文件做爲輸入來源建立離散流(DStreams)的方法。

File Streams:用於從文件中讀取數據,在任何與 HDFS API 兼容的文件系統中(即,HDFS,S3,NFS 等),一個 DStream 能夠像下面這樣建立:

Scala

Java

Python

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming 將監控dataDirectory目錄而且該目錄中任何新建的文件 (寫在嵌套目錄中的文件是不支持的). 注意

文件必須具備相同的數據格式.

文件必須被建立在dataDirectory目錄中, 經過 atomically(院子的)moving(移動)或renaming(重命名)它們到數據目錄.

一旦移動,這些文件必須不能再更改,所以若是文件被連續地追加,新的數據將不會被讀取.

對於簡單的文本文件,還有一個更加簡單的方法streamingContext.textFileStream(dataDirectory). 而且文件流(file streams)不須要運行一個接收器(receiver),所以,不須要分配內核(core)。

Python API在 Python API 中fileStream是不可用的, 只有textFileStream是可用的.

Streams based on Custom Receivers(基於自定義的接收器的流):DStreams 可使用經過自定義的 receiver(接收器)接收到的數據來建立. 更多細節請參閱自定義 Receiver 指南.

Queue of RDDs as a Stream(RDDs 隊列做爲一個流):爲了使用測試數據測試 Spark Streaming 應用程序,還可使用streamingContext.queueStream(queueOfRDDs)建立一個基於 RDDs 隊列的 DStream,每一個進入隊列的 RDD 都將被視爲 DStream 中的一個批次數據,而且就像一個流進行處理.

想要了解更多的關於從 sockets 和文件(files)建立的流的細節, 請參閱相關函數的 API文檔, 它們在StreamingContextfor Scala,JavaStreamingContextfor Java 以及StreamingContextfor Python 中.

高級 Sources(數據源)

Python API從 Spark 2.2.0 開始, 在 Python API 中的 Kafka, Kinesis 和 Flume 這樣的外部數據源都是可用的.

這一類別的 sources(數據源)須要使用非 Spark 庫中的外部接口,它們中的其中一些還須要比較複雜的依賴關係(例如, Kafka 和 Flume). 所以,爲了最小化有關的依賴關係的版本衝突的問題,這些資源自己不能建立 DStream 的功能,它是經過依賴單獨的類庫實現建立 DStream 的功能.

請注意, 這些高級 sources(數據源)不能再 Spark shell 中使用, 所以,基於這些高級 sources(數據源)的應用程序不能在 shell 中被測試. 若是你真的想要在 Spark shell 中使用它們,你必須下載帶有它的依賴的相應的 Maven 組件的 JAR ,而且將其添加到 classpath.

一些高級的 sources(數據源)以下.

Kafka:Spark Streaming 2.2.0 與 Kafka broker 版本 0.8.2.1 或更高是兼容的. 更多細節請參閱Kafka 集成指南.

Flume:Spark Streaming 2.2.0 與 Flume 1.6.0 相兼容. 更多細節請參閱Flume 集成指南.

Kinesis:Spark Streaming 2.2.0 與 Kinesis Client Library 1.2.1 相兼容. 更多細節請參閱Kinesis 集成指南.

自定義 Sources(數據源)

Python API在 Python 中還不支持這一功能.

Input DStreams 也能夠從自定義數據源中建立. 若是您想這樣作, 須要實現一個用戶自定義的receiver(看下一節以瞭解它是什麼), 它能夠從自定義的 sources(數據源)中接收數據而且推送它到 Spark. 更多細節請參閱自定義 Receiver 指南.

Receiver Reliability(接收器的可靠性)

能夠有兩種基於他們的reliability可靠性的數據源. 數據源(如 Kafka 和 Flume)容許傳輸的數據被確認. 若是系統從這些可靠的數據來源接收數據,而且被確認(acknowledges)正確地接收數據,它能夠確保數據不會由於任何類型的失敗而致使數據丟失. 這樣就出現了 2 種接收器(receivers):

Reliable Receiver(可靠的接收器)- 當數據被接收並存儲在 Spark 中並帶有備份副本時,一個可靠的接收器(reliable receiver)正確地發送確認(acknowledgment)給一個可靠的數據源(reliable source).

Unreliable Receiver(不可靠的接收器)- 一個不可靠的接收器( unreliable receiver )不發送確認(acknowledgment)到數據源。這能夠用於不支持確認的數據源,或者甚至是可靠的數據源當你不想或者不須要進行復雜的確認的時候.

自定義 Receiver 指南中描述了關於如何去編寫一個 reliable receiver(可靠的接收器)的細節.

DStreams 上的 Transformations(轉換)

與 RDD 相似,transformation 容許從 input DStream 輸入的數據作修改. DStreams 支持不少在 RDD 中可用的 transformation 算子。一些經常使用的以下所示 :

與RDD相似,相似,transformation 容許修改來自 input DStream 的數據. DStreams 支持標準的 Spark RDD 上可用的許多轉換. 一些常見的以下.

Transformation(轉換)Meaning(含義)

map(func)利用函數func處理原 DStream 的每一個元素,返回一個新的 DStream.

flatMap(func)與 map 類似,可是每一個輸入項可用被映射爲 0 個或者多個輸出項。.

filter(func)返回一個新的 DStream,它僅僅包含原 DStream 中函數func返回值爲 true 的項.

repartition(numPartitions)經過建立更多或者更少的 partition 以改變這個 DStream 的並行級別(level of parallelism).

union(otherStream)返回一個新的 DStream,它包含源 DStream 和otherDStream的全部元素.

count()經過 count 源 DStream 中每一個 RDD 的元素數量,返回一個包含單元素(single-element)RDDs 的新 DStream.

reduce(func)利用函數func彙集源 DStream 中每一個 RDD 的元素,返回一個包含單元素(single-element)RDDs 的新 DStream。函數應該是相關聯的,以使計算能夠並行化.

countByValue()在元素類型爲 K 的 DStream上,返回一個(K,long)pair 的新的 DStream,每一個 key 的值是在原 DStream 的每一個 RDD 中的次數.

reduceByKey(func, [numTasks])當在一個由 (K,V) pairs 組成的 DStream 上調用這個算子時,返回一個新的, 由 (K,V) pairs 組成的 DStream,每個 key 的值均由給定的 reduce 函數聚合起來.注意:在默認狀況下,這個算子利用了 Spark 默認的併發任務數去分組。你能夠用 numTasks 參數設置不一樣的任務數。

join(otherStream, [numTasks])當應用於兩個 DStream(一個包含(K,V)對,一個包含 (K,W) 對),返回一個包含 (K, (V, W)) 對的新 DStream.

cogroup(otherStream, [numTasks])當應用於兩個 DStream(一個包含(K,V)對,一個包含 (K,W) 對),返回一個包含 (K, Seq[V], Seq[W]) 的 tuples(元組).

transform(func)經過對源 DStream 的每一個 RDD 應用 RDD-to-RDD 函數,建立一個新的 DStream. 這個能夠在 DStream 中的任何 RDD 操做中使用.

updateStateByKey(func)返回一個新的 "狀態" 的 DStream,其中每一個 key 的狀態經過在 key 的先前狀態應用給定的函數和 key 的新 valyes 來更新. 這能夠用於維護每一個 key 的任意狀態數據.

其中一些轉換值得深刻討論.

UpdateStateByKey 操做

該updateStateByKey操做容許您維護任意狀態,同時不斷更新新信息. 你須要經過兩步來使用它.

定義 state - state 能夠是任何的數據類型.

定義 state update function(狀態更新函數) - 使用函數指定如何使用先前狀態來更新狀態,並從輸入流中指定新值.

在每一個 batch 中,Spark 會使用狀態更新函數爲全部已有的 key 更新狀態,無論在 batch 中是否含有新的數據。若是這個更新函數返回一個 none,這個 key-value pair 也會被消除.

讓咱們舉個例子來講明. 在例子中,假設你想保持在文本數據流中看到的每一個單詞的運行計數,運行次數用一個 state 表示,它的類型是整數, 咱們可使用以下方式來定義 update 函數:

Scala

Java

Python

defupdateFunction(newValues:Seq[Int],runningCount:Option[Int]):Option[Int]={valnewCount=...// add the new values with the previous running count to get the new countSome(newCount)}

這裏是一個應用於包含 words(單詞)的 DStream 上(也就是說,在先前的示例中,該pairsDStream 包含了 (word, 1) pair).

valrunningCounts=pairs.updateStateByKey[Int](updateFunction_)

update 函數將會被每一個單詞調用,newValues擁有一系列的 1(來自 (word, 1) pairs),runningCount 擁有以前的次數.

請注意, 使用updateStateByKey須要配置的checkpoint(檢查點)的目錄,這裏是更詳細關於討論checkpointing的部分.

Transform Operation*(轉換操做)

transform 操做(以及它的變化形式如transformWith)容許在 DStream 運行任何 RDD-to-RDD 函數. 它可以被用來應用任何沒在 DStream API 中提供的 RDD 操做. 例如,鏈接數據流中的每一個批(batch)和另一個數據集的功能並無在 DStream API 中提供,然而你能夠簡單的利用transform方法作到. 這使得有很是強大的可能性. 例如,能夠經過將輸入數據流與預先計算的垃圾郵件信息(也可使用 Spark 一塊兒生成)進行實時數據清理,而後根據它進行過濾.

Scala

Java

Python

valspamInfoRDD=ssc.sparkContext.newAPIHadoopRDD(...)// RDD containing spam informationvalcleanedDStream=wordCounts.transform{rdd=>rdd.join(spamInfoRDD).filter(...)// join data stream with spam information to do data cleaning...}

請注意,每一個 batch interval(批間隔)提供的函數被調用. 這容許你作隨時間變更的 RDD 操做, 即 RDD 操做, 分區的數量,廣播變量,等等. batch 之間等能夠改變。

Window Operations(窗口操做)

Spark Streaming 也支持windowed computations(窗口計算),它容許你在數據的一個滑動窗口上應用 transformation(轉換). 下圖說明了這個滑動窗口.

 

 

如上圖顯示,窗口在源 DStream 上slides(滑動),合併和操做落入窗內的源 RDDs,產生窗口化的 DStream 的 RDDs。在這個具體的例子中,程序在三個時間單元的數據上進行窗口操做,而且每兩個時間單元滑動一次。 這說明,任何一個窗口操做都須要指定兩個參數.

window length(窗口長度)- 窗口的持續時間(圖 3).

sliding interval(滑動間隔)- 執行窗口操做的間隔(圖 2).

這兩個參數必須是 source DStream 的 batch interval(批間隔)的倍數(圖 1).

讓咱們舉例以說明窗口操做. 例如,你想擴展前面的例子用來計算過去 30 秒的詞頻,間隔時間是 10 秒. 爲了達到這個目的,咱們必須在過去 30 秒的(wrod, 1)pairs 的pairsDStream 上應用reduceByKey操做. 用方法reduceByKeyAndWindow實現.

Scala

Java

Python

// Reduce last 30 seconds of data, every 10 secondsvalwindowedWordCounts=pairs.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(10))

一些經常使用的窗口操做以下所示,這些操做都須要用到上文提到的兩個參數 -windowLength(窗口長度)和slideInterval(滑動的時間間隔).

Transformation(轉換)Meaning(含義)

window(windowLength,slideInterval)返回一個新的 DStream, 它是基於 source DStream 的窗口 batch 進行計算的.

countByWindow(windowLength,slideInterval)返回 stream(流)中滑動窗口元素的數

reduceByWindow(func,windowLength,slideInterval)返回一個新的單元素 stream(流),它經過在一個滑動間隔的 stream 中使用func來聚合以建立. 該函數應該是 associative(關聯的)且 commutative(可交換的),以便它能夠並行計算

reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks])在一個 (K, V) pairs 的 DStream 上調用時, 返回一個新的 (K, V) pairs 的 Stream, 其中的每一個 key 的 values 是在滑動窗口上的 batch 使用給定的函數func來聚合產生的.Note(注意):默認狀況下, 該操做使用 Spark 的默認並行任務數量(local model 是 2, 在 cluster mode 中的數量經過spark.default.parallelism來肯定)來作 grouping. 您能夠經過一個可選的numTasks參數來設置一個不一樣的 tasks(任務)數量.

reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval, [numTasks])上述reduceByKeyAndWindow()的更有效的一個版本,其中使用前一窗口的 reduce 值逐漸計算每一個窗口的 reduce值. 這是經過減小進入滑動窗口的新數據,以及 「inverse reducing(逆減)」 離開窗口的舊數據來完成的. 一個例子是當窗口滑動時」添加」 和 「減」 keys 的數量. 然而,它僅適用於 「invertible reduce functions(可逆減小函數)」,即具備相應 「inverse reduce(反向減小)」 函數的 reduce 函數(做爲參數invFunc ). 像在reduceByKeyAndWindow中的那樣, reduce 任務的數量能夠經過可選參數進行配置. 請注意, 針對該操做的使用必須啓用checkpointing.

countByValueAndWindow(windowLength,slideInterval, [numTasks])在一個 (K, V) pairs 的 DStream 上調用時, 返回一個新的 (K, Long) pairs 的 DStream, 其中每一個 key 的 value 是它在一個滑動窗口以內的頻次. 像 code>reduceByKeyAndWindow 中的那樣, reduce 任務的數量能夠經過可選參數進行配置.

Join 操做

最後,它值得強調的是,您能夠輕鬆地在 Spark Streaming 中執行不一樣類型的 join.

Stream-stream joins

Streams(流)能夠很是容易地與其餘流進行 join.

Scala

Java

Python

valstream1:DStream[String,String]=...valstream2:DStream[String,String]=...valjoinedStream=stream1.join(stream2)

這裏,在每一個 batch interval(批間隔)中,由stream1生成的 RDD 將與stream2生成的 RDD 進行 jion. 你也能夠作leftOuterJoin,rightOuterJoin,fullOuterJoin. 此外,在 stream(流)的窗口上進行 join 一般是很是有用的. 這也很容易作到.

Scala

Java

Python

valwindowedStream1=stream1.window(Seconds(20))valwindowedStream2=stream2.window(Minutes(1))valjoinedStream=windowedStream1.join(windowedStream2)

Stream-dataset joins

這在解釋DStream.transform操做時已經在前面演示過了. 這是另外一個 join window stream(窗口流)與 dataset 的例子.

Scala

Java

Python

valdataset:RDD[String,String]=...valwindowedStream=stream.window(Seconds(20))...valjoinedStream=windowedStream.transform{rdd=>rdd.join(dataset)}

實際上,您也能夠動態更改要加入的 dataset. 提供給transform的函數是每一個 batch interval(批次間隔)進行評估,所以將使用dataset引用指向當前的 dataset.

DStream 轉換的完整列表可在 API 文檔中找到. 針對 Scala API,請看DStreamPairDStreamFunctions. 針對 Java API,請看JavaDStreamJavaPairDStream. 針對 Python API,請看DStream.

DStreams 上的輸出操做

輸出操做容許將 DStream 的數據推送到外部系統, 如數據庫或文件系統. 因爲輸出操做實際上容許外部系統使用變換後的數據, 因此它們觸發全部 DStream 變換的實際執行(相似於RDD的動做). 目前, 定義瞭如下輸出操做:

Output OperationMeaning

print()在運行流應用程序的 driver 節點上的DStream中打印每批數據的前十個元素. 這對於開發和調試頗有用.

Python API這在 Python API 中稱爲pprint().

saveAsTextFiles(prefix, [suffix])將此 DStream 的內容另存爲文本文件. 每一個批處理間隔的文件名是根據前綴和後綴:"prefix-TIME_IN_MS[.suffix]"生成的.

saveAsObjectFiles(prefix, [suffix])將此 DStream 的內容另存爲序列化 Java 對象的SequenceFiles. 每一個批處理間隔的文件名是根據前綴和後綴:"prefix-TIME_IN_MS[.suffix]"生成的.

Python API這在Python API中是不可用的.

saveAsHadoopFiles(prefix, [suffix])將此 DStream 的內容另存爲 Hadoop 文件. 每一個批處理間隔的文件名是根據前綴和後綴:"prefix-TIME_IN_MS[.suffix]"生成的.

Python API這在Python API中是不可用的.

foreachRDD(func)對從流中生成的每一個 RDD 應用函數func的最通用的輸出運算符. 此功能應將每一個 RDD 中的數據推送到外部系統, 例如將 RDD 保存到文件, 或將其經過網絡寫入數據庫. 請注意, 函數func在運行流應用程序的 driver 進程中執行, 一般會在其中具備 RDD 動做, 這將強制流式傳輸 RDD 的計算.

foreachRDD 設計模式的使用

dstream.foreachRDD是一個強大的原語, 容許將數據發送到外部系統.可是, 瞭解如何正確有效地使用這個原語很重要. 避免一些常見的錯誤以下.

一般向外部系統寫入數據須要建立鏈接對象(例如與遠程服務器的 TCP 鏈接), 並使用它將數據發送到遠程系統.爲此, 開發人員可能會無心中嘗試在Spark driver 中建立鏈接對象, 而後嘗試在Spark工做人員中使用它來在RDD中保存記錄.例如(在 Scala 中):

Scala

Java

Python

dstream.foreachRDD{rdd=>valconnection=createNewConnection()// executed at the driverrdd.foreach{record=>connection.send(record)// executed at the worker}}

這是不正確的, 由於這須要將鏈接對象序列化並從 driver 發送到 worker. 這種鏈接對象不多能跨機器轉移. 此錯誤可能會顯示爲序列化錯誤(鏈接對象不可序列化), 初始化錯誤(鏈接對象須要在 worker 初始化)等. 正確的解決方案是在 worker 建立鏈接對象.

可是, 這可能會致使另外一個常見的錯誤 - 爲每一個記錄建立一個新的鏈接. 例如:

Scala

Java

Python

dstream.foreachRDD{rdd=>rdd.foreach{record=>valconnection=createNewConnection()connection.send(record)connection.close()}}

一般, 建立鏈接對象具備時間和資源開銷. 所以, 建立和銷燬每一個記錄的鏈接對象可能會引發沒必要要的高開銷, 並可顯着下降系統的整體吞吐量. 一個更好的解決方案是使用rdd.foreachPartition- 建立一個鏈接對象, 並使用該鏈接在 RDD 分區中發送全部記錄.

Scala

Java

Python

dstream.foreachRDD{rdd=>rdd.foreachPartition{partitionOfRecords=>valconnection=createNewConnection()partitionOfRecords.foreach(record=>connection.send(record))connection.close()}}

這樣能夠在多個記錄上分攤鏈接建立開銷.

最後, 能夠經過跨多個RDD /批次重用鏈接對象來進一步優化. 能夠維護鏈接對象的靜態池, 而不是將多個批次的 RDD 推送到外部系統時從新使用, 從而進一步減小開銷.

Scala

Java

Python

dstream.foreachRDD{rdd=>rdd.foreachPartition{partitionOfRecords=>// ConnectionPool is a static, lazily initialized pool of connectionsvalconnection=ConnectionPool.getConnection()partitionOfRecords.foreach(record=>connection.send(record))ConnectionPool.returnConnection(connection)// return to the pool for future reuse}}

請注意, 池中的鏈接應根據須要懶惰建立, 若是不使用一段時間, 則會超時. 這實現了最有效地將數據發送到外部系統.

其餘要記住的要點:

DStreams 經過輸出操做進行延遲執行, 就像 RDD 由 RDD 操做懶惰地執行. 具體來講, DStream 輸出操做中的 RDD 動做強制處理接收到的數據.所以, 若是您的應用程序沒有任何輸出操做, 或者具備dstream.foreachRDD()等輸出操做, 而在其中沒有任何 RDD 操做, 則不會執行任何操做.系統將簡單地接收數據並將其丟棄.

默認狀況下, 輸出操做是 one-at-a-time 執行的. 它們按照它們在應用程序中定義的順序執行.

DataFrame 和 SQL 操做

您能夠輕鬆地在流數據上使用DataFrames and SQL和 SQL 操做. 您必須使用 StreamingContext 正在使用的 SparkContext 建立一個 SparkSession.此外, 必須這樣作, 以即可以在 driver 故障時從新啓動. 這是經過建立一個簡單實例化的 SparkSession 單例實例來實現的.這在下面的示例中顯示.它使用 DataFrames 和 SQL 來修改早期的字數示例以生成單詞計數.將每一個 RDD 轉換爲 DataFrame, 註冊爲臨時表, 而後使用 SQL 進行查詢.

Scala

Java

Python

/** DataFrame operations inside your streaming program */valwords:DStream[String]=...words.foreachRDD{rdd=>// Get the singleton instance of SparkSessionvalspark=SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()importspark.implicits._// Convert RDD[String] to DataFramevalwordsDataFrame=rdd.toDF("word")// Create a temporary viewwordsDataFrame.createOrReplaceTempView("words")// Do word count on DataFrame using SQL and print itvalwordCountsDataFrame=spark.sql("select word, count(*) as total from words group by word")wordCountsDataFrame.show()}

請參閱完整的源代碼.

您還能夠對來自不一樣線程的流數據(即異步運行的 StreamingContext )上定義的表運行 SQL 查詢. 只需確保您將 StreamingContext 設置爲記住足夠數量的流數據, 以便查詢能夠運行. 不然, 不知道任何異步 SQL 查詢的 StreamingContext 將在查詢完成以前刪除舊的流數據. 例如, 若是要查詢最後一個批次, 可是您的查詢可能須要5分鐘才能運行, 則能夠調用streamingContext.remember(Minutes(5))(以 Scala 或其餘語言的等價物).

有關DataFrames的更多信息, 請參閱DataFrames 和 SQL 指南.

MLlib 操做

您還能夠輕鬆使用MLlib提供的機器學習算法. 首先, 有 streaming 機器學習算法(例如:Streaming 線性迴歸,Streaming KMeans等), 其能夠同時從 streaming 數據中學習, 並將該模型應用於 streaming 數據. 除此以外, 對於更大類的機器學習算法, 您能夠離線學習一個學習模型(即便用歷史數據), 而後將該模型在線應用於流數據.有關詳細信息, 請參閱MLlib指南.

緩存 / 持久性

與 RDD 相似, DStreams 還容許開發人員將流的數據保留在內存中. 也就是說, 在 DStream 上使用persist()方法會自動將該 DStream 的每一個 RDD 保留在內存中. 若是 DStream 中的數據將被屢次計算(例如, 相同數據上的多個操做), 這將很是有用. 對於基於窗口的操做, 如reduceByWindow和reduceByKeyAndWindow以及基於狀態的操做, 如updateStateByKey, 這是隱含的.所以, 基於窗口的操做生成的 DStream 會自動保存在內存中, 而不須要開發人員調用persist().

對於經過網絡接收數據(例如: Kafka, Flume, sockets 等)的輸入流, 默認持久性級別被設置爲將數據複製到兩個節點進行容錯.

請注意, 與 RDD 不一樣, DStreams 的默認持久性級別將數據序列化在內存中. 這在性能調優部分進一步討論. 有關不一樣持久性級別的更多信息, 請參見Spark編程指南.

Checkpointing

streaming 應用程序必須 24/7 運行, 所以必須對應用邏輯無關的故障(例如, 系統故障, JVM 崩潰等)具備彈性. 爲了能夠這樣作, Spark Streaming 須要checkpoint足夠的信息到容錯存儲系統, 以即可以從故障中恢復.checkpoint有兩種類型的數據.

Metadata checkpointing- 將定義 streaming 計算的信息保存到容錯存儲(如 HDFS)中.這用於從運行 streaming 應用程序的 driver 的節點的故障中恢復(稍後詳細討論). 元數據包括:

Configuration- 用於建立流應用程序的配置.

DStream operations- 定義 streaming 應用程序的 DStream 操做集.

Incomplete batches- 批量的job 排隊但還沒有完成.

Data checkpointing- 將生成的 RDD 保存到可靠的存儲.這在一些將多個批次之間的數據進行組合的狀態變換中是必需的.在這種轉換中, 生成的 RDD 依賴於先前批次的 RDD, 這致使依賴鏈的長度隨時間而增長.爲了不恢復時間的這種無限增長(與依賴關係鏈成比例), 有狀態轉換的中間 RDD 會按期checkpoint到可靠的存儲(例如 HDFS)以切斷依賴關係鏈.

總而言之, 元數據 checkpoint 主要用於從 driver 故障中恢復, 而數據或 RDD checkpoint 對於基本功能(若是使用有狀態轉換)則是必需的.

什麼時候啓用 checkpoint

對於具備如下任一要求的應用程序, 必須啓用 checkpoint:

使用狀態轉換- 若是在應用程序中使用updateStateByKey或reduceByKeyAndWindow(具備反向功能), 則必須提供 checkpoint 目錄以容許按期的 RDD checkpoint.

從運行應用程序的 driver 的故障中恢復- 元數據 checkpoint 用於使用進度信息進行恢復.

請注意, 無需進行上述有狀態轉換的簡單 streaming 應用程序便可運行, 無需啓用 checkpoint. 在這種狀況下, 驅動器故障的恢復也將是部分的(一些接收但未處理的數據可能會丟失). 這一般是能夠接受的, 許多運行 Spark Streaming 應用程序. 將來對非 Hadoop 環境的支持預計會有所改善.

如何配置 checkpoint

能夠經過在保存 checkpoint 信息的容錯, 可靠的文件系統(例如, HDFS, S3等)中設置目錄來啓用 checkpoint. 這是經過使用streamingContext.checkpoint(checkpointDirectory)完成的. 這將容許您使用上述有狀態轉換. 另外, 若是要使應用程序從 driver 故障中恢復, 您應該重寫 streaming 應用程序以具備如下行爲.

當程序第一次啓動時, 它將建立一個新的 StreamingContext, 設置全部流, 而後調用 start().

當程序在失敗後從新啓動時, 它將從 checkpoint 目錄中的 checkpoint 數據從新建立一個 StreamingContext.

Scala

Java

Python

使用StreamingContext.getOrCreate能夠簡化此行爲. 這樣使用以下.

// Function to create and setup a new StreamingContextdeffunctionToCreateContext():StreamingContext={valssc=newStreamingContext(...)// new contextvallines=ssc.socketTextStream(...)// create DStreams...ssc.checkpoint(checkpointDirectory)// set checkpoint directoryssc}// Get StreamingContext from checkpoint data or create a new onevalcontext=StreamingContext.getOrCreate(checkpointDirectory,functionToCreateContext_)// Do additional setup on context that needs to be done,// irrespective of whether it is being started or restartedcontext....// Start the contextcontext.start()context.awaitTermination()

If thecheckpointDirectoryexists, then the context will be recreated from the checkpoint data. If the directory does not exist (i.e., running for the first time), then the functionfunctionToCreateContextwill be called to create a new context and set up the DStreams. See the Scala exampleRecoverableNetworkWordCount. This example appends the word counts of network data into a file.

除了使用getOrCreate以外, 還須要確保在失敗時自動從新啓動 driver 進程. 這隻能由用於運行應用程序的部署基礎架構完成. 這在部署部分進一步討論.

請注意, RDD 的 checkpoint 會致使保存到可靠存儲的成本. 這可能會致使 RDD 獲得 checkpoint 的批次的處理時間增長. 所以, 須要仔細設置 checkpoint 的間隔. 在小批量大小(例如: 1秒), 檢查每一個批次可能會顯着下降操做吞吐量. 相反, checkpoint 太少會致使譜系和任務大小增加, 這可能會產生不利影響. 對於須要 RDD checkpoint 的狀態轉換, 默認間隔是至少10秒的批間隔的倍數. 它能夠經過使用dstream.checkpoint(checkpointInterval)進行設置. 一般, DStream 的5到10個滑動間隔的 checkpoint 間隔是一個很好的設置.

Accumulators, Broadcast 變量, 和 Checkpoint

在Spark Streaming中, 沒法從 checkpoint 恢復AccumulatorsBroadcast 變量. 若是啓用 checkpoint 並使用AccumulatorsBroadcast 變量, 則必須爲AccumulatorsBroadcast 變量建立延遲實例化的單例實例, 以便在 driver 從新啓動失敗後從新實例化. 這在下面的示例中顯示:

Scala

Java

Python

objectWordBlacklist{@volatileprivatevarinstance:Broadcast[Seq[String]]=nulldefgetInstance(sc:SparkContext):Broadcast[Seq[String]]={if(instance==null){synchronized{if(instance==null){valwordBlacklist=Seq("a","b","c")instance=sc.broadcast(wordBlacklist)}}}instance}}objectDroppedWordsCounter{@volatileprivatevarinstance:LongAccumulator=nulldefgetInstance(sc:SparkContext):LongAccumulator={if(instance==null){synchronized{if(instance==null){instance=sc.longAccumulator("WordsInBlacklistCounter")}}}instance}}wordCounts.foreachRDD{(rdd:RDD[(String,Int)],time:Time)=>// Get or register the blacklist Broadcastvalblacklist=WordBlacklist.getInstance(rdd.sparkContext)// Get or register the droppedWordsCounter AccumulatorvaldroppedWordsCounter=DroppedWordsCounter.getInstance(rdd.sparkContext)// Use blacklist to drop words and use droppedWordsCounter to count themvalcounts=rdd.filter{case(word,count)=>if(blacklist.value.contains(word)){droppedWordsCounter.add(count)false}else{true}}.collect().mkString("[",", ","]")valoutput="Counts at time "+time+" "+counts})

請參閱完整的源代碼.

應用程序部署

本節討論部署 Spark Streaming 應用程序的步驟.

要求

要運行 Spark Streaming 應用程序, 您須要具有如下功能.

集羣管理器集羣- 這是任何 Spark 應用程序的通常要求, 並在部署指南中詳細討論.

打包應用程序 JAR- 您必須將 streaming 應用程序編譯爲 JAR. 若是您正在使用spark-submit啓動應用程序, 則不須要在 JAR 中提供 Spark 和 Spark Streaming.可是, 若是您的應用程序使用高級資源(例如: Kafka, Flume), 那麼您將必須將他們連接的額外工件及其依賴項打包在用於部署應用程序的 JAR 中.例如, 使用KafkaUtils的應用程序必須在應用程序 JAR 中包含spark-streaming-kafka-0-8_2.11及其全部傳遞依賴關係.

爲 executor 配置足夠的內存- 因爲接收到的數據必須存儲在內存中, 因此 executor 必須配置足夠的內存來保存接收到的數據. 請注意, 若是您正在進行10分鐘的窗口操做, 系統必須至少保留最近10分鐘的內存中的數據. 所以, 應用程序的內存要求取決於其中使用的操做.

配置 checkpoint- 若是 streaming 應用程序須要它, 則 Hadoop API 兼容容錯存儲(例如:HDFS, S3等)中的目錄必須配置爲 checkpoint 目錄, 而且流程應用程序以 checkpoint 信息的方式編寫 用於故障恢復. 有關詳細信息, 請參閱checkpoint部分.

配置應用程序 driver 的自動從新啓動- 要從 driver 故障自動恢復, 用於運行流應用程序的部署基礎架構必須監視 driver 進程, 並在 driver 發生故障時從新啓動 driver.不一樣的集羣管理者有不一樣的工具來實現這一點.

Spark Standalone- 能夠提交 Spark 應用程序 driver 以在Spark Standalone集羣中運行(請參閱集羣部署模式), 即應用程序 driver 自己在其中一個工做節點上運行. 此外, 能夠指示獨立的羣集管理器來監督 driver, 若是因爲非零退出代碼而致使 driver 發生故障, 或因爲運行 driver 的節點發生故障, 則能夠從新啓動它. 有關詳細信息, 請參閱 [Spark Standalone 指南]](spark-standalone.html) 中的羣集模式和監督.

YARN- Yarn 支持相似的機制來自動從新啓動應用程序.有關詳細信息, 請參閱 YARN文檔.

Mesos-Marathon已被用來實現這一點與Mesos.

配置預寫日誌- 自 Spark 1.2 以來, 咱們引入了寫入日誌來實現強大的容錯保證.若是啓用, 則從 receiver 接收的全部數據都將寫入配置 checkpoint 目錄中的寫入日誌.這能夠防止 driver 恢復時的數據丟失, 從而確保零數據丟失(在容錯語義部分中詳細討論).能夠經過將配置參數spark.streaming.receiver.writeAheadLog.enable設置爲true來啓用此功能.然而, 這些更強的語義可能以單個 receiver 的接收吞吐量爲代價.經過並行運行更多的 receiver能夠糾正這一點, 以增長總吞吐量.另外, 建議在啓用寫入日誌時, 在日誌已經存儲在複製的存儲系統中時, 禁用在 Spark 中接收到的數據的複製.這能夠經過將輸入流的存儲級別設置爲StorageLevel.MEMORY_AND_DISK_SER來完成.使用 S3(或任何不支持刷新的文件系統)寫入日誌時, 請記住啓用spark.streaming.driver.writeAheadLog.closeFileAfterWrite和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite.有關詳細信息, 請參閱Spark Streaming配.請注意, 啓用 I/O 加密時, Spark 不會將寫入寫入日誌的數據加密.若是須要對提早記錄數據進行加密, 則應將其存儲在本地支持加密的文件系統中.

設置最大接收速率- 若是集羣資源不夠大, streaming 應用程序可以像接收到的那樣快速處理數據, 則能夠經過設置 記錄/秒 的最大速率限制來對 receiver 進行速率限制. 請參閱 receiver 的spark.streaming.receiver.maxRate和用於 Direct Kafka 方法的spark.streaming.kafka.maxRatePerPartition的配置參數. 在Spark 1.5中, 咱們引入了一個稱爲背壓的功能, 無需設置此速率限制, 由於Spark Streaming會自動計算速率限制, 並在處理條件發生變化時動態調整速率限制. 能夠經過將配置參數spark.streaming.backpressure.enabled設置爲true來啓用此 backpressure.

升級應用程序代碼

若是運行的 Spark Streaming 應用程序須要使用新的應用程序代碼進行升級, 則有兩種可能的機制.

升級後的 Spark Streaming 應用程序與現有應用程序並行啓動並運行.一旦新的(接收與舊的數據相同的數據)已經升溫並準備好黃金時段, 舊的能夠被關掉.請注意, 這能夠用於支持將數據發送到兩個目的地(即較早和已升級的應用程序)的數據源.

現有應用程序正常關閉(請參閱StreamingContext.stop(...)JavaStreamingContext.stop(...)以獲取正常的關閉選項), 以確保已關閉的數據在關閉以前被徹底處理.而後能夠啓動升級的應用程序, 這將從較早的應用程序中止的同一點開始處理.請注意, 只有在支持源端緩衝的輸入源(如: Kafka 和 Flume)時才能夠進行此操做, 由於數據須要在先前的應用程序關閉而且升級的應用程序還沒有啓動時進行緩衝.從升級前代碼的早期 checkpoint 信息從新啓動不能完成.checkpoint 信息基本上包含序列化的 Scala/Java/Python 對象, 並嘗試使用新的修改的類反序列化對象可能會致使錯誤.在這種狀況下, 可使用不一樣的 checkpoint 目錄啓動升級的應用程序, 也能夠刪除之前的 checkpoint 目錄.

Monitoring Applications (監控應用程序)

除了 Spark 的monitoring capabilities(監控功能), 還有其餘功能特定於 Spark Streaming .當使用 StreamingContext 時,Spark web UI顯示一個額外的Streaming選項卡, 顯示 running receivers (運行接收器)的統計信息(不管是 receivers (接收器)是否處於 active (活動狀態), 接收到的 records (記錄)數, receiver error (接收器錯誤)等)並完成 batches (批次)(batch processing times (批處理時間), queueing delays (排隊延遲)等).這能夠用來監視 streaming application (流應用程序)的進度.

web UI 中的如下兩個 metrics (指標)特別重要:

Processing Time (處理時間)- 處理每 batch (批)數據的時間 .

Scheduling Delay (調度延遲)- batch (批處理)在 queue (隊列)中等待處理 previous batches (之前批次)完成的時間.

若是 batch processing time (批處理時間)始終 more than (超過) batch interval (批間隔) and/or queueing delay (排隊延遲)不斷增長, 表示系統是沒法快速 process the batches (處理批次), 而且正在 falling behind (落後). 在這種狀況下, 請考慮reducing (減小)batch processing time (批處理時間).

Spark Streaming 程序的進展也可使用StreamingListener接口, 這容許您得到 receiver status (接收器狀態)和 processing times (處理時間).請注意, 這是一個開發人員 API 而且未來可能會改善(即, 更多的信息報告).

Performance Tuning (性能調優)

在集羣上的 Spark Streaming application 中得到最佳性能須要一些調整.本節介紹了可調整的多個 parameters (參數)和 configurations (配置)提升你的應用程序性能.在高層次上, 你須要考慮兩件事情:

經過有效利用集羣資源, Reducing the processing time of each batch of data (減小每批數據的處理時間).

設置正確的 batch size (批量大小), 以便 batches of data (批量的數據)能夠像 received (被接收)處理同樣快(即 data processing (數據處理)與 data ingestion (數據攝取)保持一致).

Reducing the Batch Processing Times (減小批處理時間)

在 Spark 中能夠進行一些優化, 以 minimize the processing time of each batch (最小化每批處理時間).這些已在Tuning Guide (調優指南)中詳細討論過.本節突出了一些最重要的.

Level of Parallelism in Data Receiving (數據接收中的並行級別)

經過網絡接收數據(如Kafka, Flume, socket 等)須要 deserialized (反序列化)數據並存儲在 Spark 中.若是數據接收成爲系統的瓶頸, 那麼考慮一下 parallelizing the data receiving (並行化數據接收).注意每一個 input DStream 建立接收 single stream of data (單個數據流)的 single receiver (單個接收器)(在 work machine 上運行). 所以, 能夠經過建立多個 input DStreams 來實現 Receiving multiple data streams (接收多個數據流)並配置它們以從 source(s) 接收 data stream (數據流)的 different partitions (不一樣分區).例如, 接收 two topics of data (兩個數據主題)的單個Kafka input DStream 能夠分爲兩個 Kafka input streams (輸入流), 每一個只接收一個 topic (主題).這將運行兩個 receivers (接收器), 容許 in parallel (並行)接收數據, 從而提升 overall throughput (整體吞吐量).這些 multiple DStreams 能夠 unioned (聯合起來)建立一個 single DStream .而後 transformations (轉化)爲應用於 single input DStream 能夠應用於 unified stream .以下這樣作.

Scala

Java

Python

valnumStreams=5valkafkaStreams=(1tonumStreams).map{i=>KafkaUtils.createStream(...)}valunifiedStream=streamingContext.union(kafkaStreams)unifiedStream.print()

應考慮的另外一個參數是 receiver’s block interval (接收器的塊間隔), 這由configuration parameter (配置參數)的spark.streaming.blockInterval決定.對於大多數 receivers (接收器), 接收到的數據 coalesced (合併)在一塊兒存儲在 Spark 內存以前的 blocks of data (數據塊).每一個 batch (批次)中的 blocks (塊)數肯定將用於處理接收到的數據以 map-like (相似與 map 形式的) transformation (轉換)的 task (任務)的數量.每一個 receiver (接收器)每 batch (批次)的任務數量將是大約( batch interval (批間隔)/ block interval (塊間隔)).例如, 200 ms的 block interval (塊間隔)每 2 秒 batches (批次)建立 10 個 tasks (任務).若是 tasks (任務)數量太少(即少於每一個機器的內核數量), 那麼它將無效, 由於全部可用的內核都不會被使用處理數據.要增長 given batch interval (給定批間隔)的 tasks (任務)數量, 請減小 block interval (塊間​​隔).可是, 推薦的 block interval (塊間隔)最小值約爲 50ms , 低於此任務啓動開銷多是一個問題.

使用 multiple input streams (多個輸入流)/ receivers (接收器)接收數據的替代方法是明確 repartition (從新分配) input data stream (輸入數據流)(使用inputStream.repartition()). 這會在 further processing (進一步處理)以前將 received batches of data (收到的批次數據) distributes (分發)到集羣中指定數量的計算機.

Level of Parallelism in Data Processing (數據處理中的並行度水平)

若是在任何 computation (計算)階段中使用 number of parallel tasks (並行任務的數量), 則 Cluster resources (集羣資源)可能未獲得充分利用. 例如, 對於 distributed reduce (分佈式 reduce)操做, 如reduceByKey和reduceByKeyAndWindow, 默認並行任務的數量由spark.default.parallelismconfiguration property控制. 您 能夠經過 parallelism (並行度)做爲參數(見PairDStreamFunctions文檔 ), 或設置spark.default.parallelismconfiguration property更改默認值.

Data Serialization (數據序列化)

能夠經過調優 serialization formats (序列化格式)來減小數據 serialization (序列化)的開銷.在 streaming 的狀況下, 有兩種類型的數據被 serialized (序列化).

Input data (輸入數據): 默認狀況下, 經過 Receivers 接收的 input data (輸入數據)經過StorageLevel.MEMORY_AND_DISK_SER_2存儲在 executors 的內存中.也就是說, 將數據 serialized (序列化)爲 bytes (字節)以減小 GC 開銷, 並複製以容忍 executor failures (執行器故障).此外, 數據首先保留在內存中, 而且只有在內存不足以容納 streaming computation (流計算)所需的全部輸入數據時纔會 spilled over (溢出)到磁盤.這個 serialization (序列化)顯然具備開銷 - receiver (接收器)必須使接收的數據 deserialize (反序列化), 並使用 Spark 的 serialization format (序列化格式)從新序列化它.

Persisted RDDs generated by Streaming Operations (流式操做生成的持久 RDDs): 經過 streaming computations (流式計算)生成的 RDD 可能會持久存儲在內存中.例如, window operations (窗口操做)會將數據保留在內存中, 由於它們將被處理屢次.可是, 與StorageLevel.MEMORY_ONLY的 Spark Core 默認狀況不一樣, 經過流式計算生成的持久化 RDD 將以StorageLevel.MEMORY_ONLY_SER(即序列化), 以最小化 GC 開銷.

在這兩種狀況下, 使用 Kryo serialization (Kryo 序列化)能夠減小 CPU 和內存開銷.有關詳細信息, 請參閱Spark Tuning Guide.對於 Kryo , 請考慮 registering custom classes , 並禁用對象引用跟蹤(請參閱Configuration Guide中的 Kryo 相關配置).

在 streaming application 須要保留的數據量不大的特定狀況下, 能夠將數據(兩種類型)做爲 deserialized objects (反序列化對象)持久化, 而不會致使過多的 GC 開銷.例如, 若是您使用幾秒鐘的 batch intervals (批次間隔)而且沒有 window operations (窗口操做), 那麼能夠經過明確地相應地設置 storage level (存儲級別)來嘗試禁用 serialization in persisted data (持久化數據中的序列化).這將減小因爲序列化形成的 CPU 開銷, 潛在地提升性能, 而不須要太多的 GC 開銷.

Task Launching Overheads (任務啓動開銷)

若是每秒啓動的任務數量很高(好比每秒 50 個或更多), 那麼這個開銷向 slaves 發送任務多是重要的, 而且將難以實現 sub-second latencies (次要的延遲).能夠經過如下更改減小開銷:

Execution mode (執行模式): 以 Standalone mode (獨立模式)或 coarse-grained Mesos 模式運行 Spark 比 fine-grained Mesos 模式更好的任務啓動時間.有關詳細信息, 請參閱Running on Mesos guide.

這些更改可能會將 batch processing time (批處理時間)縮短 100 毫秒, 從而容許 sub-second batch size (次秒批次大小)是可行的.

Setting the Right Batch Interval (設置正確的批次間隔)

對於在集羣上穩定地運行的 Spark Streaming application, 該系統應該可以處理數據儘量快地被接收.換句話說, 應該處理批次的數據就像生成它們同樣快.這是否適用於 application 能夠在monitoringstreaming web UI 中的 processing times 中被找到, processing time (批處理處理時間)應小於 batch interval (批間隔).

取決於 streaming computation (流式計算)的性質, 使用的 batch interval (批次間隔)可能對處理由應用程序持續一組固定的 cluster resources (集羣資源)的數據速率有重大的影響.例如, 讓咱們考慮早期的 WordCountNetwork 示例.對於特定的 data rate (數據速率), 系統可能可以跟蹤每 2 秒報告 word counts (即 2 秒的 batch interval (批次間隔)), 但不能每 500 毫秒.所以, 須要設置 batch interval (批次間隔), 使預期的數據速率在生產能夠持續.

爲您的應用程序找出正確的 batch size (批量大小)的一個好方法是使用進行測試 conservative batch interval (保守的批次間隔)(例如 5-10 秒)和 low data rate (低數據速率).驗證是否系統可以跟上 data rate (數據速率), 能夠檢查遇到的 end-to-end delay (端到端延遲)的值經過每一個 processed batch (處理的批次)(在 Spark driver log4j 日誌中查找 「Total delay」 , 或使用StreamingListener接口). 若是 delay (延遲)保持與 batch size (批量大小)至關, 那麼系統是穩定的.除此之外, 若是延遲不斷增長, 則意味着系統沒法跟上, 所以不穩定.一旦你有一個 stable configuration (穩定的配置)的想法, 你能夠嘗試增長 data rate and/or 減小 batch size .請注意, momentary increase (瞬時增長)因爲延遲暫時增長只要延遲下降到 low value (低值), 臨時數據速率增長就能夠很好(即, 小於 batch size (批量大小)).

Memory Tuning (內存調優)

調整 Spark 應用程序的內存使用狀況和 GC behavior 已經有不少的討論在Tuning Guide中.咱們強烈建議您閱讀一下.在本節中, 咱們將在 Spark Streaming applications 的上下文中討論一些 tuning parameters (調優參數).

Spark Streaming application 所需的集羣內存量在很大程度上取決於所使用的 transformations 類型.例如, 若是要在最近 10 分鐘的數據中使用 window operation (窗口操做), 那麼您的集羣應該有足夠的內存來容納內存中 10 分鐘的數據.或者若是要使用大量 keys 的updateStateByKey, 那麼必要的內存將會很高.相反, 若是你想作一個簡單的 map-filter-store 操做, 那麼所需的內存就會很低.

通常來講, 因爲經過 receivers (接收器)接收的數據與 StorageLevel.MEMORY_AND_DISK_SER_2 一塊兒存儲, 因此不適合內存的數據將會 spill over (溢出)到磁盤上.這可能會下降 streaming application (流式應用程序)的性能, 所以建議您提供足夠的 streaming application (流量應用程序)所需的內存.最好仔細查看內存使用量並相應地進行估算.

memory tuning (內存調優)的另外一個方面是 garbage collection (垃圾收集).對於須要低延遲的 streaming application , 由 JVM Garbage Collection 引發的大量暫停是不但願的.

有幾個 parameters (參數)能夠幫助您調整 memory usage (內存使用量)和 GC 開銷:

Persistence Level of DStreams (DStreams 的持久性級別): 如前面在Data Serialization部分中所述, input data 和 RDD 默認保持爲 serialized bytes (序列化字節).與 deserialized persistence (反序列化持久性)相比, 這減小了內存使用量和 GC 開銷.啓用 Kryo serialization 進一步減小了 serialized sizes (序列化大小)和 memory usage (內存使用).能夠經過 compression (壓縮)來實現內存使用的進一步減小(參見Spark配置spark.rdd.compress), 代價是 CPU 時間.

Clearing old data (清除舊數據): 默認狀況下, DStream 轉換生成的全部 input data 和 persisted RDDs 將自動清除. Spark Streaming 決定什麼時候根據所使用的 transformations (轉換)來清除數據.例如, 若是您使用 10 分鐘的 window operation (窗口操做), 則 Spark Streaming 將保留最近 10 分鐘的數據, 並主動丟棄舊數據. 數據能夠經過設置streamingContext.remember保持更長的持續時間(例如交互式查詢舊數據).

CMS Garbage Collector (CMS垃圾收集器): 強烈建議使用 concurrent mark-and-sweep GC , 以保持 GC 相關的暫停始終如一.即便 concurrent GC 已知能夠減小 系統的總體處理吞吐量, 其使用仍然建議實現更多一致的 batch processing times (批處理時間).確保在 driver (使用--driver-java-options在spark-submit中 )和 executors (使用Spark configurationspark.executor.extraJavaOptions)中設置 CMS GC.

Other tips (其餘提示): 爲了進一步下降 GC 開銷, 如下是一些更多的提示.

使用OFF_HEAP存儲級別的保持 RDDs .在Spark Programming Guide中查看更多詳細信息.

使用更小的 heap sizes 的 executors.這將下降每一個 JVM heap 內的 GC 壓力.

Important points to remember(要記住的要點):

DStream 與 single receiver (單個接收器)相關聯.爲了得到讀取並行性, 須要建立多個 receivers , 即 multiple DStreams .receiver 在一個 executor 中運行.它佔據一個 core (內核).確保在 receiver slots are booked 後有足夠的內核進行處理, 即spark.cores.max應該考慮 receiver slots . receivers 以循環方式分配給 executors .

當從 stream source 接收到數據時, receiver 建立數據 blocks (塊).每一個 blockInterval 毫秒生成一個新的數據塊.在 N = batchInterval/blockInterval 的 batchInterval 期間建立 N 個數據塊.這些塊由當前 executor 的 BlockManager 分發給其餘執行程序的 block managers .以後, 在驅動程序上運行的 Network Input Tracker (網絡輸入跟蹤器)通知有關進一步處理的塊位置

在驅動程序中爲在 batchInterval 期間建立的塊建立一個 RDD .在 batchInterval 期間生成的塊是 RDD 的 partitions .每一個分區都是一個 spark 中的 task. blockInterval == batchinterval 意味着建立 single partition (單個分區), 而且可能在本地進行處理.

除非 non-local scheduling (非本地調度)進行, 不然塊上的 map tasks (映射任務)將在 executors (接收 block, 複製塊的另外一個塊)中進行處理.具備更大的 block interval (塊間隔)意味着更大的塊.spark.locality.wait的高值增長了處理 local node (本地節點)上的塊的機會.須要在這兩個參數之間找到平衡, 以確保在本地處理較大的塊.

而不是依賴於 batchInterval 和 blockInterval , 您能夠經過調用inputDstream.repartition(n)來定義 number of partitions (分區數).這樣能夠隨機從新組合 RDD 中的數據, 建立 n 個分區.是的, 爲了更大的 parallelism (並行性).雖然是 shuffle 的代價. RDD 的處理由 driver’s jobscheduler 做爲一項工做安排.在給定的時間點, 只有一個 job 是 active 的.所以, 若是一個做業正在執行, 則其餘做業將排隊.

若是您有兩個 dstream , 將會有兩個 RDD 造成, 而且將建立兩個將被安排在另外一個以後的做業.爲了不這種狀況, 你能夠聯合兩個 dstream .這將確保爲 dstream 的兩個 RDD 造成一個 unionRDD .這個 unionRDD 而後被認爲是一個 single job (單一的工做).但 RDD 的 partitioning (分區)不受影響.

若是 batch processing time (批處理時間)超過 batchinterval (批次間隔), 那麼顯然 receiver 的內存將會開始填滿, 最終會拋出 exceptions (最多是 BlockNotFoundException ).目前沒有辦法暫停 receiver .使用 SparkConf 配置spark.streaming.receiver.maxRate, receiver 的 rate 能夠受到限制.

Fault-tolerance Semantics (容錯語義)

在本節中, 咱們將討論 Spark Streaming applications 在該 event 中的行爲的失敗.

Background(背景)

要了解 Spark Streaming 提供的語義, 請記住 Spark 的 RDD 的基本 fault-tolerance semantics (容錯語義).

RDD 是一個不可變的, 肯定性地可從新計算的分佈式數據集.每一個RDD 記住在容錯輸入中使用的肯定性操做的 lineage 數據集建立它.

若是 RDD 的任何 partition 因爲工做節點故障而丟失, 則該分區能夠是 從 original fault-tolerant dataset (原始容錯數據集)中使用業務流程從新計算.

假設全部的 RDD transformations 都是肯定性的, 最後的數據被轉換, 不管 Spark 集羣中的故障如何, RDD 始終是同樣的.

Spark 運行在容錯文件系統(如 HDFS 或 S3 )中的數據上.所以, 從容錯數據生成的全部 RDD 也都是容錯的.可是, 這不是在大多數狀況下, Spark Streaming 做爲數據的狀況經過網絡接收(除非fileStream被使用).爲了爲全部生成的 RDD 實現相同的 fault-tolerance properties (容錯屬性), 接收的數據在集羣中的工做節點中的多個 Spark executors 之間進行復制(默認 replication factor (備份因子)爲 2).這致使了發生故障時須要恢復的系統中的兩種數據:

Data received and replicated (數據接收和複製)- 這個數據在單個工做節點做爲副本的故障中倖存下來, 它存在於其餘節點之一上.

Data received but buffered for replication (接收數據但緩衝進行復制)- 因爲不復制, 恢復此數據的惟一方法是從 source 從新獲取.

此外, 咱們應該關注的有兩種 failures:

Failure of a Worker Node (工做節點的故障)- 運行 executors 的任何工做節點均可能會故障, 而且這些節點上的全部內存中數據將丟失.若是任何 receivers 運行在失敗節點, 則它們的 buffered (緩衝)數據將丟失.

Failure of the Driver Node (Driver 節點的故障)- 若是運行 Spark Streaming application 的 driver node 發生了故障, 那麼顯然 SparkContext 丟失了, 全部的 executors 和其內存中的數據也一塊兒丟失了.

有了這個基礎知識, 讓咱們瞭解 Spark Streaming 的 fault-tolerance semantics (容錯語義).

Definitions (定義)

streaming systems (流系統)的語義一般是經過系統能夠處理每一個記錄的次數來捕獲的.系統能夠在全部可能的操做條件下提供三種類型的保證(儘管有故障等).

At most once (最多一次): 每一個 record (記錄)將被處理一次或根本不處理.

At least once (至少一次): 每一個 record (記錄)將被處理一次或屢次.這比at-most once, 由於它確保沒有數據將丟失.但可能有重複.

Exactly once(有且僅一次): 每一個 record (記錄) 將被精確處理一次 - 沒有數據丟失, 數據不會被屢次處理.這顯然是三者的最強保證.

Basic Semantics (基本語義)

在任何 stream processing system (流處理系統)中, 廣義上說, 處理數據有三個步驟.

Receiving the data (接收數據): 使用 Receivers 或其餘方式從數據源接收數據.

Transforming the data (轉換數據): 使用 DStream 和 RDD transformations 來 transformed (轉換)接收到的數據.

Pushing out the data (推出數據): 最終的轉換數據被推出到 external systems (外部系統), 如 file systems (文件系統), databases (數據庫), dashboards (儀表板)等.

若是 streaming application 必須實現 end-to-end exactly-once guarantees (端到端的一次且僅一次性保證), 那麼每一個步驟都必須提供 exactly-once guarantee .也就是說, 每一個記錄必須被精確地接收一次, 轉換完成一次, 並被推送到下游系統一次.讓咱們在 Spark Streaming 的上下文中瞭解這些步驟的語義.

Receiving the data (接收數據): 不一樣的 input sources 提供不一樣的保證.這將在下一小節中詳細討論.

Transforming the data (轉換數據): 全部已收到的數據都將被處理exactly once, 這得益於 RDD 提供的保證.即便存在故障, 只要接收到的輸入數據可訪問, 最終變換的 RDD 將始終具備相同的內容.

Pushing out the data (推出數據): 默認狀況下的輸出操做確保at-least once語義, 由於它取決於輸出操做的類型( idempotent (冪等))或 downstream system (下游系統)的語義(是否支持 transactions (事務)).但用戶能夠實現本身的事務機制來實現exactly-once語義.這將在本節後面的更多細節中討論.

Semantics of Received Data (接收數據的語義)

不一樣的 input sources (輸入源)提供不一樣的保證, 範圍從at-least once到exactly once.

With Files

若是全部的 input data (輸入數據)都已經存在於 fault-tolerant file system (容錯文件系統)中 HDFS , Spark Streaming 能夠隨時從任何故障中恢復並處理全部數據.這給了exactly-once語義, 意味着不管什麼故障, 全部的數據將被精確處理一次.

With Receiver-based Sources (使用基於接收器的數據源)

對於基於 receivers (接收器)的 input sources (輸入源), 容錯語義取決於故障場景和接收器的類型. 正如咱們earlier討論的, 有兩種類型的 receivers (接收器):

Reliable Receiver (可靠的接收器)- 這些 receivers (接收機)只有在確認收到的數據已被複制以後確認 reliable sources (可靠的源).若是這樣的接收器出現故障, source 將不會被接收對於 buffered (unreplicated) data (緩衝(未複製)數據)的確認.所以, 若是 receiver 是從新啓動, source 將從新發送數據, 而且不會因爲故障而丟失數據.

Unreliable Receiver (不可靠的接收器)- 這樣的接收器不會發送確認, 所以可能丟失數據, 因爲 worker 或 driver 故障.

根據使用的 receivers 類型, 咱們實現如下語義. 若是 worker node 出現故障, 則 reliable receivers 沒有數據丟失.unreliable receivers , 收到但未複製的數據可能會丟失.若是 driver node 失敗, 那麼除了這些損失以外, 在內存中接收和複製的全部過去的數據將丟失.這將影響 stateful transformations (有狀態轉換)的結果.

爲避免過去收到的數據丟失, Spark 1.2 引入了_write ahead logs_ 將接收到的數據保存到 fault-tolerant storage (容錯存儲).用write ahead logs enabled和 reliable receivers, 數據沒有丟失.在語義方面, 它提供 at-least once guarantee (至少一次保證).

下表總結了失敗的語義:

Deployment Scenario (部署場景)Worker Failure (Worker 故障)Driver Failure (Driver 故障)

Spark 1.1 或更早版本,或者

Spark 1.2 或者沒有 write ahead logs 的更高的版本Buffered data lost with unreliable receivers(unreliable receivers 的緩衝數據丟失)

Zero data loss with reliable receivers (reliable receivers 的零數據丟失)

At-least once semantics (至少一次性語義)Buffered data lost with unreliable receivers (unreliable receivers 的緩衝數據丟失)

Past data lost with all receivers (全部的 receivers 的過去的數據丟失)

Undefined semantics (未定義語義)

Spark 1.2 或者帶有 write ahead logs 的更高版本Zero data loss with reliable receivers(reliable receivers 的零數據丟失)

At-least once semantics (至少一次性語義)Zero data loss with reliable receivers and files (reliable receivers 和 files 的零數據丟失)

At-least once semantics (至少一次性語義)

With Kafka Direct API (使用 Kafka Direct API)

在 Spark 1.3 中, 咱們引入了一個新的 Kafka Direct API , 能夠確保全部的 Kafka 數據都被 Spark Streaming exactly once (一次)接收.與此同時, 若是您實現 exactly-once output operation (一次性輸出操做), 您能夠實現 end-to-end exactly-once guarantees (端到端的一次且僅一次性保證).在Kafka Integration Guide中進一步討論了這種方法.

Semantics of output operations (輸出操做的語義)

Output operations (輸出操做)(如foreachRDD)具備at-least once語義, 也就是說, transformed data (變換後的數據)可能會不止一次寫入 external entity (外部實體)在一個 worker 故障事件中.雖然這是能夠接受的使用saveAs***Files操做(由於文件將被相同的數據簡單地覆蓋) 保存到文件系統, 可能須要額外的努力來實現 exactly-once (一次且僅一次)語義.有兩種方法.

Idempotent updates (冪等更新): 屢次嘗試老是寫入相同的數據.例如,saveAs***Files老是將相同的數據寫入生成的文件.

Transactional updates (事務更新): 全部更新都是事務性的, 以便更新徹底按原子進行.這樣作的一個方法以下.

使用批處理時間(在foreachRDD中可用)和 RDD 的 partition index (分區索引)來建立 identifier (標識符).該標識符惟一地標識 streaming application 中的 blob 數據.

使用該 identifier (標識符)blob transactionally (blob 事務地)更新 external system (外部系統)(即, exactly once, atomically (一次且僅一次, 原子性地)).也就是說, 若是 identifier (標識符)還沒有提交, 則以 atomically (原子方式)提交 partition data (分區數據)和 identifier (標識符).不然, 若是已經提交, 請跳過更新.

dstream.foreachRDD { (rdd, time) =>

rdd.foreachPartition { partitionIterator =>

val partitionId = TaskContext.get.partitionId()

val uniqueId = generateUniqueId(time.milliseconds, partitionId)

// use this uniqueId to transactionally commit the data in partitionIterator

}

}

快速連接

附加指南

Kafka 集成指南

Kinesis 集成指南

自定義 Receiver(接收器)指南

第三方 DStream 數據源能夠在第三方項目上查看.

API 文檔

Scala 文檔

StreamingContextDStream

KafkaUtils,FlumeUtils,KinesisUtils,

Java 文檔

JavaStreamingContext,JavaDStreamJavaPairDStream

KafkaUtils,FlumeUtils,KinesisUtils

Python 文檔

StreamingContextDStream

KafkaUtils

更多的示例在ScalaJavaPython



原文地址: http://spark.apachecn.org/docs/cn/2.2.0/streaming-programming-guide.html

網頁地址: http://spark.apachecn.org/

相關文章
相關標籤/搜索