前段時間跟一個朋友聊起kafka,flint,spark這些是否是某種分佈式運算框架。我自認爲的分佈式運算框架最基礎條件是可以把多個集羣節點看成一個完整的系統,而後程序好像是在同一臺機器的內存裏運行同樣。固然,這種集成實現方式有賴於底層的一套消息系統。這套消息系統能夠把消息隨意在集羣各節點之間自由傳遞。因此若是可以經過消息來驅動某段程序的運行,那麼這段程序就有可能在集羣中任何一個節點上運行了。好了,akka-cluster是經過對每一個集羣節點上的中介發送消息使之調動該節點上某段程序運行來實現分佈式運算的。那麼,kafka也能夠實現消息在集羣節點間的自由流通,是否是也是一個分佈式運算框架呢?實際上,kafka設計強調的重點是消息的接收,或者叫消息消費機制。至於接收消息後怎麼去應對,用什麼方式處理,都是kafka用戶本身的事了。與分佈式運算框架像akka-cluster對比,kafka還缺了個在每一個集羣節點上的」運算調度中介「,因此kafka應該不算我所指的分佈式運算框架,充其量是一種分佈式的消息傳遞系統。實際上kafka是一種高吞吐量、高可用性、安全穩定、有良好口碑的分佈式消息系統。算法
kafka的本質是一種commit-log,或者「事件記錄系統」:上游產生的數據(即事件)會按發生時間順序存入kafka,而後下游能夠對任什麼時候間段內事件按序進行讀取,重演運算產生那段時間內的某種狀態。這不就是妥妥的CQRS模式嗎?固然kafka也可使用在其它一些場景如:消息隊列,數據存儲等,不過這些都是commit-log的具體應用。編程
經常看到網上有朋友抱怨akka-cluster的一些處理方式太底層或太基礎了。用戶每每須要本身來增長一些方法來確保使用安全。我想做爲一種消息驅動系統,如何保證akka消息的正確產生和安全使用應該是最基本的要求。而偏偏akka是沒有提供對消息遺漏和重複消息的保障機制。我想這也是形成akka用戶擔憂的主要緣由。上面提到kafka是一種高吞吐量、高可用性、安全穩定的分佈式消息系統,特別是它提供了對exactly-once,「保證一次」的消息使用支持。那麼經過kafka實現一套CQRS模式的實時交易處理系統應該是可行的。這也是我使用kafka的主要目的。安全
上面提到,但願能充分利用kafka commit-log特性來開發一個基於CQRS的實時交易系統,好比支付系統、庫存管理系統,從實踐中瞭解kafka。kafka支持多種語言終端,怪異的是沒有scala終端。kafka是用scala開發的,不提供scala終端實在是說不通啊。不過akka在alpakka社區提供了alpakka-kafka:這個東西是個基於akka-streams的kafka scala終端編程工具,稍微過了一下,感受功能比較全面,那就是它了。不過在開始前先把kafka的原理和基本狀況作個介紹:框架
從表面上看kafka就是一個簡單的消息存儲和傳遞工具。不過由於其特殊分佈式的消息發佈、存儲、讀取處理機制,使其成爲一種高吞吐量、高可用性、安全穩定的分佈式消息處理工具。從應用角度來說,kafka應用包括三個方面,kafka自己,就叫kafka引擎吧,發佈終端、訂閱終端,即:kafka,writer,reader三部分,其中:全部複雜的功能實現是包嵌在kafka內部的,writer,reader應該整合到用戶應用裏。kafka的做業是圍繞着消息的發佈訂閱/讀寫進行的。所謂消息即CQRS模式裏的事件。那麼kafka的工做原理直白點就是writer向kafka寫事件,kafka把事件按發生時間順序保存,reader再按順序從kafka讀取事件並進行處理以產生新的業務狀態,如在某個庫位的一個商品數量獲得了更新。固然原理看似簡單,但具體的實現纔是真正複雜的地方。分佈式
首先,writer和reader是以事件關聯的,即:write發佈某種類型的事件,而reader則是訂閱相同類型的事件。 這裏的事件也就是topic,或一項業務,如:圖書類當前庫存。爲了提升數據吞吐量,每一個topic又能夠細分爲多個partition。每一個partition分擔所屬topic消息類型下的一些指定的細分類消息或者事件,如"圖書庫房101"。若是把這些partition再分佈到一個集羣的節點上,就能夠實現高吞吐量的分佈式讀寫,而後經過集羣partition的複本同步又能夠達到數據安全及系統高可用性的目的。這些集羣節點就是所謂的broker了。發佈消息內容由topic,key,value所組成。其中key值指定該消息應該寫入那個partition,即經過對key進行hash計算得出partition id。hash算法能夠保證相同的key值永遠指定同一個partition。值得注意的是kafka保證每一個partition上的事件確定按照發生時間排序,因此要保證一種事件只能寫入同一個partition。固然,一個partition能夠承載多種事件。要注意的是建立topic和partition都是嚴格的管理工做admin,不是在某些程序中任意進行增減的。通常來說,在建立一個新topic時就要肯定它下面的partition數量了。這個partition數量要按照對數據吞吐量需求設定。但通常是集羣節點的倍數,這樣partition能夠均勻分佈在各broker上。工具
好了,該到reader這頭了:reader做業從訂閱某個topic開始。上面提過:一個topic下面可能有多個partition,但每一個partition都會包含topic的其中幾個子業務的所有事件,並且這些事件是嚴格按發生時間排序的。kafka有個reader group這麼個概念:針對同一個topic,允許有一組多個reader對這個topic下的partition進行讀取。但每一個partition只允許組內一個reader讀取。至於goup內reader是如何分配partition的徹底由kafka內部解決。若是發現新partition或者組內reader有增減變化,kafka會自動進行再分配rebalance。因此總的來講訂閱某個topic的一個組內reader應該負責那個partition是不肯定的,加上隨時可能發生動態再分配的狀況,好比組內某個reader出問題倒了。換言之組內全部reader都必須具有處理整個topic全部類型業務的能力,如此才能解決組內reader-partition關係不肯定的難題。kafka最重要的特色就是能夠允許不一樣的應用經過不一樣的reader-group對同一個partition上的事件進行任意讀取,本意應該是不一樣的應用能夠利用同一個業務事件序列進行不一樣的業務處理。具體實現方式應該是每一個組對某個partition上事件最後讀取的位置分別進行了登記,offset-commit。這樣,即便發生了從新分配rebalance組內任何一個reader對分配到的partition應從那個位置開始讀仍是肯定的。這個offset-commit方式描述了幾種事件讀取模式:atom
一、at-most-once, 最多一次:若是剛讀取事件,在進行業務處理以前就登記位置commit-offset,那麼commit-offset後位置已經登記,即便業務處理失敗也不再可能二次讀取了。 spa
二、at-least-once,最少一次:讀取事件、完成業務處理後才commit-offset。若是處理業務中系統故障,只能從上次登記的位置從新讀取了,那麼就會出現重複讀取的狀況。scala
三、exactly-once, 保證只一次:控制commit-offset的時間節點是取得at-most-once, at-least-once之間安全係數的一種方式。但exactly-once不允許有模糊地帶。具體作法是把業務處理和commit-offset做爲一個完整事物單元來處理(atomic-transaction)。兩樣操做同時成功或失敗。設計
我覺着kafka的exactly-once能力最值得推介。由於在akka或者其它消息隊列工具裏不容易獲得保證。而在一個消息驅動的實時交易系統裏,保證事件重演能正確反映當時狀態是關鍵。任何事件遺失或重複都會形成不可逆轉的偏差。那麼下面的一系列討論我就會嘗試用alpakka-kafka來構建一個基於CQRS模式的實時交易系統,並和你們進行交流分享。