本文基於Spark Streaming Programming Guide原文翻譯, 加上一些本身的理解和小實驗的結果。
php
Spark Streaming是基於Core Spark API的可擴展,高吞吐量,並具備容錯能力的用於處理實時數據流的一個組件。Spark Streaming能夠接收各類數據源傳遞來的數據,好比Kafka, Flume, Kinesis或者TCP等,對接收到的數據還可使用一些用高階函數(好比map, reduce, join
及window
)進行封裝的複雜算法作進一步的處理。最後,處理好的數據能夠寫入到文件系統,數據庫,或者直接用於實時展現。除此以外,還能夠在數據流上應用一些機器學習或者圖計算等算法。
html
上圖展現了Spark Streaming的總體數據流轉狀況。在Spark Streaming中的處理過程能夠參考下圖,Spark Streaming接收實時數據,而後把這些數據分割成一個個batch,而後經過Spark Engine分別處理每個batch並輸出。
java
Spark Streaming中一個最重要的概念是DStream,即離散化數據流(discretized stream),DStream由一系列連續的數據集組成。DStream的建立有兩種辦法,一種是從數據源接收數據生成初始DStream,另外一種是由DStream A經過轉換生成DStream B。一個DStream實質上是由一系列的RDDs組成。
本文介紹瞭如何基於DStream
寫出Spark Streaming程序。Spark Streaming提供了Scala, Java以及Python接口,在官方文檔中對這三種語言都有示例程序的實現,在這裏只分析Scala寫的程序。python
在深刻分析Spark Streaming的特性和原理以前,以寫一個簡單的Spark Streaming程序並運行起來爲入口先了解一些相關的基礎知識。這個示例程序從TCP socket中接收數據,進行Word Count操做。git
首先須要導入Spark Streaming相關的類,其中StreamingContext是全部Streaming程序的主要入口。接下來的代碼中建立一個local StreamingContext
,batch時間爲1秒,execution線程數爲2。github
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// 建立一個local StreamingContext batch時間爲1秒,execution線程數爲2
// master的線程數數最少爲2,後面會詳細解釋
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, econds(1))
使用上面這個ssc
對象,就能夠建立一個lines
變量用來表示從TCP接收的數據流了,指定機器名爲localhost
端口號爲9999
web
// 建立一個鏈接到hostname:port的DStream, 下面代碼中使用的是localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
lines
中的每一條記錄都是TCP中的一行文本信息。接下來,使用空格將每一行語句進行分割。算法
// 將每一行分割成單詞
val words = lines.flatMap(_.split(" "))
上面使用的flatMap
操做是一個一對多的DStream
操做,在這裏表示的是每輸入一行記錄,會根據空格生成多個單詞,這些單詞造成一個新的DStream words
。接下來統計單詞個數。sql
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// 統計每一個batch中的不一樣單詞個數
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// 打印出其中前10個單詞出現的次數
wordCounts.print()
上面代碼中,將每個單詞使用map
方法映射成(word, 1)
的形式,即paris變量。而後調用reduceByKey
方法,將相同單詞出現的次數進行疊加,最終打印出統計的結果。數據庫
寫完上面的代碼,Spark Streaming程序尚未運行起來,須要寫入如下兩行代碼使Spark Streaming程序可以真正的開始執行。
ssc.start() // 開始計算
ssc.awaitTermination() // 等待計算結束
(1)運行Netcat
使用如下命令啓動一個Netcat
nc -lk 9999
接下來就能夠在命令行中輸入任意語句了。
(2)運行Spark Streaming程序
./bin/run-example streaming.NetworkWordCount localhost 9999
程序運行起來後Netcat中輸入的任何語句,都會被統計每一個單詞出現的次數,例如
這一部分詳細介紹Spark Streaming中的基本概念。
Spark Streaming相關jar包的依賴也可使用Maven來管理,寫一個Spark Streaming程序的時候,須要將下面的內容寫入到Maven項目中
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>
對於從Kafka,Flume,Kinesis這些數據源接收數據的狀況,Spark Streaming core API中不提供這些類和接口,須要添加下面這些依賴。
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-8_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
Spark Streaming程序的主要入口是一個StreamingContext
對象,在程序的開始,須要初始化該對象,代碼以下
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
其中的參數appName
是當前應用的名稱,能夠在Cluster UI上進行顯示。master
是Spark的運行模式,能夠參考 Spark, Mesos or YARN cluster URL,或者設置成local[*]
的形式在本地模式下運行。在生產環境中運行Streaming應用時,通常不會將master參數寫死在代碼中,而是在使用spark-submit
命令提交時動態傳入--master
參數,具體能夠參考 launch the application with spark-submit 。
至於batch時間間隔的設置,須要綜合考慮程序的性能要求以及集羣可提供的資源狀況。
也能夠基於SparkContext
對象,生成一個StreamingContext
對象,使用以下代碼
import org.apache.spark.streaming._
val sc = ... // 已有的SparkContext對象
val ssc = new StreamingContext(sc, Seconds(1))
當context初始化後,還須要作的工做有:
DStreams
transformation
以及輸出操做處理輸入的DStreams
streamingContext.start()
啓動程序,開始接收並處理數據streamingContext.awaitTermination()
等待程序運行終止(包括手動中止,或者遇到Error
後退出應用)streamingContext.stop()
手動中止應用須要注意的點:
context
開始運行後,不能再往其中添加新的計算邏輯context
被中止後,不能restart
StreamingContext
對象處於運行狀態StreamingContext
中的stop()
方法一樣會終止SparkContext
。若是隻須要中止StreamingContext
,將stop()
方法的可選參數設置成false
,避免SparkContext
被終止SparkContext
對象,能夠用於構造多個StreamingContext
對象,只要在新的StreamingContext
對象被建立前,舊的StreamingContext
對象被中止便可。 DStream
是Spark Streaming中最基本最重要的一個抽象概念。DStream
由一系列的數據組成,這些數據既能夠是從數據源接收到的數據,也能夠是從數據源接收到的數據通過transform
操做轉換後的數據。從本質上來講一個DStream
是由一系列連續的RDDs
組成,DStream
中的每個RDD
包含了一個batch的數據。
DStream
上的每個操做,最終都反應到了底層的RDDs
上。好比,在前面那個Word Count代碼中將lines
轉化成words
的邏輯,lines
上的flatMap
操做就如下圖中所示的形式,做用到了每個底層的RDD
上。
這些底層RDDs
上的轉換操做會有Spark Engine進行計算。對於開發者來講,DStream
提供了一個更方便使用的高階API,從而開發者無需過多的關注每個轉換操做的細節。
DStream
上能夠執行的操做後續文章中會有進一步的介紹。
(1)基本數據源
在前面Word Count的示例程序中,已經使用到了ssc.socketTextStream(...)
,這個會根據TCP socket中接收到的數據建立一個DStream
。除了sockets以外,StreamingContext API還支持以文件爲數據源生成DStream
。
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming程序會監控用戶輸入的dataDirectory
路徑,接收並處理該路徑中的全部文件,不過不支持子文件夾中的文件。
須要注意的地方有:
a、全部的文件數據格式必須相同
b、該路徑下的文件應該是原子性的移動到該路徑,或者重命名到該路徑
c、文件進入該路徑後不可再發生變化,因此這種數據源不支持數據連續寫入的形式
對於簡單的text文件,有一個簡單的StreamingContext.textFileStream(dataDirectory)
方法來進行處理。而且文件數據源的形式不須要運行一個receiver進程,因此對Execution的核數沒有要求。
streamingContext.queueStream(queueOfRDDs)
,能夠將一系列的RDDs轉化成一個DStream。該queue中的每個RDD會被當作DStream
中的一個batcn,而後以Streaming的形式處理這些數據。(2)高階數據源
(3)自定義數據源
除了上面兩類數據源以外,也能夠支持自定義數據源。自定義數據源時,須要實現一個能夠從自定義數據源接收數據併發送到Spark中的用戶自定義receiver。具體能夠參考 Custom Receiver Guide。
(4)數據接收的可靠性
相似於RDDs,transformations
可使輸入DStream
中的數據內容根據特定邏輯發生轉換。DStreams
上支持不少RDDs
上相同的一些transformations
。
具體含義和使用方法可參考另外一篇博客:Spark Streaming中的操做函數分析
在上面這些transformations
中,有一些須要進行進一步的分析
(1)UpdateStateByKey操做
(2)Transform操做
transform
操做及其相似的一些transformwith
操做,可使DStream
中的元素可以調用任意的RDD-to-RDD的操做。可使DStream
調用一些只有RDD纔有而DStream API沒有提供的算子。例如,DStream API就不支持一個data DStream中的每個batch數據能夠直接和另外的一個數據集作join
操做,可是使用transform
就能夠實現這一功能。這個操做能夠說進一步豐富了DStream
的操做功能。
再列舉一個這個操做的使用場景,將某處計算到的重複信息與實時數據流中的記錄進行join,而後進行filter操做,能夠當作一種數據清理的方法。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // 一個包含重複信息的RDD
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // 將重複信息與實時數據作join,而後根據指定規則filter,用於數據清洗
...})
這裏須要注意的是,transform
傳入的方法是被每個batch調用的。這樣能夠支持在RDD上作一些時變的操做,即RDD,分區數以及廣播變量能夠在不一樣的batch之間發生變化。
(3)Window操做
Spark Streaming提供一類基於窗口的操做,這類操做能夠在一個滑動窗口中的數據集上進行一些transformations
操做。下圖展現了窗口操做的示例
上圖中,窗口在一個DStream
源上滑動,DStream
源中暴露在該窗口中的RDDs
可讓這個窗口進行相關的一些操做。在上圖中能夠看到,該窗口中任一時刻都只能看到3個RDD,而且這個窗口每2秒中往前滑動一次。這裏提到的兩個參數,正好是任意一個窗口操做都必須指定的。
滑動間隔:指窗口多長時間往前滑動一次,上圖中爲2。
須要注意的一點是,上面這兩個參數,必須是batch時間的整數倍,上圖中的batch時間爲1。
接下來展現一個簡單的窗口操做示例。好比說,在前面那個word count示例程序的基礎上,我但願每隔10秒鐘就統計一下當前30秒時間內的每一個單詞出現的次數。這一功能的簡單描述是,在paris DStream
的當前30秒的數據集上,調用reduceByKey
操做進行統計。爲了實現這一功能,可使用窗口操做reduceByKeyAndWindow
。
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
更多的窗口操做能夠參考:Spark Streaming中的操做函數分析
DStream
上的輸出操做,可使DStream
中的數據發送到外部系統,好比數據庫或文件系統中。DStream
只有通過輸出操做,其中的數據才能被外部系統使用。而且下面這些輸出操做才真正的觸發DStream對象上調用的transformations
操做。這一點相似於RDDs上的Actions
算子。
輸出操做的使用和功能請參考:Spark Streaming中的操做函數分析
下面主要進一步分析foreachRDD
操做往外部數據庫寫入數據的一些注意事項。
dstream.foreachRDD
是DStream輸出操做中最經常使用也最重要的一個操做。關於這個操做如何正確高效的使用,下面會列舉出一些使用方法和案例,能夠幫助讀者在使用過程當中避免踩到一些坑。
一般狀況下,若是想把數據寫入到某個外部系統中時,須要爲之建立一個鏈接對象(好比提供一個TCP鏈接工具用於鏈接遠程服務器),使用這個鏈接工具才能將數據發送到遠程系統。在Spark Streaming中,開發者極可能會在Driver端建立這個對象,而後又去Worker
端使用這個對象處理記錄。好比下面這個例子
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // 在driver端執行
rdd.foreach { record =>
connection.send(record) // 在wroker端執行
}}
上面這個使用方法實際上是錯誤的,當在driver端建立這個鏈接對象後,須要將這個鏈接對象序列化併發送到wroker端。一般狀況下,鏈接對象都是不可傳輸的,即wroker端沒法獲取該鏈接對象,固然也就沒法將記錄經過這個鏈接對象發送出去了。這種狀況下,應用系統的報錯提示多是序列化錯誤(鏈接對象沒法序列化),或者初始化錯誤(鏈接對象須要在wroker端完成初始化),等等。
正確的作法是在worker端建立這個鏈接對象。
可是,即便是在worker建立這個對象,又可能會犯如下錯誤。
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}}
上面代碼會爲每一條記錄建立一個鏈接對象,致使鏈接對象太多。 鏈接對象的建立個數會受到時間和系統資源狀況的限制,所以爲每一條記錄都建立一個鏈接對象會致使系統出現沒必要要的高負載,進一步致使系統吞吐量下降。
一個好的辦法是使用rdd.foreachPartition
操做,而後爲RDD的每個partition
,使一個partition
中的記錄使用同一個鏈接對象。以下面代碼所示
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}}
最後,能夠經過使用鏈接對象池進一步對上面的代碼進行優化。使用鏈接對象池能夠進一步提升鏈接對象的使用效率,使得多個RDDs/batches
之間能夠重複使用鏈接對象。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// 鏈接對象池是靜態的,而且創建對象只有在真正使用時才被建立
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // 使用完以後,將鏈接對象歸還到池中以便下一次使用
}}
須要注意的是,鏈接對象池中的對象最好設置成懶生成模式,即在真正使用時纔去建立鏈接對象,而且給鏈接對象設置一個生命週期,必定時間不使用則註銷該鏈接對象。
總結一下關鍵點:
DStreams
的transformations
操做是由輸出操做觸發的,相似於RDDs
中的actions
操做。上面列舉出某些DStream
的輸出操做中能夠將其中的元素轉化成RDD
,進而能夠調用RDD提供的一些API操做,這時若是對RDD
調用actions
操做會當即強制對接收到的數據進行處理。所以,若是用戶應用程序中DStream不須要任何的輸出操做,或者僅僅對DStream
使用一些相似於dstream.foreachRDD
操做可是在這個操做中不調用任何的RDD action
操做時,程序是不會進行任何實際運算的。系統只會簡單的接收數據,任何丟棄數據。 Spark Streaming的累加器和廣播變量沒法從checkpoint
恢復。若是在應用中既使用到checkpoint
又使用了累加器和廣播變量的話,最好對累加器和廣播變量作懶實例化操做,這樣纔可使累加器和廣播變量在driver失敗重啓時可以從新實例化。參考下面這段代碼
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}}
object DroppedWordsCounter {
@volatile private var instance: Accumulator[Long] = null
def getInstance(sc: SparkContext): Accumulator[Long] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.accumulator(0L, "WordsInBlacklistCounter")
}
}
}
instance
}}
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter += count
false
} else {
true
}
}.collect()
val output = "Counts at time " + time + " " + counts})
查看完整代碼請移步 source code
在streaming數據上也能夠很方便的使用到DataFrames
和SQL操做。爲了支持這種操做,須要用StreamingContext對象使用的SparkContext
對象初始化一個SQLContext
對象出來,SQLContext
對象設置成一個懶初始化的單例對象。下面代碼對前面的Word Count進行一些修改,經過使用DataFrames
和SQL
來實現Word Count的功能。每個RDD都被轉化成一個DataFrame
對象,而後註冊成一個臨時表,最後就能夠在這個臨時表上進行SQL查詢了。
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// 獲取單例SQLContext對象
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
// 將RDD[String]轉化成DataFrame
val wordsDataFrame = rdd.toDF("word")
// 註冊表
wordsDataFrame.registerTempTable("words")
// 在該臨時表上執行sql語句操做
val wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()}
查看完整代碼請移步 source code.
也能夠在另外一線程獲取到的Streaming
數據上進行SQL操做(這裏涉及到異步運行StreamingContext
)。StreamingContext
對象沒法感知到異步SQL查詢的存在,所以有StreamingContext
對象有可能在SQL查詢完成以前把歷史數據刪除掉。爲了保證StreamingContext
不刪除須要用到的歷史數據,須要告訴StreamingContext
保留必定量的歷史數據。例如,若是你想在某一個batch的數據上執行SQL查詢操做,可是你這個SQL須要執行5分鐘的時間,那麼,須要執行streamingContext.remember(Minutes(5))
語句告訴StreamingContext
將歷史數據保留5分鐘。
有關DataFrames的更多介紹,參考另外一篇博客:Spark-SQL之DataFrame操做大全
相似於RDDs
,DStreams
也容許開發者將stream
中的數據持久化到內存中。在DStream
對象上使用persist()
方法會將DStream
對象中的每個RDD
自動持久化到內存中。這個功能在某個DStream的數據須要進行屢次計算時特別有用。對於窗口操做好比reduceByWindow
,以及涉及到狀態的操做好比updateStateByKey
,默認會對DStream
對象執行持久化。所以,程序在運行時會自動將窗口操做和涉及到狀態的這些操做生成的DStream對象持久化到內存中,不須要開發者顯示的執行persist()
操做。
對那些經過網絡接收到的streams
數據(好比Kafka, Flume, Socket等),默認的持久化等級是將數據持久化到兩個節點上,以保證其容錯能力。
注意,不一樣於RDDs
,默認狀況下DStream
的持久化等級是將數據序列化保存在內存中。這一特性會在後面的性能調優中進一步分析。有關持久化級別的介紹,能夠參考rdd-persistence
當Streaming
應用運行起來時,基本上須要7 * 24的處於運行狀態,因此須要有必定的容錯能力。檢查點的設置就是可以支持Streaming
應用程序快速的從失敗狀態進行恢復的。檢查點保存的數據主要有兩種:
1 . 元數據(Metadata
)檢查點:保存Streaming
應用程序的定義信息。主要用於恢復運行Streaming
應用程序的driver節點上的應用。元數據包括:
a、配置信息:建立Streaming應用程序的配置信息
b、DStream
操做:在DStream
上進行的一系列操做方法
c、未處理的batch:記錄進入等待隊列可是還未處理完成的批次
2 . 數據(Data)檢查點:將計算獲得的RDD保存起來。在一些跨批次計算並保存狀態的操做時,必須設置檢查點。由於在這些操做中基於其餘批次數據計算獲得的RDDs,隨着時間的推移,計算鏈路會愈來愈長,若是發生錯誤重算的代價會特別高。
元數據檢查點信息主要用於恢復driver端的失敗,數據檢查點主要用於計算的恢復。
(1)何時須要使用檢查點
當應用程序出現如下兩種狀況時,須要配置檢查點。
- 使用到狀態相關的操做算子-好比updateStateByKey
或者reduceByKeyAndWindow
等,這種狀況下必須爲應用程序設置檢查點,用於按期的對RDD進行檢查點設置。
- Driver端應用程序恢復-當應用程序從失敗狀態恢復時,須要從檢查點中讀取相關元數據信息。
(2)檢查點設置
通常是在具備容錯能力,高可靠的文件系統上(好比HDFS, S3等)設置一個檢查點路徑,用於保存檢查點數據。設置檢查點能夠在應用程序中使用streamingContext.checkpoint(checkpointDirectory)
來指定路徑。
若是想要應用程序在失敗重啓時使用到檢查點存儲的元數據信息,須要應用程序具備如下兩個特性,須要使用StreamingContext.getOrCreate
代碼在失敗時從新建立StreamingContext
對象:
StreamingContext
對象,而後開始執行程序處理DStream。當應用程序失敗重啓時,能夠從設置的檢查點路徑獲取元數據信息,建立一個StreamingContext
對象,並恢復到失敗前的狀態。
下面用Scala代碼實現上面的要求。
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // 建立一個新的StreamingContext對象
val lines = ssc.socketTextStream(...) // 獲得DStreams
...
ssc.checkpoint(checkpointDirectory) // 設置checkpoint路徑
ssc
}
// 用checkpoint元數據建立StreamingContext對象或根據上面的函數建立新的對象
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// 設置context的其餘參數
context. ...
// 啓動context
context.start()
context.awaitTermination()
若是checkpointDirectory
路徑存在,會使用檢查點元數據恢復一個StreamingContext
對象。若是路徑不存在,或者程序是第一次運行,則會使用functionToCreateContext
來建立一個新的StreamingContext
對象。
RecoverableNetWorkWordCount示例代碼演示了一個從檢查點恢復應用程序的示例。
須要注意的是,想要用到上面的getOrCreate
功能,須要在應用程序運行時使其支持失敗自動重跑的功能。這一功能,在接下來一節中有分析。
另外,在往檢查點寫入數據這一過程,是會增長系統負荷的。所以,須要合理的設置寫入檢查點數據的時間間隔。對於小批量時間間隔(好比1秒)的應用,若是每個batch都執行檢查點寫入操做,會顯著的下降系統的吞吐性能。相反的,若是寫入檢查點數據間隔過久,會致使lineage過長。對那些狀態相關的須要對RDD進行檢查點寫入的算子,檢查點寫入時間間隔最好設置成batch時間間隔的整數倍。好比對於1秒的batch間隔,設置成10秒。有關檢查點時間間隔,可使用dstream.checkpoint(checkpointInterval)
。通常來講,檢查點時間間隔設置成5~10
倍滑動時間間隔是比較合理的。
這一節主要討論如何將一個Spark Streaming應用程序部署起來。
(1)需求
運行一個Spark Streaming應用程序,須要知足一下要求。
KafkaUtils
的話,須要將spark-streaming-kafka-0.8_2.11
以及其依賴都打入到應用程序JAR包中。spark.streaming.receiver.writeAheadLog.enable=true
來開啓這一功能。然而,這一功能的開啓會下降數據接收的吞吐量。這是能夠經過同時並行運行多個接收進程(這一點在後面的性能調優部分會有介紹)進行來抵消該負面影響。另外,若是已經設置了輸入數據流的存儲級別爲Storagelevel.MEMORY_AND_DISK_SET
,因爲接收到的數據已經會在文件系統上保存一份,這樣就能夠關閉WAL功能了。當使用S3以及其餘任何不支持flushng功能的文件系統來write ahead logs時,要記得設置spark.streaming.driver.writeAheadLog.closeFileAfterWrite
以及spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
兩個參數。spark.streaming.receiver.maxRate
,對於Direct Kafka模式,設置spark.streaming.kafka.maxRatePerPartition
限制從每一個Kafka的分區讀取數據的速率。假如某個Topic有8個分區,spark.streaming.kafka.maxRatePerpartition=100
,那麼每一個batch最大接收記錄爲800
。從Spark-1.5版本開始,引入了一個backpressure
的機制來避免設置這個限制閾值。Spark Streaming會自動算出當前的速率限制,而且動態調整這個閾值。經過將spark.streaming.backpressure.enabled
爲true
開啓backpressure
功能。(2)升級應用代碼
若是運行中的應用程序有更新,須要運行更新後的代碼,有如下兩種機制。
SparkStreamingContext
對象再也不合適,由於檢查點中的信息可能不包含更新的代碼邏輯,這樣會致使程序出現錯誤。在這種狀況下,要麼從新指定一個檢查點,要麼刪除以前的檢查點。 在Spark Streaming應用程序運行時,Spark Web UI頁面上會多出一個Streaming
的選項卡,在這裏面能夠顯示一些Streaming相關的參數,好比Receiver是否在運行,接收了多少記錄,處理了多少記錄等。以及Batch相關的信息,包括batch的執行時間,等待時間,完成的batch數,運行中的batch數等等。這裏面有兩個時間參數須要注意理解一些:
Scheduling Delay - 當前batch從進入隊列到開始執行的延遲時間
若是處理時間一直比batch時間跨度要長,或者延遲時間逐漸增加,表示系統已經沒法處理當前的數據量了,這時候就須要考慮如何去下降每個batch的處理時間。如何下降batch處理時間,能夠參考第四節。
除了監控頁面以外,Spark還提供了StreamingListener接口,經過這個接口能夠獲取到receiver以及batch的處理時間等信息。
爲了使Spark Streaming應用可以更好的運行,須要進行一些調優設置,這一節會分析一些性能調優中的參數和設置規則。在性能調優方面,主要須要考慮如下兩個問題:
接下來的內容在Spark性能調優中已有介紹,這裏再次強調一下在Streaming中須要注意的一些地方。
(1)接收數據進程的並行度
經過網絡(好比Kafka, Flume, socket等)接收到的數據,首先須要反序列化而後保存在Spark中。當數據接收成爲系統的瓶頸時,就須要考慮如何提升系統接收數據的能力了。每個輸入的DStream會在一個Worker節點上運行一個接收數據流的進程。若是建立了多個接收數據流進程,就能夠生成多個輸入DStream了。好比說,對於Kafka數據源,若是使用的是一個DStream接收來自兩個Topic中的數據的話,就能夠將這兩個Topic拆開,由兩個數據接收進程分開接收。當用兩個receiver接收到DStream後,能夠在應用中將這兩個DStream再進行合併。好比下面代碼中所示
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
須要注意一個參數spark.streaming.blockInterval
。對於Receiver來講,接收到的數據在保存到Spark內存中以前,會以block的形式匯聚到一塊兒。每一個Batch中block的個數決定了程序運行時處理這些數據的task的個數。每個receiver的每個batck對應的task個數大體爲(batch時間間隔 / block時間間隔)。好比說對於一個2m
的batch,若是block時間間隔爲200ms
那麼,將會有10個task。若是task的數量太少,對數據的處理就不會很高效。在batch時間固定的狀況下,若是想增大task個數,那麼就須要下降blockInterval
參數了,這個參數默認值爲200ms
,官方建議的該參數下限爲50ms
,若是低於50ms
可能會引發其餘問題。
另外一個提升數據併發處理能力的方法是顯式的對接收數據從新分區,inputStream.repartition(<number of partitions>)
。
(2)數據處理的並行度
對於reduceByKey
和reduceByKeyAndWindow
操做來講,並行task個數由參數spark.default.parallelism
來控制。若是想要提升數據處理的並行度,能夠在調用這類方法時,指定並行參數,或者將spark.default.parallelism
參數根據集羣實際狀況進行調整。
(3)數據序列化
能夠經過調整序列化相關的參數,來提升數據序列化性能。在Streaming應用中,有兩類數據須要序列化操做。
StorageLevel.MEMORY_AND_DIS_SER_2
的形式保存在Executor的內存中。也就是說,爲了下降GC開銷,這些數據會被序列化成bytes形式,而且還考慮到executor失敗的容錯。這些數據首先會保存在內存中,當內存不足時會spill到磁盤上。使用這種方式的一個明顯問題是,Spark接收到數據後,首先須要反序列化這些數據,而後再按照Spark的方式對這些數據從新序列化。Streaming操做中持久化的RDD:Streaming計算產生的RDD可能也會持久化到內存中。好比窗口操做函數會將數據緩存起來以便後續屢次使用。而且Streaming應用中,這些數據的存儲級別是StorageLevel.MEMORY_ONLY_SET
(Spark Core的默認方式是StorageLevel.MEMORY_ONLY
)。Streaming對這些數據多了一個序列化操做,這主要也是爲了下降GC開銷。
在上面這兩種狀況中,可使用Kyro
方式對數據進行序列化,同時下降CPU和內存的開銷。有關序列化能夠進一步參考Spark調優。對於Kyro
方式的參數設置,請參考Spark Kyro參數設置。
通常狀況下,若是須要緩存的數據量不大,能夠直接將數據以非序列化的形式進行存儲,這樣不會明顯的帶來GC的開銷。好比說,batch時間只有若干秒,而且沒有使用到窗口函數操做,那麼能夠在持久化時顯示的指定存儲級別,避免持久化數據時對數據的序列化操做。
(4)提升task啓動性能
若是每秒啓動的task個數太多(通常指50個以上),那麼對task的頻繁啓動也是一個不容忽視的損耗。遇到這種狀況時,須要考慮一下Execution模式了。通常來講,在Spark的Standalone模式以及coarse-grained Mesos模式下task的啓動時間會比fine-grained Mesos模式要低。
爲了使一個Spark Streaming應用在集羣上穩定運行,須要保證應用在接收到數據時可以及時處理。若是處理速率不匹配,隨着時間的積累,等待處理的數據將會愈來愈多,最終致使應用沒法正常運行。最好的狀況是batch的處理時間小於batch的間隔時間。因此,正確合理的設置Batch時間間隔是很重要的。
有關Spark內存的使用以及Spark應用的GC性能調節的更多細節在Spark調優中已經有了更加詳細的描述。這裏簡單分析一些Spark Streaming應用程序會用到的參數。
一個Spark Streaming應用程序須要使用集羣多少內存資源,很大程度上是由該應用中的具體邏輯來決定的,即須要看應用程序中的transformations
的類型。好比代碼中使用到長達10分鐘的窗口操做時,就須要使用到可以把10分鐘的數據都保存到內存中的內存量。若是使用updateStateByKey
這種操做,而數據中不一樣key
特別多,也會使用更多的內存。若是應用的邏輯比較簡單,僅僅是接收-過濾-存儲等一系列操做時,消耗的內存量會明顯減小。
默認狀況下,receivers接收到的數據會以StorageLevel.MEMORY_AND_DISK_SER_2
級別進程存儲,當內存中容納不下時會spill到磁盤上,可是這樣會下降應用的處理性能,因此爲了應用可以更高效的運行,最好仍是多分配一些內存以供使用。通常能夠經過在少許數據的狀況下,評估一下數據使用的內存量,繼而計算出應用正式部署時須要分配的總內存量大小。
內存調節的另外一方面是垃圾回收的設置。對一個低延遲的應用系統來講,JVM在垃圾回收時致使應用長時間暫停運行是一個很討厭的場景。
下面有一些可用於調節內存使用量和GC性能的方面:
Kryo
方式進行序列化可以進一步下降序列化後數據大小和內存的使用。想要進一步下降內存的使用量,能夠在數據上再增長一個壓縮功能,經過參數spark.rdd.compress
來設置。transformations
持久化的數據都會自動進行清理。Spark Streaming根據transformations
的不一樣來決定哪些數據須要被清理掉。例如,當使用10分鐘的窗口函數時,Spark Streaming會保存最少10分鐘的數據。想要數據保存更長時間,能夠設置streamingContext.remenber
參數。spark-submit
命令的--driver-java-options
參數來指定,executor上經過設置spark.executor.extraJavaOptions
參數來指定。OFF_HEAP
存儲級別來持久化RDDs,能夠參考RDD Persistence本節主要討論Spark Streaming應用程序失敗後的處理辦法。
(1)Files輸入
(2)基於Receiverd 數據源
(3)Kafka Direct輸入方式