CQRS+ES項目解析-Diary.CQRS

在《當咱們在討論CQRS時,咱們在討論些神馬》中,咱們討論了當使用CQRS的過程當中,須要關心的一些問題。其中與CQRS關聯最爲緊密的模式莫過於Event Sourcing了,CQRS與ES的結合,爲咱們構造高性能、可擴展系統提供了基本思路。本文將介紹
Kanasz Robert在《Introduction to CQRS》中的示例項目Diary.CQRS。html

獲取Diary.CQRS項目

該項目爲Kanasz Robert爲了介紹CQRS模式而寫的一個測試項目,原始項目能夠經過訪問《Introduction to CQRS》來獲取,因爲項目版本比較舊,沒有使用nuget管理程序包等,致使下載之後並不能正常運行,我下載了這個項目,升級到Visual Studio 2017,從新引用了StructMap框架(使用nuget),移除了Web層報錯的代碼,並上傳到博客園,能夠從這裏下載:Diary.CQRS.rarweb

Diary.CQRS項目簡介

Diary.CQRS項目的場景爲日記本管理,提供了新增、編輯、刪除、列表等功能,整個解決方案分爲三個項目:數據庫

  • Diary.CQRS:核心項目,完成了EventBus、CommandBus、Domain、Storage等功能,也是咱們分析的重點。
  • Diary.CQRS.Configuration:服務配置,經過ServiceLocator類進行依賴注入、服務查找功能。
  • Diary.CQRS.Web:用戶界面,MVC項目。

這是一個很好的入門項目,功能簡單、結構清晰,概念覆蓋全面。若是CQRS是一個城堡,那麼Diary.CQRS則是打開第一重門的鑰匙,接下來讓咱們一塊兒推開這扇門吧。安全

Diary.CQRS.Web

運行項目,最早看到的是一個Web頁面,以下圖:架構

image

很簡單,只有一個Add按鈕,當咱們點擊之後,會進入添加的頁面:併發

image

咱們填上一些內容,而後點擊Save按鈕,就會返回到列表頁,咱們能夠看到已添加的條目:app

image

而後咱們進行編輯操做,點擊列表中的Edit按鈕,跳轉到編輯頁面:框架

image

雖然頁面中顯示的是Add,但確實是Edit頁面。咱們編輯之後點擊Save按鈕,而後返回列表頁便可看到編輯後的內容。ide

在列表頁中,若是咱們點擊Delete按鈕,則會刪除改條目。函數

到此爲止,咱們已經看到了這個項目的全部頁面,一個簡單的CURD操做。咱們繼續看它的代碼(在HomeController中)。

Index:列表頁面

public ActionResult Index()
{
    ViewBag.Model = ServiceLocator.ReportDatabase.GetItems();
    return View();
}

經過ServiceLocator定位ReportDatabase,並從ReportDatabase中獲取全部條目。

Add:新增頁面

public ActionResult Add()
{
    return View();
}

[HttpPost]
public ActionResult Add(DiaryItemDto item)
{
    ServiceLocator.CommandBus.Send(new CreateItemCommand(Guid.NewGuid(), item.Title, item.Description, -1, item.From, item.To));
    return RedirectToAction("Index");
}

兩個方法:

  • Add()方法,處理Get請求,返回新增視圖;
  • Add(DiaryItemDto item)方法,接收DiaryItemDto參數,處理Post請求,建立併發送CreateItemCommand命令,而後返回到Index頁面

Edit:編輯頁面

public ActionResult Edit(Guid id)
{
    var item = ServiceLocator.ReportDatabase.GetById(id);
    var model = new DiaryItemDto()
    {
        Description = item.Description,
        From = item.From,
        Id = item.Id,
        Title = item.Title,
        To = item.To,
        Version = item.Version
    };
    return View(model);
}

[HttpPost]
public ActionResult Edit(DiaryItemDto item)
{
    ServiceLocator.CommandBus.Send(new ChangeItemCommand(item.Id, item.Title, item.Description, item.From, item.To, item.Version));
    return RedirectToAction("Index");
}

