DStream編程數據模型java
DStream(Discretized Stream)做爲Spark Streaming的基礎抽象,它表明持續性的數據流。算法
這些數據流既能夠經過外部輸入源賴獲取,也能夠經過現有的Dstream的transformation操做來得到。shell
在內部實現上,DStream由一組時間序列上連續的RDD來表示。每一個RDD都包含了本身特定時間間隔內的數據流。數據庫
對DStream中數據的各類操做也是映射到內部的RDD上來進行的apache
對Dtream的操做能夠經過RDD的transformation生成新的DStream。編程
咱們把RDD加上一個時間屬性來區分。網絡
咱們能夠把DStream看成一連串用時間分段的RDD來看待,而且這串是RDD像流水同樣源源不絕的。機器學習
當咱們對DStream採起一些操做的時候,其中每段時間的RDD之間相互對應轉化成新的DStream. socket
SparkStreaming的基本步驟ide
1.經過建立輸入DStream來定義輸入源
2.經過對DStream應用轉換操做和輸出操做來定義流計算,用戶本身定義處理邏輯
3.經過streamingContext.start()來開始接收數據和處理流程
4.經過streamingContext.awaitTermination()方法來等待處理結果
5.經過streamingContext.stop()來手動結束流計算流程
具體步驟
1.建立StreamingContext對象
(1)經過 new StreamingContext(SparkConf,Interval)創建
建立StreamingContext對象所需的參數有兩個一個是 SparkConf 配置參數,一個是時間參數。
與SparkContext基本一致,SparkConf 配置參數須要指明Master,任務名稱(如NetworkWordCount)。
時間參數咱們若是以秒來定義的話格式爲Seconds(n),這個參數定義了Spark Streaming須要指定處理數據的時間間隔,
時間參數須要根據用戶的需求和集羣的處理能力進行適當的設置。
例如
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1))
這裏的那麼Spark Streaming會以1s爲時間窗口進行數據處理。
(2)經過 new StreamingContext(SparkContext,Interval)創建
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1))
這種方式通常用於spark-shell中創建,
spark-shell中給咱們定義好了sc,可是spark-shell並無爲咱們創建好ssc
因此咱們須要本身創建ssc
在創建ssc 以前咱們須要導入 import org.apache.spark.streaming._
在編碼以前咱們須要設置一下日誌等級,以便咱們以後的程序調試。
要麼日誌會把全部東西都顯示出來,你根本找不到哪條是錯誤信息。
//設置日誌等級的單例對象 import org.apache.log4j.{Logger,Level} import org.apache.spark.internal.Logging object StreamingLoggingExample extends Logging{ def setStreamingLogLevels(): Unit ={ val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if(!log4jInitialized) logInfo("Setting log level to [WARN] for streaming example" + "To override add a custom log4j.properties to the classpath" ) Logger.getRootLogger.setLevel(Level.WARN) } } //使用單例對象修改日誌等級 StreamingLoggingExample.setsetStreamingLogLevels() //注意在編碼以前設置
2.建立InputDStream
咱們經過設置InputDStream來設置數據的來源
Spark Streaming支持的數據源有文件流、套接字流、RDD隊列流、Kafka、 Flume、HDFS/S三、Kinesis和Twitter等數據源。
(1)文件流
val lines = ssc.textFileStream("file:///")
文件流的加載的是系統中的文件,能夠是HDFS中的也能夠是本地的,跟建立RDD是同樣的。
(2)套接字流
val lines = ssc.socketTextStream("hostname", port.toInt)
(3)RDD隊列流
建立RDD隊列
val rddQueue = new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()//創建一個整型RDD的隊列流,初始化爲空
建立RDD隊列流的spark流進行監聽
val lines = ssc.queueStream(rddQueue)
rdd隊列流中添加數據
for(i <- 1 to 100){ rddQueue += ssc.sparkContext.makeRDD(1 to 100,2) //添加數據到RDD隊列 }
(4)Kafka
3.操做DStream
對於從數據源獲得的DStream,用戶能夠在其基礎上進行各類操做。
與RDD相似,DStream也提供了本身的一系列操做方法,這些操做能夠分紅三類:普通的轉換操做、窗口轉換操做和輸出操做。
(1)普通的轉換操做
轉換 |
描述 |
map(func) |
源 DStream的每一個元素經過函數func返回一個新的DStream。 |
flatMap(func) |
相似與map操做,不一樣的是每一個輸入元素能夠被映射出0或者更多的輸出元素。 |
filter(func) |
在源DSTREAM上選擇Func函數返回僅爲true的元素,最終返回一個新的DSTREAM 。 |
repartition(numPartitions) |
經過輸入的參數numPartitions的值來改變DStream的分區大小。 |
union(otherStream) |
返回一個包含源DStream與其餘 DStream的元素合併後的新DSTREAM。 |
count() |
對源DStream內部的所含有的RDD的元素數量進行計數,返回一個內部的RDD只包含一個元素的DStreaam。 |
reduce(func) |
使用函數func(有兩個參數並返回一個結果)將源DStream 中每一個RDD的元素進行聚 合操做,返回一個內部所包含的RDD只有一個元素的新DStream。 |
countByValue() |
計算DStream中每一個RDD內的元素出現的頻次並返回新的DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素出現的頻次。 |
reduceByKey(func, [numTasks]) |
當一個類型爲(K,V)鍵值對的DStream被調用的時候,返回類型爲類型爲(K,V)鍵值對的新 DStream,其中每一個鍵的值V都是使用聚合函數func彙總。注意:默認狀況下,使用 Spark的默認並行度提交任務(本地模式下並行度爲2,集羣模式下位8),能夠經過配置numTasks設置不一樣的並行任務數。 |
join(otherStream, [numTasks]) |
當被調用類型分別爲(K,V)和(K,W)鍵值對的2個DStream 時,返回類型爲(K,(V,W))鍵值對的一個新 DSTREAM。 |
cogroup(otherStream, [numTasks]) |
當被調用的兩個DStream分別含有(K, V) 和(K, W)鍵值對時,返回一個(K, Seq[V], Seq[W])類型的新的DStream。 |
transform(func) |
經過對源DStream的每RDD應用RDD-to-RDD函數返回一個新的DStream,這能夠用來在DStream作任意RDD操做。 |
updateStateByKey(func) |
返回一個新狀態的DStream,其中每一個鍵的狀態是根據鍵的前一個狀態和鍵的新值應用給定函數func後的更新。這個方法能夠被用來維持每一個鍵的任何狀態數據。 |
注意:
transform(func)
該transform操做(轉換操做)連同其其相似的 transformWith操做容許DStream 上應用任意RDD-to-RDD函數。
它能夠被應用於未在 DStream API 中暴露任何的RDD操做。
例如,在每批次的數據流與另外一數據集的鏈接功能不直接暴露在DStream API 中,但能夠輕鬆地使用transform操做來作到這一點,這使得DStream的功能很是強大。
例如,你能夠經過鏈接預先計算的垃圾郵件信息的輸入數據流(可能也有Spark生成的),而後基於此作實時數據清理的篩選,以下面官方提供的僞代碼所示。
事實上,也能夠在transform方法中使用機器學習和圖形計算的算法。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... }
updateStateByKey操做
咱們使用的通常操做都是不記錄歷史數據的,也就說只記錄當前定義時間段內的數據,跟先後時間段無關。
若是咱們想統計歷史時間內的總共數據而且實時更新呢?
該 updateStateByKey 操做可讓你保持任意狀態,同時不斷有新的信息進行更新。要使用此功能,必須進行兩個步驟 :
(1) 定義狀態 - 狀態能夠是任意的數據類型。
(2) 定義狀態更新函數 - 用一個函數指定如何使用先前的狀態和從輸入流中獲取的新值 更新狀態。
對DStream經過updateStateByKey(updateFunction)來實現實時更新。
更新函數有兩個參數 1.newValues 是當前新進入的數據 2.runningCount 是歷史數據,被封裝到了Option中。
爲何歷史數據要封裝到Option中呢?有可能咱們沒有歷史數據,這個時候就能夠用None,有數據能夠用Some(x)。
固然咱們的當前結果也要封裝到Some()中,以便做爲以後的歷史數據。
咱們並不用關心新進入的數據和歷史數據,系統會自動幫咱們產生和維護,咱們只須要專心寫處理方法就行。
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {//定義的更新函數 val newCount = ... // add the new values with the previous running count to get the new count Some(newCount) } val runningCounts = pairs.updateStateByKey[Int](updateFunction)//應用
示例:
(1)首先咱們須要瞭解數據的類型
(2)編寫處理方法
(3)封裝結果
//定義更新函數 //咱們這裏使用的Int類型的數據,由於要作統計個數 def updateFunc(newValues : Seq[Int],state :Option[Int]) :Some[Int] = { //傳入的newVaules將當前的時間段的數據所有保存到Seq中 //調用foldLeft(0)(_+_) 從0位置開始累加到結束 val currentCount = newValues.foldLeft(0)(_+_) //獲取歷史值,沒有歷史數據時爲None,有數據的時候爲Some //getOrElse(x)方法,若是獲取值爲None則用x代替 val previousCount = state.getOrElse(0) //計算結果,封裝成Some返回 Some(currentCount+previousCount) } //使用 val stateDStream = DStream.updateStateByKey[Int](updateFunc)
(2)窗口轉換函數
Spark Streaming 還提供了窗口的計算,它容許你經過滑動窗口對數據進行轉換,窗口轉換操做以下:
轉換 |
描述 |
window(windowLength, slideInterval) |
返回一個基於源DStream的窗口批次計算後獲得新的DStream。 |
countByWindow(windowLength,slideInterval) |
返回基於滑動窗口的DStream中的元素的數量。 |
reduceByWindow(func, windowLength,slideInterval) |
基於滑動窗口對源DStream中的元素進行聚合操做,獲得一個新的DStream。 |
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks]) |
基於滑動窗口對(K,V)鍵值對類型的DStream中的值按K使用聚合函數func進行聚合操做,獲得一個新的DStream。 |
reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks]) |
一個更高效的reduceByKkeyAndWindow()的實現版本,先對滑動窗口中新的時間間隔內數據增量聚合並移去最先的與新增數據量的時間間隔內的數據統計量。例如,計算t+4秒這個時刻過去5秒窗口的WordCount,那麼咱們能夠將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量,這種方法能夠複用中間三秒的統計量,提升統計的效率。 |
countByValueAndWindow(windowLength,slideInterval, [numTasks]) |
基於滑動窗口計算源DStream中每一個RDD內每一個元素出現的頻次並返回DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素頻次。與countByValue同樣,reduce任務的數量能夠經過一個可選參數進行配置。 |
在Spark Streaming中,數據處理是按批進行的,而數據採集是逐條進行的。
所以在Spark Streaming中會先設置好批處理間隔(batch duration),當超過批處理間隔的時候就會把採集到的數據彙總起來成爲一批數據交給系統去處理。
對於窗口操做而言,在其窗口內部會有N個批處理數據,批處理數據的大小由窗口間隔(window duration)決定
而窗口間隔指的就是窗口的持續時間,在窗口操做中,只有窗口的長度知足了纔會觸發批數據的處理。
除了窗口的長度,窗口操做還有另外一個重要的參數就是滑動間隔(slide duration)
它指的是通過多長時間窗口滑動一次造成新的窗口,滑動窗口默認狀況下和批次間隔的相同,而窗口間隔通常設置的要比它們兩個大。
在這裏必須注意的一點是滑動間隔和窗口間隔的大小必定得設置爲批處理間隔的整數倍。
如圖所示,批處理間隔是1個時間單位,窗口間隔是3個時間單位,滑動間隔是2個時間單位。
對於初始的窗口time 1-time 3,只有窗口間隔知足了定義的長度也就是3才觸發數據的處理,不夠3繼續等待。
當間隔知足3以後進行計算後而後進行窗口滑動,滑動2個單位,會有新的數據流入窗口。
而後重複等待知足窗口間隔執行計算。
(3)輸出操做
Spark Streaming容許DStream的數據被輸出到外部系統,如數據庫或文件系統。
因爲輸出操做實際上使transformation操做後的數據能夠經過外部系統被使用,同時輸出操做觸發全部DStream的transformation操做的實際執行(相似於RDD操做)。
如下表列出了目前主要的輸出操做:
轉換 |
描述 |
print() |
在Driver中打印出DStream中數據的前10個元素。 |
saveAsTextFiles(prefix, [suffix]) |
將DStream中的內容以文本的形式保存爲文本文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsObjectFiles(prefix, [suffix]) |
將DStream中的內容按對象序列化而且以SequenceFile的格式保存。其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
saveAsHadoopFiles(prefix, [suffix]) |
將DStream中的內容以文本的形式保存爲Hadoop文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。 |
foreachRDD(func) |
最基本的輸出操做,將func函數應用於DStream中的RDD上,這個操做會輸出數據到外部系統,好比保存RDD到文件或者網絡數據庫等。須要注意的是func函數是在運行該streaming應用的Driver進程裏執行的。 |
一樣DStream也支持持久化
與RDD同樣,DStream一樣也能經過persist()方法將數據流存放在內存中,
4.啓動Spark Streaming
經過streamingContext.start()來開始接收數據和處理流程
經過streamingContext.awaitTermination()方法來等待處理結果
經過streamingContext.stop()來手動結束流計算流程
示例
package SparkDemo import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamWordCount { def main(args:Array[String]): Unit ={ //建立StreamingContext val conf = new SparkConf().setMaster("local[*]").setAppName("StreamTest") val ssc = new StreamingContext(conf,Seconds(20)) //創建文件流數據源通道 val lines = ssc.textFileStream("file:///") lines.cache()//持久化 //處理,word count val words = lines.flatMap(_.split(" ")) val wordPair = words.map((_,1)) val count = wordPair.reduceByKey(_+_) count.print() //啓動StreamingContext ssc.start() ssc.awaitTermination() } }
而後咱們將程序打包提交到spark集羣中運行
當程序運行ssc.start()後,就開始自動循環進入監聽狀態,屏幕上會顯示
這是正確的,若是咱們在創建ssc的文件中再添加一個文件file3.txt
就能夠在監聽窗口中顯示詞頻的統計了。
最後咱們能夠經過ssc.stop()中止程序,不過注意咱們不能省略這裏的圓括號。