【轉】淺談命令查詢職責分離(CQRS)模式

原文連接:https://www.cnblogs.com/yangecnu/p/Introduction-CQRS.htmlhtml

在經常使用的三層架構中,一般都是經過數據訪問層來修改或者查詢數據,通常修改和查詢使用的是相同的實體。在一些業務邏輯簡單的系統中可能沒有什麼問題,可是隨着系統邏輯變得複雜,用戶增多,這種設計就會出現一些性能問題。雖然在DB上能夠作一些讀寫分離的設計,但在業務上若是在讀寫方面混合在一塊兒的話,仍然會出現一些問題。數據庫

本文介紹了命令查詢職責分離模式(Command Query Responsibility Segregation,CQRS),該模式從業務上分離修改 (Command,增,刪,改,會對系統狀態進行修改)和查詢(Query,查,不會對系統狀態進行修改)的行爲。從而使得邏輯更加清晰,便於對不一樣部分進行鍼對性的優化。文章首先簡要介紹了傳統的CRUD方式存在的問題,接着介紹了CQRS模式,最後以一個簡單的在線日記系統演示瞭如何實現CQRS模式。要談到讀寫操做,首先咱們來看傳統的CRUD的問題。設計模式

一 CRUD方式的問題

在之前的管理系統中,命令(Command,一般用來更新數據,操做DB)和查詢(Query)一般使用的是在數據訪問層中Repository中的實體對象(這些對象是對DB中表的映射),這些實體有多是SQLServer中的一行數據或者多個表。安全

一般對DB執行的增,刪,改,查(CRUD)都是針對的系統的實體對象。如經過數據訪問層獲取數據,而後經過數據傳輸對象DTO傳給表現層。或者,用戶須要更新數據,經過DTO對象將數據傳給Model,而後經過數據訪問層寫回數據庫,系統中的全部交互都是和數據查詢和存儲有關,能夠認爲是數據驅動(Data-Driven)的,以下圖:數據結構

 Traditional CRUD Architecture

對於一些比較簡單的系統,使用這種CRUD的設計方式可以知足要求。特別是經過一些代碼生成工具及ORM等可以很是方便快速的實現功能。多線程

可是傳統的CRUD方法有一些問題架構

  • 使用同一個對象實體來進行數據庫讀寫可能會太粗糙,大多數狀況下,好比編輯的時候可能只須要更新個別字段,可是卻須要將整個對象都穿進去,有些字段實際上是不須要更新的。在查詢的時候在表現層可能只須要個別字段,可是須要查詢和返回整個實體對象。
  • 使用同一實體對象對同一數據進行讀寫操做的時候,可能會遇到資源競爭的狀況,常常要處理的鎖的問題,在寫入數據的時候,須要加鎖。讀取數據的時候須要判斷是否容許髒讀。這樣使得系統的邏輯性和複雜性增長,而且會對系統吞吐量的增加會產生影響。
  • 同步的,直接與數據庫進行交互在大數據量同時訪問的狀況下可能會影響性能和響應性,而且可能會產生性能瓶頸。
  • 因爲同一實體對象都會在讀寫操做中用到,因此對於安全和權限的管理會變得比較複雜。

這裏面很重要的一個問題是,系統中的讀寫頻率比,是偏向讀,仍是偏向寫,就如同通常的數據結構在查找和修改上時間複雜度不同,在設計系統的結構時也須要考慮這樣的問題。解決方法就是咱們常常用到的對數據庫進行讀寫分離。 讓主數據庫處理事務性的增,刪,改操做(Insert,Update,Delete)操做,讓從數據庫處理查詢操做(Select操做),數據庫複製被用來將事務性操做致使的變動同步到集羣中的從數據庫。這只是從DB角度處理了讀寫分離,可是從業務或者系統上面讀和寫仍然是存放在一塊兒的。他們都是用的同一個實體對象。異步

要從業務上將讀和寫分離,就是接下來要介紹的命令查詢職責分離模式。ide

二 什麼是CQRS

CQRS最先來自於Betrand Meyer(Eiffel語言之父,開-閉原則OCP提出者)在 Object-Oriented Software Construction 這本書中提到的一種 命令查詢分離(Command Query Separation,CQS) 的概念。其基本思想在於,任何一個對象的方法能夠分爲兩大類:函數

  • 命令(Command):不返回任何結果(void),但會改變對象的狀態。
  • 查詢(Query):返回結果,可是不會改變對象的狀態,對系統沒有反作用。