仍然是兩個方法:

  • Edit(Guid id)方法,接收Guid做爲參數,並從ReportDatabase中獲取數據,構建dto對象返回給頁面
  • Edit(DiaryItemDto item)方法,接收DiaryItemDto對象,處理Post請求,接收到請求之後根據dto對象建立ChangeItemCommand命令,而後返回到Index頁面

Delete:刪除操做

public ActionResult Delete(Guid id)
{
    var item = ServiceLocator.ReportDatabase.GetById(id);
    ServiceLocator.CommandBus.Send(new DeleteItemCommand(item.Id, item.Version));
    return RedirectToAction("Index");
}

對於刪除操做來講,它沒有視圖頁面,接收到請求之後,先獲取該記錄,建立併發送DeleteImteCommand命令,而後返回到Index頁面

題外話:對於改變數據狀態的操做,使用Get請求是不可取的,可能存在安全隱患

經過上面的代碼,你會發現全部的操做都是從ServiceLocator發起的,經過它咱們可以定位到CommandBus和ReportDatabase,從而進行相應的操做,咱們在接下來會介紹ServiceLocator類。

Diary.CQRS.Configuration

Diary.CQRS.Configuration 項目中定義了ServiceLocator類,這個類的做用是完成IoC容器的服務註冊、服務定位功能。例如咱們能夠經過ServiceLocator獲取到CommandBus實例、獲取ReportDatabase實例。

服務註冊

ServiceLocator使用StructureMap做爲依賴注入框架,提供了服務註冊、服務導航的功能。ServiceLocator類經過靜態構造函數完成對服務註冊和服務實例化工做:

static ServiceLocator()
{
    if (!_isInitialized)
    {
        lock (_lockThis)
        {
            ContainerBootstrapper.BootstrapStructureMap();
            _commandBus = ObjectFactory.GetInstance<ICommandBus>();
            _reportDatabase = ObjectFactory.GetInstance<IReportDatabase>();
            _isInitialized = true;
        }
    }
}

首先調用ContainerBootstrapper.BootstrapStructureMap()方法,這個方法裏面包含了對將服務添加到容器的代碼;而後使用容器建立CommandBus和ReportDatabase的實例。

  • CommandBus:命令總線,對應Command操做,用來發送命令,程序中須要定義相應的命令處理器,從而完成具體的操做。
  • ReportDatabase:報表數據庫,對應Query操做,用來獲取數據。

ServiceLocator的重要之處在於對外暴露了兩個相當重要的實例,分別處理CQRS中的Command和Query。

爲何沒有Event相關操做呢?到目前爲止咱們尚未涉及到,由於對於UI層來講,用戶的意圖都是經過Command表示的,而數據的狀態變化纔會觸發Event。

Diary.CQRS

在ServiceLocator中定義了獲取CommandBus和ReportDatabase的方法,咱們順着這兩個對象繼續分析。

CommandBus

在基於消息的系統設計中,咱們常會看到總線的身影,Command也是一種消息,因此使用總線是再合適不過的了。CommandBus就是咱們在Diary.CQRS項目中用到的一種消息總線。

在Diary.CQRS中,它被定義在Messaging目錄,在這個目錄下面,還有與Event相關的EventBus,咱們稍後再進行介紹。

CommandBus實現ICommandBus接口,ICommandBus接口的定義以下:

public interface ICommandBus
{
    void Send<T>(T command) where T : Command;
}

它只包含了Send方法,用來將命令發送到對應的處理程序。

CommandBus是ICommand的實現,具體代碼以下:

public class CommandBus:ICommandBus
{
    private readonly ICommandHandlerFactory _commandHandlerFactory;

    public CommandBus(ICommandHandlerFactory commandHandlerFactory)
    {
        _commandHandlerFactory = commandHandlerFactory;
    }

    public void Send<T>(T command) where T : Command
    {
        var handler = _commandHandlerFactory.GetHandler<T>();
        if (handler!=null)
        {
            handler.Execute(command);
        }
        else
        {
            throw new Exception();
        }
    }
}

