基於Spark 2.0 Preview的材料翻譯,原[英]文地址: html
http://spark.apache.org/docs/2.0.0-preview/streaming-programming-guide.htmljava
Streaming應用實戰,參考:http://my.oschina.net/u/2306127/blog/635518python
Spark Streaming 是基於Spark 核心API的擴展,使高伸縮性、高帶寬、容錯的流式數據處理成爲可能。數據能夠來自於多種源,如Kafka、Flume、Kinesis、或者TCP sockets等,並且可使用map、reduce
、join
和 window等高級接口實現複雜算法的處理。最終,處理的數據能夠被推送到數據庫、文件系統以及動態佈告板。實際上,
你還能夠將Spark的機器學習( machine learning) 和圖計算 (graph processing )算法用於數據流的處理。git
內部工做流程以下。Spark Streaming接收數據流的動態輸入,而後將數據分批,每一批數據經過Spark建立一個結果數據集而後進行處理。github
Spark Streaming提供一個高級別的抽象-離散數據流(DStream),表明一個連續的數據流。DStreams能夠從Kafka, Flume, and Kinesis等源中建立,或者在其它的DStream上執行高級操做。在內部,DStream表明一系列的 RDDs。web
本指南將岩石如何經過DStreams開始編寫一個Spark Streaming程序。你可使用Scala、Java或者Python。能夠經過相應的鏈接切換去查看相應語言的代碼。算法
注意:這在Python裏有一些不一樣,不多部分API暫時沒有,本指南進行了Python API標註。sql
在開始Spark Streaming編程以前咱們先看看一個簡單的Spark Streaming程序將長什麼樣子。咱們從基於TCP socket的數據服務器接收一個文本數據,而後對單詞進行計數。看起來像下面這個樣子。shell
首先,咱們導入Spark Streaming的類命名空間和一些StreamingContext的轉換工具。 StreamingContext 是全部的Spark Streaming功能的主入口點。咱們建立StreamingContext,指定兩個執行線程和分批間隔爲1秒鐘。數據庫
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1))
使用這個context,咱們能夠建立一個DStream,這是來自於TCP數據源 的流數據,咱們經過hostname (e.g. localhost
) 和端口 (e.g. 9999
)來指定這個數據源。
// Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999)
這裏line是一個DStream對象,表明從服務器收到的流數據。每個DStream中的記錄是一個文本行。下一步,咱們將每一行中以空格分開的單詞分離出來。
// Split each line into words val words = lines.flatMap(_.split(" "))
flatMap是「一對多」的DStream操做,經過對源DStream的每個記錄產生多個新的記錄建立新DStream。這裏,每一行將被分解多個單詞,而且單詞流表明了words DStream。下一步,咱們對這些單詞進行計數統計。
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print()
words
DStream而後映射爲(word, 1)的鍵值對的Dstream,而後用於統計單詞出現的頻度。最後,wordCounts.print()打印出每秒鐘建立出的計數值。
注意,上面這些代碼行執行的時候,僅僅是設定了計算執行的邏輯,並無真正的處理數據。在全部的設定完成後,爲了啓動處理,須要調用:
ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate
完整的代碼能夠Spark Streaming 的例程 NetworkWordCount 中找到。
若是已經下載和構建了Spark,你能夠按照下面的方法運行這個例子。首先運行Netcat(一個Unix風格的小工具)做爲數據服務器,以下所示:
$ nc -lk 9999
而後,到一個控制檯窗口,啓動例程:
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
而後,任何在netcat服務器運行控制檯鍵入的行都會被計數而後每隔一秒鐘在屏幕上打印出來,以下所示:
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world ... |
# TERMINAL 2: RUNNING NetworkWordCount $ ./bin/run-example streaming.NetworkWordCount localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ... |
下一步,咱們將離開這個簡單的例子,詳細闡述Spark Streaming的基本概念和功能。
與Spark相似,Spark Streaming也能夠經過Maven中心庫訪問。爲了編寫你本身的Spark Streaming程序,您將加入下面的依賴到你的SBT或者Maven工程文件。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.0.0-preview</version> </dependency>
爲了從Kafka/Flume/Kinesis等非Spark Streaming核心API等數據源注入數據,咱們須要添加對應的spark-streaming-xyz_2.11到依賴中。例如,像下面的這樣:
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-8_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
對於最新的列表,參考Maven repository 得到全面的數據源河訪問部件的列表。
爲了初始化Spark Streaming程序,StreamingContext 對象必須首先建立做爲總入口。
StreamingContext 對象能夠經過 SparkConf 對象建立,以下所示。
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))
這裏 appName參數是應用在集羣中的名稱。 master
是 Spark, Mesos 或 YARN cluster URL, 或者「local[*]」 字符串指示運行在 local 模式下。實踐中,當運行一個集羣, 您不該該硬編碼 master
參數在集羣中, 而是經過 launch the application with spark-submit
接收其參數。可是, 對於本地測試和單元測試, 你能夠傳遞「local[*]」 來運行 Spark Streaming 在進程內運行(自動檢測本地系統的CPU內核數量)。 注意,這裏內部建立了 SparkContext (全部的Spark 功能的入口點) ,能夠經過 ssc.sparkContext
進行存取。
分批間隔時間基於應用延遲需求和可用的集羣資源進行設定(譯註:設定間隔要大於應用數據的最小延遲需求,同時不能設置過小以致於系統沒法在給定的週期內處理完畢),參考 Performance Tuning 部分得到更多信息。
StreamingContext
對象也能夠從已有的 SparkContext
對象中建立。
import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1))
在context建立以後,能夠接着開始以下的工做:
streamingContext.start()
。streamingContext.awaitTermination()
.streamingContext.stop()
。
記住:
stopSparkContext的
Stop時設置選項爲false。離散數據流(DStream)是Spark Streaming最基本的抽象。它表明了一種連續的數據流,要麼從某種數據源提取數據,要麼從其餘數據流映射轉換而來。DStream內部是由一系列連 續的RDD組成的,每一個RDD都是不可變、分佈式的數據集(詳見Spark編程指南 – Spark Programming Guide)。每一個RDD都包含了特定時間間隔內的一批數據,以下圖所示:
任何做用於DStream的算子,其實都會被轉化爲對其內部RDD的操做。例如,在前面的例子中,咱們將 lines 這個DStream轉成words DStream對象,其實做用於lines上的flatMap算子,會施加於lines中的每一個RDD上,並生成新的對應的RDD,而這些新生成的RDD 對象就組成了words這個DStream對象。其過程以下圖所示:
底層的RDD轉換仍然是由Spark引擎來計算。DStream的算子將這些細節隱藏了起來,併爲開發者提供了更爲方便的高級API。後續會詳細討論這些高級算子。
輸入DStream表明從某種流式數據源流入的數據流。在以前的例子裏,lines 對象就是輸入DStream,它表明從netcat server收到的數據流。每一個輸入DStream(除文件數據流外)都和一個接收器(Receiver – Scala doc, Java doc)相關聯,而接收器則是專門從數據源拉取數據到內存中的對象。
Spark Streaming主要提供兩種內建的流式數據源:
本節中,咱們將會從每種數據源中挑幾個繼續深刻討論。
注意,若是你須要同時從多個數據源拉取數據,那麼你就須要建立多個DStream對象(詳見後續的性能調優這一小節)。多個DStream對象其實也就同 時建立了多個數據流接收器。可是請注意,Spark的worker/executor 都是長期運行的,所以它們都會各自佔用一個分配給Spark Streaming應用的CPU。因此,在運行Spark Streaming應用的時候,須要注意分配足夠的CPU core(本地運行時,須要足夠的線程)來處理接收到的數據,同時還要足夠的CPU core來運行這些接收器。
前面的快速入門例子中,咱們已經看到,使用ssc.socketTextStream(…) 能夠從一個TCP鏈接中接收文本數據。而除了TCP套接字外,StreamingContext API 還支持從文件或者Akka actor中拉取數據。
文件數據流(File Streams): 能夠從任何兼容HDFS API(包括:HDFS、S三、NFS等)的文件系統,建立方式以下:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming將監視該dataDirectory目錄,並處理該目錄下任何新建的文件(目前還不支持嵌套目錄)。注意:
另外,文件數據流不是基於接收器的,因此不須要爲其單獨分配一個CPU core。
Python API fileStream目前暫時不可用,Python目前只支持textFileStream。
對於簡單的文本文件,更簡單的方式是調用 streamingContext.textFileStream(dataDirectory)。
基於自定義Actor的數據流(Streams based on Custom Actors): DStream能夠由Akka actor建立獲得,只需調用 streamingContext.actorStream(actorProps, actor-name)。詳見自定義接收器(Custom Receiver Guide)。actorStream暫時不支持Python API。
關於套接字、文件以及Akka actor數據流更詳細信息,請參考相關文檔:StreamingContext for Scala,JavaStreamingContext for Java, and StreamingContext for Python。
Python API自 Spark 2.0.0(譯註:1.6.1就已經支持了) 起,Kafka、Kinesis、Flume和MQTT這些數據源將支持Python。
使用這類數據源須要依賴一些額外的代碼庫,有些依賴還挺複雜的(如:Kafka、Flume)。所以爲了減小依賴項版本衝突問題,各個數據源 DStream的相關功能被分割到不一樣的代碼包中,只有用到的時候才須要連接打包進來。
例如,若是你須要使用Twitter的tweets做爲數據源,你 須要如下步驟:
注意,高級數據源在spark-shell中不可用,所以不能用spark-shell來測試基於高級數據源的應用。若是真有須要的話,你須要自行下載相應數據源的Maven工件及其依賴項,並將這些Jar包部署到spark-shell的classpath中。
下面列舉了一些高級數據源:
Python API自定義數據源目前還不支持Python。
輸入DStream也能夠用自定義的方式建立。你須要作的只是實現一個自定義的接收器(receiver),以便從自定義的數據源接收數據,而後將數據推入Spark中。詳情請參考自定義接收器指南(Custom Receiver Guide)。
從可靠性角度來劃分,大體有兩種數據源。其中,像Kafka、Flume這樣的數據源,它們支持對所傳輸的數據進行確認。系統收到這類可靠數據源過來的數據,而後發出確認信息,這樣就可以確保任何失敗狀況下,都不會丟數據。所以咱們能夠將接收器也相應地分爲兩類:
自定義接收器指南(Custom Receiver Guide)中詳細討論瞭如何寫一個可靠接收器。
和RDD相似,DStream也支持從輸入DStream通過各類transformation算子映射成新的DStream。DStream支持不少RDD上常見的transformation算子,一些經常使用的見下表:
Transformation算子 | 用途 |
---|---|
map(func) | 返回會一個新的DStream,並將源DStream中每一個元素經過func映射爲新的元素 |
flatMap(func) | 和map相似,不過每一個輸入元素再也不是映射爲一個輸出,而是映射爲0到多個輸出 |
filter(func) | 返回一個新的DStream,幷包含源DStream中被func選中(func返回true)的元素 |
repartition(numPartitions) | 更改DStream的並行度(增長或減小分區數) |
union(otherStream) | 返回新的DStream,包含源DStream和otherDStream元素的並集 |
count() | 返回一個包含單元素RDDs的DStream,其中每一個元素是源DStream中各個RDD中的元素個數 |
reduce(func) | 返回一個包含單元素RDDs的DStream,其中每一個元素是經過源RDD中各個RDD的元素經func(func輸入兩個參數並返回一個同類型結果數據)聚合獲得的結果。func必須知足結合律,以便支持並行計算。 |
countByValue() | 若是源DStream包含的元素類型爲K,那麼該算子返回新的DStream包含元素爲(K, Long)鍵值對,其中K爲源DStream各個元素,而Long爲該元素出現的次數。 |
reduceByKey(func, [numTasks]) | 若是源DStream 包含的元素爲 (K, V) 鍵值對,則該算子返回一個新的也包含(K, V)鍵值對的DStream,其中V是由func聚合獲得的。注意:默認狀況下,該算子使用Spark的默認併發任務數(本地模式爲2,集羣模式下由 spark.default.parallelism 決定)。你能夠經過可選參數numTasks來指定併發任務個數。 |
join(otherStream, [numTasks]) | 若是源DStream包含元素爲(K, V),同時otherDStream包含元素爲(K, W)鍵值對,則該算子返回一個新的DStream,其中源DStream和otherDStream中每一個K都對應一個 (K, (V, W))鍵值對元素。 |
cogroup(otherStream, [numTasks]) | 若是源DStream包含元素爲(K, V),同時otherDStream包含元素爲(K, W)鍵值對,則該算子返回一個新的DStream,其中每一個元素類型爲包含(K, Seq[V], Seq[W])的tuple。 |
transform(func) | 返回一個新的DStream,其包含的RDD爲源RDD通過func操做後獲得的結果。利用該算子能夠對DStream施加任意的操做。 |
updateStateByKey(func) | 返回一個包含新」狀態」的DStream。源DStream中每一個key及其對應的values會做爲func的輸入,而func能夠用於對每一個key的「狀態」數據做任意的更新操做。 |
下面咱們會挑幾個transformation算子深刻討論一下。
updateStateByKey 算子支持維護一個任意的狀態。要實現這一點,只須要兩步:
在每個批次數據到達後,Spark都會調用狀態更新函數,來更新全部已有key(無論key是否存在於本批次中)的狀態。若是狀態更新函數返回None,則對應的鍵值對會被刪除。
舉例以下。假設你須要維護一個流式應用,統計數據流中每一個單詞的出現次數。這裏將各個單詞的出現次數這個整型數定義爲狀態。咱們接下來定義狀態更新函數以下:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... // add the new values with the previous running count to get the new count Some(newCount) }
該狀態更新函數能夠做用於一個包括(word, 1) 鍵值對的DStream上(見本文開頭的例子)。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
該狀態更新函數會爲每一個單詞調用一次,且相應的newValues是一個包含不少個」1″的數組(這些1來自於(word,1)鍵值對),而runningCount包含以前該單詞的計數。本例的完整代碼請參考 StatefulNetworkWordCount.scala。
注意,調用updateStateByKey前須要配置檢查點目錄,後續對此有詳細的討論,見檢查點(checkpointing)這節。
transform算子(及其變體transformWith)能夠支持任意的RDD到RDD的映射操做。也就是說,你能夠用tranform算子來包裝 任何DStream API所不支持的RDD算子。例如,將DStream每一個批次中的RDD和另外一個Dataset進行關聯(join)操做,這個功能DStream API並無直接支持。不過你能夠用transform來實現這個功能,可見transform其實爲DStream提供了很是強大的功能支持。好比說, 你能夠用事先算好的垃圾信息,對DStream進行實時過濾。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform(rdd => { rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... })
注意,這裏transform包含的算子,其調用時間間隔和批次間隔是相同的。因此你能夠基於時間改變對RDD的操做,如:在不一樣批次,調用不一樣的RDD算子,設置不一樣的RDD分區或者廣播變量等。
Spark Streaming一樣也提供基於時間窗口的計算,也就是說,你能夠對某一個滑動時間窗內的數據施加特定tranformation算子。以下圖所示:
如上圖所示,每次窗口滑動時,源DStream中落入窗口的RDDs就會被合併成新的windowed DStream。在上圖的例子中,這個操做會施加於3個RDD單元,而滑動距離是2個RDD單元。由此能夠得出任何窗口相關操做都須要指定一下兩個參數:
注意,這兩個參數都必須是DStream批次間隔(上圖中爲1)的整數倍.
下面我們舉個例子。假設,你須要擴展前面的那個小栗子,你須要每隔10秒統計一下前30秒內的單詞計數。爲此,咱們須要在包含(word, 1)鍵值對的DStream上,對最近30秒的數據調用reduceByKey算子。不過這些均可以簡單地用一個 reduceByKeyAndWindow搞定。
// Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
如下列出了經常使用的窗口算子。全部這些算子都有前面提到的那兩個參數 – 窗口長度 和 滑動距離。
Transformation窗口算子 | 用途 |
---|---|
window(windowLength, slideInterval) | 將源DStream窗口化,並返回轉化後的DStream |
countByWindow(windowLength,slideInterval) | 返回數據流在一個滑動窗口內的元素個數 |
reduceByWindow(func, windowLength,slideInterval) | 基於數據流在一個滑動窗口內的元素,用func作聚合,返回一個單元素數據流。func必須知足結合律,以便支持並行計算。 |
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks]) | 基於(K, V)鍵值對DStream,將一個滑動窗口內的數據進行聚合,返回一個新的包含(K,V)鍵值對的DStream,其中每一個value都是各個key通過func聚合後的結果。 注意:若是不指定numTasks,其值將使用Spark的默認並行任務數(本地模式下爲2,集羣模式下由 spark.default.parallelism決定)。固然,你也能夠經過numTasks來指定任務個數。 |
reduceByKeyAndWindow(func, invFunc,windowLength,slideInterval, [numTasks]) | 和前面的reduceByKeyAndWindow() 相似,只是這個版本會用以前滑動窗口計算結果,遞增地計算每一個窗口的歸約結果。當新的數據進入窗口時,這些values會被輸入func作歸約計算,而這 些數據離開窗口時,對應的這些values又會被輸入 invFunc 作」反歸約」計算。舉個簡單的例子,就是把新進入窗口數據中各個單詞個數「增長」到各個單詞統計結果上,同時把離開窗口數據中各個單詞的統計個數從相應的 統計結果中「減掉」。不過,你的本身定義好」反歸約」函數,即:該算子不只有歸約函數(見參數func),還得有一個對應的」反歸約」函數(見參數中的 invFunc)。和前面的reduceByKeyAndWindow() 相似,該算子也有一個可選參數numTasks來指定並行任務數。注意,這個算子須要配置好檢查點(checkpointing)才能用。 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) | 基於包含(K, V)鍵值對的DStream,返回新的包含(K, Long)鍵值對的DStream。其中的Long value都是滑動窗口內key出現次數的計數。 和前面的reduceByKeyAndWindow() 相似,該算子也有一個可選參數numTasks來指定並行任務數。 |
最後,值得一提的是,你在Spark Streaming中作各類關聯(join)操做很是簡單。
一個數據流能夠和另外一個數據流直接關聯。
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2)
上面代碼中,stream1的每一個批次中的RDD會和stream2相應批次中的RDD進行join。一樣,你能夠相似地使用 leftOuterJoin, rightOuterJoin, fullOuterJoin 等。此外,你還能夠基於窗口來join不一樣的數據流,其實現也很簡單,以下;)
val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2)
其實這種狀況已經在前面的DStream.transform算子中介紹過了,這裏再舉個基於滑動窗口的例子。
val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
實際上,在上面代碼裏,你能夠動態地該表join的數據集(dataset)。傳給tranform算子的操做函數會在每一個批次從新求值,因此每次該函數都會用最新的dataset值,因此不一樣批次間你能夠改變dataset的值。
完整的DStream transformation算子列表見API文檔。Scala請參考 DStream 和 PairDStreamFunctions. Java請參考 JavaDStream 和 JavaPairDStream. Python見 DStream。
輸出算子能夠將DStream的數據推送到外部系統,如:數據庫或者文件系統。由於輸出算子會將最終完成轉換的數據輸出到外部系統,所以只有輸出算 子調用時,纔會真正觸發DStream transformation算子的真正執行(這一點相似於RDD 的action算子)。目前所支持的輸出算子以下表:
輸出算子 | 用途 |
---|---|
print() | 在驅動器(driver)節點上打印DStream每一個批次中的頭十個元素。 Python API 對應的Python API爲 pprint() |
saveAsTextFiles(prefix, [suffix]) | 將DStream的內容保存到文本文件。 每一個批次一個文件,各文件命名規則爲 「prefix-TIME_IN_MS[.suffix]」 |
saveAsObjectFiles(prefix, [suffix]) | 將DStream內容以序列化Java對象的形式保存到順序文件中。 每一個批次一個文件,各文件命名規則爲 「prefix-TIME_IN_MS[.suffix]」Python API 暫不支持Python |
saveAsHadoopFiles(prefix, [suffix]) | 將DStream內容保存到Hadoop文件中。 每一個批次一個文件,各文件命名規則爲 「prefix-TIME_IN_MS[.suffix]」Python API 暫不支持Python |
foreachRDD(func) | 這是最通用的輸出算子了,該算子接收一個函數func,func將做用於DStream的每一個RDD上。 func應該實現將每一個RDD的數據推到外部系統中,好比:保存到文件或者寫到數據庫中。 注意,func函數是在streaming應用的驅動器進程中執行的,因此若是其中包含RDD的action算子,就會觸發對DStream中RDDs的實際計算過程。 |
DStream.foreachRDD是一個很是強大的原生工具函數,用戶能夠基於此算子將DStream數據推送到外部系統中。不過用戶須要瞭解如何正確而高效地使用這個工具。如下列舉了一些常見的錯誤。
一般,對外部系統寫入數據須要一些鏈接對象(如:遠程server的TCP鏈接),以便發送數據給遠程系統。所以,開發人員可能會不經意地在Spark驅動器(driver)進程中建立一個鏈接對象,而後又試圖在Spark worker節點上使用這個鏈接。以下例所示:
dstream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } }
這段代碼是錯誤的,由於它須要把鏈接對象序列化,再從驅動器節點發送到worker節點。而這些鏈接對象一般都是不能跨節點(機器)傳遞的。好比,鏈接對 象一般都不能序列化,或者在另外一個進程中反序列化後再次初始化(鏈接對象一般都須要初始化,所以從驅動節點發到worker節點後可能須要從新初始化) 等。解決此類錯誤的辦法就是在worker節點上建立鏈接對象。
然而,有些開發人員可能會走到另外一個極端 – 爲每條記錄都建立一個鏈接對象,例如:
dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } }
通常來講,鏈接對象是有時間和資源開銷限制的。所以,對每條記錄都進行一次鏈接對象的建立和銷燬會增長不少沒必要要的開銷,同時也大大減少了系統的吞吐量。 一個比較好的解決方案是使用 rdd.foreachPartition – 爲RDD的每一個分區建立一個單獨的鏈接對象,示例以下:
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }
這樣一來,鏈接對象的建立開銷就攤到不少條記錄上了。
最後,還有一個更優化的辦法,就是在多個RDD批次之間複用鏈接對象。開發者能夠維護一個靜態鏈接池來保存鏈接對象,以便在不一樣批次的多個RDD之間共享同一組鏈接對象,示例以下:
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }
注意,鏈接池中的鏈接應該是懶惰建立的,而且有肯定的超時時間,超時後自動銷燬。這個實現應該是目前發送數據最高效的實現方式。
其餘要點:
首先須要注意的是,累加器(Accumulators)和廣播變量(Broadcast variables)是沒法從Spark Streaming的檢查點中恢復回來的。因此若是你開啓了檢查點功能,並同時在使用累加器和廣播變量,那麼你最好是使用懶惰實例化的單例模式,由於這樣累加器和廣播變量才能在驅動器(driver)故障恢復後從新實例化。代碼示例以下:
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } object DroppedWordsCounter { @volatile private var instance: Accumulator[Long] = null def getInstance(sc: SparkContext): Accumulator[Long] = { if (instance == null) { synchronized { if (instance == null) { instance = sc.accumulator(0L, "WordsInBlacklistCounter") } } } instance } } wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter += count false } else { true } }.collect() val output = "Counts at time " + time + " " + counts })
這裏有完整代碼:source code。
在Streaming應用中能夠調用DataFrames and SQL來 處理流式數據。開發者能夠用經過StreamingContext中的SparkContext對象來建立一個SQLContext,而且,開發者須要確保一旦驅動器(driver)故障恢復後,該SQLContext對象能從新建立出來。一樣,你仍是可使用懶惰建立的單例模式來實例化 SQLContext,以下面的代碼所示,這裏咱們將最開始的那個例子作了一些修改,使用DataFrame和SQL來統計單詞計數。其實就是,將每一個 RDD都轉化成一個DataFrame,而後註冊成臨時表,再用SQL查詢這些臨時表。
/** DataFrame operations inside your streaming program */ val words: DStream[String] = ... words.foreachRDD { rdd => // Get the singleton instance of SQLContext val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") // Create a temporary view wordsDataFrame.createOrReplaceTempView("words") // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() }
這裏有完整代碼:source code。
你也能夠在其餘線程裏執行SQL查詢(異步查詢,即:執行SQL查詢的線程和運行StreamingContext的線程不一樣)。不過這種狀況下, 你須要確保查詢的時候 StreamingContext 沒有把所需的數據丟棄掉,不然StreamingContext有可能已將老的RDD數據丟棄掉了,那麼異步查詢的SQL語句也可能沒法獲得查詢結果。舉 個栗子,若是你須要查詢上一個批次的數據,可是你的SQL查詢可能要執行5分鐘,那麼你就須要StreamingContext至少保留最近5分鐘的數 據:streamingContext.remember(Minutes(5)) (這是Scala爲例,其餘語言差很少)
更多DataFrame和SQL的文檔見這裏: DataFrames and SQL
MLlib 提供了不少機器學習算法。首先,你須要關注的是流式計算相關的機器學習算法(如:Streaming Linear Regression, Streaming KMeans),這些流式算法能夠在流式數據上一邊學習訓練模型,一邊用最新的模型處理數據。除此之外,對更多的機器學習算法而言,你須要離線訓練這些模型,而後將訓練好的模型用於在線的流式數據。詳見MLlib。
和RDD相似,DStream也支持將數據持久化到內存中。只須要調用 DStream的persist() 方法,該方法內部會自動調用DStream中每一個RDD的persist方法進而將數據持久化到內存中。這對於可能須要計算不少次的DStream很是有 用(例如:對於同一個批數據調用多個算子)。對於基於滑動窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或 者有狀態的算子,如:updateStateByKey,數據持久化就更重要了。所以,滑動窗口算子產生的DStream對象默認會自動持久化到內存中 (不須要開發者調用persist)。
對於從網絡接收數據的輸入數據流(如:Kafka、Flume、socket等),默認的持久化級別會將數據持久化到兩個不一樣的節點上互爲備份副本,以便支持容錯。
注意,與RDD不一樣的是,DStream的默認持久化級別是將數據序列化到內存中。進一步的討論見性能調優這一小節。關於持久化級別(或者存儲級別)的更詳細說明見Spark編程指南(Spark Programming Guide)。
通常來講Streaming 應用都須要7*24小時長期運行,因此必須對一些與業務邏輯無關的故障有很好的容錯(如:系統故障、JVM崩潰等)。對於這些可能性,Spark Streaming 必須在檢查點保存足夠的信息到一些可容錯的外部存儲系統中,以便可以隨時從故障中恢復回來。因此,檢查點須要保存如下兩種數據:
總之,元數據檢查點主要是爲了恢復驅動器節點上的故障,而數據或RDD檢查點是爲了支持對有狀態轉換操做的恢復。
什麼時候啓用檢查點
若是有如下狀況出現,你就必須啓用檢查點了:
注意,一些簡單的流式應用,若是沒有用到前面所說的有狀態轉換算子,則徹底能夠不開啓檢查點。不過這樣的話,驅動器(driver)故障恢復後,有 可能會丟失部分數據(有些已經接收但還未處理的數據可能會丟失)。不過一般這點丟失時可接受的,不少Spark Streaming應用也是這樣運行的。對非Hadoop環境的支持將來還會繼續改進。
如何配置檢查點
檢查點的啓用,只須要設置好保存檢查點信息的檢查點目錄便可,通常會會將這個目錄設爲一些可容錯的、可靠性較高的文件系統(如:HDFS、S3 等)。開發者只須要調用 streamingContext.checkpoint(checkpointDirectory)。設置好檢查點,你就可使用前面提到的有狀態轉換 算子了。另外,若是你須要你的應用可以支持從驅動器故障中恢復,你可能須要重寫部分代碼,實現如下行爲:
不過這個行爲能夠用StreamingContext.getOrCreate來實現,示例以下:
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
若是 checkpointDirectory 目錄存在,則context對象會從檢查點數據從新構建出來。若是該目錄不存在(如:首次運行),則 functionToCreateContext 函數會被調用,建立一個新的StreamingContext對象並定義好DStream數據流。完整的示例請參見RecoverableNetworkWordCount,這個例子會將網絡數據中的單詞計數統計結果添加到一個文件中。
除了使用getOrCreate以外,開發者還須要確保驅動器進程能在故障後重啓。這一點只能由應用的部署環境基礎設施來保證。進一步的討論見部署(Deployment)這一節。
另外須要注意的是,RDD檢查點會增長額外的保存數據的開銷。這可能會致使數據流的處理時間變長。所以,你必須仔細的調整檢查點間隔時間。若是批次 間隔過小(好比:1秒),那麼對每一個批次保存檢查點數據將大大減少吞吐量。另外一方面,檢查點保存過於頻繁又會致使血統信息和任務個數的增長,這一樣會影響 系統性能。對於須要RDD檢查點的有狀態轉換算子,默認的間隔是批次間隔的整數倍,且最小10秒。開發人員能夠這樣來自定義這個間 隔:dstream.checkpoint(checkpointInterval)。通常推薦設爲批次間隔時間的5~10倍。
本節中將主要討論一下如何部署Spark Streaming應用。
要運行一個Spark Streaming 應用,你首先須要具有如下條件:
spark-submit
提交應用,那麼你不須要提供Spark和Spark Streaming的相關JAR包。可是,若是你使用了高級數據源(advanced sources – 如:Kafka、Flume、Twitter等),那麼你須要將這些高級數據源相關的JAR包及其依賴一塊兒打包並部署。例如,若是你使用了 TwitterUtils,那麼就必須將spark-streaming-twitter_2.10及其相關依賴都打到應用的JAR包中。升級Spark Streaming應用程序代碼,可使用如下兩種方式:
StreamingContext.stop(...)
or JavaStreamingContext.stop(...)
), 即:確保所收到的數據都已經處理完畢後再退出。而後再啓動新的Streaming程序,而新程序將接着在老程序退出點上繼續拉取數據。注意,這種方式須要 數據源支持數據緩存(或者叫數據堆積,如:Kafka、Flume),由於在新舊程序交接的這個空檔時間,數據須要在數據源處緩存。目前還不能支持從檢查 點重啓,由於檢查點存儲的信息包含老程序中的序列化對象信息,在新程序中將其反序列化可能會出錯。這種狀況下,只能要麼指定一個新的檢查點目錄,要麼刪除 老的檢查點目錄。除了Spark自身的監控能力(monitoring capabilities)以外,對Spark Streaming還有一些額外的監控功能可用。若是實例化了StreamingContext,那麼你能夠在Spark web UI上看到多出了一個Streaming tab頁,上面顯示了正在運行的接收器(是否活躍,接收記錄的條數,失敗信息等)和處理完的批次信息(批次處理時間,查詢延時等)。這些信息均可以用來監控streaming應用。
web UI上有兩個度量特別重要:
若是批次處理耗時一直比批次間隔時間大,或者批次調度延時持續上升,就意味着系統處理速度跟不上數據接收速度。這時候你就得考慮一下怎麼把批次處理時間降下來(reducing)。
Spark Streaming程序的處理進度能夠用StreamingListener接口來監聽,這個接口能夠監聽到接收器的狀態和處理時間。不過須要注意的是,這是一個developer API接口,換句話說這個接口將來極可能會變更(可能會增長更多度量信息)。
要得到Spark Streaming應用的最佳性能須要一點點調優工做。本節將深刻解釋一些可以改進Streaming應用性能的配置和參數。整體上來講,你須要考慮這兩方面的事情:
有很多優化手段均可以減小Spark對每一個批次的處理時間。細節將在優化指南(Tuning Guide)中詳談。這裏僅列舉一些最重要的。
跨網絡接收數據(如:從Kafka、Flume、socket等接收數據)須要在Spark中序列化並存儲數據。
若是接收數據的過程是系統瓶頸,那麼能夠考慮增長數據接收的並行度。注意,每一個輸入DStream只包含一個單獨的接收器(receiver,運行 約worker節點),每一個接收器單獨接收一路數據流。因此,配置多個輸入DStream就能從數據源的不一樣分區分別接收多個數據流。例如,能夠將從 Kafka拉取兩個topic的數據流分紅兩個Kafka輸入數據流,每一個數據流拉取其中一個topic的數據,這樣一來會同時有兩個接收器並行地接收數 據,於是增長了整體的吞吐量。同時,另外一方面咱們又能夠把這些DStream數據流合併成一個,而後能夠在合併後的DStream上使用任何可用的 transformation算子。示例代碼以下:
val numStreams = 5 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream = streamingContext.union(kafkaStreams) unifiedStream.print()
另外一個能夠考慮優化的參數就是接收器的阻塞間隔,該參數由配置參數(configuration parameter)spark.streaming.blockInterval 決定。大多數接收器都會將數據合併成一個個數據塊,而後再保存到spark內存中。對於map類算子來講,每一個批次中數據塊的個數將會決定處理這批數據並 行任務的個數,每一個接收器每批次數據處理任務數約等於 (批次間隔 / 數據塊間隔)。例如,對於2秒的批次間隔,若是數據塊間隔爲200ms,則建立的併發任務數爲10。若是任務數太少(少於單機cpu core個數),則資源利用不夠充分。如需增長這個任務數,對於給定的批次間隔來講,只須要減小數據塊間隔便可。不過,咱們仍是建議數據塊間隔至少要 50ms,不然任務的啓動開銷佔比就過高了。
另外一個切分接收數據流的方法是,顯示地將輸入數據流劃分爲多個分區(使用 inputStream.repartition(<number of partitions>))。該操做會在處理前,將數據散開從新分發到集羣中多個節點上。
在計算各個階段(stage)中,任何一個階段的併發任務數不足都有可能形成集羣資源利用率低。例如,對於reduce類的算子, 如:reduceByKey 和 reduceByKeyAndWindow,其默認的併發任務數是由 spark.default.parallelism 決定的。你既能夠修改這個默認值(spark.default.parallelism),也能夠經過參數指定這個併發數量(見PairDStreamFunctions)。
調整數據的序列化格式能夠大大減小數據序列化的開銷。在spark Streaming中主要有兩種類型的數據須要序列化:
無論是上面哪種數據,均可以使用Kryo序列化來減小CPU和內存開銷,詳見Spark Tuning Guide。另,對於Kryo,你能夠考慮這些優化:註冊自定義類型,禁用對象引用跟蹤(詳見Configuration Guide)。
在一些特定的場景下,若是數據量不是很大,那麼你能夠考慮不用序列化格式,不過你須要注意的是取消序列化是否會致使大量的GC開銷。例如,若是你的 批次間隔比較短(幾秒)而且沒有使用基於窗口的算子,這種狀況下你能夠考慮禁用序列化格式。這樣能夠減小序列化的CPU開銷以優化性能,同時GC的增加也 很少。
若是每秒啓動的任務數過多(好比每秒50個以上),那麼將任務發送給slave節點的開銷會明顯增長,那麼你也就很難達到亞秒級(sub-second)的延遲。不過如下兩個方法能夠減小任務的啓動開銷:
這些調整有可能可以減小100ms的批次處理時間,這也使得亞秒級的批次間隔成爲可能。
要想streaming應用在集羣上穩定運行,那麼系統處理數據的速度必須能跟上其接收數據的速度。換句話說,批次數據的處理速度應該和其生成速度同樣快。對於特定的應用來講,能夠從其對應的監控(monitoring)頁面上觀察驗證,頁面上顯示的處理耗時應該要小於批次間隔時間。
根據spark streaming計算的性質,在必定的集羣資源限制下,批次間隔的值會極大地影響系統的數據處理能力。例如,在WordCountNetwork示例 中,對於特定的數據速率,一個系統可能可以在批次間隔爲2秒時跟上數據接收速度,但若是把批次間隔改成500毫秒系統可能就處理不過來了。因此,批次間隔 須要謹慎設置,以確保生產系統可以處理得過來。
要找出適合的批次間隔,你能夠從一個比較保守的批次間隔值(如5~10秒)開始測試。要驗證系統是否能跟上當前的數據接收速率,你可能須要檢查一下端到端的批次處理延遲(能夠看看Spark驅動器log4j日誌中的Total delay,也能夠用StreamingListener接 口來檢測)。若是這個延遲能保持和批次間隔差很少,那麼系統基本就是穩定的。不然,若是這個延遲持久在增加,也就是說系統跟不上數據接收速度,那也就意味 着系統不穩定。一旦系統文檔下來後,你就能夠嘗試提升數據接收速度,或者減小批次間隔值。不過須要注意,瞬間的延遲增加能夠只是暫時的,只要這個延遲後續 會自動降下來就沒有問題(如:降到小於批次間隔值)
Spark應用內存佔用和GC調優已經在調優指南(Tuning Guide)中有詳細的討論。牆裂建議你讀一讀那篇文檔。本節中,咱們只是討論一下幾個專門用於Spark Streaming的調優參數。
Spark Streaming應用在集羣中佔用的內存量嚴重依賴於具體所使用的tranformation算子。例如,若是想要用一個窗口算子操縱最近10分鐘的數 據,那麼你的集羣至少須要在內存裏保留10分鐘的數據;另外一個例子是updateStateByKey,若是key不少的話,相對應的保存的key的 state也會不少,而這些都須要佔用內存。而若是你的應用只是作一個簡單的 「映射-過濾-存儲」(map-filter-store)操做的話,那須要的內存就不多了。
通常狀況下,streaming接收器接收到的數據會以 StorageLevel.MEMORY_AND_DISK_SER_2 這個存儲級別存到spark中,也就是說,若是內存裝不下,數據將被吐到磁盤上。數據吐到磁盤上會大大下降streaming應用的性能,所以仍是建議根 據你的應用處理的數據量,提供充足的內存。最好就是,一邊小規模地放大內存,再觀察評估,而後再放大,再評估。
另外一個內存調優的方向就是垃圾回收。由於streaming應用每每都須要低延遲,因此確定不但願出現大量的或耗時較長的JVM垃圾回收暫停。
如下是一些可以幫助你減小內存佔用和GC開銷的參數或手段:
本節中,咱們將討論Spark Streaming應用在出現失敗時的具體行爲。
要理解Spark Streaming所提供的容錯語義,咱們首先須要回憶一下Spark RDD所提供的基本容錯語義。
Spark主要操做一些可容錯文件系統的數據,如:HDFS或S3。所以,全部從這些可容錯數據源產生的RDD也是可容錯的。然而,對於Spark Streaming並不是如此,由於多數狀況下Streaming須要從網絡遠端接收數據,這回致使Streaming的數據源並不可靠(尤爲是對於使用了 fileStream的應用)。要實現RDD相同的容錯屬性,數據接收就必須用多個不一樣worker節點上的Spark執行器來實現(默認副本因子是 2)。所以一旦出現故障,系統須要恢復兩種數據:
此外,還有兩種可能的故障類型須要考慮:
有了以上這些基本知識,下面咱們就進一步瞭解一下Spark Streaming的容錯語義。
流式系統的可靠度語義能夠據此來分類:單條記錄在系統中被處理的次數保證。一個流式系統可能提供保證一定是如下三種之一(無論系統是否出現故障):
任何流式處理系統通常都會包含如下三個數據處理步驟:
若是Streaming應用須要作到端到端的「精確一次」的保證,那麼就必須在以上三個步驟中各自都保證精確一次:即,每條記錄必須,只接收一次、處理一次、推送一次。下面讓咱們在Spark Streaming的上下文環境中來理解一下這三個步驟的語義:
不一樣的輸入源提供不一樣的數據可靠性級別,從「至少一次」到「精確一次」。
若是全部的輸入數據都來源於可容錯的文件系統,如HDFS,那麼Spark Streaming就能在任何故障中恢復並處理全部的數據。這種狀況下就能保證精確一次語義,也就是說無論出現什麼故障,全部的數據老是精確地只處理一次,很少也很多。
對於基於接收器的輸入源,容錯語義將同時依賴於故障場景和接收器類型。前面也已經提到過,spark Streaming主要有兩種類型的接收器:
對於不一樣的接收器,咱們能夠得到以下不一樣的語義。若是一個worker節點故障了,對於可靠接收器來書,不會有數據丟失。而對於不可靠接收器,緩存 的(接收但還沒有保存副本)數據可能會丟失。若是driver節點故障了,除了接收到的數據以外,其餘的已經接收且已經保存了內存副本的數據都會丟失,這將 會影響有狀態算子的計算結果。
爲了不丟失已經收到且保存副本的數,從 spark 1.2 開始引入了WAL(write ahead logs),以便將這些數據寫入到可容錯的存儲中。只要你使用可靠接收器,同時啓用WAL(write ahead logs enabled),那麼久不再用爲數據丟失而擔憂了。而且這時候,還能提供「至少一次」的語義保證。
下表總結了故障狀況下的各類語義:
部署場景 | Worker 故障 | Driver 故障 |
---|---|---|
Spark 1.1及之前版本 或者 Spark 1.2及之後版本,且未開啓WAL |
若使用不可靠接收器,則可能丟失緩存(已接收但還沒有保存副本)數據; 若使用可靠接收器,則沒有數據丟失,且提供至少一次處理語義 |
若使用不可靠接收器,則緩存數據和已保存數據均可能丟失; 若使用可靠接收器,則沒有緩存數據丟失,但已保存數據可能丟失,且不提供語義保證 |
Spark 1.2及之後版本,並啓用WAL | 若使用可靠接收器,則沒有數據丟失,且提供至少一次語義保證 | 若使用可靠接收器和文件,則無數據丟失,且提供至少一次語義保證 |
從Spark 1.3開始,咱們引入Kafka Direct API,該API能爲Kafka數據源提供「精確一次」語義保證。有了這個輸入API,再加上輸出算子的「精確一次」保證,你就能真正實現端到端的「精確 一次」語義保證。(改功能截止Spark 1.6.1仍是實驗性的)更詳細的說明見:Kafka Integration Guide。
輸出算子(如 foreachRDD)提供「至少一次」語義保證,也就是說,若是worker故障,單條輸出數據可能會被屢次寫入外部實體中。不過這對於文件系統來講是 能夠接受的(使用saveAs***Files 屢次保存文件會覆蓋以前的),因此咱們須要一些額外的工做來實現「精確一次」語義。主要有兩種實現方式:
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // 使用uniqueId做爲事務的惟一標識,基於uniqueId實現partitionIterator所指向數據的原子事務提交 } }
在Spark 0.9.1和Spark 1.0之間,有一些API接口變動,變動目的是爲了保障將來版本API的穩定。本節將詳細說明一下從已有版本遷移升級到1.0所需的工做。
輸入DStream(Input DStreams): 全部建立輸入流的算子(如:StreamingContext.socketStream, FlumeUtils.createStream 等)的返回值再也不是DStream(對Java來講是JavaDStream),而是 InputDStream / ReceiverInputDStream(對Java來講是JavaInputDStream / JavaPairInputDStream /JavaReceiverInputDStream / JavaPairReceiverInputDStream)。這樣才能確保特定輸入流的功能可以在將來持續增長到這些class中,而不會打破二進制兼容性。注意,已有的Spark Streaming應用應該不須要任何代碼修改(新的返回類型都是DStream的子類),只不過須要基於Spark 1.0從新編譯一把。
定製網絡接收器(Custom Network Receivers): 自從Spark Streaming發佈以來,Scala就能基於NetworkReceiver來定製網絡接收器。但因爲錯誤處理和彙報API方便的限制,該類型不能在Java中使用。因此Spark 1.0開始,用 Receiver 來替換掉這個NetworkReceiver,主要的好處以下:
爲了將已有的基於NetworkReceiver的自定義接收器遷移到Receiver上來,你須要以下工做:
org.apache.spark.streaming.receiver.Receiver繼承,而再也不是
org.apache.spark.streaming.dstream.NetworkReceiver。
基於Actor的接收器(Actor-based Receivers): 從actor class繼承後,並實現了
org.apache.spark.streaming.receiver.Receiver 後,
便可從Akka Actors中獲取數據。獲取數據的類被重命名爲 org.apache.spark.streaming.receiver.ActorHelper
,而保存數據的pushBlocks(…)方法也被重命名爲 store(…)。其餘org.apache.spark.streaming.receivers包中的工具類也被移到 org.apache.spark.streaming.receiver
包下並重命名,新的類名應該比以前更加清晰。