在本文發出以後不久,老外就寫了一篇相似內容的。人家比我寫得好,推薦你們讀這篇
http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101....html
流式統計聽着挺容易的一個事情,說到底不就是數數嘛,每一個告警系統裏基本上都有一個簡單的流式統計模塊。可是當時基於storm作的時候,這幾個問題仍是困擾了我很長時間的。沒有用過spark streaming/flink,不知道下面這些問題在spark streaming/flink裏是否是都已經解決得很好了。git
作流式統計首要的問題是把一個時間窗口內的數據統計到一塊兒。問題是,什麼是時間窗口?有兩種選擇github
最簡單的時間窗口統計的是基於「牆上時間」的,每過1分鐘就切分出一個新窗口出來。好比statsd,它的窗口切分就是這樣的。這種基於「牆上時間」的統計有一個很是嚴重的問題是不能回放數據流。當數據流是實時產生的時候,「牆上時間」的一分鐘也就只會有一分鐘的event被產生出來。可是若是統計的數據流是基於歷史event的,那麼一分鐘能夠產生消費的event數量只受限於數據處理速度。另外event在分佈式採集的時候也遇到有快有慢的問題,一分鐘內產生的event未必能夠在一分鐘內精確到達統計端,這樣就會由於採集的延遲波動影響統計數據的準確性。實際上基於「牆上時間」統計須要redis
collection latency = wall clock - event timestamp
數據庫
基於「牆上時間」的統計須要採集延遲很是小,波動也很小才能夠工做良好。大部分時候更現實的選擇是須要基於「日誌時間」來進行窗口統計的。緩存
使用「日誌時間」就會引入數據亂序的問題,對於一個實時event stream流,其每一個event的timestamp未必是嚴格遞增的。這種亂序有兩種因素引入:網絡
咱們但願的流式統計是這樣的:框架
可是實際上數據只是基本有序的,也就是在時間窗口的邊緣會有一些event須要跨到另一個窗口去:異步
最簡單的分發event到時間窗口代碼是這樣的分佈式
window index = event timestamp / window size
對1分鐘的時間窗口 window size 就是60,timestamp除以60爲相同window index的event就是在同一個時間窗口的。問題的關鍵是,何時我能夠確信這個時間窗口內的event都已經到齊了。若是到齊了,就能夠開始統計出這個時間窗口內的指標了。而後忽然又有一個落後於大夥的event落到這個已經被計算過的時間窗口如何處理?
因此對於來晚了的數據就兩種策略:要麼再統計一條結果出來,要麼直接丟棄。要肯定何時一個時間窗口內的event已經到齊了,有幾種策略:
三種策略其實都是「等」,只是等的依據不一樣。實踐中,第二種策略也就是根據「日誌時間」的等待是最容易實現的。若是對於過時的event不是丟棄,而是要再次統計一條結果出來,那麼過時的窗口要從新打開,又要通過一輪「等待」去判斷這個過去的窗口何時再被關閉。
在spark上已經有人作相似的嘗試了:Building Big Data Operational Intelligence platform with Apache Spark - Eric Carr (Guavus)
一個kafka的partition就是一個流,一個kafka topic的多個partition就是多個獨立的流(offset彼此獨立增加)。多個kafka topic顯然是多個獨立的流。流式統計常常須要把多個流合併統計到一塊兒。這種裏會遇到兩個難題
舉一個具體的例子:
spout 1 emit 12:05 spout 1 emit 12:06 spout 2 emit 12:04 spout 1 emit 12:07 spout 2 emit 12:05 // this is when 12:05 is ready
要想知道12:05這個時間窗的event都到齊了,首先要知道相關的流有幾個(在這例子裏是spout1和spout2兩個流),而後要知道何時spout1產生了12:05的數據,何時spout2產生了12:05的數據,最後才能夠判斷出來12:05的數據是到齊了的。在某個地方要存一份這樣的流速的數據去跟蹤,在窗口內數據到齊以後發出信號讓相關的下游往前推進時間窗口。考慮到一個分佈式的系統,這個跟蹤要放在哪一個地方作,怎麼去通知全部的相關方。
極端一些的例子
spout 1 emit 13:05 spout 2 emit 12:31 spout 1 emit 13:06 spout 2 emit 12:32
多個流的流速可能會相差到半個小時以上。考慮到若是用歷史的數據匯入到實時統計系統裏時,很容易由於計算速度不一樣致使不一樣節點之間的處理進度不一致。要計算出正確的結果,下游須要緩存這些差別的半個小時內的全部數據,這樣很容易爆內存。可是上游如何感知到下游要處理不過來了呢?多個上游之間又如何感知彼此之間的速度差別呢?又有誰來仲裁誰應該流慢一些呢?
一個相對簡單的作法是在整個流式統計的分佈式系統裏引入一個coordinator的角色。它負責跟蹤不一樣流的流速,在時間窗口的數據到齊以後通知下游flush,在一些上游流速過快的時候(好比最快的流相比最慢的流差距大於10分鐘)由coordinator發送backoff指令給流速過快的上游,而後接到指令以後sleep一段時間。一段基本堪用的跟蹤不一樣流流速的代碼:https://gist.github.com/taowen/2d0b3bcc0a4bfaecd404
低檔一些的說法是這樣的。假設統計出來的曲線是這樣的:
若是中間,好比08:35左右重啓了統計程序,那麼曲線可否仍是連續的?
高檔一些的說法是,能夠把流式統計理解爲主數據庫與分析數據庫之間經過kafka消息隊列進行異步同步。主數據庫與分析數據庫之間應該保持eventual consistency。
要保證數據不重不丟,就要作到生產到kafka的時候,在主數據庫和kafka消息隊列之間保持一個事務一致性。舉一個簡單的例子:
用戶下了一個訂單 主數據庫裏插入了一條訂單的數據記錄 kafka消息隊列裏多了一條OrderPlaced的event
這個流程中一個問題就是,主數據插入成功了以後,可能往kafka消息隊列裏enqueue event失敗。若是把這個操做反過來
用戶下了一個訂單 kafka消息隊列裏多了一條OrderPlaced的event 主數據庫裏插入了一條訂單的數據記錄
又可能出現kafka消息隊列裏enqueue了,可是主數據庫插入失敗的狀況。就kafka隊列的目前的設計而言,對這個問題是無解的。一旦enqueue的event,除非過時是沒法刪除的。
在消費端,當咱們從kafka裏取出數據以後,去更新分析數據庫的過程也要保持一個分佈式事務的一致性。
取出下一條OrderPlaced evnet(指向的offset+1) 當前時間窗的統計值+1 重複以上過程,直到窗口被關閉,數據寫入到分析數據庫
kafka的數據是能夠重放的,只要指定offset就能夠把這個offset以及以後的數據讀取出來。所謂消費的過程就是把客戶端保存的offset值加1的過程。問題是,這個offset指針保存在哪裏的問題。常規的作法是把消費的offset保存到zookeeper裏。那麼這就有一個分佈式的一致性問題了,zookeeper裏offset+1了,可是分析數據庫並無實際把值統計進去。考慮到統計通常不是每條輸入的event都會更新分析數據庫,而是把中間狀態緩存在內存中的。那麼就有可能消費了成千上萬個event,狀態都在內存裏,而後「啪」的一下機器掉電了。若是每次讀取event都移動offset的話,這些event就丟掉了。若是不是每次都移動offset的話,又可能在重啓的時候致使重複統計。
搞統計的人在意這麼一兩條數據嗎?其實大部分人是不在意的。很多團隊壓根連offset都不保存,每次開始統計直接seek到隊列的尾部開始。實時計算嘛,實時最重要了。準確計算?重放歷史?這個讓hadoop搞定就行了。可是若是就是要較這個真呢?或者咱們不追求嚴格的強一致,只要求重啓以後曲線不斷開那麼難看就行了。
別的流式計算框架不清楚,storm的ack機制是毫無幫助的。
storm的ack機制是基於每一個message來作的。這就要求若是作一個每分鐘100萬個event的統計,一分鐘就要跟蹤100萬個message id。就算是100萬個int,也是一筆至關可觀的內存開銷。要知道,從kafka裏讀出來的event都是順序offset的,處理也是順序,只要記錄一個offset就能夠跟蹤整個流的消費進度了。1個int,相比100萬個int,storm的per message ack的機制對於流式處理的進度跟蹤來講,沒有利用消息處理的有序性(storm根本上假設message之間是彼此獨立處理的),而變得效率低下。
要作到強一致是很困難的,它須要把
變成一個原子事務來完成。大部分分析數據庫都沒有原子性事務的能力,連插入三條數據都不能保持同時變爲可見,且不說還要用它來記錄offset了。考慮到kafka在生產端都沒法提供分佈式事務,event從生產出來就不是徹底一致的(多產生了或者少產生了),真正高一致的計費場景仍是用其餘的技術棧。因此值得解決的問題是,如何在重啓以後,把以前重啓的時候丟棄掉的內存狀態從新恢復出來,使得統計出來的曲線仍然是連續的。
解決思路有三點:
作流式統計的有兩種作法:
基於外部存儲會把整個壓力所有壓到數據庫上。通常來講流式統計的流速是很快的,遠大於普通的關係型數據庫,甚至可能會超過單臺redis的承載。這就使得基於純內存的統計很是有吸引力。大部分的時候都是在更新時間窗口內的內存狀態,只有當時間窗口關閉的時候才把數據刷到分析數據庫裏去。刷數據出去的同時記錄一下當前流消費到的位置(offset)。
這種純內存的狀態相對來講容易管理一些。計算直接是基於這個內存狀態作的。若是重啓丟失了,重放一段歷史數據就能夠重建出來。
可是內存的問題是它老是不夠用的。當統計的維度組合特別多的時候,好比其中某個字段是用戶的id,那麼很快這個內存狀態就會超過單機的內存上限。這種狀況有兩種辦法:
簡單地在流式統計程序裏開關數據庫鏈接是能夠解決這個容量問題的:
可是這種對外部數據庫使用不當心就會致使兩個問題:
可是這種把窗口統計的中間狀態落地的好處也是顯而易見的。重啓以後不用經過重算來恢復內存狀態。若是一個時間窗口有24小時,重算24小時的歷史數據多是很昂貴的操做。
版本跟蹤,批量等都不該該是具體的統計邏輯的實現者的責任。理論上框架應該負責把冷熱數據分離,自動把冷數據下沉到外部的存儲,以把本地內存空閒出來。同時每次小批量處理event的時候都要記錄處理的offset,而不是要等到窗口關閉等待時候。
數據庫狀態和內存狀態要變成一個緊密結合的總體。能夠把二者的關係想象成操做系統的filesystem page cache。用mmap把狀態映射到內存裏,由框架負責何時把內存裏的變動持久化到外部存儲裏。
基於storm作流式統計缺少對如下四個基本問題的成熟解決方案。其trident框架可能能夠提供一些答案,可是實踐中好像使用的人並很少,資料也太少了。能夠比較自信的說,不只僅是storm,對於大多數流式計算平臺都是如此。
這些問題要好好解決,仍是須要一番功夫的。新一代的流式計算框架好比spark streaming/flink應該有不少改進。即使底層框架提供了支持,從這四個角度去考察一下它們是如何支持的也是很是有裨益的事情。