Spark Streaming處理的數據流圖:html
Spark Streaming在內部的處理機制是,接收實時流的數據,並根據必定的時間間隔拆分紅一批批的數據,而後經過Spark Engine處理這些批數據,最終獲得處理後的一批批結果數據。python
對應的批數據,在Spark內核對應一個RDD實例,所以,對應流數據的DStream能夠當作是一組RDDs,即RDD的一個序列。通俗點理解的話,在流數據分紅一批一批後,經過一個先進先出的隊列,而後 Spark Engine從該隊列中依次取出一個個批數據,把批數據封裝成一個RDD,而後進行處理,這是一個典型的生產者消費者模型。git
l離散流(discretized stream)或DStream:Spark Streaming對內部持續的實時數據流的抽象描述,即咱們處理的一個實時數據流,在Spark Streaming中對應於一個DStream 實例。github
l批數據(batch data):這是化整爲零的第一步,將實時流數據以時間片爲單位進行分批,將流處理轉化爲時間片數據的批處理。隨着持續時間的推移,這些處理結果就造成了對應的結果數據流了。數據庫
l時間片或批處理時間間隔( batch interval):人爲地對流數據進行定量的標準,以時間片做爲咱們拆分流數據的依據。一個時間片的數據對應一個RDD實例。apache
l窗口長度(window length):一個窗口覆蓋的流數據的時間長度。必須是批處理時間間隔的倍數,編程
l滑動時間間隔:前一個窗口到後一個窗口所通過的時間長度。必須是批處理時間間隔的倍數網絡
lInput DStream :一個input DStream是一個特殊的DStream,將Spark Streaming鏈接到一個外部數據源來讀取數據。數據結構
SparkStreaming是一個對實時數據流進行高通量、容錯處理的流式處理系統,能夠對多種數據源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)進行相似Map、Reduce和Join等複雜操做,並將結果保存到外部文件系統、數據庫或應用到實時儀表盤。架構
l計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理做業。這裏的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分紅一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),而後將Spark Streaming中對DStream的Transformation操做變爲針對Spark中對RDD的Transformation操做,將RDD通過操做變成中間結果保存在內存中。整個流式計算根據業務的需求能夠對中間的結果進行疊加或者存儲到外部設備。下圖顯示了Spark Streaming的整個流程。
圖Spark Streaming構架
DStream(Discretized Stream)做爲Spark Streaming的基礎抽象,它表明持續性的數據流。這些數據流既能夠經過外部輸入源賴獲取,也能夠經過現有的Dstream的transformation操做來得到。在內部實現上,DStream由一組時間序列上連續的RDD來表示。每一個RDD都包含了本身特定時間間隔內的數據流。如圖7-3所示。
圖7-3 DStream中在時間軸下生成離散的RDD序列
對DStream中數據的各類操做也是映射到內部的RDD上來進行的,如圖7-4所示,對Dtream的操做能夠經過RDD的transformation生成新的DStream。這裏的執行引擎是Spark。
""" Counts words in UTF8 encoded, '\n' delimited text directly received from Kafka in every 2 seconds. Usage: direct_kafka_wordcount.py <broker_list> <topic> To run this on your local machine, you need to setup Kafka and create a producer first, see http://kafka.apache.org/documentation.html#quickstart
and then run the example `$ bin/spark-submit --jars \ external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \ examples/src/main/python/streaming/direct_kafka_wordcount.py \ localhost:9092 test` """ from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr) exit(-1) sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") ssc = StreamingContext(sc, 2) brokers, topic = sys.argv[1:] kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
#這裏kafka產生的是一個map, key是null, value是實際發送的數據,因此取x[1] lines = kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()
1.建立StreamingContext對象 同Spark初始化須要建立SparkContext對象同樣,使用Spark Streaming就須要建立StreamingContext對象。建立StreamingContext對象所需的參數與SparkContext基本一致,包括指明Master,設定名稱。Spark Streaming須要指定處理數據的時間間隔,如上例所示的2s,那麼Spark Streaming會以2s爲時間窗口進行數據處理。此參數須要根據用戶的需求和集羣的處理能力進行適當的設置;
2.建立InputDStream Spark Streaming須要指明數據源。如socketTextStream,Spark Streaming以socket鏈接做爲數據源讀取數據。固然Spark Streaming支持多種不一樣的數據源,包括Kafka、 Flume、HDFS/S三、Kinesis和Twitter等數據源;
3.操做DStream 對於從數據源獲得的DStream,用戶能夠在其基礎上進行各類操做,如上例所示的操做就是一個典型的WordCount執行流程:對於當前時間窗口內從數據源獲得的數據首先進行分割,而後利用Map和ReduceByKey方法進行計算,固然最後還有使用print()方法輸出結果;
4.啓動Spark Streaming 以前所做的全部步驟只是建立了執行流程,程序沒有真正鏈接上數據源,也沒有對數據進行任何操做,只是設定好了全部的執行計劃,當ssc.start()啓動後程序才真正進行全部預期的操做。
至此對於Spark Streaming的如何使用有了一個大概的印象,在後面的章節咱們會經過源代碼深刻探究一下Spark Streaming的執行流程
與RDD相似,DStream也提供了本身的一系列操做方法,這些操做能夠分紅三類:普通的轉換操做、窗口轉換操做和輸出操做。
普通的轉換操做以下表所示:
轉換 |
描述 |
map(func) |
源 DStream的每一個元素經過函數func返回一個新的DStream。 |
flatMap(func) |
相似與map操做,不一樣的是每一個輸入元素能夠被映射出0或者更多的輸出元素。 |
filter(func) |
在源DSTREAM上選擇Func函數返回僅爲true的元素,最終返回一個新的DSTREAM 。 |
repartition(numPartitions) |
經過輸入的參數numPartitions的值來改變DStream的分區大小。 |
union(otherStream) |
返回一個包含源DStream與其餘 DStream的元素合併後的新DSTREAM。 |
count() |
對源DStream內部的所含有的RDD的元素數量進行計數,返回一個內部的RDD只包含一個元素的DStreaam。 |
reduce(func) |
使用函數func(有兩個參數並返回一個結果)將源DStream 中每一個RDD的元素進行聚 合操做,返回一個內部所包含的RDD只有一個元素的新DStream。 |
countByValue() |
計算DStream中每一個RDD內的元素出現的頻次並返回新的DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素出現的頻次。 |
reduceByKey(func, [numTasks]) |
當一個類型爲(K,V)鍵值對的DStream被調用的時候,返回類型爲類型爲(K,V)鍵值對的新 DStream,其中每一個鍵的值V都是使用聚合函數func彙總。注意:默認狀況下,使用 Spark的默認並行度提交任務(本地模式下並行度爲2,集羣模式下位8),能夠經過配置numTasks設置不一樣的並行任務數。 |
join(otherStream, [numTasks]) |
當被調用類型分別爲(K,V)和(K,W)鍵值對的2個DStream時,返回類型爲(K,(V,W))鍵值對的一個新 DSTREAM。 |
cogroup(otherStream, [numTasks]) |
當被調用的兩個DStream分別含有(K, V) 和(K, W)鍵值對時,返回一個(K, Seq[V], Seq[W])類型的新的DStream。 |
transform(func) |
經過對源DStream的每RDD應用RDD-to-RDD函數返回一個新的DStream,這能夠用來在DStream作任意RDD操做。 |
updateStateByKey(func) |
返回一個新狀態的DStream,其中每一個鍵的狀態是根據鍵的前一個狀態和鍵的新值應用給定函數func後的更新。這個方法能夠被用來維持每一個鍵的任何狀態數據。 |
在上面列出的這些操做中,transform()方法和updateStateByKey()方法值得咱們深刻的探討一下:
l transform(func)操做
該transform操做(轉換操做)連同其其相似的 transformWith操做容許DStream 上應用任意RDD-to-RDD函數。它能夠被應用於未在DStream API 中暴露任何的RDD操做。例如,在每批次的數據流與另外一數據集的鏈接功能不直接暴露在DStream API 中,但能夠輕鬆地使用transform操做來作到這一點,這使得DStream的功能很是強大。
l updateStateByKey操做
該 updateStateByKey 操做可讓你保持任意狀態,同時不斷有新的信息進行更新。要使用此功能,必須進行兩個步驟 :
(1) 定義狀態 - 狀態能夠是任意的數據類型。
(2) 定義狀態更新函數 - 用一個函數指定如何使用先前的狀態和從輸入流中獲取的新值 更新狀態。
讓咱們用一個例子來講明,假設你要進行文本數據流中單詞計數。在這裏,正在運行的計數是狀態並且它是一個整數。咱們定義了更新功能以下:
詳細案例參考:
http://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#transformations-on-dstreams
此函數應用於含有鍵值對的DStream中(如前面的示例中,在DStream中含有(word,1)鍵值對)。它會針對裏面的每一個元素(如wordCount中的word)調用一下更新函數,newValues是最新的值,runningCount是以前的值。
Spark Streaming 還提供了窗口的計算,它容許你經過滑動窗口對數據進行轉換,窗口轉換操做以下:
轉換 |
描述 |
window(windowLength, slideInterval) |
返回一個基於源DStream的窗口批次計算後獲得新的DStream。 |
countByWindow(windowLength,slideInterval) |
返回基於滑動窗口的DStream中的元素的數量。 |
reduceByWindow(func, windowLength,slideInterval) |
基於滑動窗口對源DStream中的元素進行聚合操做,獲得一個新的DStream。 |
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) |
基於滑動窗口對(K,V)鍵值對類型的DStream中的值按K使用聚合函數func進行聚合操做,獲得一個新的DStream。 |
reduceByKeyAndWindow(func, invFunc,windowLength,slideInterval, [numTasks]) |
一個更高效的reduceByKkeyAndWindow()的實現版本,先對滑動窗口中新的時間間隔內數據增量聚合並移去最先的與新增數據量的時間間隔內的數據統計量。例如,計算t+4秒這個時刻過去5秒窗口的WordCount,那麼咱們能夠將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量,這種方法能夠複用中間三秒的統計量,提升統計的效率。 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) |
基於滑動窗口計算源DStream中每一個RDD內每一個元素出現的頻次並返回DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素頻次。與countByValue同樣,reduce任務的數量能夠經過一個可選參數進行配置。
|
Spark Streaming容許DStream的數據被輸出到外部系統,如數據庫或文件系統。因爲輸出操做實際上使transformation操做後的數據能夠經過外部系統被使用,同時輸出操做觸發全部DStream的transformation操做的實際執行(相似於RDD操做)。如下表列出了目前主要的輸出操做:
轉換 |
描述 |
print() |
在Driver中打印出DStream中數據的前10個元素。 |
saveAsTextFiles(prefix, [suffix]) |
將DStream中的內容以文本的形式保存爲文本文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsObjectFiles(prefix, [suffix]) |
將DStream中的內容按對象序列化而且以SequenceFile的格式保存。其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsHadoopFiles(prefix, [suffix]) |
將DStream中的內容以文本的形式保存爲Hadoop文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
foreachRDD(func) |
最基本的輸出操做,將func函數應用於DStream中的RDD上,這個操做會輸出數據到外部系統,好比保存RDD到文件或者網絡數據庫等。須要注意的是func函數是在運行該streaming應用的Driver進程裏執行的。 |
dstream.foreachRDD是一個很是強大的輸出操做,它允將許數據輸出到外部系統。詳細案例請參考:
http://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#output-operations-on-dstreams
用spark streaming流式處理kafka中的數據,第一步固然是先把數據接收過來,轉換爲spark streaming中的數據結構Dstream。接收數據的方式有兩種:1.利用Receiver接收數據,2.直接從kafka讀取數據。
這種方式利用接收器(Receiver)來接收kafka中的數據,其最基本是使用Kafka高階用戶API接口。對於全部的接收器,從kafka接收來的數據會存儲在spark的executor中,以後spark streaming提交的job會處理這些數據。以下圖:
還有幾個須要注意的點:
KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
構造函數爲KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )
對於全部的receivers接收到的數據將會保存在spark executors中,而後經過Spark Streaming啓動job來處理這些數據,默認會丟失,可啓用WAL日誌,該日誌存儲在HDFS上
在spark1.3以後,引入了Direct方式。不一樣於Receiver的方式,Direct方式沒有receiver這一層,其會週期性的獲取Kafka中每一個topic的每一個partition中的最新offsets,以後根據設定的maxRatePerPartition偏移量範圍來處理每一個batch。其形式以下圖:
這種方法相較於Receiver方式的優點在於:
以上主要是對官方文檔[1]的一個簡單翻譯,詳細內容你們能夠直接看下官方文檔這裏再也不贅述。
http://spark.apache.org/docs/1.6.3/streaming-kafka-integration.html
不一樣於Receiver的方式,是從Zookeeper中讀取offset值,那麼天然zookeeper就保存了當前消費的offset值,那麼若是從新啓動開始消費就會接着上一次offset值繼續消費。
而在Direct的方式中,咱們是直接從kafka來讀數據,那麼offset須要本身記錄,能夠利用checkpoint、數據庫或文件記錄或者回寫到zookeeper中進行記錄。這裏咱們給出利用Kafka底層API接口,將offset及時同步到zookeeper中的通用類,我將其放在了github上:
Spark streaming+Kafka demo
示例中KafkaManager是一個通用類,而KafkaCluster是kafka源碼中的一個類,因爲包名權限的緣由我把它單獨提出來,ComsumerMain簡單展現了通用類的使用方法,在每次建立KafkaStream時,都會先從zooker中查看上次的消費記錄offsets,而每一個batch處理完成後,會同步offsets到zookeeper中。
refer:http://blog.csdn.net/zhong_han_jun/article/details/50814038
Spark入門實戰系列--7.Spark Streaming(上)--實時流計算Spark Streaming原理介紹
http://blog.selfup.cn/1665.html#comments