1、實驗目的
經過本實驗對基於Spark Streaming流式計算框架有所全面瞭解,掌握對DStreams抽象的操做。html
2、實驗內容
實驗內容針對新浪股票數據接口,基於Spark Streaming實時接收並處理數據,並對特定的某一支股票走勢進行簡單預測。算法
3、實驗要求
以小組爲單元進行實驗,每小組5人,小組自協商一位組長,由組長安排和分配實驗任務,具體參加試驗內容中實驗過程。小組成員須要對流式計算有所瞭解,特別是對Spark Streaming流式計算框架有所瞭解和學習。 數據庫
4、準備知識
4.1 Spark Streaming流式計算框架apache
在Spark Streaming上實現DStream,它是基於Spark處理引擎的一個修改版本。Spark Streaming由三部分組成,如圖4.1所示編程
圖4.1 Spark Streaming組件windows
圖4.1顯示了Spark Streaming在Spark Core上所作的修改:服務器
(1)master跟蹤DStream lineage,並調度任務來計算新的RDD分區。網絡
(2)工做節點接收數據,保存輸入分區和已計算的RDD,並執行任務。數據結構
(3)客戶端用於發送數據給系統。架構
如圖4.1所示,如圖中所示,Spark Streaming 重用了 Spark 的許多組件,但仍然須要修改和添加多個組件來支持流處理。
從架構角度來看,Spark Streaming 和傳統的流系統之間區別在於,Spark Streaming 將計算過程分解爲小的、無狀態的、肯定的任務。每一個任務均可以在集羣中的任何節點或同時在多個節點運行。在傳統系統的固定拓撲結構中,將部分計算過程轉移到另外一臺機器是一個很大的動做。Spark Streaming 的作法,能夠很是直接地在集羣上進行負載均衡,應對故障或啓動慢節點恢復。同理也能用於批處理系統——如 MapReduce。然而,因爲 RDD 運行於內存中,Spark
Streaming 的任務執行時間會短得多,通常只有 50-200 毫秒。
不一樣於之前系統將狀態存儲在長時間運行的處理過程當中,Spark Streaming 中的全部狀態都以容錯數據結構(RDD)來保存。因爲 RDD 分區被肯定性地計算出來,它能夠駐留在任何節點上,甚至能夠在多個節點上進行計算。這套系統試圖最大限度地提升數據局部性,同時這種底層的靈活性使得推測執行和並行恢復成爲可能。
這些優點在批處理平臺(Spark)上運行時能夠很天然地得到。但依然須要進行顯著的修改來支持流處理。
應用程序執行:
Spark Streaming 的應用從一個或多個輸入流開始執行。系統加載數據流的方式,要麼是經過直接從客戶端接收記錄數據,要麼是經過週期性的從外部存儲系統中加載數據,如 HDFS,外部的存儲系統也能夠被日誌收集系統[9]所代替。在前一種方式下,因爲 D-Streams 須要輸入的數據被可靠地進行存儲來從新計算結果,所以咱們須要確保新的數據在向客戶端程序發送確認以前,在兩個工做節點間被複制。若是一個工做節點發生故障,客戶端程序向另外一個工做節點
發送未經確認的數據。
全部的數據在每個工做節點上被一個塊存儲進行管理,同時利用主服務器上的跟蹤器來讓各個節點找到數據塊的位置。因爲咱們的輸入數據塊和咱們從數據塊計算獲得的 RDD 的分區是不可變的,所以對塊存儲的跟蹤是相對簡單的:每個數據塊只是簡單的給定一個惟一 ID,並全部擁有這個 ID 的節點都可以對其進行操做(例如,若是多個節點同時計算它)。塊存儲將新的數據塊存儲在內存中,但會以 LRU 策略將這些數據塊丟棄, 這在後面會進行描述。
爲了肯定什麼時候開始一個新的時間週期,咱們假設各個節點經過 NTP 進行了時鐘同步,而且在每個週期結束時每個節點都會向主服務器報告它所接收到的數據塊 IDs。主服務器以後會啓動任務來計算這個週期內的輸出 RDDs, 不須要其餘任何同步。和其餘的批處理調度器同樣,一旦完成上個週期任務,它就簡單地開始每一個後續任務。
Spark Streaming 依賴於每個時間間隔內 Spark 現有的批處理調度器,並加入了像DryadLINQ系統中的大量優化:(1)它對一個單獨任務中的多個操做進行了管道式執行,如一個 map 操做後緊跟着另外一個 map操做。(2)它根據數據的本地性對各個任務進行調度。(3)它對 RDD 的各個劃分進行了控制,以免在網絡中數據的 shuffle。例如,在一個reduceByWindow 的操做中,每個週期內的任務須要從當前的週期內「增長」 新的部分結果(例如,每個頁面的點擊數),和「刪除」 多個週期之前的結果。調度器使用相同的方式對不一樣週期內的狀態 RDD 進行切分,以使在同一個節點的每個 key 的數據(例如,一個頁面) 在各時間分片間保持一致。
流處理優化:
儘管 Spark Streaming 創建在 Spark 之上,咱們仍然必須優化這個批處理引擎以使其支持流處理。這些優化包括如下幾個方面:
(1)網絡通訊:咱們重寫了 Spark 的數據層,經過使用異步 I/O 使得帶有遠程輸入的任務,好比說reduce 任務,可以更快地獲取它們。
(2)時間間隔流水線化:由於每個時間間隔內的任務均可能沒有充分地使用集羣的資源(好比說,在每個時間間隔的末端, 可能只有不多的幾個任務還在運行),因此,咱們修改了 Spark的調度器,使它容許在當前的時間間隔尚未結束的時候調用下一個時間間隔的任務。例如,考慮咱們在表 4.3 提到的 map + runningReduce 做業。咱們之因此可以在時間間隔 1 的 reduce 操做結束以前就能夠執行時間間隔 2 的 map 操做,這是由於每一步的 map操做都是獨立的。
(3)任務調度:咱們對 Spark 的任務調度器作了大量的優化,好比說手工調整控制消息的大小,使得每隔幾百毫秒就能夠啓動上百個任務的並行做業。
(4)存儲層:爲了支持 RDDs 的異步檢查點和性能提高,咱們重寫了 Spark 的存儲層。由於 RDDs 是不可變的,因此能夠在不阻塞計算和減慢做業的狀況下經過網絡對 RDDs 設置檢查點。在可能的狀況下,新的數據層還會使用零拷貝。
(5)lineage 截斷:由於在 D-Streams 中 RDDs 之間的 lineage 能夠無限增加,咱們修改了調度器使之在一個 RDD 被設置檢查點以後刪除本身的 lineage,修改以後 RDDs 之間的 lineage 不能任意生長。相似地,對於 Spark 中的其餘無限增加的數據結構來講,將會按期調用一個清理進程來清理它們。
(6)Master 的恢復:由於流應用須要不間斷運行 7 天 24 小時,咱們給 Spark 加入對 master 狀態恢復的支持。
針對流處理所作的優化還提升了 Spark 在批處理標準測試上的性能,大概是以前的 2 倍. Spark 的引擎可以同時應對流處理和批處理,這是其強大之處。
內存管理:
在當前的 Spark Streaming 實現中,每一個結點的塊存儲管理 RDD 的分片是以 LRU(最近最少使用)的方式,若是內存不夠會依 LRU 算法將數據調換到磁盤。另外,用戶能夠設置最大的超時時間,當達到這個時間以後系統會直接將舊的數據塊丟棄而不進行磁盤 I/O 操做(這個超時時間必須大於檢查點間隔的時間)。咱們發如今不少應用中,Spark Streaming 須要的內存並非不少,這是由於一個計算中的狀態一般比輸入數據少不少(不少應用是計算聚合統計),而且任何可靠的流式處理系統都須要像咱們這樣經過網絡來複制數據到多個結點。可是,Spark團隊仍是會探索優化內存使用的方式。
並行恢復:
DStreams的肯定性使得可使用兩種有效卻不適合常規流式系統的恢復技術來恢復工做節點狀態:並行恢復和推測執行。此外,它也簡化了主節點的恢復。
當集羣中一個Worker失敗,該節點上的RDD分片狀態、Running中的任務,DStreams都會在其餘Worker上從新並行計算。經過異步的複製RDD狀態到其它的工做節點,系統能夠週期性地設置RDDs狀態地檢查點。例如,在運行時統計頁面瀏覽數的程序中,系統可能對於該計算每分鐘選擇一個檢查點。而後,若是一個節點失敗了,系統會檢查全部丟失的RDD分片,而後啓動一個任務從上次的檢查點開始從新計算。多個任務能夠同時啓動去計算不一樣的分片,使得整個集羣參與恢復。DStream在每一個時間片中並行的計算RDDs的分區以及並行處理每一個時間片中相互獨立的操做(例如開始的map操做),由於能夠從lineage中細粒度地得到依賴關係。
在上行流備份中,單個閒置機器執行了全部的恢復,而後開始處理新的紀錄。在高負荷地系統中這須要很長時間才能跟上進度,這是由於在重建舊的狀態過程當中新的紀錄會持續到達。事實上,假設在失敗以前地工做量是,而後在恢復的每分鐘中備份節點只能作一分鐘地工做,可是會同時收到 分鐘的新任務。所以,要在的時間內從上次失敗節點中徹底恢復 個單元的任務,則能夠獲得:。
(4.1)
在其餘線路中,全部的機器參與恢復,同時也處理新的紀錄。假定在任務失敗以前分佈式集羣中有臺機器,剩餘的臺機器,如今每一個機器須要恢復個工做,同時接收數據的速率是。它們追趕到來的數據流時間知足
。
(4.2)
所以,擁有更多的節點,並行恢復可以跟上到來的數據流,這比上行留備份要快得多。
除了節點故障,在大型集羣中另外一個值得關注的問題是運行較慢的節點。幸運的是,DStreams一樣也可讓咱們像批處理系統那樣減小較慢節點的影響,這是經過推測性(speculative)地運行較慢任務地備份副本實現的。這種推測執行在連續的處理系統中可能很難實現,由於它須要啓動一個節點的新副本,填充新副本的狀態,並追遇上較慢的副本。事實上,流式處理中的複製算法,好比Flux和DPC,主要在於研究兩個副本之間的同步。
在Spark Streaming的實現中,使用了一個簡單的閾值來檢測較慢的節點:若是一個任務的運行時長比它所處的工做階段中的平均值高1.4倍以上,那麼就標記它爲慢節點。
7*24運行Spark Streaming的一個最終要求是可以容忍Spark master的故障[51]。Spark經過兩個步驟來作到這些,第一步是當開始每一個時序時可靠的記錄計算的狀態,第二步是當舊的master失敗時,讓計算節點鏈接到一個新的master而且報告它們的RDD分區。DStreams簡化恢復的一個關鍵方面是,若是一個給定的RDD被計算兩次是沒有問題的。由於操做是肯定的,這一結果與從故障中進行恢復相似。由於任務能夠從新計算,這意味着當master從新鏈接時丟掉一些運行中的任務也是能夠的。
Spark Sreaming目前的實現方式是將DStreams元數據存儲在HDFS,記錄(1)用戶的DStreams圖以及代表用戶代碼的Scala的函數對象,(2)最後的檢查點的時間,還有(3)自檢查點開始的RDD的ID號,其中檢查點經過在每一個時序進行重命名(原子操做)來更新HDFS文件。恢復後,新master會讀取這個文件找到它斷開的地方,並從新鏈接到計算節點,以便肯定哪些RDD分區是在內存中。而後再繼續處理每個漏掉的時序。雖然Spark Streaming尚未優化恢復處理,但它是至關快了,100個節點的集羣能夠在12秒內恢復。
4.2 DStreams API
由於DStreams是主要的執行策略(描述如何將一個計算分解成多個步驟),所以它們被用在流式系統中實現了多個標準的操做,好比滑動窗口和增量式處理,以簡單的對它們的執行批處理到各個小的時間間隔中。
在Spark Streaming中,用戶使用函數API來註冊一個或多個數據流。程序能夠將輸入數據流定義爲從外部系統中讀取數據,該系統經過從對節點端口監聽或週期性地從一個存儲系統(例如,HDFS)加載來獲取數據。它能夠適用於兩種類型對這些數據流的操做:轉換操做,從一個或多個父數據流建立一個新的DStreams。這些操做多是無狀態的,在這個時間週期內對RDD分別進行處理,或它們可能跨越週期來建立狀態。輸出操做,使得程序將數據寫入外部系統。例如,save操做將DStreams中的每個RDD輸出到數據庫。
DStreams支持在典型批處理框架中所擁有的無狀態的轉換操做,包括map,reduce,groupBy和join。咱們在Spark中提供了全部操做。例如,一個程序使用如下的代碼能夠在DStreams的每個時間週期內,運行一個規範的MapReduce WC程序實例。
pairs=words.map(w=>(w,1)) //構造鍵值對RDD,並統計數量
counts=pairs.reduceByKey((a,b)=>a+b) //聚合
此外,爲了支持跨越多個週期的計算,DStreams提供了多個有狀態的轉換操做,這些操做包括:窗口、增量式聚合、狀態跟蹤等。這些操做是在基於標準的數據流處理技術的基礎上例如滑動窗口。圖4.2 用於單一關聯和關聯+可逆版本的操做執行的reduceByWindow。這兩個版本爲每一個時間間隔只進行一次計數的計算,可是第二個版本的操做避免了對每個窗口進行從新求和。方框表示RDDs,箭頭表示用來計算窗口的操做[t,t+5)。
圖4.2 用於單一關聯和關聯+可逆版本的操做執行的reduceByWindow
(1)窗口操做:window操做將每個過去的時間週期的滑動窗口裏的全部記錄組合到一個RDD。例如,代碼words.window(「5s」),會產生一個包含週期內單詞的RDDs的DStream[0,5),[1,6),[2,7)等。
(2)增量式聚合操做:對於經常使用的聚合計算的用例,就像在一個滑動窗口上進行count或max操做,DStreams有增量reduceByWindow操做的幾個變種操做。最簡單的一個是僅僅用一個關聯的合併函數來對值進行合併。例如,在上述代碼中,用戶能夠寫:pairs.ReduceByWindow(「5s」,(a,b)=>a+b)。對於每個時間週期只對該週期的計數進行一次計算,但不得不反覆地對過去的5秒去添加計數,如圖4.2(a)所示。若是聚合函數也是可逆的,一個更加高效的版本還須要「減值和增量式維護狀態的一個函數(圖4.2(b)):paris.reduceByWindow(「5s」,
(a,b)=>a+b,(a,b)=>a-b))。
(3)狀態跟蹤:一般,應用程序爲了對錶示狀態變化的事件流進行響應,須要對各種對象進行狀態跟蹤。
在Spark中可使用批處理的操做來實現這些操做,經過將批處理操做應用到來自父數據流中不一樣時間的RDDs,例如,能夠由updateStateByKey構建的RDDs,經過對舊的狀態和每一個週期的新事件進行分組來實現。最後,用戶調用輸出操做符將Spark Streaming的結果發送到外部系統(例如,展現在dashboard上)。咱們提供了兩個這樣的操做:save操做,將Dstream中的每個RDD寫入到一個存儲系統(例如,HDFS或HBase),和foreachRDD,在每個RDD上執行一段用戶代碼段(任意的Spark代碼)。例如,用戶能夠用counts.foreachRDD(rdd=>print(rdd.top(K)))來打印top K的計數。
4.3 Window Operations
Spark Streaming還提供了窗口計算,容許您在數據的滑動窗口上應用轉換。下圖說明了這個滑動窗口。
圖4.3 Window Operations
如圖4.3所示,每當窗口滑過源DStream時,落在窗口內的源RDD被組合並進行操做以產生窗口DStream的RDD。在這種具體狀況下,操做應用於最近3個時間單位的數據,並以2個時間單位滑動。這代表任何窗口操做都須要指定兩個參數。
- windowLength窗口長度 - 窗口的持續時間(4.3圖中的3)
- slideInterval滑動間隔 - 執行窗口操做的間隔(4.3圖中的2)
這兩個參數必須是源DStream的批間隔的倍數(圖中的1)。
咱們以一個例子來講明窗口操做。假設但願經過在過去30秒的數據中每10秒產生一個字數來擴展早期的示例。爲此,必須在最近30秒的數據中對(word,1)對的對DStream應用reduceByKey操做。這是使用reduceByKeyAndWindow操做完成的。
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些常見的窗口操做以下。全部這些操做都採用上述兩個參數 - windowLength和slideInterval。
5、實驗步驟
5.1 數據源
實驗採用Sina股票數據接口。以字符串數據的形式範圍,簡單易用且直觀。
5.2 測試數據源
針對股票的數據接口,一下代碼提供簡單的測試,以解析返回的數據。
解析數據
伴生對象:
5.3 Spark Streaming編程
數據接口調試完畢,股票數據也解析好了,下面就開始Streaming。Spark Streaming必定會涉及數據源,且該數據源是一個主動推送的過程,即spark被動接受該數據源的數據進行分析。但Sina的接口是一個很簡單的HttpResponse,沒法主動推送數據,因此咱們須要實現一個Custom Receiver,可參考 http://spark.apache.org/docs/latest/streaming-custom-receivers.html
下面是具體的代碼,其實定製化一個Receiver簡單來講就是實現onStart/onStop。onStart用來初始化資源,給獲取數據作準備,獲取到的數據用store發送給SparkStreaming便可;onStop用來釋放資源
Receiver搞定以後就能夠開始編寫股票預測的main函數了,股票預測的方法之一,就是統計一段時間內股票上漲的次數,並展現上漲次數TopN的股票信息,但本文一切從簡,並無實現所有的功能,只是統計了股票上漲的次數,也就是對上漲與否進行WordCount。
5.4 運行結果分析
因爲ssc的時間間隔爲1,因此每秒都會查詢大同煤業的股票數據,這就是下面每一個Time打印的第一行數據(由於stockState先進行print,因此每次查詢的股票數據是第一行);又由於slide設置爲3,因此每隔3秒會進行reduceFunc計算,該函數處理windowsize個RDD(此處設置爲6),對這6個RDD按照時間前後順序進行reduce。
須要特別說明的是spark的reduce默認從左到右進行fold(摺疊),從最左邊取兩個數進行reduce計算產生臨時結果,再與後面的數據進行reduce,以此類推動行計算,其實就是foldLeft。
6、總結
實驗針對新浪股票數據接口,基於Spark Streaming實時接收並處理數據,並對特定的某一支股票走勢進行簡單預測。使學生可以對Spark Streaming流式計算框架有所全面的瞭解,並掌握DStreams抽象的操做。