許多應用須要即時處理收到的數據,例如用來實時追蹤頁面訪問統計的應用、訓練機器學習模型的應用,還有自動檢測異常的應用。Spark Streaming 是 Spark 爲這些應用而設計的模型。它容許用戶使用一套和批處理很是接近的 API 來編寫流式計算應用,這樣就能夠大量重用批處理應用的技術甚至代碼。數據庫
和 Spark 基於 RDD 的概念很類似,Spark Streaming 使用離散化流(discretized stream)做爲抽象表示,叫做 DStream。DStream 是隨時間推移而收到的數據的序列。在內部,每一個時間區間收到的數據都做爲 RDD 存在,而 DStream 是由這些 RDD 所組成的序列(所以得名「離散化」)。DStream 能夠從各類輸入源建立,好比 Flume、Kafka 或者 HDFS。建立出來的 DStream 支持兩種操做,一種是轉化操做(transformation),會生成一個新的DStream,另外一種是輸出操做(output operation),能夠把數據寫入外部系統中。DStream提供了許多與 RDD 所支持的操做相相似的操做支持,還增長了與時間相關的新操做,好比滑動窗口。apache
和批處理程序不一樣,Spark Streaming 應用須要進行額外配置來保證 24/7 不間斷工做。Spark Streaming 的檢查點(checkpointing)機制,也就是把數據存儲到可靠文件系統(好比 HDFS)上的機制,這也是 Spark Streaming 用來實現不間斷工做的主要方式。服務器
咱們會從一臺服務器的 9999 端口上實時輸入數據,並在控制檯打印出來。網絡
首先,你得有一個nc軟件,由於我是在window下運行程序的,可是在Linux系統裏面就不須要,Linux裏面有內置的nc命令。架構
nc軟件的用法:less
開一個命令行窗口(這裏要切換到nc軟件的路徑下): 服務端:nc –lp 9999 //客戶端:nc localhost 9999
nc軟件啓動成功的界面: 機器學習
而後就是一個簡單的Spark Streaming的代碼:socket
import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds object Test { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test").setMaster("local[4]") // 從SparkConf建立StreamingContex並指定4秒鐘的批處理大小 // 用來指定多長時間處理一次新數據的批次間隔(batch interval)做爲輸入 val ssc = new StreamingContext(conf,Seconds(4)) // 鏈接到本地機器9999端口 val lines = ssc.socketTextStream("localhost", 9999) lines.print() // 啓動流式計算環境StreamingContext並等待它"完成" ssc.start() // 等待做業完成 ssc.awaitTermination() } }
鏈接成功的界面:ide
而後我在剛纔的界面輸入"Hello world",而後就會在控制檯界面打印出來。函數
Spark Streaming 使用「微批次」的架構,把流式計算看成一系列連續的小規模批處理來對待。Spark Streaming 從各類輸入源中讀取數據,並把數據分組爲小的批次。新的批次按均勻的時間間隔建立出來。在每一個時間區間開始的時候,一個新的批次就建立出來,在該區間內收到的數據都會被添加到這個批次中。在時間區間結束時,批次中止增加。時間區間的大小是由批次間隔這個參數決定的。批次間隔通常設在 500 毫秒到幾秒之間,由應用開發者配置。每一個輸入批次都造成一個 RDD,以 Spark 做業的方式處理並生成其餘的 RDD。處理的結果能夠以批處理的方式傳給外部系統。
Spark Streaming 對 DStream 提供的容錯性與 Spark 爲 RDD 所提供的容錯性一致:只要輸入數據還在,它就可使用 RDD 譜系重算出任意狀態(好比從新執行處理輸入數據的操做)。默認狀況下,收到的數據分別存在於兩個節點上,這樣 Spark 能夠容忍一個工做節點的故障。不過,若是隻用譜系圖來恢復的話,重算有可能會花很長時間,由於須要處理從程序啓動以來的全部數據。所以,Spark Streaming 也提供了檢查點機制,能夠把狀態階段性地存儲到可靠文件系統中(例如 HDFS 或者 S3)。通常來講,你須要每處理 5-10 個批次的數據就保存一次。在恢復數據時,Spark Streaming 只須要回溯到上一個檢查點便可。
若是流計算應用中的驅動器程序崩潰了,還能夠重啓驅動器程序並讓驅動器程序從檢查點恢復,這樣 Spark Streaming 就能夠讀取以前運行的程序處理數據的進度,並從那裏繼續。
ssc.checkpoint("hdfs://...")
DStream 的轉化操做能夠分爲無狀態(stateless)和有狀態(stateful)兩種。
• 在無狀態轉化操做中,每一個批次的處理不依賴於以前批次的數據。常見的RDD轉化操做,例如 map() 、 filter() 、 reduceByKey() 等,都是無狀態轉化操做,無狀態轉化操做是分別應用到每一個 RDD 上的。
• 相對地,有狀態轉化操做須要使用以前批次的數據或者是中間結果來計算當前批次的數據。有狀態轉化操做包括基於滑動窗口的轉化操做和追蹤狀態變化的轉化操做。
DStream 的有狀態轉化操做是跨時間區間跟蹤數據的操做;也就是說,一些先前批次的數據也被用來在新的批次中計算結果。主要的兩種類型是滑動窗口和 updateStateByKey() ,前者以一個時間階段爲滑動窗口進行操做,後者則用來跟蹤每一個鍵的狀態變化(例如構建一個表明用戶會話的對象)。有狀態轉化操做須要在你的 StreamingContext 中打開檢查點機制來確保容錯性。
全部基於窗口的操做都須要兩個參數,分別爲窗口時長以及滑動步長,二者都必須是StreamContext 的批次間隔的整數倍。窗口時長控制每次計算最近的多少個批次的數據,其實就是最近的 windowDuration/batchInterval 個批次。若是有一個以 10 秒爲批次間隔的源DStream,要建立一個最近 30 秒的時間窗口(即最近 3 個批次),就應當把 windowDuration設爲 30 秒。而滑動步長的默認值與批次間隔相等,用來控制對新的 DStream 進行計算的間隔。若是源 DStream 批次間隔爲 10 秒,而且咱們只但願每兩個批次計算一次窗口結果,就應該把滑動步長設置爲 20 秒。
對 DStream 能夠用的最簡單窗口操做是 window() ,它返回一個新的 DStream 來表示所請求的窗口操做的結果數據。換句話說, window() 生成的 DStream 中的每一個 RDD 會包含多個批次中的數據,能夠對這些數據進行 count() 、 transform() 等操做。
lines.window(windowDuration, slideDuration) lines.reduceByWindow(reduceFunc, windowDuration, slideDuration)
有時,咱們須要在 DStream 中跨批次維護狀態(例如跟蹤用戶訪問網站的會話)。針對這種狀況, updateStateByKey() 爲咱們提供了對一個狀態變量的訪問,用於鍵值對形式的DStream。給定一個由(鍵,事件)對構成的 DStream,並傳遞一個指定如何根據新的事件更新每一個鍵對應狀態的函數,它能夠構建出一個新的 DStream,其內部數據爲(鍵,狀態)對。例如,在網絡服務器日誌中,事件多是對網站的訪問,此時鍵是用戶的 ID。使用updateStateByKey() 能夠跟蹤每一個用戶最近訪問的 10 個頁面。這個列表就是「狀態」對象,咱們會在每一個事件到來時更新這個狀態。
輸出操做指定了對流數據經轉化操做獲得的數據所要執行的操做(例如把結果推入外部數據庫或輸出到屏幕上)。與 RDD 中的惰性求值相似,若是一個 DStream 及其派生出的 DStream都沒有被執行輸出操做,那麼這些 DStream 就都不會被求值。若是StreamingContext 中沒有設定輸出操做,整個 context 就都不會啓動。
在 Scala 中將 DStream 保存爲文本文件
ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")
由於 Spark 支持從任意 Hadoop 兼容的文件系統中讀取數據,因此 Spark Streaming 也就支持從任意 Hadoop 兼容的文件系統目錄中的文件建立數據流。
val line = ssc.textFileStream("directory")
這篇博文主要來自《Spark快速大數據分析》這本書裏面的第十章,內容有刪減,還有本書的一些代碼的實驗結果。