根據CQS的思想,任何一個方法均可以拆分爲命令和查詢兩部分,好比:

private int i = 0;
private int Increase(int value)
{
    i += value;
    return i;
}

這個方法,咱們執行了一個命令即對變量i進行相加,同時又執行了一個Query,即查詢返回了i的值,若是按照CQS的思想,該方法能夠拆成Command和Query兩個方法,以下:

private void IncreaseCommand(int value)
{
    i += value;
}
private int QueryValue()
{
    return i;
}

操做和查詢分離使得咱們可以更好的把握對象的細節,可以更好的理解哪些操做會改變系統的狀態。固然CQS也有一些缺點,好比代碼須要處理多線程的狀況。

CQRS是對CQS模式的進一步改進成的一種簡單模式。 它由Greg Young在CQRS, Task Based UIs, Event Sourcing agh! 這篇文章中提出。「CQRS只是簡單的將以前只須要建立一個對象拆分紅了兩個對象,這種分離是基於方法是執行命令仍是執行查詢這一原則來定的(這個和CQS的定義一致)」。

CQRS使用分離的接口將數據查詢操做(Queries)和數據修改操做(Commands)分離開來,這也意味着在查詢和更新過程當中使用的數據模型也是不同的。這樣讀和寫邏輯就隔離開來了。

A basic CQRS architecture

使用CQRS分離了讀寫職責以後,能夠對數據進行讀寫分離操做來改進性能,可擴展性和安全。以下圖:

A CQRS architecture with separate read and write stores

主數據庫處理CUD,從庫處理R,從庫的的結構能夠和主庫的結構徹底同樣,也能夠不同,從庫主要用來進行只讀的查詢操做。在數量上從庫的個數也能夠根據查詢的規模進行擴展,在業務邏輯上,也能夠根據專題從主庫中劃分出不一樣的從庫。從庫也能夠實現成ReportingDatabase,根據查詢的業務需求,從主庫中抽取一些必要的數據生成一系列查詢報表來存儲。

reportingDatabase

使用ReportingDatabase的一些優勢一般可使得查詢變得更加簡單高效:

  • ReportingDatabase的結構和數據表會針對經常使用的查詢請求進行設計。
  • ReportingDatabase數據庫一般會去正規化,存儲一些冗餘而減小必要的Join等聯合查詢操做,使得查詢簡化和高效,一些在主數據庫中用不到的數據信息,在ReportingDatabase能夠不用存儲。
  • 能夠對ReportingDatabase重構優化,而不用去改變操做數據庫。
  • 對ReportingDatabase數據庫的查詢不會給操做數據庫帶來任何壓力。
  • 能夠針對不一樣的查詢請求創建不一樣的ReportingDatabase庫。

固然這也有一些缺點,好比從庫數據的更新。若是使用SQLServer,自己也提供了一些如故障轉移和複製機制來方便部署。

三 何時能夠考慮CQRS

CQRS模式有一些優勢:

  1. 分工明確,能夠負責不一樣的部分
  2. 將業務上的命令和查詢的職責分離可以提升系統的性能、可擴展性和安全性。而且在系統的演化中可以保持高度的靈活性,可以防止出現CRUD模式中,對查詢或者修改中的某一方進行改動,致使另外一方出現問題的狀況。
  3. 邏輯清晰,可以看到系統中的那些行爲或者操做致使了系統的狀態變化。
  4. 能夠從數據驅動(Data-Driven) 轉到任務驅動(Task-Driven)以及事件驅動(Event-Driven).

在下場景中,能夠考慮使用CQRS模式:

  1. 當在業務邏輯層有不少操做須要相同的實體或者對象進行操做的時候。CQRS使得咱們能夠對讀和寫定義不一樣的實體和方法,從而能夠減小或者避免對某一方面的更改形成衝突
  2. 對於一些基於任務的用戶交互系統,一般這類系統會引導用戶經過一系列複雜的步驟和操做,一般會須要一些複雜的領域模型,而且整個團隊已經熟悉領域驅動設計技術。寫模型有不少和業務邏輯相關的命令操做的堆,輸入驗證,業務邏輯驗證來保證數據的一致性。讀模型沒有業務邏輯以及驗證堆,僅僅是返回DTO對象爲視圖模型提供數據。讀模型最終和寫模型相一致。
  3. 適用於一些須要對查詢性能和寫入性能分開進行優化的系統,尤爲是讀/寫比很是高的系統,橫向擴展是必須的。好比,在不少系統中讀操做的請求時遠大於寫操做。爲適應這種場景,能夠考慮將寫模型抽離出來單獨擴展,而將寫模型運行在一個或者少數幾個實例上。少許的寫模型實例可以減小合併衝突發生的狀況
  4. 適用於一些團隊中,一些有經驗的開發者能夠關注複雜的領域模型,這些用到寫操做,而另外一些經驗較少的開發者能夠關注用戶界面上的讀模型。
  5. 對於系統在未來會隨着時間不段演化,有可能會包含不一樣版本的模型,或者業務規則常常變化的系統
  6. 須要和其餘系統整合,特別是須要和事件溯源Event Sourcing進行整合的系統,這樣子系統的臨時異常不會影響整個系統的其餘部分。

可是在如下場景中,可能不適宜使用CQRS:

  1. 領域模型或者業務邏輯比較簡單,這種狀況下使用CQRS會把系統搞複雜。
  2. 對於簡單的,CRUD模式的用戶界面以及與之相關的數據訪問操做已經足夠的話,不必使用CQRS,這些都是一個簡單的對數據進行增刪改查。
  3. 不適合在整個系統中處處使用該模式。在整個數據管理場景中的特定模塊中CQRS可能比較有用。可是在有些地方使用CQRS會增長系統沒必要要的複雜性。

四 CQRS與Event Sourcing的關係

在CQRS中,查詢方面,直接經過方法查詢數據庫,而後經過DTO將數據返回。在操做(Command)方面,是經過發送Command實現,由CommandBus處理特定的Command,而後由Command將特定的Event發佈到EventBus上,而後EventBus使用特定的Handler來處理事件,執行一些諸如,修改,刪除,更新等操做。這裏,全部與Command相關的操做都經過Event實現。這樣咱們能夠經過記錄Event來記錄系統的運行歷史記錄,而且可以方便的回滾到某一歷史狀態。Event Sourcing就是用來進行存儲和管理事件的。這裏不展開介紹。

五 CQRS的簡單實現

CQRS模式在思想上比較簡單,可是實現上仍是有些複雜。它涉及到DDD,以及Event Sourcing,這裏使用codeproject上的 Introduction to CQRS 這篇文章的例子來講明CQRS模式。這個例子是一個簡單的在線記日誌(Diary)系統,實現了日誌的增刪改查功能。總體結構以下:

CQRS

上圖很清晰的說明了CQRS在讀寫方面的分離,在讀方面,經過QueryFacade到數據庫裏去讀取數據,這個庫有多是ReportingDB。在寫方面,比較複雜,操做經過Command發送到CommandBus上,而後特定的CommandHandler處理請求,產生對應的Event,將Eevnt持久化後,經過EventBus特定的EevntHandler對數據庫進行修改等操做。

例子代碼能夠到codeproject上下載,總體結構以下:

DiaryCQRS project

由三個項目構成,Diary.CQRS包含了全部的Domain和消息對象。Configuration經過使用一個名爲StructMap的IOC來初始化一些變量方便Web調用,Web是一個簡單的MVC3項目,在Controller中有與CQRS交互的代碼。

下面分別看Query和Command方面的實現:

Query方向的實現

查詢方面很簡單,日誌列表和明細獲取就是簡單的查詢。下面先看列表查詢部分的代碼。

public ActionResult Index()
{
    ViewBag.Model = ServiceLocator.ReportDatabase.GetItems();
    return View();
}

public ActionResult Edit(Guid id)
{
    var item = ServiceLocator.ReportDatabase.GetById(id);
    var model = new DiaryItemDto()
    {
        Description = item.Description,
        From = item.From,
        Id = item.Id,
        Title = item.Title,
        To = item.To,
        Version = item.Version
    };
    return View(model);
}

ReportDatabase的GetItems和GetById(id)方法就是簡單的查詢,從命名能夠看出他是ReportDatabase。

public class ReportDatabase : IReportDatabase
{
    static List<DiaryItemDto> items = new List<DiaryItemDto>();

    public DiaryItemDto GetById(Guid id)
    {
        return items.Where(a => a.Id == id).FirstOrDefault();
    }

    public void Add(DiaryItemDto item)
    {
        items.Add(item);
    }

    public void Delete(Guid id)
    {
        items.RemoveAll(i => i.Id == id);
    }

    public List<DiaryItemDto> GetItems()
    {
        return items;
    } 
}

ReportDataBase只是在內部維護了一個List的DiaryItemDto列表。在使用的時候,是經過IRepositoryDatabase對其進行操做的,這樣便於mock代碼。

Query方面的代碼很簡單。在實際的應用中,這一塊就是直接對DB進行查詢,而後經過DTO對象返回,這個DB多是應對特定場景的報表數據庫,這樣能夠提高查詢性能。

下面來看Command方向的實現:

Command方向的實現

Command的實現比較複雜,下面以簡單的建立一個新的日誌來講明。

在MVC的Control中,能夠看到Add的Controller中只調用了一句話:

[HttpPost]
public ActionResult Add(DiaryItemDto item)
{
    ServiceLocator.CommandBus.Send(new CreateItemCommand(Guid.NewGuid(), item.Title, item.Description, -1, item.From, item.To));

    return RedirectToAction("Index");
}

首先聲明瞭一個CreateItemCommand,這個Command只是保存了一些必要的信息。

public class CreateItemCommand:Command
{
    public string Title { get; internal set; }
    public string Description { get;internal set; }
    public DateTime From { get; internal set; }
    public DateTime To { get; internal set; }

    public CreateItemCommand(Guid aggregateId, string title, 
        string description,int version,DateTime from, DateTime to)
        : base(aggregateId,version)
    {
        Title = title;
        Description = description;
        From = from;
        To = to;
    }
}

而後將Command發送到了CommandBus上,其實就是讓CommandBus來選擇合適的CommandHandler來處理。

public class CommandBus:ICommandBus
{
    private readonly ICommandHandlerFactory _commandHandlerFactory;

    public CommandBus(ICommandHandlerFactory commandHandlerFactory)
    {
        _commandHandlerFactory = commandHandlerFactory;
    }

    public void Send<T>(T command) where T : Command
    {
        var handler = _commandHandlerFactory.GetHandler<T>();
        if (handler != null)
        {
            handler.Execute(command);
        }
        else
        {
            throw new UnregisteredDomainCommandException("no handler registered");
        }
    }        
}

這個裏面須要值得注意的是CommandHandlerFactory這個類型的GetHandler方法,他接受一個類型爲T的泛型,這裏就是咱們以前傳入的CreateItemCommand。來看他的GetHandler方法。

public class StructureMapCommandHandlerFactory : ICommandHandlerFactory
{
    public ICommandHandler<T> GetHandler<T>() where T : Command
    {
        var handlers = GetHandlerTypes<T>().ToList();

        var cmdHandler = handlers.Select(handler => 
            (ICommandHandler<T>)ObjectFactory.GetInstance(handler)).FirstOrDefault();
            
        return cmdHandler;
    }
        
    private IEnumerable<Type> GetHandlerTypes<T>() where T : Command
    {
        var handlers = typeof(ICommandHandler<>).Assembly.GetExportedTypes()
            .Where(x => x.GetInterfaces()
                .Any(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(ICommandHandler<>) ))
                .Where(h=>h.GetInterfaces()
                    .Any(ii=>ii.GetGenericArguments()
                        .Any(aa=>aa==typeof(T)))).ToList();

           
        return handlers;
    }

}

這裏能夠看到,他首先查找當前的程序集中(ICommandHandler)所在的程序集中的全部的實現了ICommandHandler的接口的類型,而後在全部的類型找查找實現了該泛型接口而且泛型的類型參數類型爲T類型的全部類型。以上面的代碼爲例,就是要找出實現了ICommandHandler<CreateItemCommand>接口的類型。能夠看到就是CreateItemCommandHandler類型。

public class CreateItemCommandHandler : ICommandHandler<CreateItemCommand>
{
    private IRepository<DiaryItem> _repository;

    public CreateItemCommandHandler(IRepository<DiaryItem> repository)
    {
        _repository = repository;
    }

    public void Execute(CreateItemCommand command)
    {
        if (command == null)
        {
            throw new ArgumentNullException("command");
        }
        if (_repository == null)
        {
            throw new InvalidOperationException("Repository is not initialized.");
        }
        var aggregate = new DiaryItem(command.Id, command.Title, command.Description, command.From, command.To);
        aggregate.Version = -1;
        _repository.Save(aggregate, aggregate.Version);
    }
}

找到以後而後使用IOC實例化了該對象返回。

如今CommandBus中,找到了處理特定Command的Handler。而後執行該類型的Execute方法。

能夠看到在該類型中實例化了一個名爲aggregate的DiaryItem對象。這個和咱們以前查詢所用到的DiaryItemDto有所不一樣,這個一個領域對象,裏面包含了一系列事件。

public class DiaryItem : AggregateRoot, 
    IHandle<ItemCreatedEvent>,
    IHandle<ItemRenamedEvent>,
    IHandle<ItemFromChangedEvent>, 
    IHandle<ItemToChangedEvent>,
    IHandle<ItemDescriptionChangedEvent>,
    IOriginator
{
    public string Title { get; set; }

    public DateTime From { get; set; }
    public DateTime To { get; set; }
    public string Description { get; set; }

    public DiaryItem()
    {
            
    }

    public DiaryItem(Guid id,string title, string description,  DateTime from, DateTime to)
    {
        ApplyChange(new ItemCreatedEvent(id, title,description, from, to));
    }

    public void ChangeTitle(string title)
    {
        ApplyChange(new ItemRenamedEvent(Id, title));
    }

    public void Handle(ItemCreatedEvent e)
    {
        Title = e.Title;
        From = e.From;
        To = e.To;
        Id = e.AggregateId;
        Description = e.Description;
        Version = e.Version;
    }

    public void Handle(ItemRenamedEvent e)
    {
        Title = e.Title;
    }
    ...
}

ItemCreatedEvent 事件的定義以下,其實就是用來存儲傳輸過程當中須要用到的數據。

public class ItemCreatedEvent:Event
{
    public string Title { get; internal set; }
    public DateTime From { get; internal set; }
    public DateTime To { get; internal set; }
    public string Description { get;internal set; }

    public ItemCreatedEvent(Guid aggregateId, string title ,
        string description, DateTime from, DateTime to)
    {
        AggregateId = aggregateId;
        Title = title;
        From = from;
        To = to;
        Description = description;
    }
}

能夠看到在Domain對象中,除了定義基本的字段外,還定義了一些相應的事件,好比在構造函數中,其實是發起了一個名爲ItemCreateEvent的事件,同時還定義了處理時間的邏輯,這些邏輯都放在名爲Handle的接口方法發,例如ItemCerateEvent的處理方法爲Handle(ItemCreateEvent)方法。

ApplyChange方法在AggregateRoot對象中,他是彙集根,這是DDD中的概念。經過這個根能夠串起全部對象。 該類實現了IEventProvider接口,他保存了全部在_changes中的全部沒有提交的變動,其中的ApplyChange的用來爲特定的Event查找Eventhandler的方法:

public abstract class AggregateRoot : IEventProvider
{
    private readonly List<Event> _changes;

    public Guid Id { get; internal set; }
    public int Version { get; internal set; }
    public int EventVersion { get; protected set; }

    protected AggregateRoot()
    {
        _changes = new List<Event>();
    }

    public IEnumerable<Event> GetUncommittedChanges()
    {
        return _changes;
    }

    public void MarkChangesAsCommitted()
    {
        _changes.Clear();
    }

    public void LoadsFromHistory(IEnumerable<Event> history)
    {
        foreach (var e in history) ApplyChange(e, false);
        Version = history.Last().Version;
        EventVersion = Version;
    }

    protected void ApplyChange(Event @event)
    {
        ApplyChange(@event, true);
    }

    private void ApplyChange(Event @event, bool isNew)
    {
        dynamic d = this;

        d.Handle(Converter.ChangeTo(@event, @event.GetType()));
        if (isNew)
        {
            _changes.Add(@event);
        }
    }
}

在ApplyChange的實現中,this其實就是對應的實現了AggregateRoot的DiaryItem的Domain對象,調用的Handle方法就是咱們以前在DiaryItem中定義的行爲。而後將該event保存在內部的未提交的事件列表中。相關的信息及事件都保存在了定義的aggregate對象中並返回。

而後Command繼續執行,而後調用了_repository.Save(aggregate, aggregate.Version);這個方法。先看這個Repository對象。

public class Repository<T> : IRepository<T> where T : AggregateRoot, new()
{
    private readonly IEventStorage _storage;
    private static object _lockStorage = new object();

    public Repository(IEventStorage storage)
    {
        _storage = storage;
    } 

    public void Save(AggregateRoot aggregate, int expectedVersion)
    {
        if (aggregate.GetUncommittedChanges().Any())
        {
            lock (_lockStorage)
            {
                var item = new T();

                if (expectedVersion != -1)
                {
                    item = GetById(aggregate.Id);
                    if (item.Version != expectedVersion)
                    {
                        throw new ConcurrencyException(string.Format("Aggregate {0} has been previously modified",
                                                                        item.Id));
                    }
                }

                _storage.Save(aggregate);
            }
        }
    }

    public T GetById(Guid id)
    {
        IEnumerable<Event> events;
        var memento = _storage.GetMemento<BaseMemento>(id);
        if (memento != null)
        {
            events = _storage.GetEvents(id).Where(e=>e.Version>=memento.Version);
        }
        else
        {
            events = _storage.GetEvents(id);
        }
        var obj = new T();
        if(memento!=null)
            ((IOriginator)obj).SetMemento(memento);
            
        obj.LoadsFromHistory(events);
        return obj;
    }
}

這個方法主要是用來對事件進行持久化的。 全部的聚合的變更都會存在該Repository中,首先,檢查當前的聚合是否和以前存儲在storage中的聚合一致,若是不一致,則表示對象在其餘地方被更改過,拋出ConcurrencyException,不然將該變更保存在Event Storage中。

IEventStorage用來存儲全部的事件,其實現類型爲InMemoryEventStorage。

public class InMemoryEventStorage:IEventStorage
{
    private List<Event> _events;
    private List<BaseMemento> _mementos;

    private readonly IEventBus _eventBus;

    public InMemoryEventStorage(IEventBus eventBus)
    {
        _events = new List<Event>();
        _mementos = new List<BaseMemento>();
        _eventBus = eventBus;
    }

    public IEnumerable<Event> GetEvents(Guid aggregateId)
    {
        var events = _events.Where(p => p.AggregateId == aggregateId).Select(p => p);
        if (events.Count() == 0)
        {
            throw new AggregateNotFoundException(string.Format("Aggregate with Id: {0} was not found", aggregateId));
        }
        return events;
    }

    public void Save(AggregateRoot aggregate)
    {
        var uncommittedChanges = aggregate.GetUncommittedChanges();
        var version = aggregate.Version;
            
        foreach (var @event in uncommittedChanges)
        {
            version++;
            if (version > 2)
            {
                if (version % 3 == 0)
                {
                    var originator = (IOriginator)aggregate;
                    var memento = originator.GetMemento();
                    memento.Version = version;
                    SaveMemento(memento);
                }
            }
            @event.Version=version;
            _events.Add(@event);
        }
        foreach (var @event in uncommittedChanges)
        {
            var desEvent = Converter.ChangeTo(@event, @event.GetType());
            _eventBus.Publish(desEvent);
        }
    }

    public T GetMemento<T>(Guid aggregateId) where T : BaseMemento
    {
        var memento = _mementos.Where(m => m.Id == aggregateId).Select(m=>m).LastOrDefault();
        if (memento != null)
            return (T) memento;
        return null;
    }

    public void SaveMemento(BaseMemento memento)
    {
        _mementos.Add(memento);
    }
}

在GetEvent方法中,會找到全部的聚合根Id相關的事件。在Save方法中,將全部的事件保存在內存中,而後每隔三個事件創建一個快照。能夠看到這裏面使用了備忘錄模式。

而後在foreach循環中,對於全部的沒有提交的變動,EventBus將該事件發佈出去。

如今,全部的發生變動的事件已經記錄下來了。事件已經被髮布到EventBus上,而後對應的EventHandler再處理對應的事件,而後與DB交互。如今來看EventBus的Publish方法。

