在《當咱們在討論CQRS時,咱們在討論些神馬》中,咱們討論了當使用CQRS的過程當中,須要關心的一些問題。其中與CQRS關聯最爲緊密的模式莫過於Event Sourcing了,CQRS與ES的結合,爲咱們構造高性能、可擴展系統提供了基本思路。本文將介紹
Kanasz Robert在《Introduction to CQRS》中的示例項目Diary.CQRS。html
該項目爲Kanasz Robert爲了介紹CQRS模式而寫的一個測試項目,原始項目能夠經過訪問《Introduction to CQRS》來獲取,因爲項目版本比較舊,沒有使用nuget管理程序包等,致使下載之後並不能正常運行,我下載了這個項目,升級到Visual Studio 2017,從新引用了StructMap框架(使用nuget),移除了Web層報錯的代碼,並上傳到博客園,能夠從這裏下載:Diary.CQRS.rarweb
Diary.CQRS項目的場景爲日記本管理,提供了新增、編輯、刪除、列表等功能,整個解決方案分爲三個項目:數據庫
這是一個很好的入門項目,功能簡單、結構清晰,概念覆蓋全面。若是CQRS是一個城堡,那麼Diary.CQRS則是打開第一重門的鑰匙,接下來讓咱們一塊兒推開這扇門吧。安全
運行項目,最早看到的是一個Web頁面,以下圖:架構
很簡單,只有一個Add按鈕,當咱們點擊之後,會進入添加的頁面:併發
咱們填上一些內容,而後點擊Save按鈕,就會返回到列表頁,咱們能夠看到已添加的條目:app
而後咱們進行編輯操做,點擊列表中的Edit按鈕,跳轉到編輯頁面:框架
雖然頁面中顯示的是Add,但確實是Edit頁面。咱們編輯之後點擊Save按鈕,而後返回列表頁便可看到編輯後的內容。ide
在列表頁中,若是咱們點擊Delete按鈕,則會刪除改條目。函數
到此爲止,咱們已經看到了這個項目的全部頁面,一個簡單的CURD操做。咱們繼續看它的代碼(在HomeController中)。
public ActionResult Index() { ViewBag.Model = ServiceLocator.ReportDatabase.GetItems(); return View(); }
經過ServiceLocator定位ReportDatabase,並從ReportDatabase中獲取全部條目。
public ActionResult Add() { return View(); } [HttpPost] public ActionResult Add(DiaryItemDto item) { ServiceLocator.CommandBus.Send(new CreateItemCommand(Guid.NewGuid(), item.Title, item.Description, -1, item.From, item.To)); return RedirectToAction("Index"); }
兩個方法:
public ActionResult Edit(Guid id) { var item = ServiceLocator.ReportDatabase.GetById(id); var model = new DiaryItemDto() { Description = item.Description, From = item.From, Id = item.Id, Title = item.Title, To = item.To, Version = item.Version }; return View(model); } [HttpPost] public ActionResult Edit(DiaryItemDto item) { ServiceLocator.CommandBus.Send(new ChangeItemCommand(item.Id, item.Title, item.Description, item.From, item.To, item.Version)); return RedirectToAction("Index"); }
仍然是兩個方法:
public ActionResult Delete(Guid id) { var item = ServiceLocator.ReportDatabase.GetById(id); ServiceLocator.CommandBus.Send(new DeleteItemCommand(item.Id, item.Version)); return RedirectToAction("Index"); }
對於刪除操做來講,它沒有視圖頁面,接收到請求之後,先獲取該記錄,建立併發送DeleteImteCommand命令,而後返回到Index頁面
題外話:對於改變數據狀態的操做,使用Get請求是不可取的,可能存在安全隱患
經過上面的代碼,你會發現全部的操做都是從ServiceLocator發起的,經過它咱們可以定位到CommandBus和ReportDatabase,從而進行相應的操做,咱們在接下來會介紹ServiceLocator類。
Diary.CQRS.Configuration 項目中定義了ServiceLocator類,這個類的做用是完成IoC容器的服務註冊、服務定位功能。例如咱們能夠經過ServiceLocator獲取到CommandBus實例、獲取ReportDatabase實例。
ServiceLocator使用StructureMap做爲依賴注入框架,提供了服務註冊、服務導航的功能。ServiceLocator類經過靜態構造函數完成對服務註冊和服務實例化工做:
static ServiceLocator() { if (!_isInitialized) { lock (_lockThis) { ContainerBootstrapper.BootstrapStructureMap(); _commandBus = ObjectFactory.GetInstance<ICommandBus>(); _reportDatabase = ObjectFactory.GetInstance<IReportDatabase>(); _isInitialized = true; } } }
首先調用ContainerBootstrapper.BootstrapStructureMap()方法,這個方法裏面包含了對將服務添加到容器的代碼;而後使用容器建立CommandBus和ReportDatabase的實例。
ServiceLocator的重要之處在於對外暴露了兩個相當重要的實例,分別處理CQRS中的Command和Query。
爲何沒有Event相關操做呢?到目前爲止咱們尚未涉及到,由於對於UI層來講,用戶的意圖都是經過Command表示的,而數據的狀態變化纔會觸發Event。
在ServiceLocator中定義了獲取CommandBus和ReportDatabase的方法,咱們順着這兩個對象繼續分析。
在基於消息的系統設計中,咱們常會看到總線的身影,Command也是一種消息,因此使用總線是再合適不過的了。CommandBus就是咱們在Diary.CQRS項目中用到的一種消息總線。
在Diary.CQRS中,它被定義在Messaging目錄,在這個目錄下面,還有與Event相關的EventBus,咱們稍後再進行介紹。
CommandBus實現ICommandBus接口,ICommandBus接口的定義以下:
public interface ICommandBus { void Send<T>(T command) where T : Command; }
它只包含了Send方法,用來將命令發送到對應的處理程序。
CommandBus是ICommand的實現,具體代碼以下:
public class CommandBus:ICommandBus { private readonly ICommandHandlerFactory _commandHandlerFactory; public CommandBus(ICommandHandlerFactory commandHandlerFactory) { _commandHandlerFactory = commandHandlerFactory; } public void Send<T>(T command) where T : Command { var handler = _commandHandlerFactory.GetHandler<T>(); if (handler!=null) { handler.Execute(command); } else { throw new Exception(); } } }
在CommandBus中,顯式依賴ICommandHandlerFactory類,經過構造函數進行注入。那麼 _commandHandlerFactory 的做用是什麼呢?咱們在Send方法中能夠看到,經過 _commandHandlerFactory 能夠獲取到與Command對應的CommandHandler(命令處理程序),在程序的設計上,每個Command都會有一個對應的CommandHandler,而手工判斷類型、實例化處理程序顯然不符合使用習慣,此處採用工廠模式來獲取命令處理程序。
當獲取到與Command對應的CommandHandler後,調用handler的Execute方法,執行該命令。
截止目前爲止,咱們又接觸了三個概念:CommandHandlerFactory、CommandHandler、Command:
使用簡單工廠模式,用來獲取與命令對應的處理程序。它的代碼在Utils文件夾中,它的做用是提供一種獲取Handler的方式,因此它只能做爲工具存在。
接口定義以下:
public interface ICommandHandlerFactory { ICommandHandler<T> GetHandler<T>() where T : Command; }
只有GetHandler一個方法,它的實現是 StructureMapCommandHandlerFactory,即經過StructureMap做爲依賴注入框架來實現的,代碼也比較簡單,這裏再也不貼出來了。
命令是表明用戶的意圖、幷包含與意圖相關的數據,好比用戶想要添加一條數據,這即是一個意圖,因而就有了CreateItemCommand,用戶要在界面上填寫添加操做必須的數據,因而就有了命令的屬性。
關於命令的定義以下:
public interface ICommand { Guid Id { get; } } public class Command : ICommand { public Guid Id { get; private set; } public int Version { get; set; } public Command(Guid id, int version) { Id = id; Version = version; } }
Command類:實現了ICommand接口,並增長了Version屬性,用來標記當前操做對應的聚合跟的版本。
爲何要有版本的概念的?由於當使用ES模式的時候,數據庫中的數據都是事件產生的數據鏡像,保存了某個時間點的數據快照,若是要獲取到最新的數據,則須要經過加載該聚合根對應的全部Event來回放到最新狀態。若是引入版本的概念,每個Event對應一個版本,而景象中的數據也有一個版本,在進行回放的時候,能夠僅加載高版本的Event進行回放,節省了系統資源,並提升了運行效率。
命令處理程序,它的做用是處理與它相對應的命令,處理CQRS的核心,接口定義以下:
public interface ICommandHandler<TCommand> where TCommand : Command { void Execute(TCommand command); }
它接收command做爲參數,執行該命令的處理邏輯。每個命令都有一個與之對應的處理程序。
咱們再從新梳理一下流程,首先用戶要新增一個數據,點擊保存按鈕後,生成CreateItemCommand命令,隨後這個命令被髮送到CommandBus中,CommandBus經過CommandHandlerFactory找到該Command的處理程序,此時在CommandBus的Send方法中,咱們有一個Command和CommandHandler,而後調用CommandHandler的Execute方法,即完成了該方法的處理。至此,Command的處理流程完結。
咱們來看一下CreateItemCommand的代碼:
public class CreateItemCommand : Command { public string Title { get; internal set; } public string Description { get; internal set; } public DateTime From { get; internal set; } public DateTime To { get; internal set; } public CreateItemCommand(Guid aggregateId, string title, string description, int version, DateTime from, DateTime to) : base(aggregateId, version) { Title = title; Description = description; From = from; To = to; } }
它繼承自Command基類,繼承後即擁有了Id和Version屬性,而後又定義了幾個其它的屬性。它只包含數據,與該命令對應的處理程序叫作CreateItemCommandHandler,代碼以下:
public class CreateItemCommandHandler : ICommandHandler<CreateItemCommand> { private IRepository<DiaryItem> _repository; public CreateItemCommandHandler(IRepository<DiaryItem> repository) { _repository = repository; } public void Execute(CreateItemCommand command) { if (command == null) { throw new Exception(); } if (_repository == null) { throw new Exception(); } var aggregate = new DiaryItem(command.Id, command.Title, command.Description, command.From, command.To); aggregate.Version = -1; _repository.Save(aggregate, aggregate.Version); } }
這纔是咱們要分析的核心,在Handler中,咱們看到了Repository,看到了DiaryItem聚合:
在上面的代碼中,因爲是新增,因此聚合的版本爲-1,而後調用倉儲的Save方法進行保存。咱們繼續往下扒,看看倉儲和聚合的實現。
對於Repository的定義,仍然先看一下接口中的定義,代碼以下:
public interface IRepository<T> where T : AggregateRoot, new() { void Save(AggregateRoot aggregate, int expectedVersion); T GetById(Guid id); }
在倉儲中只有兩個方法:
關於IRepository的實現,代碼在Repository.cs中,咱們拆開來進行介紹:
private readonly IEventStorage _eventStorage; private static object _lock = new object(); public Repository(IEventStorage eventStorage) { _eventStorage = eventStorage; }
首先是它的構造函數,強依賴IEventStorage,經過構造函數注入。EventStorage是事件的儲存倉庫,有個更爲熟知的名字EventStore,咱們稍後進行介紹。
public T GetById(Guid id) { IEnumerable<Event> events; var memento = _eventStorage.GetMemento<BaseMemento>(id); if (memento != null) { events = _eventStorage.GetEvents(id).Where(e => e.Version >= memento.Version); } else { events = _eventStorage.GetEvents(id); } var obj = new T(); if (memento != null) { ((IOriginator)obj).SetMemento(memento); } obj.LoadsFromHistory(events); return obj; }
GetById(Guid id)方法經過Id獲取一個聚合對象,獲取一個聚合對象有如下幾個步驟:
加載Event列表,加載到的事件列表將用來作事件回放。
若是獲取到快照的話,則加載版本高於該快照版本的事件列表,若是沒有獲取到快照,則加載所有事件列表。此處在上面已經介紹過,經過快照的方式保存聚合對象,在獲取數據時能夠減小重放事件的數量,起到提升加載速度的做用。
var obj = new T();
。加載歷史事件,完成重放。完成這個步驟之後,聚合根將更新到最新狀態。
經過這幾個步驟之後,咱們獲得了一個最新狀態的聚合根對象。
public void Save(AggregateRoot aggregate, int expectedVersion) { if (aggregate.GetUncommittedChanges().Any()) { lock (_lock) { var item = new T(); if (expectedVersion != -1) { item = GetById(aggregate.Id); if (item.Version != expectedVersion) { throw new Exception(); } } _eventStorage.Save(aggregate); } } }
Save方法,用來保存一個聚合根對象。在這個方法中,參數expectedVersion表示指望的版本,這裏約定-1
爲新增的聚合根,當聚合根爲新增的時候,會直接調用EventStorage中的Save方法。
關於expectedVersion參數,咱們能夠理解爲對併發的控制,只有當expectedVersion與GetById獲取到的聚合根對象的版本相同時才能進行保存操做。
在介紹Repository類的時候,咱們接觸了兩個新的概念:EventStorage和AggregateRoot,接下來咱們分別進行介紹。
AggregateRoot是聚合根,他表示一組強關聯的領域對象,全部對象的狀態變動只能經過聚合根來完成,這樣能夠保證數據的一致性,以及減小併發衝突。應用到EventSourcing模式中,聚合根的好處也是很明顯的,咱們全部對數據狀態的變動都經過聚合根完成,而每次變動,聚合根都會生成相應的事件,在進行事件回放的時候,又經過聚合根來完成歷史事件的加載。由此咱們能夠看到,聚合根對象應該具有生成事件、重放事件的能力。
咱們來看看聚合根基類的定義,在Domain文件夾中:
public abstract class AggregateRoot : IEventProvider{ // ...... }
首先這是一個抽象類,實現了IEventProvider接口,該接口的定義以下:
public interface IEventProvider { void LoadsFromHistory(IEnumerable<Event> history); IEnumerable<Event> GetUncommittedChanges(); }
它定義了兩個方法,咱們分別進行說明:
爲了實現這個接口,聚合根中定義了 List<Event> _changes
對象,用來臨時存儲全部未提交的事件,該對象在構造函數中進行初始化。
AggregateRoot中對於該事件的實現以下:
public void LoadsFromHistory(IEnumerable<Event> history) { foreach (var e in history) { ApplyChange(e, false); } Version = history.Last().Version; EventVersion = Version; } public IEnumerable<Event> GetUncommittedChanges() { return _changes; }
LoadsFromHistory方法遍歷歷史事件,並調用ApplyChange方法更新聚合根的狀態,在完成更新後設置版本號爲最後一個事件的版本。GetUncommittedChanges方法比較簡單,返回對象的_changes事件列表。
接下來咱們看看ApplyChange方法,該方法有兩個實現,代碼以下:
protected void ApplyChange(Event @event) { ApplyChange(@event, true); } protected void ApplyChange(Event @event, bool isNew) { dynamic d = this; d.Handle(Converter.ChangeTo(@event, @event.GetType())); if (isNew) { _changes.Add(@event); } }
這兩個方法定義爲protected,只能被子類訪問。咱們能夠理解爲,ApplyChange(Event @event)方法爲簡化操做,對第二個參數進行了默認爲true的操做,而後調用ApplyChange(Event @event, bool isNew)方法。
在ApplyChange(Event @event, bool isNew)方法中,調用了聚合根的Handle方法,用來處理事件。若是isNew參數爲true,則將事件添加到change列表中,若是爲false,則認爲是在進行事件回放,因此不進行事件的添加。
須要注意的是,聚合根的Handle方法,與EventHandler不一樣,當Event產生之後,首先由它對應的聚合根進行處理,所以聚合根要具有處理該事件的能力,如何具有呢?聚合根要實現IHandle接口,該接口的定義以下:
public interface IHandle<TEvent> where TEvent:Event { void Handle(TEvent e); }
這裏能夠看出,IHandle接口是泛型的,它只對一個具體的Event類型生效,在代碼上的體現以下:
public class DiaryItem : AggregateRoot, IHandle<ItemCreatedEvent>, IHandle<ItemRenamedEvent>, IHandle<ItemFromChangedEvent>, IHandle<ItemToChangedEvent>, IHandle<ItemDescriptionChangedEvent>, IOriginator { //...... }
最後,聚合根還定義了清除全部事件的方法,代碼以下:
public void MarkChangesAsCommitted() { _changes.Clear(); }
MarkChangesAsCommitted()方法用來清空事件列表。
終於到咱們今天的另一個核心內容了,Event是ES中的一等公民,全部的狀態變動最終都以Event的形式進行存儲,當咱們要查看聚合根最新狀態的時候,能夠經過事件回放來獲取。咱們來看看Event的定義:
public interface IEvent { Guid Id { get; } }
IEvent接口定義了一個事件必須擁有惟一的Id進行標識。而後Event實現了IEvent接口:
public class Event:IEvent { public int Version; public Guid AggregateId { get; set; } public Guid Id { get; private set; } }
能夠看到,除了Id屬性外,還添加了兩個字段Version和AggregateId。AggregateId表示該事件關聯的聚合根Id,經過該Id能夠獲取到惟一的聚合根對象;Version表示事件發生時該事件的版本,每次產生新的事件,Version都會進行累加。
從而能夠知道,在EventStorage中,聚合根Id對應的全部Event中的Version是順序累加的,按照Version進行排序能夠獲得事件發生的前後順序。
顧名思義,EventStorage是用來存儲Event的地方。在Diary.CQRS中,EventStorage的定義以下:
public interface IEventStorage { IEnumerable<Event> GetEvents(Guid aggregateId); void Save(AggregateRoot aggregate); T GetMemento<T>(Guid aggregateId) where T : BaseMemento; void SaveMemento(BaseMemento memento); }
Diary.CQRS中使用InMemory的方式實現了EventStorage,屬性和構造函數以下:
private List<Event> _events; private List<BaseMemento> _mementoes; private readonly IEventBus _eventBus; public InMemoryEventStorage(IEventBus eventBus) { _events = new List<Event>(); _mementoes = new List<BaseMemento>(); _eventBus = eventBus; }
當Event生成後,它並無立刻存入EventStorage,而是在Repository顯示調用Save方法時,倉儲將存儲權交給了EventStorage,EventStorage是事件倉庫,事件倉儲在存儲時進行了以下操做:
Save方法的代碼以下:
public void Save(AggregateRoot aggregate) { var uncommittedChanges = aggregate.GetUncommittedChanges(); var version = aggregate.Version; foreach (var @event in uncommittedChanges) { version++; if (version > 2) { if (version % 3 == 0) { var originator = (IOriginator)aggregate; var memento = originator.GetMemento(); memento.Version = version; SaveMemento(memento); } } @event.Version = version; _events.Add(@event); } foreach (var @event in uncommittedChanges) { var desEvent = Converter.ChangeTo(@event, @event.GetType()); _eventBus.Publish(desEvent); } }
至此Event的處理流程就算完結了。此時全部的操做都是在主庫完成的,當事件被髮布之後,訂閱了該事件的全部Handler都將會被觸發。
在Diary.CQRS項目中,EventHandler都被用來處理ReportDatabase了。
當你使用ES模式時,都存在一個嚴重問題,那就是數據查詢的問題。當用戶進行數據檢索是,必然會使用各類查詢條件,然而不管那種事件倉庫都很難知足複雜查詢。爲了解決此問題,ReportDatabase就顯得格外重要。
ReportDatabase的做用被定義爲獲取數據、應對數據查詢、生成報表等,它的結構與主庫不一樣,能夠根據不一樣的業務場景進行定義。
ReportDatabase的數據不是經過業務邏輯進行更新的,它經過訂閱Event進行更新。在本示例中ReportDatabase實現的很簡單,接口定義以下:
public interface IReportDatabase { DiaryItemDto GetById(Guid id); void Add(DiaryItemDto item); void Delete(Guid id); List<DiaryItemDto> GetItems(); }
實現上,經過內存中維護一個列表,每次接收到事件之後,都對相應數據進行更新,此處不在貼出。
在上文中已經介紹過Event,而針對Event的處理,實現邏輯上與Command很是類似,惟一的區別是,命令只能夠有一個對應的處理程序,而事件則能夠有多個處理程序。因此在EventHandlerFactory中獲取處理程序的方法返回了EventHandler列表,代碼以下:
public IEnumerable<IEventHandler<T>> GetHandlers<T>() where T : Event { var handlers = GetHandlerType<T>(); var lstHandlers = handlers.Select(handler => (IEventHandler<T>)ObjectFactory.GetInstance(handler)).ToList(); return lstHandlers; }
在EventBus中,若是一個事件沒有處理程序也不會引起錯誤,若是有一個或多個處理程序,則會以此調用他們的Handle方法,代碼以下:
public void Publish<T>(T @event) where T : Event { var handlers = _eventHandlerFactory.GetHandlers<T>(); foreach (var eventHandler in handlers) { eventHandler.Handle(@event); } }
Diary.CQRS是一個典型的CQRS+ES演示項目,經過對該項目的分析,咱們能瞭解到Command、AggregateRoot、Event、EventStorage、ReportDatabase的基礎知識,瞭解他們相互關係,尤爲是如何進行事件存儲、如何進行事件回放的內容。
另外,咱們發如今使用CQRS+ES的過程當中,項目的複雜度增長了不少,咱們不可避免的要使用EventStore、Messaging等架構,從而影響那些不瞭解CQRS的團隊成員的加入,所以在應用到實際項目的時候,要適可而止,慎重選擇,避免過分設計。
因爲這是一個示例,項目代碼中存在不少不夠嚴謹的地方,你們在學習的過程當中應進行甄別。
因爲本人的知識有限,若是內容中存在不許確或錯誤的地方,還請不吝賜教!