前言html
spark streaming在2.2.1版本以後出現一個相似的實時計算框架Structured Streaming。java
引用一句spark streaming structured streaming區別博客的原話,建議擴展讀下:Structured Streaming 經過提供一套 high-level 的 declarative api 使得流式計算的編寫相比 Spark Streaming 簡單容易很多,同時經過提供 end-to-end 的 exactly-once 語義。node
核心優點有如下幾點:用流式計算代替batch計算,declarative api能夠減小代碼編寫難度,能夠保證exactly-once。apache
一:StreamingContext詳解api
兩種建立方式:app
一:sparkConf方式 val conf = new SparkConf().setAppName(appName).setMaster(master); val ssc = new StreamingContext(conf, Seconds(1)); 二:sparkContext方式 val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1));
一個StreamingContext定義以後,必須作如下幾件事情:
一、經過建立輸入DStream來建立輸入數據源。
二、經過對DStream定義transformation和output算子操做,來定義實時計算邏輯。
三、調用StreamingContext的start()方法,來開始實時處理數據。
四、調用StreamingContext的awaitTermination()方法,來等待應用程序的終止。可使用CTRL+C手動中止,或者就是讓它持續不斷的運行進行計算。
五、也能夠經過調用StreamingContext的stop()方法,來中止應用程序。
須要注意的要點:
一、只要一個StreamingContext啓動以後,就不能再往其中添加任何計算邏輯了。好比執行start()方法以後,還給某個DStream執行一個算子。
二、一個StreamingContext中止以後,是確定不可以重啓的。調用stop()以後,不能再調用start()
三、一個JVM同時只能有一個StreamingContext啓動。在你的應用程序中,不能建立兩個StreamingContext。
四、調用stop()方法時,會同時中止內部的SparkContext,若是不但願如此,還但願後面繼續使用SparkContext建立其餘類型的Context,好比SQLContext,那麼就用stop(false)。
五、一個SparkContext能夠建立多個StreamingContext,只要上一個先用stop(false)中止,再建立下一個便可。
六、一個Spark Streaming Application的Executor,是一個長時間運行的任務,所以,它會獨佔分配給Spark Streaming Application的cpu core。從而只要Spark Streaming運行起來之後,這個節點上的cpu core,就無法給其餘應用使用了。因此單線程是不能正常接收數據而且處理數據的。必須只要一個core用於接收數據(receive),一個core用於處理數據。可是基於hdfs是不須要receive的。
實例代碼以下:框架
public class HDFSWordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("HDFSWordCountJava").setMaster("local[2]"); JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(10)); JavaDStream<String> lines = javaStreamingContext.textFileStream("hdfs://hadoop-100:9000/stream/"); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); JavaPairDStream<String, Integer> wordsNumber = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s, 1); } }); JavaPairDStream<String, Integer> result = wordsNumber.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); result.print(); javaStreamingContext.start(); javaStreamingContext.awaitTermination(); javaStreamingContext.close(); } }
二:kafka direct 跟receiver 方式接收數據的區別分佈式
Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,而後Spark Streaming啓動的job會去處理那些數據。然而,在默認的配置下,這種方式可能會由於底層的失敗而丟失數據。若是要啓用高可靠機制,讓數據零丟失,就必須啓用Spark Streaming的預寫日誌機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分佈式文件系統(好比HDFS)上的預寫日誌中。因此,即便底層節點出現了失敗,也可使用預寫日誌中的數據進行恢復,可是效率會降低。ide
direct 這種方式會週期性地查詢Kafka,來得到每一個topic+partition的最新的offset,從而定義每一個batch的offset的範圍。當處理數據的job啓動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset範圍的數據。這種方式有以下優勢:
一、簡化並行讀取:若是要讀取多個partition,不須要建立多個輸入DStream而後對它們進行union操做。Spark會建立跟Kafka partition同樣多的RDD partition,而且會並行從Kafka中讀取數據。因此在Kafka partition和RDD partition之間,有一個一對一的映射關係。
二、高性能:若是要保證零數據丟失,在基於receiver的方式中,須要開啓WAL機制。這種方式其實效率低下,由於數據實際上被複制了兩份,Kafka本身自己就有高可靠的機制,會對數據複製一份,而這裏又會複製一份到WAL中。而基於direct的方式,不依賴Receiver,不須要開啓WAL機制,只要Kafka中做了數據的複製,那麼就能夠經過Kafka的副本進行恢復。oop
三、一次且僅一次的事務機制:
基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制能夠保證數據零丟失的高可靠性,可是卻沒法保證數據被處理一次且僅一次,可能會處理兩次。由於Spark和ZooKeeper之間多是不一樣步的。
基於direct的方式,使用kafka的簡單api,Spark Streaming本身就負責追蹤消費的offset,並保存在checkpoint中。Spark本身必定是同步的,所以能夠保證數據是消費一次且僅消費一次。
三:DStream操做
轉化操做:
Transformation | Meaning |
---|---|
map(func) | Return a new DStream by passing each element of the source DStream through a function func. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items. |
filter(func) | Return a new DStream by selecting only the records of the source DStream on which func returns true. |
repartition(numPartitions) | Changes the level of parallelism in this DStream by creating more or fewer partitions. |
union(otherStream) | Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
count() | Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) | Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative and commutative so that it can be computed in parallel. |
countByValue() | When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
reduceByKey(func, [numTasks]) | When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism ) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
join(otherStream, [numTasks]) | When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) | When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) | Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) | Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
Transformation | Meaning |
---|---|
window(windowLength, slideInterval) | Return a new DStream which is computed based on windowed batches of the source DStream. |
countByWindow(windowLength, slideInterval) | Return a sliding window count of elements in the stream. |
reduceByWindow(func, windowLength, slideInterval) | Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative and commutative so that it can be computed correctly in parallel. |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism ) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | A more efficient version of the above |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument. |
output
Output Operation | Meaning |
---|---|
print() | Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. Python API This is called pprint() in the Python API. |
saveAsTextFiles(prefix, [suffix]) | Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix, [suffix]) | Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".Python API This is not available in the Python API. |
saveAsHadoopFiles(prefix, [suffix]) | Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". Python API This is not available in the Python API. |
foreachRDD(func) | The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs. |