最近打算用C#實現一個基於文件的EventStore。算法
什麼是EventStore
關於什麼是EventStore,若是還不清楚的朋友能夠去了解下CQRS/Event Sourcing這種架構,我博客中也有大量介紹。EventStore是在Event Sourcing(下面簡稱ES)模式中,用於存儲事件用的。從DDD的角度來講,每一個聚合根在本身的狀態發生變化時都會產生一個或多個領域事件,咱們須要把這些事件持久化起來。而後當咱們須要恢復聚合根的最新狀態到內存時,能夠經過ES這種技術,從EventStore獲取該聚合根的全部事件,而後重演這些事件,就能將該聚合根恢復到最新狀態了。這種技術和MySQL的Redo日誌以及Redis的AOF日誌或者leveldb的WAL日誌的原理是相似的。可是區別是,redo/AOF/WAL日誌是Command Sourcing,而咱們這裏說的是Event Sourcing。關於這兩個概念的區別,我很少展開了,有興趣的朋友能夠去了解下。數據庫
爲何要寫一個EventStore
目前ENode使用的EventStore,是基於關係型數據庫SqlServer的。雖然功能上徹底知足要求,可是性能上和數據容量上,離個人預期還有一些距離。好比:數組
- 關於性能,雖然能夠經過SqlBulkCopy方法,實現較大的寫入吞吐,可是我對EventStore的要求是,須要支持兩個惟一索引:1)聚合根ID+事件版本號惟一;2)聚合根ID+命令ID惟一;當添加這兩個惟一索引後,會很大影響SqlBulkCopy寫入數據的性能;並且SqlBulkCopy只有SqlServer纔有,其餘數據庫如MySQL沒有,這樣也無形之中限制了ENode的使用場景;
- 關於使用場景,DB是基於SQL的,他不是簡單的幫咱們保存數據,每次寫入數據都要解析SQL,執行SQL,寫入RedoLOG,等;另外,DB還要支持修改數據、經過SQL查詢數據等場景。因此,這就要求DB內部在設計存儲結構時,要兼顧各類場景。而咱們如今要實現的EventStore,針對的場景比較簡單:1)追求高吞吐的寫入,沒有修改和刪除;2)查詢很是少,不須要支持複雜的關係型查詢,只須要能支持查詢某個聚合根的全部事件便可;因此,針對這種特定的使用場景,若是有針對性的實現一個EventStore,我相信性能上能夠有更大的提高空間;
- 關於數據量,一個EventStore可能須要存儲大量的事件,百億或千億級別。若是採用DB,那咱們只能進行分庫分表,由於單表能存儲的記錄數是有限的,好比1000W,超過這個數量,對寫入性能也會有必定的影響。假設咱們如今要存儲100億事件記錄,單表存儲1000W,那就須要1000個表,若是單個物理庫中分100個表,那就須要10個物理庫;若是未來數據量再增長,則須要進一步擴容,那就須要牽涉到數據庫的數據遷移(全量同步、增量同步)這種麻煩的事情。而若是是基於文件版本的EventStore,因爲沒有表的概念了,因此單機只要硬盤夠大,就能存儲很是多的數據。而且,最重要的,性能不會由於數據量的增長而降低。固然,EventStore也一樣須要支持擴容,可是因爲EventStore中的數據只會Append寫入,不會修改,也不會刪除,因此擴容方案相對於DB來講,要容易作不少。
- 那爲什麼不使用NoSQL?NoSQL通常都是爲大數據、可伸縮、高性能而設計的。由於一般NoSQL不支持上面第一點中所說的二級索引,固然一些文檔型數據庫如MongoDB是支持的,可是對我來講是一個黑盒,我沒法駕馭,也沒有使用經驗,因此沒有考慮。
- 從長遠來看,若是可以本身根據本身的場景實現一個有針對性的EventStore,那將來若是出現性能瓶頸的問題,本身就有足夠的能力去解決。另外,對本身的技術能力的提升也是一個很大的鍛鍊機會。並且這個作好了,說不定又是本身的一個很好的做品,呵呵。因此,爲什麼不嘗試一下呢?
EventStore的設計目標
- 要求高性能順序寫入事件;
- 要求嚴格判斷聚合根的事件是否按版本號順序遞增寫入;
- 支持命令ID的惟一性判斷;
- 支持大量事件的存儲;
- 支持按照聚合根ID查詢該聚合根的全部事件;
- 支持動態擴容;
- 高可用(HA),須要支持集羣和主備,二期再作;
EventStore核心問題描述、問題分析、設計思路
核心問題描述
一個EventStore須要解決的核心問題有兩點:1)持久化事件;2)持久化事件以前判斷事件版本號是否合法、事件對應的命令是否重複。一個事件包含的信息以下:緩存
- 聚合根ID
- 事件版本號
- 命令ID
- 事件內容
- 事件發生時間
爲何是這些信息?
本文所提到的事件是CQRS架構中,由C端的某個命令操做某個聚合根後,致使該聚合根的狀態發生變化,而後每次變化都會產生一個對應的事件。因此,針對聚合根的每一個事件,咱們關注的信息就是:哪一個命令操做哪一個聚合根,產生了什麼版本號的一個事件,事件的內容和產生的時間分別是什麼。服務器
事件的版本號是什麼意思?
因爲一個聚合根在生命週期內常常會被修改,也就是說常常會有命令去修改聚合根的狀態,而每次狀態的變化都會產生一個對應的事件,也就是說一個聚合根在生命週期內會產生多個事件。聚合根是領域驅動設計(DDD)中的一個概念,聚合根是一個具備全局惟一ID的實體,具備獨立的生命週期,是數據強一致性的最小邊界。爲了保證聚合根內的數據的強一致性,針對單個聚合根的任何修改都必須是線性的,由於只有線性的操做,才能保證當前的操做所基於的聚合根的狀態是最新的,這樣才能保證聚合根內數據的完整性,老是知足業務規則的不變性。關於線性操做這點,就像對DB的一張表中的某一條記錄的修改也必須是線性的同樣,數據庫中的同一條記錄不可能同時被兩個線程同時修改。因此,分析到這裏,咱們知道同一個聚合根的多個事件的產生一定是有前後順序的。那如何保證這個前後順序呢?答案是,在聚合根上設計一個版本號,經過版本號的順序遞增來保證對同一個聚合根的修改也老是線性依次的。這個思路其實就是一種樂觀併發控制的思路。聚合根的第一個事件的版本號爲1,第二個事件的版本號爲2,第N個事件的版本號爲N。當第N個事件產生時,它所基於的聚合根的狀態必須是N-1。當某個版本號爲N的事件嘗試持久化到EventStore時,若是EventStore中已經存在了一個版本號爲N的事件,則認爲出現併發衝突,須要告訴上層應用當前事件持久化遇到併發衝突了,而後上層應用須要獲取該聚合根的最新狀態,而後再重試當前命令,而後再產生新的版本號的事件,再持久化到EventStore。數據結構
但願能自動檢測命令是否重複處理
CQRS架構,任何聚合根的修改都是經過命令來完成的。命令就是一個DTO,當咱們要修改一個聚合根的狀態時,就發送一個命令到分佈式MQ便可,而後MQ的消費者處理該命令。可是你們都知道任何分佈式MQ通常都只能作到至少投遞一次(At Least Once)的消息投遞語義。也就是說,一個命令可能會被消費者重複處理。在有些狀況下,某個聚合根若是重複處理某個命令,會致使聚合根的最終狀態不正確,好比重複扣款會致使帳號餘額不正確。因此,咱們但願在框架層面能支持命令的重複處理的檢測。那最理想的檢測位置在哪裏呢?若是是傳統的DB,咱們會在數據庫層面經過創建惟一索引保證命令絕對不會重複執行。那對應到咱們的EventStore,天然也應該在EventStore內部檢測。多線程
核心問題分析
經過上面的問題描述,咱們知道,其實一個EventStore須要解決的問題就兩點:1)以文件的形式持久化事件;2)持久化以前判斷事件的版本號是否衝突、事件的命令是否重複。架構
關於第一點,天然是經過順序寫文件來實現,機械硬盤在順序寫文件的狀況下,性能也是很是高的。寫文件的思路很是簡單,咱們能夠固定單個文件的大小,好比512MB。而後先寫第一個文件,寫滿後新建一個文件,再寫第二個,後面以此類推。併發
關於第二點,本質上是兩個索引的需求:a. 聚合根ID+事件版本號惟一(固然,這裏不只要保證惟一,還要判斷是不是連續遞增);b. 聚合根ID + 命令ID惟一,即針對同一個聚合根的命令不能重複處理;那如何實現這兩個索引的需求呢?第一個索引的實現成本相對較低,咱們只須要在內存維護每一個聚合根的當前版本號,而後當一個事件過來時,判斷事件的版本號是不是當前版本號的下一個版本號便可,若是不是,則認爲版本號非法;第二個索引的事件成本比較高,咱們必須維護每一個聚合根的全部產生的事件對應的命令的ID,而後在某個事件過來時,判斷該事件對應的命令ID是否和已經產生的任何一個事件的命令ID重複,若是有,則認爲出現重複。因此,歸根結底,當須要持久化某個聚合根的事件時,咱們須要加載該聚合根的全部已產生的事件的版本號以及事件對應的命令ID到內存,而後在內存進行判斷,從而檢查當前事件是否知足這兩個索引需求。框架
好了,上面是基本的也是最直接的解決問題的思路了。可是咱們不難發現,要實現上面這兩個問題並不容易。由於:首先咱們的機器的內存大小是有限的,也就是說,沒法把全部的聚合根的事件的索引信息都放在內存。那麼當某個聚合根的事件要持久化時,發現內存中並沒有這個聚合根的事件索引時,必然要從磁盤中加載該聚合根的事件索引。但問題是,咱們的事件因爲爲了追求高性能的寫入到文件,老是隻是簡單的Append追加到最後一個文件的末尾。這樣必然致使某個聚合根的事件可能分散在多個文件中,這樣就給咱們查找這個聚合根的全部相關事件帶來了極大的困難。那該如何權衡的去設計這兩個需求呢?
我以爲設計是一種權衡,咱們老是應該根據咱們的實際業務場景去有側重點的進行設計,優先解決主要問題,而後次要問題儘可能去解決。就像leveldb在設計時,也是側重於寫入時很是簡單快速,而讀取時,可能會比較迂迴曲折。EventStore,是很是典型的高頻寫入但不多讀取的系統。但寫入時須要保證上述的兩個索引需求,因此,應該說這個寫入的要求比leveldb的寫入要求還要高一些。那咱們該如何去權衡呢?
EventStore核心設計思路
- 在內存中維護每一個聚合根的版本索引eventVersion,eventVersion中維護了當前聚合根的全部的版本、每一個版本對應的cmdId,以及每一個版本的事件在event文件中的物理位置;當一個事件過來時,經過這個eventVersion來判斷version,cmdId是否合法(version必須是currentVersion+1,cmdId必須惟一);
- 當寫入一個事件時,只寫入一個文件,event.file文件;假設一個文件的大小爲512MB,一個事件的大小爲1KB,則一個文件大概存儲52W個事件;
- 一個event.file文件寫滿後:
- 完成當前event.file文件,而後新建一個新的event.file文件,接下來的事件寫入新的event.file文件;
- 啓動一個後臺線程,在內存中對當前完成的event.file文件中的event按照聚合根ID和事件版本號進行排序;
- 排序完成後,咱們就知道了該文件中的事件涉及到哪些聚合根,他們的順序,以及最大最小聚合根ID分別是什麼;
- 新建一個和event.file文件同樣大小的臨時文件;
- 在臨時文件的header中記錄當前event.file已排序過;
- 在臨時文件的數據區域將排好序的事件順序寫入文件;
- 臨時文件寫入完成後,將臨時文件替換當前已完成的event.file文件;
- 爲event.file文件新建一個對應的事件索引文件eventIndex.file;
- 將event.file文件中的最大和最小聚合根ID寫入到eventIndex.file索引文件的header;每一個event.file的最大最小的聚合根ID的關係,會在EventStore啓動時自動加載並緩存到內存中,這樣能夠方便咱們快速知道某個聚合根在某個event.file中是否存在事件,由於直接在內存中判斷便可。這個緩存我暫時命名爲aggregateIdRangeCache吧,以便下面更方便的進一步說明如何使用它。
- 將event.file文件中的每一個聚合根的每一個事件的索引信息寫入eventIndex.file文件,事件索引信息包括:聚合根ID+事件版本號+事件的命令ID+事件在event.file文件中的物理位置這4個信息;有了這些索引信息,咱們就能夠只須要訪問事件索引文件就能獲取某個聚合根的全部版本信息(就是上面說的eventVersion)了;
- 但僅僅在事件索引文件中記錄最大最小聚合根ID以及每一個事件的索引信息還不是不夠的。緣由是,當咱們要查找某個聚合根的全部版本信息時,雖然能夠先根據內存中緩存的每一個event.file文件的最大最小聚合根ID快速定位該聚合根在哪些event.file中存在事件(也就是明確了在哪些對應的事件索引文件中存在版本信息),可是當咱們要從這些事件索引文件中找出該聚合根的事件索引到底具體在文件的哪一個位置時,只能從文件的起始位置順序掃描文件才能知道,這樣的順序掃描無疑是不高效的。假設一個event.file文件的大小固定爲512MB,一個事件的大小爲1KB,則一個event.file文件大概存儲52W個事件,每一個事件索引的大小大概爲:24 + 4 + 24 + 8 = 60個字節。因此,這52W個事件的索引信息大概佔用30MB,也就是最終一個事件索引文件的大小大概爲30MB多一點。當咱們要獲取某個聚合根的全部版本信息時,若是每次訪問某個事件索引文件時,老是要順序掃描30MB的文件數據,那無疑效率不高。因此,我還須要進一步想辦法優化,由於事件索引文件裏的事件索引信息都是按照聚合根ID和事件版本號排序的,假設如今有52W個事件索引,則咱們能夠將這52W個事件索引記錄均等切分爲100個點,而後把每一個點對應的事件索引的聚合根ID都記錄到事件索引文件的header中,一個聚合根ID的長度爲24個字節,則100個也就2.4KB左右。這樣一來,當咱們想要知道某個聚合根的事件索引大概在事件索引文件的哪一個位置時,咱們能夠先經過訪問header裏的信息,快速知道應該從哪一個位置去掃描。這樣一來,原本對於一個事件索引文件咱們要掃描30MB的數據,如今變爲只須要掃描百分之一的數據,即300KB,這樣掃描的速度就快不少了。這一段寫的有點囉嗦,但一切都是爲了儘可能詳細的描述個人設計思路,不知道各位看官是否看懂了。
- 除了記錄記錄最大最小聚合根ID以及記錄100個等分的切割點外,還有一點能夠優化來提升獲取聚合根的版本信息的性能,就是:若是內存足夠,當某個eventIndex.file被讀取一次後,EventStore能夠自動將這個eventIndex.file文件緩存到非託管內存中;這樣下次就能夠直接在非託管內存訪問這個eventIndex.file了,減小了磁盤IO的讀取;
- 由於內存大小有限,因此eventVersion不可能所有緩存在內存;因此,當某個聚合根的eventVersion不在內存中時,須要從磁盤加載。加載的思路是:掃描aggregateIdRangeCache,快速找出該聚合根的事件在哪些event.file文件中存在;而後經過上面提到的查找算法快速查找這些event.file文件對應的eventIndex.file文件,這樣就能快速獲取該聚合根的eventVersion信息了;
- 另外,EventStore啓動時,最好須要預加載一些熱門聚合根的eventVersion信息到緩存。那該預加載哪些聚合根呢?咱們能夠在內存中維護一個固定大小(N)的環形數組,環形數組中維護了最近修改的聚合根的ID;當某個聚合根有事件產生,則將該聚合根ID的hashcode取摸N獲得環形數組的下標,而後將該聚合根ID放入該下標;定時將該環形數組中的聚合根ID dump到文件preloadAggregateId.file進行存儲;這樣當EventStore啓動時,就能夠從preloadAggregateId.file加載指定聚合根的eventVersion;
思路總結:
上面的設計的主要思路是:
- 寫入一個事件前先內存中判斷是否容許寫入,若是容許,則順序寫入event.file文件;
- 對一個已經寫入完成的event.file文件,則用一個後臺異步線程對文件中的事件按照聚合根ID和事件版本號進行排序,而後將排序後的臨時event.file文件替換原event.file文件,同時將排序後獲得的事件索引信息寫入eventIndex.file文件;
- 寫入一個事件時,若是當前聚合根的版本信息不在內存,則須要從相關的eventIndex.file文件加載到內存;
- 因爲加載版本信息可能須要訪問多個eventIndex.file文件,會有屢次讀磁盤的IO,對性能影響較大,因此,咱們老是應該儘可能在內存緩存聚合根的版本信息;
- 整個EventStore的性能瓶頸在於內存中能緩存多少聚合根版本信息,若是可以緩存百分百的聚合根版本信息,且能作到沒有GC的問題(儘可能避免),那咱們就能夠作到寫入事件很是快速;因此,如何設計一個支持大容量緩存(好比緩存幾十個GB的數據),且沒有GC問題的高性能緩存服務,就變得很關鍵了;
- 因爲有了事件索引信息以及這麼多的緩存機制,因此,當要查詢某個聚合根的全部事件,也就很是簡單了;
如何解決多線程併發寫的時候的CPU佔用高的問題?
到這裏,咱們分析瞭如何存儲數據,如何寫入數據,還有如何查詢聚合根的全部事件,應該說核心功能的實現思路已經想好了。若是如今是單線程訪問EventStore,我相信性能應該不會很低了。可是,實際的狀況是N多客戶端會同時併發的訪問EventStore。這個時候就會致使EventStore服務器會有不少線程要求同時寫入事件到數據文件,可是你們知道寫文件必須是單線程的,若是是多線程,那也要用鎖的機制,保證同一個時刻只能有一個線程在寫文件。最簡單的辦法就是寫文件時用一個lock搞定。可是通過測試發現簡單的使用lock,在多線程的狀況下,會致使CPU很高。由於每一個線程在處理當前事件時,因爲要寫文件或讀文件,都是IO操做,因此鎖的佔用時間比較長,致使不少線程都在阻塞等待。
爲了解決這個問題,我作了一些調研,最後決定使用雙緩衝隊列的技術來解決。大體思路是:
設計兩個隊列,將要寫入的事件先放入隊列1,而後當前要真正處理的事件放在隊列2。這樣就作到了把接收數據和處理數據這兩個過程在物理上分離,先快速接收數據並放在隊列1,而後處理時把隊列1裏的數據放入隊列2,而後隊列2裏的數據單線程線性處理。這裏的一個關鍵問題是,如何把隊列1裏的數據傳給隊列2呢?是一個個拷貝嗎?不是。這種作法過低效。更好的辦法是用交換兩個隊列的引用的方式。具體思路這裏我不展開了,你們能夠網上找一下雙緩衝隊列的概念。這個設計我以爲最大的好處是,能夠有效的下降多線程寫入數據時對鎖的佔用時間,原本一次鎖佔用後要直接處理當前事件的,而如今只須要把事件放入隊列便可。雙緩衝隊列能夠在不少場景下被使用,我認爲,只要是多個消息生產者併發產生消息,而後單個消費者單線程消費消息的場景,均可以使用。並且這個設計還有一個好處,就是咱們能夠有機會單線程批量處理隊列2裏的數據,進一步提升處理數據的吞吐能力。
如何緩存大量事件索引信息?
最簡單的辦法是使用支持併發訪問的字典,如ConcurrentDictionary<T,K>,Java中就是ConcurrentHashmap。可是通過測試發現ConcurrentDictionary在key增長到3000多萬的時候就會很是慢,因此我本身實現了一個簡單的緩存服務,初步測試下來,基本知足要求。具體的設計思路本文先不介紹了,總之咱們但願實現一個進程內的,支持緩存大量key/value的一個字典,支持併發操做,不要由於內存佔用越多而致使緩存能力的降低,儘可能不要有GC的問題,能知足這些需求就OK。
如何擴容?
咱們再來看一下最後一個我認爲比較重要的問題,就是如何擴容。
雖然咱們單臺EventStore機器只要硬盤夠大,就能夠存儲至關多的事件。可是硬盤再大也有上限,因此擴容的需求老是有的。因此如何擴容(將數據遷移到其餘服務器上)呢?經過上面的設計咱們瞭解到,EventStore中最核心的文件就是event.file,其他文件均可以經過event.file文件來生成。因此,咱們擴容時只須要遷移event.file文件便可。
那如何擴容呢?假設如今有4臺EventStore機器,要擴容到8臺。
有兩個辦法:
- 土豪的作法:準備8臺全新的機器,而後把原來4臺機器的所有數據分散到新準備的8臺機器上,而後再把老機器上的數據所有刪除;
- 屌絲的作法:準備4臺全新的機器,而後把原來4臺機器的一半數據分散到新準備的4臺機器上,而後再把老機器上的那一半數據刪除;
對比之下,能夠很容易發現土豪的作法比較簡單,由於只須要考慮如何遷移數據到新機器便可,不須要考慮遷移後把已經遷移過去的數據還要刪除。大致的思路是:
- 採用拉的方式,新的8臺目標機器都在向老的4臺源機器拖事件數據;目標機器記錄當前拖到哪裏了,以便若是遇到意外中斷中止後,下次重啓能繼續從該位置繼續拖;
- 每臺源機器都掃描全部的事件數據文件,一個個事件進行掃描,掃描的起始位置由當前要拖數據的目標機器給出;
- 每臺目標機器該拖哪些事件數據?預先在源機器上配置好此次擴容的目標機器的全部惟一標識,如IP;而後當某一臺目標機器過來拖數據時,告知本身的機器的IP。而後源機器根據IP就能知道該目標機器在全部目標機器中排第幾,而後源機器就能知道應該把哪些事件數據同步給該目標機器了。舉個例子:假設當前目標機器的IP在全部IP中排名第3,則針對每一個事件,獲取事件的聚合根ID,而後將聚合根ID hashcode取摸8,若是餘數爲3,則認爲該事件須要同步給該目標機器,不然就跳過該事件;經過這樣的思路,咱們能夠保證同一個聚合根的全部事件都最終同步到了同一臺新的目標機器。只要咱們的聚合根ID夠均勻,那最終必定是均勻的把全部聚合根的事件均勻的同步到目標機器上。
- 當目標機器上同步完整了一個event.file後,就自動異步生成其對應的eventIndex.file文件;
擴容過程的數據同步遷移的思路差很少了。可是擴容過程不只僅只有數據遷移,還有客戶端路由切換等。那如客戶端何動態切換路由信息呢?或者說如何作到不停機動態擴容呢?呵呵。這個實際上是一個外圍的技術。只要數據遷移的速度跟得上數據寫入的速度,而後再配合動態推送新的路由配置信息到全部的客戶端。最終就能實現動態庫容了。這個問題我這裏先不深刻了,搞過數據庫動態擴容的朋友應該都瞭解原理。無非就是一個全量數據遷移、增量數據遷移、數據校驗、短暫中止寫服務,切換路由配置信息這幾個關鍵的步驟。我上面介紹的是最核心的數據遷移的思路。
結束語
本文介紹了我以前一直想作的一個基於文件版本的EventStore的關鍵設計思路,但願經過這篇文章把本身的思路系統地整理出來。一方面經過寫文章能夠進一步確信本身的思路是否OK,由於若是你文章寫不出來,其實思路必定是哪裏有問題,寫文章的過程就是大腦整理思緒的過程。因此,寫文章也是檢查本身設計的一種好方法。另外一方面,也能夠經過本身的原創分享,但願和你們交流,但願你們能給我一些意見或建議。這樣也許能夠在我動手寫代碼前能及時糾正一些設計上的錯誤。最後再補充一點,語言不重要,重要的是架構設計、數據結構,以及算法。誰說C#語言作不出好東西呢?呵呵。