Spark Streaming是一套優秀的實時計算框架。其良好的可擴展性、高吞吐量以及容錯機制可以知足咱們不少的場景應用。本篇文單在《Spark Streaming場景應用-Kafka數據讀取方式》基礎上,結合咱們的應用場景,介結咱們在使用Spark Streaming方面的技術架構,並着重講解Spark Streaming兩種計算模型,無狀態和狀態計算模型以及該兩種模型的注意事項;接着介紹了Spark Streaming在監控方面所作的一些事情,最後總結了Spark Streaming的優缺點。數據庫
數據是很是寶貴的資源,對各級企事業單均有很是高的價值。可是數據的爆炸,致使原先單機的數據處理已經沒法知足業務的場景需求。所以在此基礎上出現了一些優秀的分佈式計算框架,諸如Hadoop、Spark等。離線分佈式處理框架雖然可以處理很是大量的數據,可是其遲滯性很難知足一些特定的需求場景,好比push反饋、實時推薦、實時用戶行爲等。爲了知足這些場景,使數據處理可以達到實時的響應和反饋,又隨之出現了實時計算框架。目前的實時處理框架有Apache Storm、Apache Flink以及Spark Streaming等。其中Spark Streaming因爲其自己的擴展性、高吞吐量以及容錯能力等特性,而且可以和離線各類框架有效結合起來,於是是當下是比較受歡迎的一種流式處理框架。編程
根據其官方文檔介紹,Spark Streaming 有高擴展性、高吞吐量和容錯能力強的特色。Spark Streaming 支持的數據輸入源不少,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後能夠用 Spark 的高度抽象原語如:map、reduce、join、window 等進行運算。而結果也能保存在不少地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。其架構見下圖:
設計模式
Spark Streaming 其優秀的特色給咱們帶來不少的應用場景,如網站監控和網絡監控、異常監測、網頁點擊、用戶行爲、用戶遷移等。本文中,將爲你們詳細介紹,咱們的應用場景中,Spark Streaming的技術架構、兩種狀態模型以及Spark Streaming監控等。網絡
在 Spark Streaming 中,處理數據的單位是一批而不是單條,而數據採集倒是逐條進行的,所以 Spark Streaming 系統須要設置間隔使得數據彙總到必定的量後再一併操做,這個間隔就是批處理間隔。批處理間隔是 Spark Streaming 的核心概念和關鍵參數,它決定了 Spark Streaming 提交做業的頻率和數據處理的延遲,同時也影響着數據處理的吞吐量和性能。 架構
目前咱們Spark Streaming的業務應用場景包括異常監測、網頁點擊、用戶行爲以及用戶地圖遷徙等場景。按計算模型來看大致可分爲無狀態的計算模型以及狀態計算模型兩種。在實際的應用場景中,咱們採用Kafka做爲實時輸入源,Spark Streaming做爲計算引擎處理完數據以後,再持久化到存儲中,包括MySQL、HDFS、ElasticSearch以及MongoDB等;同時Spark Streaming 數據清洗後也會寫入Kafka,而後經由Flume持久化到HDFS;接着基於持久化的內容作一些UI的展示。架構見下圖:app
無狀態模型只關注當前新生成的DStream數據,因此的計算邏輯均基於該批次的數據進行處理。無狀態模型可以很好地適應一些應用場景,好比網站點擊實時排行榜、指定batch時間段的用戶訪問以及點擊狀況等。該模型因爲沒有狀態,並不須要考慮有狀態的狀況,只須要根據業務場景保證數據不丟就行。此種狀況通常採用Direct方式讀取Kafka數據,並採用監聽器方式持久化Offsets便可。具體流程以下: 框架
其上模型框架包含如下幾個處理步驟:機器學習
受網絡、集羣等一些因素的影響,實時程序出現長時失敗,致使數據出現堆積。此種狀況下是丟掉堆積的數據從Kafka largest處消費仍是從以前的Kafka offsets處消費,這個取決具體的業務場景。異步
有狀態模型是指DStreams在指定的時間範圍內有依賴關係,具體的時間範圍由業務場景來指定,能夠是2個及以上的多個batch time RDD組成。Spark Streaming提供了updateStateByKey方法來知足此類的業務場景。因涉及狀態的問題,因此在實際的計算過程當中須要保存計算的狀態,Spark Streaming中經過checkpoint來保存計算的元數據以及計算的進度。該狀態模型的應用場景有網站具體模塊的累計訪問統計、最近N batch time 的網站訪問狀況以及app新增累計統計等等。具體流程以下: 分佈式
上述流程中,每batch time計算時,須要依賴最近2個batch time內的數據,通過轉換及相關統計,最終持久化到MySQL中去。不過爲了確保每一個計算僅計算2個batch time內的數據,須要維護數據的狀態,清除過時的數據。咱們先來看下updateStateByKey的實現,其代碼以下:
def updateStateByKey[S: ClassTag]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean ): DStream[(K, S)] = ssc.withScope { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None) }
def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner, initialRDD: RDD[(K, S)] ): DStream[(K, S)] = ssc.withScope { val cleanedUpdateF = sparkContext.clean(updateFunc) val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s))) } updateStateByKey(newUpdateFunc, partitioner, true, initialRDD) }
以上兩種方法分別給咱們提供清理過時數據的思路:
同Spark同樣,Spark Streaming也提供了Jobs、Stages、Storage、Enviorment、Executors以及Streaming的監控,其中Streaming監控頁的內容以下圖:
上圖是Spark UI中提供一些數據監控,包括實時輸入數據、Scheduling Delay、處理時間以及總延遲的相關監控數據的趨勢展示。另外除了提供上述數據監控外,Spark UI還提供了Active Batches以及Completed Batches相關信息。Active Batches包含當前正在處理的batch信息以及堆積的batch相關信息,而Completed Batches剛提供每一個batch處理的明細數據,具體包括batch time、input size、scheduling delay、processing Time、Total Delay等,具體信息見下圖:
Spark Streaming可以提供如此優雅的數據監控,是因在對監聽器設計模式的使用。如若Spark UI沒法知足你所需的監控須要,用戶能夠定製個性化監控信息。Spark Streaming提供了StreamingListener特質,經過繼承此方法,就能夠定製所需的監控,其代碼以下:
@DeveloperApi trait StreamingListener { /** Called when a receiver has been started */ def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } /** Called when a receiver has reported an error */ def onReceiverError(receiverError: StreamingListenerReceiverError) { } /** Called when a receiver has been stopped */ def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { } /** Called when a batch of jobs has been submitted for processing. */ def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } /** Called when processing of a batch of jobs has started. */ def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } /** Called when processing of a batch of jobs has completed. */ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } /** Called when processing of a job of a batch has started. */ def onOutputOperationStarted( outputOperationStarted: StreamingListenerOutputOperationStarted) { } /** Called when processing of a job of a batch has completed. */ def onOutputOperationCompleted( outputOperationCompleted: StreamingListenerOutputOperationCompleted) { } }
目前,咱們保存Offsets時,採用繼承StreamingListener方式,此是一種應用場景。固然也能夠監控實時計算程序的堆積狀況,並在達到一閾值後發送報警郵件。具體監聽器的定製還得依據應用場景而定。
Spark Streaming並不是是Storm那樣,其並不是是真正的流式處理框架,而是一次處理一批次數據。也正是這種方式,可以較好地集成Spark 其餘計算模塊,包括MLlib(機器學習)、Graphx以及Spark SQL。這給實時計算帶來很大的便利,與此帶來便利的同時,也犧牲做爲流式的實時性等性能。
本篇文章主要介紹了Spark Streaming在實際應用場景中的兩種計算模型,包括無狀態模型以及狀態模型;而且重點關注了下Spark Streaming在監控方面所做的努力。首先本文介紹了Spark Streaming應用場景以及在咱們的實際應用中所採起的技術架構。在此基礎上,引入無狀態計算模型以及有狀態模型兩種計算模型;接着經過監聽器模式介紹Spark UI相關監控信息等;最後對Spark Streaming的優缺點進行歸納。但願本篇文章可以給各位帶來幫助,後續咱們會介紹Spark Streaming在場景應用中咱們所作的優化方面的努力,敬請期待!
徐勝國,大連理工大學碩士畢業,360大數據中心數據研發工程師,主要負責基於Spark Streaming的項目架構及研發工做。郵箱 : xshguo_better@yeah.net。若有問題,可郵件聯繫,歡迎交流。