在CommandBus中,顯式依賴ICommandHandlerFactory類,經過構造函數進行注入。那麼 _commandHandlerFactory 的做用是什麼呢?咱們在Send方法中能夠看到,經過 _commandHandlerFactory 能夠獲取到與Command對應的CommandHandler(命令處理程序),在程序的設計上,每個Command都會有一個對應的CommandHandler,而手工判斷類型、實例化處理程序顯然不符合使用習慣,此處採用工廠模式來獲取命令處理程序。

當獲取到與Command對應的CommandHandler後,調用handler的Execute方法,執行該命令。

截止目前爲止,咱們又接觸了三個概念:CommandHandlerFactory、CommandHandler、Command:

  • CommandHandlerFactory:命令處理程序工廠,經過GetHandler方法獲取到與命令對應的處理程序
  • CommandHandler:命令處理程序,用於執行對應的命令
  • Command:命令,描述用戶的意圖、幷包含與意圖相關的數據

CommandHandlerFactory

使用簡單工廠模式,用來獲取與命令對應的處理程序。它的代碼在Utils文件夾中,它的做用是提供一種獲取Handler的方式,因此它只能做爲工具存在。

接口定義以下:

public interface ICommandHandlerFactory
{
    ICommandHandler<T> GetHandler<T>() where T : Command;
}

只有GetHandler一個方法,它的實現是 StructureMapCommandHandlerFactory,即經過StructureMap做爲依賴注入框架來實現的,代碼也比較簡單,這裏再也不貼出來了。

Command和CommandHandler

命令是表明用戶的意圖、幷包含與意圖相關的數據,好比用戶想要添加一條數據,這即是一個意圖,因而就有了CreateItemCommand,用戶要在界面上填寫添加操做必須的數據,因而就有了命令的屬性。

關於命令的定義以下:

public interface ICommand
{
    Guid Id { get; }
}

public class Command : ICommand
{
    public Guid Id { get; private set; }
    public int Version { get; set; }

    public Command(Guid id, int version)
    {
        Id = id;
        Version = version;
    }
}
  • ICommand接口:包含Id屬性,這個Id表示Command對應聚合的Id。聚合是領域驅動開發(DDD)的概念,表示一組強關聯的領域對象,而對聚合中狀態的變動,只能經過聚合根(AggregateRoot)來完成。
  • Command類:實現了ICommand接口,並增長了Version屬性,用來標記當前操做對應的聚合跟的版本。

    爲何要有版本的概念的?由於當使用ES模式的時候,數據庫中的數據都是事件產生的數據鏡像,保存了某個時間點的數據快照,若是要獲取到最新的數據,則須要經過加載該聚合根對應的全部Event來回放到最新狀態。若是引入版本的概念,每個Event對應一個版本,而景象中的數據也有一個版本,在進行回放的時候,能夠僅加載高版本的Event進行回放,節省了系統資源,並提升了運行效率。

命令處理程序,它的做用是處理與它相對應的命令,處理CQRS的核心,接口定義以下:

public interface ICommandHandler<TCommand> where TCommand : Command
{
    void Execute(TCommand command);
}

它接收command做爲參數,執行該命令的處理邏輯。每個命令都有一個與之對應的處理程序。

咱們再從新梳理一下流程,首先用戶要新增一個數據,點擊保存按鈕後,生成CreateItemCommand命令,隨後這個命令被髮送到CommandBus中,CommandBus經過CommandHandlerFactory找到該Command的處理程序,此時在CommandBus的Send方法中,咱們有一個Command和CommandHandler,而後調用CommandHandler的Execute方法,即完成了該方法的處理。至此,Command的處理流程完結。

CreateItemCommand和CreateItemCommandHandler

咱們來看一下CreateItemCommand的代碼:

public class CreateItemCommand : Command
{
    public string Title { get; internal set; }
    public string Description { get; internal set; }
    public DateTime From { get; internal set; }
    public DateTime To { get; internal set; }

