Spark Streaming是核心Spark API的擴展,可實現實時數據流的可擴展,高吞吐量,容錯流處理。數據能夠從許多來源(如Kafka,Flume,Kinesis或TCP套接字)中獲取,而且可使用以高級函數表示的複雜算法進行處理map
,例如reduce
,join
和window
。最後,處理後的數據能夠推送到文件系統,數據庫和實時儀表板。實際上,您能夠在數據流上應用Spark的 機器學習和 圖形處理算法。html
在內部,它的工做原理以下。Spark Streaming接收實時輸入數據流並將數據分紅批處理,而後由Spark引擎處理,以批量生成最終結果流。java
Spark Streaming提供稱爲離散流或DStream的高級抽象,表示連續的數據流。DStream能夠歷來自Kafka,Flume和Kinesis等源的輸入數據流建立,也能夠經過在其餘DStream上應用高級操做來建立。在內部,DStream表示爲一系列 RDD。node
本指南向您展現如何使用DStreams開始編寫Spark Streaming程序。您可使用Scala,Java或Python編寫Spark Streaming程序(在Spark 1.2中引入),全部這些都在本指南中介紹。您能夠在本指南中找到標籤,讓您在不一樣語言的代碼段之間進行選擇。python
注意:有一些API在Python中不一樣或不可用。在本指南中,您將找到標記Python API,突出顯示這些差別。git
在咱們詳細介紹如何編寫本身的Spark Streaming程序以前,讓咱們快速瞭解一下簡單的Spark Streaming程序是什麼樣的。假設咱們想要計算從TCP套接字上偵聽的數據服務器接收的文本數據中的字數。您須要作的就是以下。github
首先,咱們將Spark Streaming類的名稱和StreamingContext中的一些隱式轉換導入到咱們的環境中,以便將有用的方法添加到咱們須要的其餘類(如DStream)。StreamingContext是全部流功能的主要入口點。咱們使用兩個執行線程建立一個本地StreamingContext,批處理間隔爲1秒。web
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent a starvation scenario. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1))
使用此上下文,咱們能夠建立一個DStream來表示來自TCP源的流數據,指定爲主機名(例如localhost
)和端口(例如9999
)。算法
// Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999)
此lines
DStream表示將從數據服務器接收的數據流。此DStream中的每條記錄都是一行文本。接下來,咱們但願將空格字符分割爲單詞。sql
// Split each line into words val words = lines.flatMap(_.split(" "))
flatMap
是一對多DStream操做,它經過從源DStream中的每一個記錄生成多個新記錄來建立新的DStream。在這種狀況下,每行將被分紅多個單詞,單詞流表示爲 words
DStream。接下來,咱們要計算這些單詞。shell
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print()
所述words
DSTREAM被進一步映射(一到一個變換)到一個DSTREAM (word, 1)
對,而後將其還原獲得的單詞的頻率數據中的每一批。最後,wordCounts.print()
將打印每秒生成的一些計數。
請注意,執行這些行時,Spark Streaming僅設置啓動時將執行的計算,而且還沒有啓動實際處理。要在設置完全部轉換後開始處理,咱們最終調用
ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate
完整的代碼能夠在Spark Streaming示例 NetworkWordCount中找到。
若是您已經下載並構建了 Spark,則能夠按以下方式運行此示例。您首先須要使用Netcat(在大多數類Unix系統中找到的小實用程序)做爲數據服務器運行
$ nc -lk 9999
而後,在不一樣的終端中,您可使用啓動示例
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
而後,在運行netcat服務器的終端中鍵入的任何行將被計數並每秒在屏幕上打印。它看起來像下面這樣。
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world ... |
# TERMINAL 2: RUNNING NetworkWordCount $ ./bin/run-example streaming.NetworkWordCount localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ... |
接下來,咱們將超越簡單的示例,詳細介紹Spark Streaming的基礎知識。
與Spark相似,Spark Streaming可經過Maven Central得到。要編寫本身的Spark Streaming程序,必須將如下依賴項添加到SBT或Maven項目中。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.4.0</version> </dependency>
要從Spark Streaming核心API中不存在的Kafka,Flume和Kinesis等源中提取數據,您必須將相應的工件添加spark-streaming-xyz_2.11
到依賴項中。例如,一些常見的以下。
資源 | 神器 |
---|---|
卡夫卡 | 火花流 - 卡夫卡0-10_2.11 |
水槽 | 火花流,flume_2.11 |
室壁運動 | spark-streaming-kinesis-asl_2.11 [亞馬遜軟件許可證] |
有關最新列表,請參閱 Maven存儲庫 以獲取受支持的源和工件的完整列表。
要初始化Spark Streaming程序,必須建立一個StreamingContext對象,它是全部Spark Streaming功能的主要入口點。
甲的StreamingContext對象能夠從被建立SparkConf對象。
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))
該appName
參數是應用程序在羣集UI上顯示的名稱。 master
是Spark,Mesos或YARN羣集URL,或在本地模式下運行的特殊「local [*]」字符串。實際上,當在羣集上運行時,您不但願master
在程序中進行硬編碼,而是啓動應用程序spark-submit
並在那裏接收它。可是,對於本地測試和單元測試,您能夠傳遞「local [*]」以在進程中運行Spark Streaming(檢測本地系統中的核心數)。請注意,這會在內部建立一個SparkContext(全部Spark功能的起點),能夠訪問它ssc.sparkContext
。
必須根據應用程序的延遲要求和可用的羣集資源設置批處理間隔。有關 更多詳細信息,請參見性能調整部分。
甲StreamingContext
目的還能夠從現有的建立的SparkContext
對象。
import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1))
定義上下文後,您必須執行如下操做。
streamingContext.start()
。streamingContext.awaitTermination()
。streamingContext.stop()
。要記住的要點:
stop()
called 的可選參數設置stopSparkContext
爲false。Discretized Stream或DStream是Spark Streaming提供的基本抽象。它表示連續的數據流,能夠是從源接收的輸入數據流,也能夠是經過轉換輸入流生成的已處理數據流。在內部,DStream由一系列連續的RDD表示,這是Spark對不可變分佈式數據集的抽象(有關更多詳細信息,請參閱Spark編程指南)。DStream中的每一個RDD都包含來自特定時間間隔的數據,以下圖所示。
應用於DStream的任何操做都轉換爲底層RDD上的操做。例如,在先前將行流轉換爲字的示例中,flatMap
操做應用於lines
DStream中的每一個RDD 以生成DStream的 words
RDD。以下圖所示。
這些底層RDD轉換由Spark引擎計算。DStream操做隱藏了大部分細節,併爲開發人員提供了更高級別的API以方便使用。這些操做將在後面的章節中詳細討論。
輸入DStream是表示從流源接收的輸入數據流的DStream。在快速示例中,lines
輸入DStream是表示從netcat服務器接收的數據流。每一個輸入DStream(文件流除外,本節稍後討論)都與Receiver (Scala doc, Java doc)對象相關聯,該對象從源接收數據並將其存儲在Spark的內存中進行處理。
Spark Streaming提供兩類內置流媒體源。
咱們將在本節後面討論每一個類別中的一些來源。
請注意,若是要在流應用程序中並行接收多個數據流,能夠建立多個輸入DStream(在「 性能調整」部分中進一步討論)。這將建立多個接收器,這些接收器將同時接收多個數據流。但請注意,Spark worker / executor是一個長期運行的任務,所以它佔用了分配給Spark Streaming應用程序的其中一個核心。所以,重要的是要記住,Spark Streaming應用程序須要分配足夠的內核(或線程,若是在本地運行)來處理接收的數據,以及運行接收器。
要記住的要點
在本地運行Spark Streaming程序時,請勿使用「local」或「local [1]」做爲主URL。這兩種方法都意味着只有一個線程將用於本地運行任務。若是您正在使用基於接收器的輸入DStream(例如套接字,Kafka,Flume等),則單線程將用於運行接收器,不會留下任何線程來處理接收到的數據。所以,在本地運行時,始終使用「local [ n ]」做爲主URL,其中n >要運行的接收器數量(有關如何設置主服務器的信息,請參閱Spark屬性)。
將邏輯擴展到在集羣上運行,分配給Spark Streaming應用程序的核心數必須大於接收器數。不然系統將接收數據,但沒法處理數據。
咱們已經看過快速示例中的內容ssc.socketTextStream(...)
,該示例 根據經過TCP套接字鏈接接收的文本數據建立DStream。除了套接字以外,StreamingContext API還提供了從文件建立DStream做爲輸入源的方法。
對於從與HDFS API兼容的任何文件系統(即HDFS,S3,NFS等)上的文件讀取數據,能夠建立DStream做爲via StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]
。
文件流不須要運行接收器,所以不須要分配任何內核來接收文件數據。
對於簡單的文本文件,最簡單的方法是StreamingContext.textFileStream(dataDirectory)
。
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
對於文本文件
streamingContext.textFileStream(dataDirectory)
如何監控目錄
Spark Streaming將監視目錄dataDirectory
並處理在該目錄中建立的任何文件。
"hdfs://namenode:8040/logs/"
。直接在這種路徑下的全部文件將在發現時進行處理。"hdfs://namenode:8040/logs/2017/*"
。這裏,DStream將包含與模式匹配的目錄中的全部文件。那就是:它是目錄的模式,而不是目錄中的文件。"hdfs://namenode:8040/logs/2016-*"
,重命名整個目錄以匹配路徑,則會將目錄添加到受監視目錄列表中。只有修改時間在當前窗口內的目錄中的文件纔會包含在流中。FileSystem.setTimes()
時間戳是一種在稍後的窗口中拾取文件的方法,即便其內容未更改。使用對象存儲做爲數據源
「完整」文件系統(如HDFS)會在建立輸出流後當即在其文件上設置修改時間。打開文件時,即便在數據徹底寫入以前,它也可能包含在DStream
- 以後 - 將忽略同一窗口中文件的更新。即:可能會遺漏更改,並從流中省略數據。
要保證在窗口中選擇更改,請將文件寫入不受監視的目錄,而後在關閉輸出流後當即將其重命名爲目標目錄。若是重命名的文件在其建立窗口期間出如今掃描的目標目錄中,則將拾取新數據。
相比之下,Amazon S3和Azure Storage等對象存儲一般具備較慢的重命名操做,由於其實是複製了數據。此外,重命名的對象可能將rename()
操做的時間做爲其修改時間,所以可能不被視爲原始建立時間所暗示的窗口的一部分。
須要對目標對象存儲進行仔細測試,以驗證存儲的時間戳行爲是否與Spark Streaming所指望的一致。多是直接寫入目標目錄是經過所選對象庫流式傳輸數據的適當策略。
有關此主題的更多詳細信息,請參閱Hadoop文件系統規範。
可使用經過自定義接收器接收的數據流建立DStream。有關詳細信息,請參閱自定義接收器指南。
爲了測試帶有測試數據的Spark Streaming應用程序,還可使用基於RDD隊列建立DStream streamingContext.queueStream(queueOfRDDs)
。推入隊列的每一個RDD將被視爲DStream中的一批數據,並像流同樣處理。
有關從套接字和文件流的詳細信息,請參閱在相關函數的API單證 的StreamingContext斯卡拉,JavaStreamingContext 對Java和的StreamingContext爲Python。
Python API從Spark 2.4.0開始,在這些來源中,Kafka,Kinesis和Flume在Python API中可用。
此類源須要與外部非Spark庫鏈接,其中一些庫具備複雜的依賴性(例如,Kafka和Flume)。所以,爲了最大限度地減小與依賴項版本衝突相關的問題,從這些源建立DStream的功能已移至可在必要時顯式連接的單獨庫。
請注意,Spark shell中不提供這些高級源,所以沒法在shell中測試基於這些高級源的應用程序。若是您真的想在Spark shell中使用它們,則必須下載相應的Maven工件JAR及其依賴項,並將其添加到類路徑中。
其中一些高級資源以下。
Kafka: Spark Streaming 2.4.0與Kafka經紀人版本0.8.2.1或更高版本兼容。有關更多詳細信息,請參閱Kafka集成指南。
Flume: Spark Streaming 2.4.0與Flume 1.6.0兼容。有關詳細信息,請參閱Flume集成指南。
Kinesis: Spark Streaming 2.4.0與Kinesis Client Library 1.2.1兼容。有關詳細信息,請參閱Kinesis集成指南。
Python API Python尚不支持此功能。
輸入DStream也能夠從自定義數據源建立。您所要作的就是實現一個用戶定義的接收器(參見下一節以瞭解它是什麼),它能夠從自定義源接收數據並將其推送到Spark。有關詳細信息,請參閱自定義接收器指南
根據其可靠性,能夠有兩種數據源。來源(如Kafka和Flume)容許傳輸數據獲得確認。若是從這些可靠來源接收數據的系統正確地確認接收到的數據,則能夠確保不會因任何類型的故障而丟失數據。這致使兩種接收器:
「 自定義接收器指南」中討論瞭如何編寫可靠接收器的詳細信息 。
與RDD相似,轉換容許修改來自輸入DStream的數據。DStreams支持普通Spark RDD上可用的許多轉換。一些常見的以下。
轉型 | 含義 |
---|---|
地圖(功能) | 經過將源DStream的每一個元素傳遞給函數func來返回一個新的DStream 。 |
flatMap(func) | 與map相似,但每一個輸入項能夠映射到0個或更多輸出項。 |
過濾器(功能) | 經過僅選擇func返回true 的源DStream的記錄來返回新的DStream 。 |
從新分區(numPartitions) | 經過建立更多或更少的分區來更改此DStream中的並行度級別。 |
union(otherStream) | 返回一個新的DStream,它包含源DStream和otherDStream中元素的並 集。 |
count() | 經過計算源DStream的每一個RDD中的元素數量,返回單元素RDD的新DStream。 |
減小(功能) | 經過使用函數func(它接受兩個參數並返回一個)聚合源DStream的每一個RDD中的元素,返回單元素RDD的新DStream 。該函數應該是關聯的和可交換的,以即可以並行計算。 |
countByValue() | 當在類型K的元素的DStream上調用時,返回(K,Long)對的新DStream,其中每一個鍵的值是其在源DStream的每一個RDD中的頻率。 |
reduceByKey(func,[numTasks ]) | 當在(K,V)對的DStream上調用時,返回(K,V)對的新DStream,其中使用給定的reduce函數聚合每一個鍵的值。注意:默認狀況下,這使用Spark的默認並行任務數(本地模式爲2,在羣集模式下,數量由config屬性肯定spark.default.parallelism )進行分組。您能夠傳遞可選numTasks 參數來設置不一樣數量的任務。 |
join(otherStream,[ numTasks]) | 當在(K,V)和(K,W)對的兩個DStream上調用時,返回(K,(V,W))對的新DStream與每一個鍵的全部元素對。 |
協同組(otherStream,[numTasks ]) | 當在(K,V)和(K,W)對的DStream上調用時,返回(K,Seq [V],Seq [W])元組的新DStream。 |
變換(功能) | 經過將RDD-to-RDD函數應用於源DStream的每一個RDD來返回新的DStream。這能夠用於在DStream上執行任意RDD操做。 |
updateStateByKey(func) | 返回一個新的「狀態」DStream,其中經過在鍵的先前狀態和鍵的新值上應用給定函數來更新每一個鍵的狀態。這可用於維護每一個密鑰的任意狀態數據。 |
其中一些轉換值得更詳細地討論。
該updateStateByKey
操做容許您在使用新信息持續更新時保持任意狀態。要使用它,您必須執行兩個步驟。
在每一個批處理中,Spark都會對全部現有密鑰應用狀態更新功能,不管它們是否在批處理中都有新數據。若是更新函數返回,None
則將刪除鍵值對。
讓咱們舉一個例子來講明這一點。假設您要維護文本數據流中看到的每一個單詞的運行計數。這裏,運行計數是狀態,它是一個整數。咱們將更新函數定義爲:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... // add the new values with the previous running count to get the new count Some(newCount) }
這適用於包含單詞的DStream(例如,前面示例中pairs
包含DStream (word, 1)
的對象)。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
將爲每一個單詞調用更新函數,newValues
其序列爲1(來自(word, 1)
成對)並runningCount
具備前一個計數。
請注意,使用updateStateByKey
須要配置檢查點目錄,這將在檢查點部分中詳細討論。
該transform
操做(及其變體transformWith
)容許在DStream上應用任意RDD到RDD功能。它可用於應用未在DStream API中公開的任何RDD操做。例如,將數據流中的每一個批次與另外一個數據集鏈接的功能不會直接在DStream API中公開。可是,您能夠輕鬆地使用它transform
來執行此操做。這使得很是強大的可能性。例如,能夠經過將輸入數據流與預先計算的垃圾郵件信息(也可使用Spark生成)鏈接,而後根據它進行過濾來進行實時數據清理。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... }
請注意,在每一個批處理間隔中都會調用提供的函數。這容許您進行時變RDD操做,即RDD操做,分區數,廣播變量等能夠在批次之間進行更改。
Spark Streaming還提供窗口計算,容許您在滑動數據窗口上應用轉換。下圖說明了此滑動窗口。
如該圖所示,每個窗口時間的幻燈片在源DSTREAM,落入窗口內的源RDDS被組合及操做,以產生加窗DSTREAM的RDDS。在這種特定狀況下,操做應用於最後3個時間單位的數據,並按2個時間單位滑動。這代表任何窗口操做都須要指定兩個參數。
這兩個參數必須是源DStream的批處理間隔的倍數(圖中的1)。
讓咱們舉一個例子來講明窗口操做。好比說,您但願經過每隔10秒在最後30秒的數據中生成字數來擴展 前面的示例。爲此,咱們必須在最後30秒的數據reduceByKey
上對pairs
DStream (word, 1)
對應用操做。這是使用該操做完成的reduceByKeyAndWindow
。
// Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些常見的窗口操做以下。全部這些操做都採用上述兩個參數 - windowLength和slideInterval。
轉型 | 含義 |
---|---|
window(windowLength,slideInterval) | 返回一個新的DStream,它是根據源DStream的窗口批次計算的。 |
countByWindow(windowLength,slideInterval) | 返回流中元素的滑動窗口數。 |
reduceByWindow(func,windowLength,slideInterval) | 返回一個新的單元素流,經過使用func在滑動間隔內聚合流中的元素而建立。該函數應該是關聯的和可交換的,以即可以並行正確計算。 |
reduceByKeyAndWindow(func,windowLength,slideInterval,[ numTasks ]) | 當在(K,V)對的DStream上調用時,返回(K,V)對的新DStream,其中使用給定的reduce函數func 在滑動窗口中的批次聚合每一個鍵的值。注意:默認狀況下,這使用Spark的默認並行任務數(本地模式爲2,在羣集模式下,數量由config屬性肯定spark.default.parallelism )進行分組。您能夠傳遞可選 numTasks 參數來設置不一樣數量的任務。 |
reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[ numTasks ]) | 上述更有效的版本, |
countByValueAndWindow(windowLength, slideInterval,[numTasks ]) | 當在(K,V)對的DStream上調用時,返回(K,Long)對的新DStream,其中每一個鍵的值是其在滑動窗口內的頻率。一樣reduceByKeyAndWindow ,reduce任務的數量可經過可選參數進行配置。 |
最後,值得強調的是,您能夠輕鬆地在Spark Streaming中執行不一樣類型的鏈接。
流鏈接
Streams能夠很容易地與其餘流鏈接。
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2)
這裏,在每一個批處理間隔中,生成的RDD stream1
將與生成的RDD鏈接stream2
。你也能夠作leftOuterJoin
,rightOuterJoin
,fullOuterJoin
。此外,在流的窗口上進行鏈接一般很是有用。這也很容易。
val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2)
流數據集鏈接
在解釋DStream.transform
操做時已經顯示了這一點。這是將窗口流與數據集鏈接的另外一個示例。
val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
實際上,您還能夠動態更改要加入的數據集。提供給的函數在transform
每一個批處理間隔進行評估,所以將使用dataset
引用指向的當前數據集。
API文檔中提供了完整的DStream轉換列表。對於Scala API,請參閱DStream 和PairDStreamFunctions。對於Java API,請參閱JavaDStream 和JavaPairDStream。對於Python API,請參閱DStream。
輸出操做容許將DStream的數據推送到外部系統,如數據庫或文件系統。因爲輸出操做實際上容許外部系統使用轉換後的數據,所以它們會觸發全部DStream轉換的實際執行(相似於RDD的操做)。目前,定義瞭如下輸出操做:
輸出操做 | 含義 |
---|---|
打印() | 在運行流應用程序的驅動程序節點上打印DStream中每批數據的前十個元素。這對開發和調試頗有用。 Python API這在Python API中稱爲 pprint()。 |
saveAsTextFiles(前綴,[ 後綴 ]) | 將此DStream的內容保存爲文本文件。每一個批處理間隔的文件名基於前綴和後綴生成:「prefix-TIME_IN_MS [.suffix]」。 |
saveAsObjectFiles(前綴,[ 後綴 ]) | 將此DStream的內容保存爲SequenceFiles 序列化Java對象。每一個批處理間隔的文件名基於前綴和後綴生成:「prefix-TIME_IN_MS [.suffix]」。 Python API這在Python API中不可用。 |
saveAsHadoopFiles(前綴,[ 後綴 ]) | 將此DStream的內容保存爲Hadoop文件。每一個批處理間隔的文件名基於前綴和後綴生成:「prefix-TIME_IN_MS [.suffix]」。 Python API這在Python API中不可用。 |
foreachRDD(func) | 最通用的輸出運算符,它將函數func應用於從流生成的每一個RDD。此函數應將每一個RDD中的數據推送到外部系統,例如將RDD保存到文件,或經過網絡將其寫入數據庫。請注意,函數func在運行流應用程序的驅動程序進程中執行,而且一般會在其中執行RDD操做,這將強制計算流式RDD。 |
dstream.foreachRDD
是一個功能強大的原語,容許將數據發送到外部系統。可是,瞭解如何正確有效地使用此原語很是重要。一些常見的錯誤要避免以下。
一般將數據寫入外部系統須要建立鏈接對象(例如,與遠程服務器的TCP鏈接)並使用它將數據發送到遠程系統。爲此,開發人員可能無心中嘗試在Spark驅動程序中建立鏈接對象,而後嘗試在Spark工做程序中使用它來保存RDD中的記錄。例如(在Scala中),
dstream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } }
這是不正確的,由於這須要鏈接對象被序列化並從驅動程序發送到worker。這種鏈接對象不多能夠跨機器轉移。此錯誤可能表現爲序列化錯誤(鏈接對象不可序列化),初始化錯誤(須要在worker處初始化鏈接對象)等。正確的解決方案是在worker處建立鏈接對象。
可是,這可能會致使另外一個常見錯誤 - 爲每條記錄建立一個新鏈接。例如,
dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } }
一般,建立鏈接對象會產生時間和資源開銷。所以,爲每一個記錄建立和銷燬鏈接對象可能會產生沒必要要的高開銷,而且可能顯着下降系統的總吞吐量。更好的解決方案是使用 rdd.foreachPartition
- 建立單個鏈接對象並使用該鏈接發送RDD分區中的全部記錄。
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }
這會在許多記錄上分攤鏈接建立開銷。
最後,經過跨多個RDD /批處理重用鏈接對象,能夠進一步優化這一點。因爲多個批次的RDD被推送到外部系統,所以能夠維護鏈接對象的靜態池,而不是能夠重用的鏈接對象,從而進一步減小了開銷。
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }
請注意,池中的鏈接應根據須要延遲建立,若是暫時不使用,則會超時。這實現了最有效的數據發送到外部系統。
其餘要記住的要點:
DStreams由輸出操做延遲執行,就像RDD由RDD操做延遲執行同樣。具體而言,DStream輸出操做中的RDD操做會強制處理接收到的數據。所以,若是您的應用程序沒有任何輸出操做,或者輸出操做dstream.foreachRDD()
沒有任何RDD操做,那麼就不會執行任何操做。系統將簡單地接收數據並將其丟棄。
默認狀況下,輸出操做一次執行一次。它們按照應用程序中定義的順序執行。
您能夠輕鬆地對流數據使用DataFrames和SQL操做。您必須使用StreamingContext正在使用的SparkContext建立SparkSession。此外,必須這樣作以即可以在驅動器故障時從新啓動。這是經過建立一個延遲實例化的SparkSession單例實例來完成的。這在如下示例中顯示。它修改了早期的單詞計數示例,以使用DataFrames和SQL生成單詞計數。每一個RDD都轉換爲DataFrame,註冊爲臨時表,而後使用SQL進行查詢。
/** DataFrame operations inside your streaming program */ val words: DStream[String] = ... words.foreachRDD { rdd => // Get the singleton instance of SparkSession val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") // Create a temporary view wordsDataFrame.createOrReplaceTempView("words") // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() }
查看完整的源代碼。
您還能夠對從不一樣線程(即,與正在運行的StreamingContext異步)的流數據上定義的表運行SQL查詢。只需確保將StreamingContext設置爲記住足夠數量的流數據,以便查詢能夠運行。不然,不知道任何異步SQL查詢的StreamingContext將在查詢完成以前刪除舊的流數據。例如,若是要查詢最後一批,但查詢可能須要5分鐘才能運行,則調用streamingContext.remember(Minutes(5))
(在Scala中,或在其餘語言中等效)。
有關DataFrame的詳細信息,請參閱DataFrames和SQL指南。
您還能夠輕鬆使用MLlib提供的機器學習算法。首先,有流媒體機器學習算法(例如流媒體線性迴歸,流媒體KMeans等),它們能夠同時學習流數據以及將模型應用於流數據。除此以外,對於更大類的機器學習算法,您能夠離線學習學習模型(即便用歷史數據),而後在線將數據應用於流數據。有關詳細信息,請參閱MLlib指南。
與RDD相似,DStreams還容許開發人員將流的數據保存在內存中。也就是說,persist()
在DStream上使用該方法將自動將該DStream的每一個RDD保留在內存中。若是DStream中的數據將被屢次計算(例如,對相同數據進行屢次操做),這將很是有用。對於像reduceByWindow
和這樣的基於窗口的操做和 reduceByKeyAndWindow
基於狀態的操做updateStateByKey
,這是隱含的。所以,基於窗口的操做生成的DStream會自動保留在內存中,而無需開發人員調用persist()
。
對於經過網絡接收數據的輸入流(例如,Kafka,Flume,套接字等),默認持久性級別設置爲將數據複製到兩個節點以實現容錯。
請注意,與RDD不一樣,DStreams的默認持久性級別使數據在內存中保持序列化。「 性能調整」部分對此進行了進一步討論。有關不一樣持久性級別的更多信息,請參閱「 Spark編程指南」。
流應用程序必須全天候運行,所以必須可以適應與應用程序邏輯無關的故障(例如,系統故障,JVM崩潰等)。爲了實現這一點,Spark Streaming須要將足夠的信息檢查到容錯存儲系統,以便它能夠從故障中恢復。檢查點有兩種類型的數據。
總而言之,元數據檢查點主要用於從驅動程序故障中恢復,而若是使用狀態轉換,即便對於基本功能也須要數據或RDD檢查點。
必須爲具備如下任何要求的應用程序啓用檢查點:
updateStateByKey
或reduceByKeyAndWindow
使用反函數),則必須提供檢查點目錄以容許按期RDD檢查點。請注意,能夠在不啓用檢查點的狀況下運行沒有上述有狀態轉換的簡單流應用程序。在這種狀況下,驅動程序故障的恢復也將是部分的(某些已接收但未處理的數據可能會丟失)。這一般是能夠接受的,而且許多以這種方式運行Spark Streaming應用程序。預計對非Hadoop環境的支持將在將來獲得改善。
能夠經過在容錯,可靠的文件系統(例如,HDFS,S3等)中設置目錄來啓用檢查點,以便保存檢查點信息。這是經過使用完成的streamingContext.checkpoint(checkpointDirectory)
。這將容許您使用上述有狀態轉換。此外,若是要使應用程序從驅動程序故障中恢復,則應重寫流應用程序以使其具備如下行爲。
使用此行爲變得簡單StreamingContext.getOrCreate
。其用法以下。
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
若是checkpointDirectory
存在,則將從檢查點數據從新建立上下文。若是目錄不存在(即,第一次運行),則將functionToCreateContext
調用該函數以建立新上下文並設置DStream。請參閱Scala示例 RecoverableNetworkWordCount。此示例將網絡數據的字數附加到文件中。
除了使用以外getOrCreate
還須要確保驅動程序進程在失敗時自動重啓。這隻能經過用於運行應用程序的部署基礎結構來完成。這在「 部署」部分中進一步討論 。
請注意,RDD的檢查點會致使節省可靠存儲的成本。這可能致使RDD被檢查點的那些批次的處理時間增長。所以,須要仔細設置檢查點的間隔。在小批量(例如1秒)下,每批次檢查點可能會顯着下降操做吞吐量。相反,檢查點過於頻繁會致使譜系和任務大小增長,這可能會產生不利影響。對於須要RDD檢查點的有狀態轉換,默認時間間隔是批處理間隔的倍數,至少爲10秒。它能夠經過使用來設置dstream.checkpoint(checkpointInterval)
。一般,DStream的5-10個滑動間隔的檢查點間隔是一個很好的設置。
沒法從Spark Streaming中的檢查點恢復累加器和廣播變量。若是啓用了檢查點並使用 累加器或廣播變量 ,則必須爲累加器和廣播變量建立延遲實例化的單例實例, 以便在驅動程序從新啓動失敗後從新實例化它們。這在如下示例中顯示。
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } object DroppedWordsCounter { @volatile private var instance: LongAccumulator = null def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { instance = sc.longAccumulator("WordsInBlacklistCounter") } } } instance } } wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter.add(count) false } else { true } }.collect().mkString("[", ", ", "]") val output = "Counts at time " + time + " " + counts })
查看完整的源代碼。
本節討論部署Spark Streaming應用程序的步驟。
要運行Spark Streaming應用程序,您須要具有如下條件。
具備集羣管理器的集羣 - 這是任何Spark應用程序的通常要求,並在部署指南中進行了詳細討論。
打包應用程序JAR - 您必須將流應用程序編譯爲JAR。若是您正在使用spark-submit
啓動應用程序,那麼您將不須要在JAR中提供Spark和Spark Streaming。可是,若是您的應用程序使用高級源(例如Kafka,Flume),則必須將它們連接的額外工件及其依賴項打包在用於部署應用程序的JAR中。例如,使用的應用程序KafkaUtils
必須包含spark-streaming-kafka-0-10_2.11
應用程序JAR中的全部傳遞依賴項。
爲執行程序配置足夠的內存 - 因爲接收的數據必須存儲在內存中,所以必須爲執行程序配置足夠的內存來保存接收的數據。請注意,若是您正在進行10分鐘的窗口操做,則系統必須至少將最後10分鐘的數據保留在內存中。所以,應用程序的內存要求取決於其中使用的操做。
配置檢查點 - 若是流應用程序須要它,則必須將Hadoop API兼容容錯存儲中的目錄(例如HDFS,S3等)配置爲檢查點目錄,並以檢查點信息能夠寫入的方式編寫流應用程序用於故障恢復。有關詳細信息,請參閱檢查點部分。
配置預寫日誌 - 自Spark 1.2起,咱們引入了預寫日誌以實現強大的容錯保證。若是啓用,則從接收器接收的全部數據都將寫入配置檢查點目錄中的預寫日誌。這能夠防止驅動程序恢復時的數據丟失,從而確保零數據丟失(在容錯語義部分中詳細討論 )。這能夠經過設置來啓用配置參數 spark.streaming.receiver.writeAheadLog.enable
來true
。然而,這些更強的語義可能以單個接收器的接收吞吐量爲代價。這能夠經過並行運行更多接收器來糾正 增長總吞吐量。此外,建議在啓用預寫日誌時禁用Spark中接收數據的複製,由於日誌已存儲在複製存儲系統中。這能夠經過將輸入流的存儲級別設置爲來完成StorageLevel.MEMORY_AND_DISK_SER
。使用S3(或任何不支持刷新的文件系統)進行預寫日誌時,請記得啓用 spark.streaming.driver.writeAheadLog.closeFileAfterWrite
和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
。有關詳細信息,請參閱 Spark Streaming配置。請注意,啓用I / O加密時,Spark不會加密寫入預寫日誌的數據。若是須要加密預寫日誌數據,則應將其存儲在本機支持加密的文件系統中。
spark.streaming.receiver.maxRate
spark.streaming.kafka.maxRatePerPartition
spark.streaming.backpressure.enabled
true
若是須要使用新的應用程序代碼升級正在運行的Spark Streaming應用程序,則有兩種可能的機制。
升級的Spark Streaming應用程序啓動並與現有應用程序並行運行。一旦新的(接收與舊的數據相同的數據)已經預熱並準備好黃金時間,舊的能夠被放下。請注意,這能夠用於支持將數據發送到兩個目標(即早期和升級的應用程序)的數據源。
現有應用程序正常關閉(請參閱 StreamingContext.stop(...)
或JavaStreamingContext.stop(...)
用於正常關閉選項),確保在關閉以前徹底處理已接收的數據。而後能夠啓動升級的應用程序,該應用程序將從早期應用程序中止的同一點開始處理。請注意,這隻能經過支持源端緩衝的輸入源(如Kafka和Flume)來完成,由於在前一個應用程序關閉且升級的應用程序還沒有啓動時須要緩衝數據。而且沒法從早期檢查點從新啓動升級前代碼的信息。檢查點信息基本上包含序列化的Scala / Java / Python對象,並嘗試使用新的修改類反序列化對象可能會致使錯誤。在這種狀況下,要麼使用不一樣的檢查點目錄啓動升級的應用程序,要麼刪除之前的檢查點目錄。
除了Spark的監控功能外,還有一些特定於Spark Streaming的功能。使用StreamingContext時, Spark Web UI會顯示一個附加Streaming
選項卡,其中顯示有關運行接收器的統計信息(接收器是否處於活動狀態,接收的記錄數,接收器錯誤等)和已完成的批處理(批處理時間,排隊延遲等)。 )。這可用於監視流應用程序的進度。
Web UI中的如下兩個指標尤其重要:
若是批處理時間始終大於批處理間隔和/或排隊延遲不斷增長,則代表系統沒法以最快的速度處理批處理而且落後。在這種狀況下,請考慮 減小批處理時間。
還可使用StreamingListener接口監視Spark Streaming程序的進度,該 接口容許您獲取接收器狀態和處理時間。請注意,這是一個開發人員API,將來可能會對其進行改進(即報告更多信息)。
從羣集上的Spark Streaming應用程序中得到最佳性能須要進行一些調整。本節介紹了許多能夠調整以提升應用程序性能的參數和配置。在高層次上,您須要考慮兩件事:
經過有效使用羣集資源減小每批數據的處理時間。
設置正確的批量大小,以即可以像接收到的那樣快速處理批量數據(即,數據處理與數據攝取保持同步)。
能夠在Spark中進行許多優化,以最大限度地縮短每一個批處理的處理時間。這些已在「 調整指南」中詳細討論過。本節重點介紹一些最重要的內容。
經過網絡接收數據(如Kafka,Flume,socket等)須要將數據反序列化並存儲在Spark中。若是數據接收成爲系統中的瓶頸,則考慮並行化數據接收。請注意,每一個輸入DStream都會建立一個接收單個數據流的接收器(在工做機器上運行)。所以,能夠經過建立多個輸入DStream並將它們配置爲從源接收數據流的不一樣分區來實現接收多個數據流。例如,接收兩個數據主題的單個Kafka輸入DStream能夠分紅兩個Kafka輸入流,每一個輸入流只接收一個主題。這將運行兩個接收器,容許並行接收數據,從而提升總體吞吐量。這些多個DStream能夠組合在一塊兒以建立單個DStream。而後,能夠在統一流上應用在單個輸入DStream上應用的轉換。這樣作以下。
val numStreams = 5 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream = streamingContext.union(kafkaStreams) unifiedStream.print()
應考慮的另外一個參數是接收器的塊間隔,它由配置參數決定 spark.streaming.blockInterval
。對於大多數接收器,接收的數據在存儲在Spark的內存中以前合併爲數據塊。每一個批次中的塊數決定了在相似地圖的轉換中用於處理接收數據的任務數。每批每一個接收器的任務數量大約是(批處理間隔/塊間隔)。例如,200 ms的塊間隔將每2秒批次建立10個任務。若是任務數量太少(即,少於每臺機器的核心數),那麼效率將會很低,由於全部可用的核心都不會用於處理數據。要增長給定批處理間隔的任務數,請減小塊間隔。可是,塊間隔的建議最小值約爲50 ms,低於該值時,任務啓動開銷可能會成爲問題。
使用多個輸入流/接收器接收數據的替代方案是顯式地從新分區輸入數據流(使用inputStream.repartition(<number of partitions>)
)。這會在進一步處理以前將收到的批量數據分佈到羣集中指定數量的計算機上。
有關直接流的信息,請參閱Spark Streaming + Kafka集成指南
若是在計算的任何階段中使用的並行任務的數量不夠高,則羣集資源可能未被充分利用。例如,對於像reduceByKey
和這樣的分佈式reduce操做reduceByKeyAndWindow
,默認的並行任務數由spark.default.parallelism
配置屬性控制。您能夠將並行級別做爲參數傳遞(請參閱PairDStreamFunctions
文檔),或者設置spark.default.parallelism
配置屬性以更改默認值。
經過調整序列化格式能夠減小數據序列化的開銷。在流式傳輸的狀況下,有兩種類型的數據被序列化。
輸入數據:默認狀況下,經過Receiver接收的輸入數據經過StorageLevel.MEMORY_AND_DISK_SER_2存儲在執行程序的內存中。也就是說,數據被序列化爲字節以減小GC開銷,而且複製以容忍執行程序失敗。此外,數據首先保存在內存中,而且僅在內存不足以保存流式計算所需的全部輸入數據時才溢出到磁盤。這種序列化顯然有開銷 - 接收器必須反序列化接收的數據並使用Spark的序列化格式從新序列化。
流式傳輸操做生成的持久RDD:流式計算生成的RDD能夠保留在內存中。例如,窗口操做將數據保留在內存中,由於它們將被屢次處理。可是,與StorageLevel.MEMORY_ONLY的Spark Core默認值不一樣,默認狀況下,經過流量計算生成的持久RDD與StorageLevel.MEMORY_ONLY_SER(即序列化)一塊兒保留,以最大限度地減小GC開銷。
在這兩種狀況下,使用Kryo序列化能夠減小CPU和內存開銷。有關詳細信息,請參閱Spark Tuning Guide。對於Kryo,請考慮註冊自定義類,並禁用對象引用跟蹤(請參閱「 配置指南」中的Kryo相關配置)。
在須要爲流應用程序保留的數據量不大的特定狀況下,將數據(兩種類型)保存爲反序列化對象多是可行的,而不會產生過多的GC開銷。例如,若是您使用的是幾秒鐘的批處理間隔而沒有窗口操做,則能夠嘗試經過相應地顯式設置存儲級別來禁用持久數據中的序列化。這將減小因爲序列化致使的CPU開銷,可能在沒有太多GC開銷的狀況下提升性能。
若是每秒啓動的任務數量很高(例如,每秒50或更多),則向從屬設備發送任務的開銷可能很大,而且將難以實現亞秒級延遲。經過如下更改能夠減小開銷:
這些更改能夠將批處理時間減小100毫秒,從而容許亞秒級批量大小可行。
要使羣集上運行的Spark Streaming應用程序保持穩定,系統應該可以以接收數據的速度處理數據。換句話說,批處理數據應該在生成時儘快處理。經過監視流式Web UI中的處理時間能夠找到是否適用於應用程序 ,其中批處理時間應小於批處理間隔。
根據流式計算的性質,所使用的批處理間隔可能對應用程序在固定的一組集羣資源上能夠維持的數據速率產生重大影響。例如,讓咱們考慮一下早期的WordCountNetwork示例。對於特定數據速率,系統可能可以每2秒(即,2秒的批處理間隔)跟上報告字數,但不是每500毫秒。所以須要設置批處理間隔,以即可以維持生產中的預期數據速率。
肯定應用程序正確批量大小的好方法是使用保守的批處理間隔(例如,5-10秒)和低數據速率進行測試。要驗證系統是否可以跟上數據速率,您能夠檢查每一個已處理批處理所遇到的端到端延遲的值(在Spark驅動程序log4j日誌中查找「總延遲」,或使用 StreamingListener 接口)。若是延遲保持與批量大小至關,則系統穩定。不然,若是延遲不斷增長,則意味着系統沒法跟上,所以不穩定。一旦瞭解了穩定配置,就能夠嘗試提升數據速率和/或減少批量大小。注意,只要延遲減少到低值(即,小於批量大小),因爲臨時數據速率增長而致使的延遲的瞬時增長多是正常的。
「調優指南」中詳細討論了調整 Spark應用程序的內存使用狀況和GC行爲。強烈建議您閱讀。在本節中,咱們將特別在Spark Streaming應用程序的上下文中討論一些調優參數。
Spark Streaming應用程序所需的集羣內存量在很大程度上取決於所使用的轉換類型。例如,若是要在最後10分鐘的數據上使用窗口操做,那麼您的羣集應該有足夠的內存來在內存中保存10分鐘的數據。或者若是你想使用updateStateByKey
大量的鍵,那麼必要的內存將很高。相反,若是你想作一個簡單的map-filter-store操做,那麼必要的內存就會不多。
一般,因爲經過接收器接收的數據與StorageLevel.MEMORY_AND_DISK_SER_2一塊兒存儲,所以不適合內存的數據將溢出到磁盤。這可能會下降流應用程序的性能,所以建議您根據流應用程序的須要提供足夠的內存。最好嘗試小規模地查看內存使用狀況並進行相應估算。
內存調整的另外一個方面是垃圾收集。對於須要低延遲的流應用程序,不但願由JVM垃圾收集致使大的暫停。
有一些參數能夠幫助您調整內存使用和GC開銷:
DStream的持久性級別:如前面數據序列化部分所述,輸入數據和RDD默認持久化爲序列化字節。與反序列化持久性相比,這減小了內存使用和GC開銷。啓用Kryo序列化可進一步減小序列化大小和內存使用量。經過壓縮(參見Spark配置spark.rdd.compress
)能夠實現內存使用的進一步減小,但代價是CPU時間。
清除舊數據:默認狀況下,DStream轉換生成的全部輸入數據和持久RDD都會自動清除。Spark Streaming根據使用的轉換決定什麼時候清除數據。例如,若是您使用10分鐘的窗口操做,那麼Spark Streaming將保留最後10分鐘的數據,並主動丟棄舊數據。經過設置,能夠將數據保留更長的持續時間(例如,交互式查詢舊數據)streamingContext.remember
。
CMS垃圾收集器:強烈建議使用併發標記和清除GC,以保持GC相關的暫停始終較低。儘管已知併發GC會下降系統的總體處理吞吐量,但仍建議使用它來實現更一致的批處理時間。確保在驅動程序(使用--driver-java-options
輸入spark-submit
)和執行程序(使用Spark配置 spark.executor.extraJavaOptions
)上設置CMS GC 。
其餘提示:爲了進一步下降GC開銷,這裏有一些嘗試的提示。
OFF_HEAP
存儲級別保留RDD 。請參閱Spark編程指南中的更多詳細信息。要記住的要點:
DStream與單個接收器相關聯。爲了得到讀取並行性,須要建立多個接收器,即多個DStream。接收器在執行器內運行。它佔據一個核心。確保在預訂接收器插槽後有足夠的核心進行處理,即spark.cores.max
應考慮接收器插槽。接收器以循環方式分配給執行器。
當從流源接收數據時,接收器建立數據塊。每隔blockInterval毫秒生成一個新的數據塊。在batchInterval期間建立N個數據塊,其中N = batchInterval / blockInterval。這些塊由當前執行程序的BlockManager分發給其餘執行程序的塊管理器。以後,在驅動程序上運行的網絡輸入跟蹤器將被告知有關進一步處理的塊位置。
在驅動程序上爲batchInterval期間建立的塊建立RDD。batchInterval期間生成的塊是RDD的分區。每一個分區都是spark中的任務。blockInterval == batchinterval意味着建立了一個分區,而且可能在本地處理它。
塊中的映射任務在執行器中處理(一個接收塊,另外一個塊複製塊),具備塊而無論塊間隔,除非非本地調度啓動。具備更大的blockinterval意味着更大的塊。較高的值會spark.locality.wait
增長在本地節點上處理塊的機會。須要在這兩個參數之間找到平衡,以確保在本地處理更大的塊。
您能夠經過調用來定義分區數,而不是依賴於batchInterval和blockInterval inputDstream.repartition(n)
。這會隨機從新調整RDD中的數據以建立n個分區。是的,爲了更大的並行性。雖然以洗牌爲代價。RDD的處理由駕駛員的jobcheduler做爲工做安排。在給定的時間點,只有一個做業處於活動狀態。所以,若是一個做業正在執行,則其餘做業將排隊。
若是你有兩個dstream,那麼將造成兩個RDD,而且將建立兩個將一個接一個地安排的做業。爲了不這種狀況,你能夠結合兩個dstreams。這將確保爲dstream的兩個RDD造成單個unionRDD。而後,此unionRDD被視爲單個做業。可是,RDD的分區不受影響。
若是批處理時間超過批處理間隔,那麼顯然接收方的內存將開始填滿,最終會拋出異常(最有多是BlockNotFoundException)。目前,沒有辦法暫停接收器。使用SparkConf配置spark.streaming.receiver.maxRate
,能夠限制接收器的速率。
在本節中,咱們將討論Spark Streaming應用程序在發生故障時的行爲。
要理解Spark Streaming提供的語義,讓咱們記住Spark的RDD的基本容錯語義。
Spark對容錯文件系統(如HDFS或S3)中的數據進行操做。所以,從容錯數據生成的全部RDD也是容錯的。可是,Spark Streaming不是這種狀況,由於大多數狀況下的數據是經過網絡接收的(除非 fileStream
使用時)。要爲全部生成的RDD實現相同的容錯屬性,接收的數據將在羣集中的工做節點中的多個Spark執行程序之間進行復制(默認複製因子爲2)。這致使系統中須要在發生故障時恢復的兩種數據:
此外,咱們應該關注兩種失敗:
有了這些基礎知識,讓咱們瞭解Spark Streaming的容錯語義。
流系統的語義一般根據系統處理每條記錄的次數來捕獲。系統能夠在全部可能的操做條件下提供三種類型的保證(儘管出現故障等)
在任何流處理系統中,從廣義上講,處理數據有三個步驟。
接收數據:使用接收器或其餘方式從數據源接收數據。
轉換數據:使用DStream和RDD轉換轉換接收的數據。
推出數據:最終轉換的數據被推送到外部系統,如文件系統,數據庫,儀表板等。
若是流應用程序必須實現端到端的一次性保證,那麼每一個步驟都必須提供一次性保證。也就是說,每條記錄必須只接收一次,轉換一次,而後推送到下游系統一次。讓咱們理解Spark Streaming上下文中這些步驟的語義。
接收數據:不一樣的輸入源提供不一樣的保證。這將在下一小節中詳細討論。
轉換數據:因爲RDD提供的保證,全部已接收的數據將只處理一次。即便存在故障,只要能夠訪問接收的輸入數據,最終變換的RDD將始終具備相同的內容。
推出數據:默認狀況下輸出操做至少確保一次語義,由於它取決於輸出操做的類型(冪等或不等)和下游系統的語義(支持或不支持事務)。可是用戶能夠實現本身的事務機制來實現一次性語義。本節稍後將對此進行更詳細的討論。
不一樣的輸入源提供不一樣的保證,範圍從至少一次到剛好一次。閱讀更多詳情。
若是全部輸入數據都已存在於HDFS等容錯文件系統中,則Spark Streaming始終能夠從任何故障中恢復並處理全部數據。這給出 了一次性語義,這意味着不管失敗什麼,全部數據都將被處理一次。
對於基於接收器的輸入源,容錯語義取決於故障情形和接收器類型。正如咱們所討論的前面,有兩種類型的接收器:
根據使用的接收器類型,咱們實現如下語義。若是工做節點發生故障,則可靠接收器不會丟失數據。對於不可靠的接收器,接收但未複製的數據可能會丟失。若是驅動程序節點出現故障,那麼除了這些丟失以外,在內存中接收和複製的全部過去數據都將丟失。這將影響有狀態轉換的結果。
爲了不丟失過去收到的數據,Spark 1.2引入了預寫日誌,將接收到的數據保存到容錯存儲中。經過啓用預寫日誌和可靠的接收器,數據丟失爲零。在語義方面,它提供至少一次保證。
下表總結了失敗時的語義:
部署方案 | 工人失敗 | 司機失敗 |
---|---|---|
Spark 1.1或更早版本,或 Spark 1.2或更高版本,沒有預寫日誌 |
使用不可靠的接收器丟失緩衝數據使用可靠的接收器實現 零數據丟失 至少一次語義 |
使用不可靠的接收器丟失緩衝數據 過去的數據在全部接收器中丟失 未定義的語義 |
Spark 1.2或更高版本,具備預寫日誌 | 使用可靠的接收器實現零數據丟失 至少一次語義 |
使用可靠的接收器和文件實現零數據丟失 至少一次語義 |
在Spark 1.3中,咱們引入了一個新的Kafka Direct API,它能夠確保Spark Streaming只接收一次全部Kafka數據。除此以外,若是您實現一次性輸出操做,您能夠實現端到端的一次性保證。「 Kafka集成指南」中進一步討論了這種方法。
輸出操做(例如foreachRDD
)具備至少一次語義,即,在工做者失敗的狀況下,轉換後的數據可能被屢次寫入外部實體。雖然這對於使用saveAs***Files
操做保存到文件系統是能夠接受的 (由於文件將被簡單地用相同的數據覆蓋),可是可能須要額外的努力來實現精確一次的語義。有兩種方法。
冪等更新:屢次嘗試始終寫入相同的數據。例如,saveAs***Files
始終將相同的數據寫入生成的文件。
事務性更新:全部更新都是以事務方式進行的,以便以原子方式完成更新。一種方法是:
foreachRDD
)和RDD的分區索引來建立標識符。該標識符惟一地標識流應用程序中的blob數據。使用標識符以事務方式(即,一次,原子地)使用此blob更新外部系統。也就是說,若是標識符還沒有提交,則以原子方式提交分區數據和標識符。不然,若是已經提交,請跳過更新。
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }