Spark streaming是Spark核心API的一個擴展,它對實時流式數據的處理具備可擴展性、高吞吐量、可容錯性等特色。咱們能夠從kafka、flume、Twitter、 ZeroMQ、Kinesis等源獲取數據,也能夠經過由 高階函數map、reduce、join、window等組成的複雜算法計算出數據。最後,處理後的數據能夠推送到文件系統、數據庫、實時儀表盤中。html
一、項目建立java
關於Java:選用1.7或者1.8.爲了通用性,本章內容使用1.7進行編寫。node
關於Scala:工程不須要增長scala nature,即不需Add Scala Nature。若增長在java代碼中調用scala library會有異常。python
關於Spark版本:使用1.6.3進行編寫。git
maven 依賴github
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.3</version> </dependency> </dependencies> |
例子:算法
public class NetworkWordCount { public static void main(String[] args) { networkWC(); } public static void networkWC() { // Create a local StreamingContext with two working thread and batch // interval of 5 second SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); // Create a DStream that will connect to hostname:port, like // localhost:9999 JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999); // Split each line into words JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); } }); // Count each word in each batch JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); // Print the first ten elements of each RDD generated in this DStream to // the console wordCounts.print(); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } } |
爲了初始化Spark Streaming程序,一個JavaStreamingContext對象必需被建立,它是SparkStreaming全部流操做的主要入口。一個JavaStreamingContext 對象能夠用SparkConf對象建立。須要注意的是,它在內部建立了一個JavaSparkContext對象,你能夠經過 jssc.sparkContext 訪問這個SparkContext對象。sql
// Create a local StreamingContext with two working thread and batch // interval of 5 second SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); |
當一個上下文(context)定義以後,你必須按照如下幾步進行操做數據庫
幾點須要注意的地方:apache
DStreams是Spark Streaming提供的基本的抽象,它表明一個連續的數據流。它要麼是從源中獲取的輸入流,要麼是輸入流經過轉換算子生成的處理後的數據流。在內部,DStreams由一系列連續的 RDD組成。DStreams中的每一個RDD都包含肯定時間間隔內的數
據,以下圖所示:
任何對DStreams的操做都轉換成了對DStreams隱含的RDD的操做。在前面的例子中, flatMap 操做應用於 lines 這個DStreams的每一個RDD,生成 words 這個DStreams的RDD。過程以下圖所示:
經過Spark引擎計算這些隱含RDD的轉換算子。DStreams操做隱藏了大部分的細節,而且爲了更便捷,爲開發者提供了更高層的API。下面幾節將具體討論這些操做的細節。
輸入DStreams表示從數據源獲取輸入數據流的DStreams。在快速例子中, lines 表示輸入DStream,它表明從netcat服務器獲取的數據流。每個輸入流DStream 和一個 Receiver 對象相關聯,這個 Receiver 從源中獲取數據,並將數據存入內存中用於處理。
輸入DStreams表示從數據源獲取的原始數據流。Spark Streaming擁有兩類數據源
須要注意的是,若是你想在一個流應用中並行地建立多個輸入DStream來接收多個數據流,你可以建立多個輸入流(這將在性能調優那一節介紹) 。它將建立多個Receiver同時接收多個數據流。可是, receiver 做爲一個長期運行的任務運行在Spark worker或executor中。所以,它佔有一個核,這個核是分配給Spark Streaming應用程序的全部 核中的一個(itoccupies one of the cores allocated to the Spark Streaming application)。因此,爲SparkStreaming應用程序分配足夠的核(若是是本地運行,那麼是線程) 用以處理接收的數據而且運行 receiver 是很是重要的。
幾點須要注意的地方:
咱們已經在快速例子中看到, ssc.socketTextStream(...) 方法用來把從TCP套接字獲取的文本數據建立成DStream。除了套接字,StreamingContext API也支持把文件 以及Akka actors做爲輸入源建立DStream。
套接字:從一個ip:port監控套接字。
文件流(File Streams):從任何與HDFS API兼容的文件系統中讀取數據,一個DStream能夠經過以下方式建立
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory); |
Spark Streaming將會監控 dataDirectory 目錄,而且處理目錄下生成的任何文件(嵌套目錄不被支持)。須要注意一下三點:
對於簡單的文本文件,有一個更簡單的方法 streamingContext.textFileStream(dataDirectory) 能夠被調用。文件流不須要運行一個receiver,因此不須要分配核。
(待補充)
(待補充)
和RDD相似,transformation容許從輸入DStream來的數據被修改。DStreams支持不少在RDD中可用的transformation算子。一些經常使用的算子以下所示:
Transformation | Meaning |
---|---|
map(func) | Return a new DStream by passing each element of the source DStream through a function func. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items. |
filter(func) | Return a new DStream by selecting only the records of the source DStream on which func returns true. |
repartition(numPartitions) | Changes the level of parallelism in this DStream by creating more or fewer partitions. |
union(otherStream) | Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
count() | Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) | Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. |
countByValue() | When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
reduceByKey(func, [numTasks]) | When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config propertyspark.default.parallelism ) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
join(otherStream, [numTasks]) | When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) | When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) | Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) | Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
updateStateByKey操做容許不斷用新信息更新它的同時保持任意狀態。你須要經過兩步來使用它
讓咱們舉個例子說明。在例子中,你想保持一個文本數據流中每一個單詞的運行次數,運行次數用一個state表示,它的類型是整數
import com.google.common.base.Optional; Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { Integer newSum = ... // add the new values with the previous running count to get the new count return Optional.of(newSum); } }; |
這被應用在一個包含word( pairs DStream 含有(word,1)對)的DStream上。
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction); |
這個update函數被每一個單詞word調用,newValues 擁有一系列的1(從 (詞, 1)對而來),runningCount擁有以前的次數。要看完整的代碼,見例子JavaStatefulNetworkWordCount.java. 備註使用updateStateByKey時須要定義checkpoint目錄,更多細節在checkpointing部分。
(略)
Spark Streaming也支持窗口計算,它容許你在一個滑動窗口數據上應用transformation算子。下圖闡明瞭這個滑動窗口。
如上圖顯示,窗口在源DStream上滑動,合併和操做落入窗內的源RDDs,產生窗口化的DStream的RDDs。在這個具體的例子中,程序在三個時間單元的數據上進行窗口操做,而且每兩個時間單元滑動一次。 這說明,任何一個窗口操做都須要指定兩個參數:
這兩個參數必須是源DStream的批時間間隔的倍數。
下面舉例說明窗口操做。例如,你想擴展前面的例子用來計算過去30秒的詞頻,間隔時間是10秒。爲了達到這個目的,咱們必須在過去30秒的 pairs DStream上應用 reduceByKey 操做。用方法 reduceByKeyAndWindow 實現。
// Reduce function adding two integers, defined separately for clarity Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }; // Reduce last 30 seconds of data, every 10 seconds JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10)); |
一些經常使用的窗口操做以下所示,這些操做都須要用到上文提到的兩個參數:窗口長度和滑動的時間間隔
Transformation | Meaning |
---|---|
window(windowLength, slideInterval) | Return a new DStream which is computed based on windowed batches of the source DStream. |
countByWindow(windowLength, slideInterval) | Return a sliding window count of elements in the stream. |
reduceByWindow(func, windowLength, slideInterval) | Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel. |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism ) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | A more efficient version of the above |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument. |
最後,值得強調的是,您能夠輕鬆地在Spark Streaming中執行不一樣類型的join。
Stream-stream joins
Streams 可以方便地和其餘streams進行join操做.
JavaPairDStream<String, String> stream1 = ... JavaPairDStream<String, String> stream2 = ... JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2); |
在這裏,每批間隔,stream1產生的 RDD將與stream2產生的 RDD進行join操做。你也能夠作leftouterjoin,rightouterjoin,fullouterjoin。此外,在stream的窗口上進行join一般是很是有用的。這也很容易。
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20)); JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1)); JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2); |
Stream-dataset joins
下面是另外一個將窗口流與數據集join的例子。
JavaPairRDD<String, String> dataset = ... JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20)); JavaPairDStream<String, String> joinedStream = windowedStream.transform( new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() { @Override public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) { return rdd.join(dataset); } } ); |
事實上,您還能夠動態地更改你想要join的數據集。提供轉換的函數在每一個批處理間隔中進行計算求值,所以將使用數據集引用指向的當前數據集。
The complete list of DStream transformations is available in the API documentation. For the Scala API, see DStream and PairDStreamFunctions. For the Java API, see JavaDStream and JavaPairDStream. For the Python API, see DStream.
輸出操做容許DStream的操做推到如數據庫、文件系統等外部系統中。由於輸出操做其實是容許外部系統消費轉換後的數據,它們觸發的實際操做是DStream轉換。目前,定義了下面幾種輸出操做:
Output Operation | Meaning |
---|---|
print() | Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. Python API This is called pprint() in the Python API. |
saveAsTextFiles(prefix, [suffix]) | Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix, [suffix]) | Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".Python API This is not available in the Python API. |
saveAsHadoopFiles(prefix, [suffix]) | Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". Python API This is not available in the Python API. |
foreachRDD(func) | The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs. |
利用foreachRDD的設計模式
dstream.foreachRDD是一個強大的原語,發送數據到外部系統中。然而,明白怎樣正確地、有效地用這個原語是很是重要的。下面幾點介紹瞭如何避免通常錯誤。常常寫數據到外部系統須要建一個鏈接對象(例如到遠程服務器的TCP鏈接),用它發送數據到遠程系統。爲了達到這個目的,開發人員可能不經意的在Spark驅動中建立一個鏈接對象,可是在Spark worker中 嘗試調用這個鏈接對象保存記錄到RDD中,以下:
dstream.foreachRDD(rdd => { val connection = createNewConnection() // executed at the driver rdd.foreach(record => { connection.send(record) // executed at the worker }) }) |
這是不正確的,由於這須要先序列化鏈接對象,而後將它從driver發送到worker中。這樣的鏈接對象在機器之間不能傳送。它可能表現爲序列化錯誤(鏈接對象不可序列化)或者初始化錯誤(鏈接對象應該 在worker中初始化)等等。正確的解決辦法是在worker中建立鏈接對象。
然而,這會形成另一個常見的錯誤-爲每個記錄建立了一個鏈接對象。例如:
dstream.foreachRDD(rdd => { rdd.foreach(record => { val connection = createNewConnection() connection.send(record) connection.close() }) }) |
一般,建立一個鏈接對象有資源和時間的開支。所以,爲每一個記錄建立和銷燬鏈接對象會致使很是高的開支,明顯的減小系統的總體吞吐量。一個更好的解決辦法是利用 rdd.foreachPartition 方法。 爲RDD的partition建立一個鏈接對象,用這個兩件對象發送partition中的全部記錄。
dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() }) }) |
這就將鏈接對象的建立開銷分攤到了partition的全部記錄上了。
最後,能夠經過在多個RDD或者批數據間重用鏈接對象作更進一步的優化。開發者能夠保有一個靜態的鏈接對象池,重複使用池中的對象將多批次的RDD推送到外部系統,以進一步節省開支。
dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse }) }) |
須要注意的是,池中的鏈接對象應該根據須要延遲建立,而且在空閒一段時間後自動超時。這樣就獲取了最有效的方式發生數據到外部系統。
其它須要注意的地方:
你能夠很容易地使用DataFrames and SQL操做數據流。你必須使用StreamingContext已用的SparkContext,以建立一個SQLContext。此外,這必須這樣作,它能夠在驅動程序故障的時候從新啓動。這是經過sqlcontext惰性實例化一個單例實例來完成的。下面的例子說明了這一點。它改變了早期的word count example 。每一個RDD轉換爲一個DataFrame,做爲一個臨時表註冊並經過SQL進行查詢。
/** Java Bean class for converting RDD to DataFrame */ public class JavaRow implements java.io.Serializable { private String word; public String getWord() { return word; } public void setWord(String word) { this.word = word; } } ... /** DataFrame operations inside your streaming program */ JavaDStream<String> words = ... words.foreachRDD( new Function2<JavaRDD<String>, Time, Void>() { @Override public Void call(JavaRDD<String> rdd, Time time) { // Get the singleton instance of SQLContext SQLContext sqlContext = SQLContext.getOrCreate(rdd.context()); // Convert RDD[String] to RDD[case class] to DataFrame JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() { public JavaRow call(String word) { JavaRow record = new JavaRow(); record.setWord(word); return record; } }); DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class); // Register as table wordsDataFrame.registerTempTable("words"); // Do word count on table using SQL and print it DataFrame wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word"); wordCountsDataFrame.show(); return null; } } ); |
查看完整代碼 source code.
You can also run SQL queries on tables defined on streaming data from a different thread (that is, asynchronous to the running StreamingContext). Just make sure that you set the StreamingContext to remember a sufficient amount of streaming data such that the query can run. Otherwise the StreamingContext, which is unaware of the any asynchronous SQL queries, will delete off old streaming data before the query can complete. For example, if you want to query the last batch, but your query can take 5 minutes to run, then call streamingContext.remember(Minutes(5))
(in Scala, or equivalent in other languages).
和RDD類似,DStreams也容許開發者持久化流數據到內存中。在DStream上使用 persist() 方法能夠自動地持久化DStream中的RDD到內存中。若是DStream中的數據須要計算屢次,這是很是有用的。像 reduceByWindow 和 reduceByKeyAndWindow 這種窗口操做、 updateStateByKey 這種基於狀態的操做,持久化是默認的,不須要開發者調用 persist() 方法。例如經過網絡(如kafka,flume等)獲取的輸入數據流,默認的持久化策略是複製數據到兩個不一樣的節點以容錯。
注意,與RDD不一樣的是,DStreams默認持久化級別是存儲序列化數據到內存中,這將在Performance Tuning 章節介紹。更多的信息請看rdd持久化 Spark Programming Guide.
一個流應用程序必須全天候運行,全部必須可以解決應用程序邏輯無關的故障(如系統錯誤,JVM崩潰等)。爲了使這成爲可能,Spark Streaming須要checkpoint足夠的信息到容錯存儲系統中, 以使系統從故障中恢復。
Metadata checkpointing:保存流計算的定義信息到容錯存儲系統如HDFS中。這用來恢復應用程序中運行worker的節點的故障。元數據包括
Data checkpointing :保存生成的RDD到可靠的存儲系統中,這在有狀態transformation(如結合跨多個批次的數據)中是必須的。在這樣一個transformation中,生成的RDD依賴於以前 批的RDD,隨着時間的推移,這個依賴鏈的長度會持續增加。在恢復的過程當中,爲了不這種無限增加。有狀態的transformation的中間RDD將會定時地存儲到可靠存儲系統中,以截斷這個依賴鏈。元數據checkpoint主要是爲了從driver故障中恢復數據。若是transformation操做被用到了,數據checkpoint即便在簡單的操做中都是必須的。
應用程序在下面兩種狀況下必須開啓checkpoint
注意,沒有前述的有狀態的transformation的簡單流應用程序在運行時能夠不開啓checkpoint。在這種狀況下,從driver故障的恢復將是部分恢復(接收到了可是尚未處理的數據將會丟失)。 這一般是能夠接受的,許多運行的Spark Streaming應用程序都是這種方式。
在容錯、可靠的文件系統(HDFS、s3等)中設置一個目錄用於保存checkpoint信息。着能夠經過 streamingContext.checkpoint(checkpointDirectory) 方法來作。這運行你用以前介紹的 有狀態transformation。另外,若是你想從driver故障中恢復,你應該如下面的方式重寫你的Streaming應用程序。
// Create a factory object that can create a and setup a new JavaStreamingContext JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { JavaStreamingContext jssc = new JavaStreamingContext(...); // new context JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams ... jssc.checkpoint(checkpointDirectory); // set checkpoint directory return jssc; } }; // Get JavaStreamingContext from checkpoint data or create a new one JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory); // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start(); context.awaitTermination(); |
若是 checkpointDirectory 存在,上下文將會利用checkpoint數據從新建立。若是這個目錄不存在,將會調用 functionToCreateContext 函數建立一個新的上下文,創建DStreams。 請看RecoverableNetworkWordCount例子。
除了使用 getOrCreate ,開發者必須保證在故障發生時,driver處理自動重啓。只能經過部署運行應用程序的基礎設施來達到該目的。在部署章節將有更進一步的討論。
注意,RDD的checkpointing有存儲成本。這會致使批數據(包含的RDD被checkpoint)的處理時間增長。所以,須要當心的設置批處理的時間間隔。在最小的批容量(包含1秒的數據)狀況下,checkpoint每批數據會顯著的減小 操做的吞吐量。相反,checkpointing太少會致使譜系以及任務大小增大,這會產生有害的影響。由於有狀態的transformation須要RDD checkpoint。默認的間隔時間是批間隔時間的倍數,最少10秒。它能夠經過 dstream.checkpoint 來設置。典型的狀況下,設置checkpoint間隔是DStream的滑動間隔的5-10大小是一個好的嘗試。