    public CreateItemCommand(Guid aggregateId, string title,
        string description, int version, DateTime from, DateTime to)
        : base(aggregateId, version)
    {
        Title = title;
        Description = description;
        From = from;
        To = to;
    }
}

它繼承自Command基類,繼承後即擁有了Id和Version屬性,而後又定義了幾個其它的屬性。它只包含數據,與該命令對應的處理程序叫作CreateItemCommandHandler,代碼以下:

public class CreateItemCommandHandler : ICommandHandler<CreateItemCommand>
{
    private IRepository<DiaryItem> _repository;

    public CreateItemCommandHandler(IRepository<DiaryItem> repository)
    {
        _repository = repository;
    }

    public void Execute(CreateItemCommand command)
    {
        if (command == null)
        {
            throw new Exception();
        }
        if (_repository == null)
        {
            throw new Exception();
        }
        var aggregate = new DiaryItem(command.Id, command.Title, command.Description, command.From, command.To);
        aggregate.Version = -1;
        _repository.Save(aggregate, aggregate.Version);
    }
}

這纔是咱們要分析的核心,在Handler中,咱們看到了Repository,看到了DiaryItem聚合:

  • IRepository :倉儲類,表明數據的儲存方式,經過倉儲可以進行數據操做
  • DiaryItem:領域對象,聚合根,全部數據狀態的變動只能經過聚合根來修改

在上面的代碼中,因爲是新增,因此聚合的版本爲-1,而後調用倉儲的Save方法進行保存。咱們繼續往下扒,看看倉儲和聚合的實現。

Repository

對於Repository的定義,仍然先看一下接口中的定義,代碼以下:

public interface IRepository<T> where T : AggregateRoot, new()
{
    void Save(AggregateRoot aggregate, int expectedVersion);
    T GetById(Guid id);
}

在倉儲中只有兩個方法:

  • Save(AggregateRoot aggregate, int expectedVersion):保存指望版本的聚合根
  • GetById(Guid id):根據聚合根Id獲取聚合根

關於IRepository的實現,代碼在Repository.cs中,咱們拆開來進行介紹:

private readonly IEventStorage _eventStorage;
private static object _lock = new object();

public Repository(IEventStorage eventStorage)
{
    _eventStorage = eventStorage;
}

首先是它的構造函數,強依賴IEventStorage,經過構造函數注入。EventStorage是事件的儲存倉庫,有個更爲熟知的名字EventStore,咱們稍後進行介紹。

public T GetById(Guid id)
{
    IEnumerable<Event> events;
    var memento = _eventStorage.GetMemento<BaseMemento>(id);
    if (memento != null)
    {
        events = _eventStorage.GetEvents(id).Where(e => e.Version >= memento.Version);
    }
    else
    {
        events = _eventStorage.GetEvents(id);
    }
    var obj = new T();
    if (memento != null)
    {
        ((IOriginator)obj).SetMemento(memento);
    }
    obj.LoadsFromHistory(events);
    return obj;
}

GetById(Guid id)方法經過Id獲取一個聚合對象,獲取一個聚合對象有如下幾個步驟:

  • 首先會從EventStorage中獲取到該聚合的快照(memento的翻譯爲記憶碎片、記念品、備忘錄,用來聚合對象的快照)。
  • 加載Event列表,加載到的事件列表將用來作事件回放。

    若是獲取到快照的話,則加載版本高於該快照版本的事件列表,若是沒有獲取到快照,則加載所有事件列表。此處在上面已經介紹過,經過快照的方式保存聚合對象,在獲取數據時能夠減小重放事件的數量,起到提升加載速度的做用。

  • 實例化聚合根,對應代碼中的var obj = new T();
  • 從快照中設置聚合根的狀態。在獲取到快照之後,若是快照不爲空,則調用聚合根的SetMemento方法設置爲快照中的狀態,SetMemento方法定義在IOriginator接口中,聚合根鬚要實現該接口。
  • 加載歷史事件,完成重放。完成這個步驟之後,聚合根將更新到最新狀態。

經過這幾個步驟之後,咱們獲得了一個最新狀態的聚合根對象。

public void Save(AggregateRoot aggregate, int expectedVersion)
{
    if (aggregate.GetUncommittedChanges().Any())
    {
        lock (_lock)
        {
            var item = new T();
            if (expectedVersion != -1)
            {
                item = GetById(aggregate.Id);
                if (item.Version != expectedVersion)
                {
                    throw new Exception();
                }
            }
            _eventStorage.Save(aggregate);
        }
    }
}

Save方法,用來保存一個聚合根對象。在這個方法中,參數expectedVersion表示指望的版本,這裏約定-1爲新增的聚合根,當聚合根爲新增的時候,會直接調用EventStorage中的Save方法。

關於expectedVersion參數,咱們能夠理解爲對併發的控制,只有當expectedVersion與GetById獲取到的聚合根對象的版本相同時才能進行保存操做。

在介紹Repository類的時候,咱們接觸了兩個新的概念:EventStorage和AggregateRoot,接下來咱們分別進行介紹。

AggregateRoot

AggregateRoot是聚合根,他表示一組強關聯的領域對象,全部對象的狀態變動只能經過聚合根來完成,這樣能夠保證數據的一致性,以及減小併發衝突。應用到EventSourcing模式中,聚合根的好處也是很明顯的,咱們全部對數據狀態的變動都經過聚合根完成,而每次變動,聚合根都會生成相應的事件,在進行事件回放的時候,又經過聚合根來完成歷史事件的加載。由此咱們能夠看到,聚合根對象應該具有生成事件、重放事件的能力。

咱們來看看聚合根基類的定義,在Domain文件夾中:

public abstract class AggregateRoot : IEventProvider{
    // ......
}

首先這是一個抽象類,實現了IEventProvider接口,該接口的定義以下:

public interface IEventProvider
{
    void LoadsFromHistory(IEnumerable<Event> history);
    IEnumerable<Event> GetUncommittedChanges();
}

它定義了兩個方法,咱們分別進行說明:

  • LoadsFromHistory()方法:加載歷史事件,還原聚合根的最新狀態,咱們在Repository中已經用過這個方法。
  • GetUncommittedChanges()方法:獲取未提交的事件。一個命令可能形成聚合根發生屢次更改,每次更改都會產生一個事件,這些事件被暫時的保存在聚合根對象中,經過該方法能夠獲取到未提交的事件列表。

爲了實現這個接口,聚合根中定義了 List<Event> _changes對象,用來臨時存儲全部未提交的事件,該對象在構造函數中進行初始化。

AggregateRoot中對於該事件的實現以下:

public void LoadsFromHistory(IEnumerable<Event> history)
{
    foreach (var e in history)
    {
        ApplyChange(e, false);
    }
    Version = history.Last().Version;
    EventVersion = Version;
}

public IEnumerable<Event> GetUncommittedChanges()
{
    return _changes;
}

LoadsFromHistory方法遍歷歷史事件,並調用ApplyChange方法更新聚合根的狀態,在完成更新後設置版本號爲最後一個事件的版本。GetUncommittedChanges方法比較簡單,返回對象的_changes事件列表。

接下來咱們看看ApplyChange方法,該方法有兩個實現,代碼以下:

protected void ApplyChange(Event @event)
{
    ApplyChange(@event, true);
}

protected void ApplyChange(Event @event, bool isNew)
{
    dynamic d = this;
    d.Handle(Converter.ChangeTo(@event, @event.GetType()));
    if (isNew)
    {
        _changes.Add(@event);
    }
}

這兩個方法定義爲protected,只能被子類訪問。咱們能夠理解爲,ApplyChange(Event @event)方法爲簡化操做,對第二個參數進行了默認爲true的操做,而後調用ApplyChange(Event @event, bool isNew)方法。

在ApplyChange(Event @event, bool isNew)方法中,調用了聚合根的Handle方法,用來處理事件。若是isNew參數爲true,則將事件添加到change列表中,若是爲false,則認爲是在進行事件回放,因此不進行事件的添加。

