數據的時效性node
平常工做中,咱們通常會先把數據儲存在一張表中,而後對這張表的數據進行加工、分析。既然數據要儲存在表中,就有時效性這個概念。
若是咱們處理的是年級別的數據,好比人口分析、宏觀經濟分析,那麼數據最新日期距今晚個一兩週、甚至一兩個月都沒什麼關係。
若是咱們處理的是天級別的數據,好比各大網站的用戶偏好分析、零售供銷分析,通常晚個幾天也是能夠的,即 T+N 更新。
若是是小時級別的數據,對時效性要求就更高了,好比金融風控,涉及到資金的安全,必須有一張小時級別的數據。
那麼還有沒有要求更高的?固然有了,好比風險監測,網站必須有實時監測系統,一旦有攻擊,就必須馬上採起措施,雙十一或者週年慶的時候,各大電商平臺都經歷着嚴峻的流量考驗,也必須對系統進行實時的監測。此外,網站的實時個性化推薦、搜索引擎中也對實時性有極高的要求。
在這種場景下,傳統的數據處理流程——先收集數據,而後放到DB中,再取出來分析——就沒法知足這麼高的實時要求。
流式計算,在實時或者準實時的場景下,應運而生。算法
(1)與批量計算那樣慢慢積累數據不一樣,流式計算將大量數據平攤到每一個時間點上,連續地進行小批量的進行傳輸,數據持續流動,計算完以後就丟棄。
(2) 批量計算是維護一張表,對錶進行實施各類計算邏輯。流式計算相反,是必須先定義好計算邏輯,提交到流式計算系統,這個計算做業邏輯在整個運行期間是不可更改的。
(3) 計算結果上,批量計算對所有數據進行計算後傳輸結果,流式計算是每次小批量計算後,結果能夠馬上投遞到在線系統,作到實時化展示。數據庫
(1) 流式計算流程
① 提交流計算做業。
② 等待流式數據觸發流計算做業。
③ 計算結果持續不斷對外寫出。編程
(2) 流式計算特色
① 實時、低延遲
② 無界,數據是不斷無終止的
③ 連續,計算持續進行,計算完以後數據即丟棄數組
Apache Storm 安全
在Storm中,先要設計一個用於實時計算的圖狀結構,咱們稱之爲拓撲(topology)。這個拓撲將會被提交給集羣,由集羣中的主控節點(master node)分發代碼,將任務分配給工做節點(worker node)執行。一個拓撲中包括spout和bolt兩種角色,其中spout發送消息,負責將數據流以tuple元組的形式發送出去;而bolt則負責轉換這些數據流,在bolt中能夠完成計算、過濾等操做,bolt自身也能夠隨機將數據發送給其餘bolt。由spout發射出的tuple是不可變數組,對應着固定的鍵值對。性能優化
Apache Flink 網絡
Flink 是一個針對流數據和批數據的分佈式處理引擎。它主要是由 Java 代碼實現。對 Flink 而言,其所要處理的主要場景就是流數據,批數據只是流數據的一個極限特例而已。再換句話說,Flink 會把全部任務當成流來處理,這也是其最大的特色。Flink 能夠支持本地的快速迭代,以及一些環形的迭代任務。而且 Flink 能夠定製化內存管理。在這點,若是要對比 Flink 和 Spark 的話,Flink 並無將內存徹底交給應用層。這也是爲何 Spark 相對於 Flink,更容易出現 OOM 的緣由(out of memory)。就框架自己與應用場景來講,Flink 更類似與 Storm。架構
Apache Spark Streaming 框架
Spark Streaming是核心Spark API的一個擴展,它並不會像Storm那樣一次一個地處理數據流,而是在處理前按時間間隔預先將其切分爲一段一段的批處理做業。Spark針對持續性數據流的抽象稱爲DStream(DiscretizedStream),一個DStream是一個微批處理(micro-batching)的RDD(彈性分佈式數據集);而RDD則是一種分佈式數據集,可以以兩種方式並行運做,分別是任意函數和滑動窗口數據的轉換。
Storm, Flink, Spark Streaming的對比圖
Storm, Flink, Spark Streaming的選擇
若是你想要的是一個容許增量計算的高速事件處理系統,Storm會是最佳選擇。
若是你必須有狀態的計算,剛好一次的遞送,而且不介意高延遲的話,那麼能夠考慮Spark Streaming,特別若是你還計劃圖形操做、機器學習或者訪問SQL的話,Apache Spark的stack容許你將一些library與數據流相結合(Spark SQL,Mllib,GraphX),它們會提供便捷的一體化編程模型。尤爲是數據流算法(例如:K均值流媒體)容許Spark實時決策的促進。
Flink支持增量迭代,具備對迭代自動優化的功能,在迭代式數據處理上,比Spark更突出,Flink基於每一個事件一行一行地流式處理,真正的流式計算,流式計算跟Storm性能差很少,支持毫秒級計算,而Spark則只能支持秒級計算。
Spark Streaming 是Spark 核心API的一個擴展,能夠實現高吞吐量的、具有容錯機制的實時流數據的處理。支持多種數據源獲取數據,包括Kafka、Flume、Zero MQ,Kinesis以及TCP Sockets,從數據源獲取數據以後,可使用諸如map、reduce、join和window等高級函數進行復雜算法的處理。最後還能夠將處理結果存儲到文件系統,數據庫和現場儀表盤。
在」One Stack rule them all」的基礎上,可使用Spark的其餘子框架,如集羣學習、圖計算等,對流數據進行處理。
Spark的各個子框架都是基於Spark Core的,Spark Streaming在內部的處理機制是,接收實時流的數據,並根據必定的時間間隔拆分紅一批批的數據,而後經過Spark Enging處理這些批數據,最終獲得處理後的一批批結果數據。 對應的批數據,在Spark內核對應一個RDD實例,所以,對應流數據的DStream能夠當作是一組RDDS,即RDD的一個序列。通俗點理解的話,在流數據分紅一批一批後,經過一個先進先出的隊列,而後Spark Enging從該隊列中依次取出一個個批數據,把批數據封裝成一個個RDD,而後進行處理,這是一個典型的生產者/消費者模型,對應的就有生產者消費者模型的問題,即如何協調生產速率和消費速率。
離散流(discretized stream)或DStream
這是SparkStraming對內部持續的實時數據流的抽象描述,即咱們處理的一個實時數據流,在Spark Streaming中對應於一個DStream實例。
批數據(batch data)
這是化整爲零的第一步,將實時流數據以時間片爲單位進行分批,將流處理轉化爲時間片數據的批處理。隨着持續時間的推移,這些處理結果就造成了對應的結果數據流了。
時間片或批處理時間間隔(batch interval)
這是人爲地對數據流進行定量的標準,以時間片做爲咱們拆分數據流的依據。一個時間片的數據對應一個RDD實例。
窗口長度(window length)
一個窗口覆蓋的流數據的時間長度。必須是批處理時間間隔的倍數。
滑動時間間隔
前一個窗口到後一個窗口所通過的時間長度。必須是批處理時間間隔的倍數。
Input DStream
一個input DStream是一個特殊的DStream,將Spark Streaming鏈接到一個外部數據源來讀取數據。
在Spark Streaming中,數據處理是按批進行的,而數據採集是逐條進行的,所以在Spark Streaming中會事先設置好批處理間隔(batch duration),當超過批處理間隔的時候就會把採集到的數據彙總起來稱爲一批數據交個系統區處理。
對於窗口操做而言,在其窗口內部會有N個批處理數據,批處理數據的大小由窗口間隔(window duration)決定,而窗口間隔指的就是窗口的持續時間,在窗口操做中,只有窗口的長度知足了纔會觸發批處理的處理。除了窗口的長度,窗口操做還有另外一個重要的參數就是滑動間隔(slide duration),它指的是通過多長時間窗口滑動一次造成新的窗口,滑動窗口默認狀況下和批次間隔的相同,而窗口間隔通常設置的要比它們兩個大。在這裏必須注意的一點是滑動間隔和窗口間隔的大小必定得設置爲批處理間隔的整數倍。
Spark Streaming是一個對實時數據流進行高通量、容錯處理的流式處理系統,能夠對多種數據源(如Kafka、Flume、Zero MQ和TCP套接字)進行相似Map、Reduce和Join等複雜操做,並將結果保存到外部文件系統、數據庫或應用到實時儀表盤。
計算流程
Spark Streaming是將流式計算分解成一系列短小的批處理做業。這裏的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分紅一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distrbute Dataset),而後將Spark Streaming中對DStream的Transformation操做變爲針對Spark中對RDD的Transformation操做,將RDD通過操做變成中間結果保存在內存中。整個流式計算根據業務的需求能夠對中間的結果進行疊加或者存儲到外部設備。
容錯性
對於流式計算來講,容錯性相當重要。首先咱們要明確一下Spark中RDD的容錯性機制。每個RDD都是一個不可變的分佈式可重算的數據集,其記錄着肯定性的操做繼承關係(lineage),因此只要輸入數據是可容錯的,那麼任意一個RDD的分區(Partition)出錯或不可用,都是能夠利用原始輸入數據經過轉換操做而從新算出的。
對於Spark Streaming來講,其RDD的傳承關係以下圖所示,圖中的每個橢圓形表示一個RDD,橢圓形中的每一個圓形表明一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每個Batch Size鎖產生的中間結果RDD。咱們能夠看到圖中的每個RDD都是經過lineage相鏈接的,因爲Spark Streaming輸入數據能夠來自磁盤,例如HDFS(多份拷貝)或是來自與網絡的數據流(Spark Streaming會將網絡輸入數據的每個數據流拷貝兩份到其餘的機器)都能保證容錯性,因此RDD中任意的Partition出錯,均可以並行地在其餘機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。
實時性
對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段數據的處理都會通過Spark DAG圖分解以及Spark的任務集的調度過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5 ~ 2秒之間(Stom目前最小的延遲在100ms左右),因此Spark Streaming可以知足除對實時性要求很是高(如高頻實時交易)以外的全部流式準實時計算場景。
擴展性與吞吐量
Spark目前在EC2上已經可以線性擴展到100個節點(每一個節點4Core),能夠以數秒的延遲處理6GB/s的數據量(60M records/s),其吞吐量也比流行的Storm高2~5倍,如下是Berkeley利用WordCount和Grep兩個用例所作的測試。
與RDD同樣,DStream一樣也能經過persist()方法將數據流存放在內存中,默認的持久化方法是MEMORY_ONLY_SER,也就是在內存中存放數據同時序列化的方式,這樣作的好處是遇到須要屢次迭代計算的程序時,速度優點十分的明顯。而對於一些基於窗口的操做,如reduceByWindow、reduceByKeyAndWindow,以及基於狀態的操做,如updateStateByKey,其默認的持久化策略就是保存在內存中。
對於來自網絡的數據源(Kafka、Flume、sockets等),默認的持久化策略是將數據保存在兩臺機器上,這也是爲了容錯性而設計的。
另外,對於窗口和有狀態的操做必須checkpont,經過StreamingContext的checkpoint來指定目錄,經過DStream的checkpoint指定間隔時間,間隔必須是滑動間隔(slide interval)的倍數。
1,優化運行時間
增長並行度
確保使用整個集羣的資源,而不是把任務集中在幾個特定的節點上。對於包含shuffle的操做,增長其並行度以確保更爲充分的使用集羣資源。
減小數據序列化,反序列化的負擔
Spark Streaming默認將接受到的數據序列化後存儲,以減小內存的使用。可是序列化和反序列化須要更多的CPU時間,所以更加高效的序列化方式和自定義的序列化接口以更高效的使用CPU。
設置合理的batch duration(批處理時間)
在Spark Streaming中,Job之間有可能存在依賴關係,後面的Job必須確保前面的做業執行結束後才能提交。若前面的Job執行的時間超出了批處理時間間隔,那麼後面的Job就沒法按時提交,這樣就會進一步拖延接下來的Job,形成後續Job的阻塞。所以設置一個合理的批處理間隔以確保做業可以在這個批處理間隔內結束是必須的。
2,優化內存使用
控制batch size(批處理間隔內的數據量)
Spark Streaming會把批處理間隔內接收到的全部數據存放在Spark內部的可用內存區域中,所以必須確保當前節點Spark的可用內存中至少能容納這個批處理時間間隔內的全部數據,不然必須增長新的資源以提升集羣的處理能力。
及時清理再也不使用的數據
前面講到Spark Streaming會將接受的數據應及時清理,以確保Spark Streaming有富餘的可用內存空間。經過設置合理的spark.cleaner.ttl時長來及時清理超時的無用數據,這個參數須要當心設置以避免後續操做中所須要的數據被超時錯誤處理。
觀察及適當調整GC策略 GC會影響Job的正常運行,可能延長Job的執行時間,引發一系列不可預料的問題。觀察GC的運行狀況,採用不一樣的GC策略以進一步減少內存回收對Job運行的影響。