public class EventBus:IEventBus
{
    private IEventHandlerFactory _eventHandlerFactory;

    public EventBus(IEventHandlerFactory eventHandlerFactory)
    {
        _eventHandlerFactory = eventHandlerFactory;
    }
        
    public void Publish<T>(T @event) where T : Event
    {
        var handlers = _eventHandlerFactory.GetHandlers<T>();
        foreach (var eventHandler in handlers)
        {
            eventHandler.Handle(@event);
        }
    }
}

能夠看到EventBus的Publish和CommandBus中的Send方法很類似,都是首先經過EventHandlerFactory查找對應Event的Handler,而後調用其Handler方法。好比

public class StructureMapEventHandlerFactory : IEventHandlerFactory
{
    public IEnumerable<IEventHandler<T>> GetHandlers<T>() where T : Event
    {
        var handlers = GetHandlerType<T>();
            
        var lstHandlers = handlers.Select(handler => (IEventHandler<T>) ObjectFactory.GetInstance(handler)).ToList();
        return lstHandlers;
    }

    private static IEnumerable<Type> GetHandlerType<T>() where T : Event
    {
           
        var handlers = typeof(IEventHandler<>).Assembly.GetExportedTypes()
            .Where(x => x.GetInterfaces()
                .Any(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(IEventHandler<>)))
                .Where(h => h.GetInterfaces()
                    .Any(ii => ii.GetGenericArguments()
                        .Any(aa => aa == typeof(T))))
                 .ToList();
        return handlers;
    }
}

而後返回並實例化了ItemCreatedEventHandler 對象,該對象的實現以下:

public class ItemCreatedEventHandler : IEventHandler<ItemCreatedEvent>
{
    private readonly IReportDatabase _reportDatabase;
    public ItemCreatedEventHandler(IReportDatabase reportDatabase)
    {
        _reportDatabase = reportDatabase;
    }
    public void Handle(ItemCreatedEvent handle)
    {
        DiaryItemDto item = new DiaryItemDto()
            {
                Id = handle.AggregateId,
                Description =  handle.Description,
                From = handle.From,
                Title = handle.Title,
                To=handle.To,
                Version =  handle.Version
            };

        _reportDatabase.Add(item);
    }
}

能夠看到在Handler方法中,從事件中獲取參數,而後新建DTO對象,而後將該對象更新到DB中。

到此,整個Command執行完成。

六 結語

CQRS是一種思想很簡單清晰的設計模式,他經過在業務上分離操做和查詢來使得系統具備更好的可擴展性及性能,使得可以對系統的不一樣部分進行擴展和優化。在CQRS中,全部的涉及到對DB的操做都是經過發送Command,而後特定的Command觸發對應事件來完成操做,這個過程是異步的,而且全部涉及到對系統的變動行爲都包含在具體的事件中,結合Eventing Source模式,能夠記錄下全部的事件,而不是以往的某一點的數據信息,這些信息能夠做爲系統的操做日誌,能夠來對系統進行回退或者重放。

CQRS 模式在實現上有些複雜,不少地方好比AggregationRoot、Domain Object都涉及到DDD中的相關概念,本人對DDD不太懂。這裏僅爲了演示CQRS模式,因此使用的例子是codeproject上的,末尾列出了一些參考文章,若是您想了解更多,能夠有針對性的閱讀。

最後,但願CQRS模式能讓您在設計高性能,可擴展性的程序時可以多一種選擇和考慮。

七 參考文獻

  1. Introduction to CQRS http://www.codeproject.com/Articles/555855/Introduction-to-CQRS
  2. CQRS http://martinfowler.com/bliki/CQRS.html
  3. CQRS Journey http://msdn.microsoft.com/en-us/library/jj554200.aspx
  4. Command and Query Responsibility Segregation (CQRS) Pattern http://msdn.microsoft.com/en-us/library/dn568103.aspx
  5. EntityFramework之領域驅動設計實踐:CQRS體系結構模式 http://www.cnblogs.com/daxnet/archive/2010/08/02/1790299.html
  6. Event Sourcing Pattern http://msdn.microsoft.com/en-us/library/dn589792.aspx
相關文章
相關標籤/搜索