須要注意的是,聚合根的Handle方法,與EventHandler不一樣,當Event產生之後,首先由它對應的聚合根進行處理,所以聚合根要具有處理該事件的能力,如何具有呢?聚合根要實現IHandle接口,該接口的定義以下:

public interface IHandle<TEvent> where TEvent:Event
{
    void Handle(TEvent e);
}

這裏能夠看出,IHandle接口是泛型的,它只對一個具體的Event類型生效,在代碼上的體現以下:

public class DiaryItem : AggregateRoot,
    IHandle<ItemCreatedEvent>,
    IHandle<ItemRenamedEvent>,
    IHandle<ItemFromChangedEvent>,
    IHandle<ItemToChangedEvent>,
    IHandle<ItemDescriptionChangedEvent>,
    IOriginator
{
    //......
}

最後,聚合根還定義了清除全部事件的方法,代碼以下:

public void MarkChangesAsCommitted()
{
    _changes.Clear();
}

MarkChangesAsCommitted()方法用來清空事件列表。

Event

終於到咱們今天的另一個核心內容了,Event是ES中的一等公民,全部的狀態變動最終都以Event的形式進行存儲,當咱們要查看聚合根最新狀態的時候,能夠經過事件回放來獲取。咱們來看看Event的定義:

public interface IEvent
{
    Guid Id { get; }
}

IEvent接口定義了一個事件必須擁有惟一的Id進行標識。而後Event實現了IEvent接口:

public class Event:IEvent
{
    public int Version;
    public Guid AggregateId { get; set; }
    public Guid Id { get; private set; }
}

能夠看到,除了Id屬性外,還添加了兩個字段Version和AggregateId。AggregateId表示該事件關聯的聚合根Id,經過該Id能夠獲取到惟一的聚合根對象;Version表示事件發生時該事件的版本,每次產生新的事件,Version都會進行累加。

從而能夠知道,在EventStorage中,聚合根Id對應的全部Event中的Version是順序累加的,按照Version進行排序能夠獲得事件發生的前後順序。

EventStorage

顧名思義,EventStorage是用來存儲Event的地方。在Diary.CQRS中,EventStorage的定義以下:

public interface IEventStorage
{
    IEnumerable<Event> GetEvents(Guid aggregateId);
    void Save(AggregateRoot aggregate);
    T GetMemento<T>(Guid aggregateId) where T : BaseMemento;
    void SaveMemento(BaseMemento memento);
}
  • GetEvents(Guid aggregateId):根據聚合根Id獲取該聚合根的全部事件
  • Save(AggregateRoot aggregate):保存方法,入參爲聚合根對象,在實現上則是獲取聚合根中全部未提交的事件,隨後對這些事件進行處理
  • GetMemento():獲取快照
  • SaveMemento():存儲快照

Diary.CQRS中使用InMemory的方式實現了EventStorage,屬性和構造函數以下:

private List<Event> _events;
private List<BaseMemento> _mementoes;
private readonly IEventBus _eventBus;

public InMemoryEventStorage(IEventBus eventBus)
{
    _events = new List<Event>();
    _mementoes = new List<BaseMemento>();
    _eventBus = eventBus;
}
  • _events:事件列表,內存中存儲事件的位置,全部事件最終都會存儲在該列表中
  • _mementoes:快照列表,用於存儲聚合根的某個事件版本的狀態
  • _eventBus:事件總線,用於發佈任務

當Event生成後,它並無立刻存入EventStorage,而是在Repository顯示調用Save方法時,倉儲將存儲權交給了EventStorage,EventStorage是事件倉庫,事件倉儲在存儲時進行了以下操做:

  • 獲取聚合根中全部未提交的Event,同時獲取到聚合根當前的版本號
  • 遍歷未提交Event列表,根據聚合根版本號自動爲Event生成版本號,保持自增加的特性;
  • 生成聚合根快照。示例中每3個版本生成一次,並保持到事件倉儲中。
  • 將任務添加到事件倉庫中。
  • 再次遍歷未提交Event列表,此時將進行任務發佈,調用事件總線的Publish方法進行發佈。

