本文講解Spark流數據處理之Spark Streaming。本文的寫做時值Spark 1.6.2發佈之際,Spark 2.0預覽版也已發佈,Spark發展如此迅速,請隨時關注Spark Streaming官方文檔以瞭解最新信息。html
文中對Spark Streaming的講解主要使用到Scala語言,其餘語言請參考官方文檔,這點請知曉。node
Spark Streaming是Spark核心API的擴展,用於可伸縮、高吞吐量、可容錯地處理在線流數據。Spark Streaming能夠從不少數據源獲取數據,好比:Kafka、Flume、Twitter、ZeroMQ、Kinesis或TCP鏈接等,並能夠用不少高層算子(map/reduce/join/window等)來方便地處理這些數據。最後處理過的數據還能夠推送到文件系統、數據庫和在線監控頁面等。實際上,你也能夠在數據流上使用Spark的機器學習和圖計算算法。linux
Spark Streaming內部工做機制概圖以下所示。Spark Streaming接收在線數據流並將其劃分紅批(batch),而後經過Spark引擎處理並最終獲得由一批一批數據構成的結果流。算法
Spark Streaming將流數據抽象爲離散化流(discretized stream),即DStream
。DStream能夠從輸入數據流建立也能夠從其餘的DStream轉換而來。DStream在內部被表示爲一個連續的RDD序列。sql
首先以一個簡單的示例開始:用Spark Streaming對從TCP鏈接中接收的文本進行單詞計數。shell
/** |
爲了測試程序,咱們得有TCP數據源做爲輸入,這可使用Netcat(通常linux系統中都有,若是是windows系統,則推薦你使用Ncat,Ncat是一個改進版的Netcat)。以下使用Netcat監聽指定本地端口:數據庫
nc -lk 9999 |
若是是使用Ncat,則對應命令以下:apache
ncat -lk 9999 |
在IntelliJ IDEA或Eclipse中能夠本地運行測試上述Spark Streaming程序,該程序會鏈接到Netcat(或Ncat)監聽的端口,你能夠在運行Netcat(或Ncat)的終端中輸入東東並回車,而後就能夠看到該Spark Streaming程序會立刻輸出處理結果,而且這個處理是不停的、流式的。windows
注意:上述示例只是對數據流中的每一批數據進行單獨的計數,而沒有進行增量計數。緩存
StreamingContext
是Spark Streaming程序的入口點,正如SparkContext是Spark程序的入口點同樣。
StreamingContext中維護了一個SparkContext實例,你能夠經過ssc.sparkContext
來訪問它。該SparkContext實例要麼在建立StreamingContext時被傳入,要麼在StreamingContext內部根據傳入的SparkConf進行建立,這取決於你所使用的StreamingContext構造函數,請觀看API文檔。
Spark Streaming將流數據抽象爲離散化流(discretized stream),即DStream
。DStream在內部被表示爲一個連續的RDD序列,每個RDD包含了一個固定時間間隔內數據源所產生的數據,以下圖所示。
對DStream所進行的操做將被轉換爲對底層RDD的操做。例如,在前面的流數據單詞計數示例程序中,lines.flatMap(_.split(" "))
語句中的flatMap
算子就被應用到lines DStream中的RDD以生成words DStream中的RDD,以下圖所示。
InputDStream表明輸入數據流,除了file stream和queue RDD stream,其餘的輸入流都和一個Receiver相關聯(具體地是對應ReceiverInputDStream類,其內部會啓動一個Receiver),Receiver工做在一個worker節點上,用於接收相應數據源的流數據並將其存儲在內存中(取決於建立StreamingContext時指定的存儲級別)以供處理。
咱們也能夠建立多個InputDStream來鏈接多個數據源,其中的ReceiverInputDStream都將啓動Receiver來接收數據。一個Spark Streaming應用程序應該分配足夠多的核心(local模式下是線程)去運行receiver(s)並處理其接收的數據。當咱們以本地模式運行Spark Streaming程序時,master URL不能指定爲local
或者local[1]
(Spark Streaming會啓動一個線程運行receiver,只有一個線程將致使沒有線程來處理數據),而應該是local[n]
,這個n應該大於receiver的個數。在集羣中運行Spark Streaming程序時,一樣道理,也須要分配大於receiver的個數的核心數。
Spark Streaming提供了從不少數據源獲取流數據的方法,一些基本的數據源能夠經過StreamingContext API直接使用,主要包括:文件系統、網絡鏈接、Akka actors等。
StreamingContext提供了從兼容於HDFS API的全部文件系統中建立文件數據輸入流的方法,以下:
ssc.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) |
文件流沒有receiver。Spark Streaming將監控對應目錄(但不支持嵌套目錄),並處理在該目錄中建立的任何文件(以.
開頭的將被忽略)。監控目錄中的文件必須有相同的數據格式。監控目錄中的文件若是被修改(好比以append方式寫入),這些修改將不會被讀取,所以正確的方式應該是先在其餘目錄中寫好這些文件並將其移動或者重命名到該監控目錄。
對於簡單的文本文件,可使用更簡單的方法,以下:
ssc.textFileStream(dataDirectory) |
網絡鏈接流可使用ssc.socketStream()
或ssc.socketTextStream()
建立,詳情參見API文檔。
能夠經過ssc.actorStream()
建立一個從Akka actor接收數據流的ReceiverInputDStream。更多參見API文檔和自定義Receiver指南。
咱們也能夠用ssc.queueStream()
建立一個基於RDD序列的InputDStream。序列中的每個RDD將被做爲DStream中的一個數據批,這一般在測試你的Spark Streaming程序時很是有用。
對於Kafka、Flume、Kinesis、Twitter等這些高級數據源,則須要添加外部依賴,關於依賴參見這裏。
下面給出一些關於高級數據源集成方法的參考連接:
你也能夠自定義數據源,只須要實現一個本身的receiver從自定義數據源接收數據並將其推送到Spark。詳情參見:Custom Receiver Guide。
依據可靠性可將Receiver分爲兩類。可靠Receiver帶有傳輸確認機制(ACK機制),能夠確保數據在傳輸過程當中不會丟失,Kafka和Flume等在ACK機制開啓的狀況下就是可靠的。不可靠Receiver不帶有傳輸確認機制,包括不支持ACK機制和支持ACK但關閉的情形。
DStream支持不少和RDD相似的轉換算子(transformation)(這些轉換算子和RDD中的同樣,都是lazy的),完整的算子列表參見API文檔中的DStream
和PairDStreamFunctions
,下面列出一些經常使用的:
Transformation | Meaning |
---|---|
map(func) | Return a new DStream by passing each element of the source DStream through a function func. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items. |
filter(func) | Return a new DStream by selecting only the records of the source DStream on which funcreturns true. |
repartition(numPartitions) | Changes the level of parallelism in this DStream by creating more or fewer partitions. |
union(otherStream) | Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
count() | Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) | Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. |
countByValue() | When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
reduceByKey(func, [numTasks]) | When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism ) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
join(otherStream, [numTasks]) | When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) | When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) | Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) | Return a new 「state」 DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
下面對其中一些操做進行更詳細的說明。
updateStateByKey操做容許你用持續不斷的新信息來更新你所維護的某些狀態信息。爲了使用這個算子,一般你須要以下兩個步驟:
在每一批數據中,Spark都將在全部已經存在的key上應用狀態更新函數,即便在這批數據中沒有某些key對應的數據。若是狀態更新函數返回None
,那麼對應的key-value對將被移除。
在快速示例中,咱們只是對每一批數據進行單獨的單詞計數,在這裏咱們就能夠經過updateStateByKey算子進行增量計數了。
須要注意的是使用updateStateByKey算子要求已經配置了檢查點目錄,參見檢查點部分。
transform操做容許在DStream上應用任意RDD-to-RDD函數,這樣你就能夠方便地使用在DStream API中沒有的卻在RDD API中存在的算子來轉換DStream中的每個RDD了。例如,在DStream API中不存在將數據流中的每一批數據(一個RDD)與其餘數據集進行join的操做,此時就能夠經過transform算子+RDD的join算子來實現。
須要注意的是傳入transform的函數每次應用在一批數據(一個RDD)上,這意味着你能夠根據時間變化在不一樣的RDD上作不一樣的處理,也就是說RDD操做、RDD分區數、廣播變量等在不一樣的批之間均可以改變。
Spark Streaming也提供了基於窗口的計算,它容許你在一個滑動窗口上使用轉換操做,滑動窗口以下圖所示。
窗口是基於時間滑動的,窗口操做新造成的DStream中的每個RDD包含了某一滑動窗口中的全部數據。任何窗口操做都須要指定以下兩個參數:
一些經常使用的窗口操做算子以下:
Transformation | Meaning |
---|---|
window( windowLength, slideInterval) | Return a new DStream which is computed based on windowed batches of the source DStream. |
countByWindow( windowLength, slideInterval) | Return a sliding window count of elements in the stream. |
reduceByWindow( func, windowLength, slideInterval) | Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel. |
reduceByKeyAndWindow( func, windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config propertyspark.default.parallelism ) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
reduceByKeyAndWindow( func, invFunc, windowLength, slideInterval, [numTasks]) | A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and 「inverse reducing」 the old data that leaves the window. An example would be that of 「adding」 and 「subtracting」 counts of keys as the window slides. However, it is applicable only to 「invertible reduce functions」, that is, those reduce functions which have a corresponding 「inverse reduce」 function (taken as parameter invFunc). Like in reduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation. |
countByValueAndWindow( windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument. |
須要強調的是,上述某些操做(如reduceByWindow
和reduceByKeyAndWindow
等)有一些特殊形式,經過只考慮新進入窗口的數據和離開窗口的數據,讓Spark增量計算歸約結果。這種特殊形式須要額外提供一個規約函數的逆函數,好比+
對應的逆函數爲-
。對於較大的窗口,提供逆函數能夠大大提升執行效率。
Stream-stream joins:
val stream1: DStream[String, String] = ... |
上述代碼對兩個DStream進行join
操做,在每個批處理間隔,stream1產生的一個RDD將和stream2產生的一個RDD進行join操做。另外,還有其餘一些join操做:leftOuterJoin
、rightOuterJoin
和fullOuterJoin
。也能夠進行基於窗口的join操做,以下:
val windowedStream1 = stream1.window(Seconds(20)) |
Stream-dataset joins:
這種類型的join能夠經過transform()
來實現。以下代碼將一個分窗的stream和一個數據集進行join:
val dataset: RDD[String, String] = ... |
輸出操做容許將DStream中的數據推送到外部系統,好比數據庫和文件系統。和RDD的action算子相似,DStream的輸出操做用來觸發全部轉換操做的執行。下面列出主要的輸出操做:
Output Operation | Meaning |
---|---|
print() | Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. |
saveAsTextFiles( prefix, [suffix]) | Save this DStream’s contents as text files. The file name at each batch interval is generated based on prefix and suffix:「prefix-TIME_IN_MS[.suffix]」. |
saveAsObjectFiles( prefix, [suffix]) | Save this DStream’s contents asSequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix:「prefix-TIME_IN_MS[.suffix]」. |
saveAsHadoopFiles( prefix, [suffix]) | Save this DStream’s contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix:「prefix-TIME_IN_MS[.suffix]」. |
foreachRDD(func) | The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs. |
一般,將數據寫到外部系統須要建立一個網絡鏈接。不經意間,你極可能在driver節點建立一個鏈接對象,而後在試着在executor節點使用這個鏈接,以下:
dstream.foreachRDD { rdd => |
上述代碼是錯誤的,由於建立的鏈接對象將被序列化而後傳輸到worker節點,而鏈接一般是不能在機器之間傳遞的。這個錯誤可能顯示爲序列化錯誤(鏈接對象不可序列化)、初始化錯誤(鏈接對象須要在worker節點初始化)等等。
改進的辦法是在worker節點建立鏈接對象。然而,這可能致使另外一個常見錯誤——爲每一條記錄建立一個鏈接,以下:
dstream.foreachRDD { rdd => |
爲每一條記錄建立一個鏈接將消耗掉大量系統資源,極大地下降了系統效率。一個更好的方法是使用rdd.foreachPartition()
爲每個RDD分區建立一個鏈接,以下:
dstream.foreachRDD { rdd => |
最後,還能夠進一步優化——在多個批數據(RDD)之間重用鏈接對象。咱們能夠經過一個靜態的lazy的鏈接池來保存鏈接,以供在多批數據之間共用鏈接對象,以下:
dstream.foreachRDD { rdd => |
注意,上述鏈接池中的鏈接應該是按需建立的(lazy的),而且最好將長期不用的鏈接關閉(超時機制)。
- DStream的轉換操做是lazy的,輸出操做觸發實質的計算。具體地說是DStream輸出操做內部的RDD行動操做強制處理接收到的數據。所以,若是一個程序沒有任何輸出操做,或者有像foreachRDD()這樣的輸出操做但其中沒有任何RDD行動操做,那麼該程序就不會執行任何計算,它將簡單地接收數據而後丟棄掉。
- 缺省狀況下,輸出操做一次執行一個,而且是按照應用程序定義的先後順序執行的。
在Spark Streaming中,累加器(Accumulator)和廣播變量(Broadcast)不能從檢查點(checkpoint)中恢復。若是你採用檢查點機制(檢查點將切斷RDD依賴)而且也用了累加器或廣播變量,爲了在突發異常並重啓driver節點以後累加器和廣播變量能夠被從新實例化,你應該爲它們建立lazy實例化的單例對象。示例以下:
object WordBlacklist { |
在流數據處理中也可使用DataFrame和SQL。此時你必須用StreamingContext正在使用的SparkContext實例來建立一個SQLContext。爲了使程序能夠在driver故障重啓以後能夠繼續運行,咱們應該建立一個lazy實例化的SQLContext的單例對象。示例以下:
/** 在Spark Streaming程序中進行DataFrame操做 */ |
你也能夠在流數據中使用由MLlib提供的機器學習算法。首先你要知道的是,有一些流式機器學習算法(例如Streaming Linear Regression、Streaming KMeans等),它們能夠從流數據中學習獲得模型,也能夠將學到的模型應用到流數據中。除此以外,對於大量的機器學習算法,你能夠經過歷史數據離線地學習獲得一個模型,並將模型應用到在線的流數據中。
和RDD類似,DStream也容許將流數據持久化,簡單地在DStream上調用persist()
將自動地將其表明的每個RDD緩存下來。若是同一個DStream中的數據要被使用屢次,將DStream緩存下來將是很是有益的。
對於window-based操做(如reduceByWindow、reduceByKeyAndWindow等)和state-based操做(如updateStateByKey),DStream將被隱式地持久化,所以你就沒必要本身手動調用persist()
了哦。
對於從網絡獲取數據的狀況(如TCP鏈接、Kafka、Flume等),出於容錯的考慮,缺省的持久化級別是將數據複製到兩個節點。
注意:和RDD不一樣的是,DStream的缺省持久化級別是將數據序列化並存儲到內存中。
流處理程序一般是7*24小時不間斷運行的,所以必須是能夠從故障中恢復的。爲了能夠從故障中恢復,Spark Streaming須要在可容錯的存儲系統中checkpoint足夠的信息。有兩類數據被checkpoint。
什麼時候啓用Checkpointing:
怎樣配置Checkpointing:
可使用ssc.checkpoint()
來設置checkpoint目錄,這樣你就能夠在程序中使用stateful的轉換操做了,若是你想使程序能夠從driver節點的故障中恢復,你應該重寫你的程序以支持如下行爲:
start()
。上述行爲能夠經過ssc.getOrCreate()
來輔助實現,示例以下:
// 建立並設置一個新的StreamingContext對象 |
須要注意的是,checkpoint的時間間隔須要仔細考慮,太小或過大的時間間隔均可能致使問題。一般,checkpoint的時間間隔最好是DStream的批處理時間間隔的5-10倍。dstream.checkpoint()
可用來設置checkpoint的時間間隔,同時對於那些沒有默認地進行checkpointing的DStream(非stateful轉換操做生成的DStream),這也將引發週期性地checkpoint該DStream中的RDD。
除了Spark提供的一些監控能力外,Spark Streaming還提供了一些額外的監控能力。當一個Spark Streaming程序運行時,Spark應用程序監控頁面(一般是 http://master:4040 )將多出一個Streaming
選項卡,其中展現了receiver和已完成的batch的統計信息。其中有兩個信息很是有用:
若是處理一批數據的時間持續高出批處理間隔,或者等待時間持續增長,一般意味着你的系統的處理速度跟不上數據產生的速度。此時,你能夠考慮削減批數據的處理時間,參見性能調優部分。
你也能夠經過StreamingListener
接口來監聽Spark Streaming程序的執行情況,包括:receiver狀態、處理時間等等。
經過網絡接收的數據(Kafka、Flume、socket等)須要通過解序列化而後存儲在Spark中。若是數據接收成爲瓶頸,你就須要考慮增長數據接收的並行度。注意每個Input DStream只會建立一個receiver(運行在worker節點)用於接收一個數據流。你能夠建立多個Input DStream並配置它們來接收數據源的不一樣部分以增長數據接收並行度。
例如,若是一個Kafka input DStream接收兩個主題的數據致使系統瓶頸的話,能夠將Kafka輸入流劃分爲兩個,而後建立兩個Input DStream,每個接收一個主題的數據流。這樣的話數據接收就能夠並行進行了,從而增長了系統的吞吐量。這兩個DStream能夠被union成爲一個單一的DStream,後續的轉換操做將做用在統一的數據流之上。示例以下:
val numStreams = 5 |
另外一個須要考慮的是receiver的數據塊劃分間隔,這能夠經過spark.streaming.blockInterval
進行設置。receiver會將接收到的數據合併爲數據塊而後存儲到Spark內存中。每一批數據中數據塊的數量決定了Task的數量(一般大約是:批處理間隔/塊間隔)。過少的Task數將致使集羣資源利用率下降,若是出現這種狀況,你應該試着去減少塊劃分間隔,咱們推薦的塊劃分間隔的最小值是大約50ms,太小的話也將致使一些問題。
另外一個增長並行度的方法是在處理數據以前,使用inputStream.repartition()
明確地將Input DStream從新分區,這樣新造成的DStream中的RDD都將有指定數量的分區。
對於一些操做,如reduceByKey()
和reduceByKeyAndWindow()
等,也能夠傳遞一個分區數參數來控制計算的並行度。
在Spark Streaming程序中,輸入數據和持久化的數據默認都通過序列化處理並緩存在內存中。
序列化優化能夠明顯提升程序的運行效率,參見個人Spark使用總結一文的序列化部分。
在一些特殊狀況下,須要保存在內存中的流數據可能不是很大,這時能夠設置存儲級別以非序列化的形式存儲在內存中,若是這不會引發過大的GC開銷,那麼將提升程序的性能。
批處理間隔的設置對流處理程序是很是關鍵的,這可能影響到輸入流可否被迅速地流暢地持續地處理。
恰當的批處理間隔一般和數據產生速度以及集羣計算能力相關。一般來講,若是咱們想了解一個流處理程序的處理速度可否跟得上數據產生的速度,能夠查看Spark應用監控頁面( http://master:4040 )。對於一個穩定的流處理程序來講,批處理時間(Processing Time)應該小於設置的批處理間隔時間(Batch Interval),而且Batch的調度延遲時間(Scheduling Delay)是相對平穩的(持續增長就意味着跟不上數據產生速度了,但瞬時的增長並不意味着什麼)。
已經證明,對於大多數應用來講,500ms是比較好的最小批處理間隔。
內存優化對於一個Spark Streaming程序來講也很重要,這主要包括:內存使用優化以及GC優化。關於內存調優參見Spark Streaming官方文檔內存調優部分。