Save方法的代碼以下:

public void Save(AggregateRoot aggregate)
{
    var uncommittedChanges = aggregate.GetUncommittedChanges();
    var version = aggregate.Version;

    foreach (var @event in uncommittedChanges)
    {
        version++;
        if (version > 2)
        {
            if (version % 3 == 0)
            {
                var originator = (IOriginator)aggregate;
                var memento = originator.GetMemento();
                memento.Version = version;
                SaveMemento(memento);
            }
        }
        @event.Version = version;
        _events.Add(@event);
    }
    foreach (var @event in uncommittedChanges)
    {
        var desEvent = Converter.ChangeTo(@event, @event.GetType());
        _eventBus.Publish(desEvent);
    }
}

至此Event的處理流程就算完結了。此時全部的操做都是在主庫完成的,當事件被髮布之後,訂閱了該事件的全部Handler都將會被觸發。

在Diary.CQRS項目中,EventHandler都被用來處理ReportDatabase了。

ReportDatabase

當你使用ES模式時,都存在一個嚴重問題,那就是數據查詢的問題。當用戶進行數據檢索是,必然會使用各類查詢條件,然而不管那種事件倉庫都很難知足複雜查詢。爲了解決此問題,ReportDatabase就顯得格外重要。

ReportDatabase的做用被定義爲獲取數據、應對數據查詢、生成報表等,它的結構與主庫不一樣,能夠根據不一樣的業務場景進行定義。

ReportDatabase的數據不是經過業務邏輯進行更新的,它經過訂閱Event進行更新。在本示例中ReportDatabase實現的很簡單,接口定義以下:

public interface IReportDatabase
{
    DiaryItemDto GetById(Guid id);
    void Add(DiaryItemDto item);
    void Delete(Guid id);
    List<DiaryItemDto> GetItems();
}

實現上,經過內存中維護一個列表,每次接收到事件之後,都對相應數據進行更新,此處不在貼出。

EventHandler、EventHandlerFactory和EventBus

在上文中已經介紹過Event,而針對Event的處理,實現邏輯上與Command很是類似,惟一的區別是,命令只能夠有一個對應的處理程序,而事件則能夠有多個處理程序。因此在EventHandlerFactory中獲取處理程序的方法返回了EventHandler列表,代碼以下:

public IEnumerable<IEventHandler<T>> GetHandlers<T>() where T : Event
{
    var handlers = GetHandlerType<T>();

    var lstHandlers = handlers.Select(handler => (IEventHandler<T>)ObjectFactory.GetInstance(handler)).ToList();
    return lstHandlers;
}

在EventBus中,若是一個事件沒有處理程序也不會引起錯誤,若是有一個或多個處理程序,則會以此調用他們的Handle方法,代碼以下:

public void Publish<T>(T @event) where T : Event
{
    var handlers = _eventHandlerFactory.GetHandlers<T>();
    foreach (var eventHandler in handlers)
    {
        eventHandler.Handle(@event);
    }
}

總結

Diary.CQRS是一個典型的CQRS+ES演示項目,經過對該項目的分析,咱們能瞭解到Command、AggregateRoot、Event、EventStorage、ReportDatabase的基礎知識,瞭解他們相互關係,尤爲是如何進行事件存儲、如何進行事件回放的內容。

另外,咱們發如今使用CQRS+ES的過程當中,項目的複雜度增長了不少,咱們不可避免的要使用EventStore、Messaging等架構,從而影響那些不瞭解CQRS的團隊成員的加入,所以在應用到實際項目的時候,要適可而止,慎重選擇,避免過分設計。

因爲這是一個示例,項目代碼中存在不少不夠嚴謹的地方,你們在學習的過程當中應進行甄別。

因爲本人的知識有限,若是內容中存在不許確或錯誤的地方,還請不吝賜教!

相關文章
相關標籤/搜索