淺談命令查詢職責分離(CQRS)模式

 

淺談命令查詢職責分離(CQRS)模式

 

在經常使用的三層架構中,一般都是經過數據訪問層來修改或者查詢數據,通常修改和查詢使用的是相同的實體。在一些業務邏輯簡單的系統中可能沒有什麼問題,可是隨着系統邏輯變得複雜,用戶增多,這種設計就會出現一些性能問題。雖然在DB上能夠作一些讀寫分離的設計,但在業務上若是在讀寫方面混合在一塊兒的話,仍然會出現一些問題。html

本文介紹了命令查詢職責分離模式(Command Query Responsibility Segregation,CQRS),該模式從業務上分離修改 (Command,增,刪,改,會對系統狀態進行修改)和查詢(Query,查,不會對系統狀態進行修改)的行爲。從而使得邏輯更加清晰,便於對不一樣部分進行鍼對性的優化。文章首先簡要介紹了傳統的CRUD方式存在的問題,接着介紹了CQRS模式,最後以一個簡單的在線日記系統演示瞭如何實現CQRS模式。要談到讀寫操做,首先咱們來看傳統的CRUD的問題。前端

一 CRUD方式的問題

在之前的管理系統中,命令(Command,一般用來更新數據,操做DB)和查詢(Query)一般使用的是在數據訪問層中Repository中的實體對象(這些對象是對DB中表的映射),這些實體有多是SQLServer中的一行數據或者多個表。git

一般對DB執行的增,刪,改,查(CRUD)都是針對的系統的實體對象。如經過數據訪問層獲取數據,而後經過數據傳輸對象DTO傳給表現層。或者,用戶須要更新數據,經過DTO對象將數據傳給Model,而後經過數據訪問層寫回數據庫,系統中的全部交互都是和數據查詢和存儲有關,能夠認爲是數據驅動(Data-Driven)的,以下圖:程序員

 Traditional CRUD Architecture

對於一些比較簡單的系統,使用這種CRUD的設計方式可以知足要求。特別是經過一些代碼生成工具及ORM等可以很是方便快速的實現功能。web

可是傳統的CRUD方法有一些問題數據庫

  • 使用同一個對象實體來進行數據庫讀寫可能會太粗糙,大多數狀況下,好比編輯的時候可能只須要更新個別字段,可是卻須要將整個對象都穿進去,有些字段實際上是不須要更新的。在查詢的時候在表現層可能只須要個別字段,可是須要查詢和返回整個實體對象。
  • 使用同一實體對象對同一數據進行讀寫操做的時候,可能會遇到資源競爭的狀況,常常要處理的鎖的問題,在寫入數據的時候,須要加鎖。讀取數據的時候須要判斷是否容許髒讀。這樣使得系統的邏輯性和複雜性增長,而且會對系統吞吐量的增加會產生影響。
  • 同步的,直接與數據庫進行交互在大數據量同時訪問的狀況下可能會影響性能和響應性,而且可能會產生性能瓶頸。
  • 因爲同一實體對象都會在讀寫操做中用到,因此對於安全和權限的管理會變得比較複雜。

這裏面很重要的一個問題是,系統中的讀寫頻率比,是偏向讀,仍是偏向寫,就如同通常的數據結構在查找和修改上時間複雜度不同,在設計系統的結構時也須要考慮這樣的問題。解決方法就是咱們常常用到的對數據庫進行讀寫分離。 讓主數據庫處理事務性的增,刪,改操做(Insert,Update,Delete)操做,讓從數據庫處理查詢操做(Select操做),數據庫複製被用來將事務性操做致使的變動同步到集羣中的從數據庫。這只是從DB角度處理了讀寫分離,可是從業務或者系統上面讀和寫仍然是存放在一塊兒的。他們都是用的同一個實體對象。編程

要從業務上將讀和寫分離,就是接下來要介紹的命令查詢職責分離模式。設計模式

二 什麼是CQRS

CQRS最先來自於Betrand Meyer(Eiffel語言之父,開-閉原則OCP提出者)在 Object-Oriented Software Construction 這本書中提到的一種 命令查詢分離 (Command Query Separation,CQS) 的概念。其基本思想在於,任何一個對象的方法能夠分爲兩大類:安全

  • 命令(Command):不返回任何結果(void),但會改變對象的狀態。
  • 查詢(Query):返回結果,可是不會改變對象的狀態,對系統沒有反作用。

根據CQS的思想,任何一個方法均可以拆分爲命令和查詢兩部分,好比:微信

private int i = 0;
private int Increase(int value)
{
    i += value;
    return i;
}

這個方法,咱們執行了一個命令即對變量i進行相加,同時又執行了一個Query,即查詢返回了i的值,若是按照CQS的思想,該方法能夠拆成Command和Query兩個方法,以下:

private void IncreaseCommand(int value)
{
    i += value;
}
private int QueryValue()
{
    return i;
}

操做和查詢分離使得咱們可以更好的把握對象的細節,可以更好的理解哪些操做會改變系統的狀態。固然CQS也有一些缺點,好比代碼須要處理多線程的狀況。

CQRS是對CQS模式的進一步改進成的一種簡單模式。 它由Greg Young在CQRS, Task Based UIs, Event Sourcing agh! 這篇文章中提出。「CQRS只是簡單的將以前只須要建立一個對象拆分紅了兩個對象,這種分離是基於方法是執行命令仍是執行查詢這一原則來定的(這個和CQS的定義一致)」。

CQRS使用分離的接口將數據查詢操做(Queries)和數據修改操做(Commands)分離開來,這也意味着在查詢和更新過程當中使用的數據模型也是不同的。這樣讀和寫邏輯就隔離開來了。

A basic CQRS architecture

使用CQRS分離了讀寫職責以後,能夠對數據進行讀寫分離操做來改進性能,可擴展性和安全。以下圖:

A CQRS architecture with separate read and write stores

主數據庫處理CUD,從庫處理R,從庫的的結構能夠和主庫的結構徹底同樣,也能夠不同,從庫主要用來進行只讀的查詢操做。在數量上從庫的個數也能夠根據查詢的規模進行擴展,在業務邏輯上,也能夠根據專題從主庫中劃分出不一樣的從庫。從庫也能夠實現成ReportingDatabase,根據查詢的業務需求,從主庫中抽取一些必要的數據生成一系列查詢報表來存儲。

reportingDatabase

使用ReportingDatabase的一些優勢一般可使得查詢變得更加簡單高效:

  • ReportingDatabase的結構和數據表會針對經常使用的查詢請求進行設計。
  • ReportingDatabase數據庫一般會去正規化,存儲一些冗餘而減小必要的Join等聯合查詢操做,使得查詢簡化和高效,一些在主數據庫中用不到的數據信息,在ReportingDatabase能夠不用存儲。
  • 能夠對ReportingDatabase重構優化,而不用去改變操做數據庫。
  • 對ReportingDatabase數據庫的查詢不會給操做數據庫帶來任何壓力。
  • 能夠針對不一樣的查詢請求創建不一樣的ReportingDatabase庫。

固然這也有一些缺點,好比從庫數據的更新。若是使用SQLServer,自己也提供了一些如故障轉移和複製機制來方便部署。

三 何時能夠考慮CQRS

CQRS模式有一些優勢:

  1. 分工明確,能夠負責不一樣的部分
  2. 將業務上的命令和查詢的職責分離可以提升系統的性能、可擴展性和安全性。而且在系統的演化中可以保持高度的靈活性,可以防止出現CRUD模式中,對查詢或者修改中的某一方進行改動,致使另外一方出現問題的狀況。
  3. 邏輯清晰,可以看到系統中的那些行爲或者操做致使了系統的狀態變化。
  4. 能夠從數據驅動(Data-Driven) 轉到任務驅動(Task-Driven)以及事件驅動(Event-Driven).

在下場景中,能夠考慮使用CQRS模式:

  1. 當在業務邏輯層有不少操做須要相同的實體或者對象進行操做的時候。CQRS使得咱們能夠對讀和寫定義不一樣的實體和方法,從而能夠減小或者避免對某一方面的更改形成衝突
  2. 對於一些基於任務的用戶交互系統,一般這類系統會引導用戶經過一系列複雜的步驟和操做,一般會須要一些複雜的領域模型,而且整個團隊已經熟悉領域驅動設計技術。寫模型有不少和業務邏輯相關的命令操做的堆,輸入驗證,業務邏輯驗證來保證數據的一致性。讀模型沒有業務邏輯以及驗證堆,僅僅是返回DTO對象爲視圖模型提供數據。讀模型最終和寫模型相一致。
  3. 適用於一些須要對查詢性能和寫入性能分開進行優化的系統,尤爲是讀/寫比很是高的系統,橫向擴展是必須的。好比,在不少系統中讀操做的請求時遠大於寫操做。爲適應這種場景,能夠考慮將寫模型抽離出來單獨擴展,而將寫模型運行在一個或者少數幾個實例上。少許的寫模型實例可以減小合併衝突發生的狀況
  4. 適用於一些團隊中,一些有經驗的開發者能夠關注複雜的領域模型,這些用到寫操做,而另外一些經驗較少的開發者能夠關注用戶界面上的讀模型。
  5. 對於系統在未來會隨着時間不段演化,有可能會包含不一樣版本的模型,或者業務規則常常變化的系統
  6. 須要和其餘系統整合,特別是須要和事件溯源Event Sourcing進行整合的系統,這樣子系統的臨時異常不會影響整個系統的其餘部分。

可是在如下場景中,可能不適宜使用CQRS:

  1. 領域模型或者業務邏輯比較簡單,這種狀況下使用CQRS會把系統搞複雜。
  2. 對於簡單的,CRUD模式的用戶界面以及與之相關的數據訪問操做已經足夠的話,不必使用CQRS,這些都是一個簡單的對數據進行增刪改查。
  3. 不適合在整個系統中處處使用該模式。在整個數據管理場景中的特定模塊中CQRS可能比較有用。可是在有些地方使用CQRS會增長系統沒必要要的複雜性。

四 CQRS與Event Sourcing的關係

在CQRS中,查詢方面,直接經過方法查詢數據庫,而後經過DTO將數據返回。在操做(Command)方面,是經過發送Command實現,由CommandBus處理特定的Command,而後由Command將特定的Event發佈到EventBus上,而後EventBus使用特定的Handler來處理事件,執行一些諸如,修改,刪除,更新等操做。這裏,全部與Command相關的操做都經過Event實現。這樣咱們能夠經過記錄Event來記錄系統的運行歷史記錄,而且可以方便的回滾到某一歷史狀態。Event Sourcing就是用來進行存儲和管理事件的。這裏不展開介紹。

五 CQRS的簡單實現

CQRS模式在思想上比較簡單,可是實現上仍是有些複雜。它涉及到DDD,以及Event Sourcing,這裏使用codeproject上的 Introduction to CQRS 這篇文章的例子來講明CQRS模式。這個例子是一個簡單的在線記日誌(Diary)系統,實現了日誌的增刪改查功能。總體結構以下:

CQRS

上圖很清晰的說明了CQRS在讀寫方面的分離,在讀方面,經過QueryFacade到數據庫裏去讀取數據,這個庫有多是ReportingDB。在寫方面,比較複雜,操做經過Command發送到CommandBus上,而後特定的CommandHandler處理請求,產生對應的Event,將Eevnt持久化後,經過EventBus特定的EevntHandler對數據庫進行修改等操做。

例子代碼能夠到codeproject上下載,總體結構以下:

DiaryCQRS project

由三個項目構成,Diary.CQRS包含了全部的Domain和消息對象。Configuration經過使用一個名爲StructMap的IOC來初始化一些變量方便Web調用,Web是一個簡單的MVC3項目,在Controller中有與CQRS交互的代碼。

下面分別看Query和Command方面的實現:

Query方向的實現

查詢方面很簡單,日誌列表和明細獲取就是簡單的查詢。下面先看列表查詢部分的代碼。

public ActionResult Index()
{
    ViewBag.Model = ServiceLocator.ReportDatabase.GetItems();
    return View();
}

public ActionResult Edit(Guid id)
{
    var item = ServiceLocator.ReportDatabase.GetById(id);
    var model = new DiaryItemDto()
    {
        Description = item.Description,
        From = item.From,
        Id = item.Id,
        Title = item.Title,
        To = item.To,
        Version = item.Version
    };
    return View(model);
}

ReportDatabase的GetItems和GetById(id)方法就是簡單的查詢,從命名能夠看出他是ReportDatabase。

public class ReportDatabase : IReportDatabase
{
    static List<DiaryItemDto> items = new List<DiaryItemDto>();

    public DiaryItemDto GetById(Guid id)
    {
        return items.Where(a => a.Id == id).FirstOrDefault();
    }

    public void Add(DiaryItemDto item)
    {
        items.Add(item);
    }

    public void Delete(Guid id)
    {
        items.RemoveAll(i => i.Id == id);
    }

    public List<DiaryItemDto> GetItems()
    {
        return items;
    } 
}

ReportDataBase只是在內部維護了一個List的DiaryItemDto列表。在使用的時候,是經過IRepositoryDatabase對其進行操做的,這樣便於mock代碼。

Query方面的代碼很簡單。在實際的應用中,這一塊就是直接對DB進行查詢,而後經過DTO對象返回,這個DB多是應對特定場景的報表數據庫,這樣能夠提高查詢性能。

下面來看Command方向的實現:

Command方向的實現

Command的實現比較複雜,下面以簡單的建立一個新的日誌來講明。

在MVC的Control中,能夠看到Add的Controller中只調用了一句話:

[HttpPost]
public ActionResult Add(DiaryItemDto item)
{
    ServiceLocator.CommandBus.Send(new CreateItemCommand(Guid.NewGuid(), item.Title, item.Description, -1, item.From, item.To));

    return RedirectToAction("Index");
}

首先聲明瞭一個CreateItemCommand,這個Command只是保存了一些必要的信息。

public class CreateItemCommand:Command
{
    public string Title { get; internal set; }
    public string Description { get;internal set; }
    public DateTime From { get; internal set; }
    public DateTime To { get; internal set; }

    public CreateItemCommand(Guid aggregateId, string title, 
        string description,int version,DateTime from, DateTime to)
        : base(aggregateId,version)
    {
        Title = title;
        Description = description;
        From = from;
        To = to;
    }
}

而後將Command發送到了CommandBus上,其實就是讓CommandBus來選擇合適的CommandHandler來處理。

public class CommandBus:ICommandBus
{
    private readonly ICommandHandlerFactory _commandHandlerFactory;

    public CommandBus(ICommandHandlerFactory commandHandlerFactory)
    {
        _commandHandlerFactory = commandHandlerFactory;
    }

    public void Send<T>(T command) where T : Command
    {
        var handler = _commandHandlerFactory.GetHandler<T>();
        if (handler != null)
        {
            handler.Execute(command);
        }
        else
        {
            throw new UnregisteredDomainCommandException("no handler registered");
        }
    }        
}

這個裏面須要值得注意的是CommandHandlerFactory這個類型的GetHandler方法,他接受一個類型爲T的泛型,這裏就是咱們以前傳入的CreateItemCommand。來看他的GetHandler方法。

public class StructureMapCommandHandlerFactory : ICommandHandlerFactory
{
    public ICommandHandler<T> GetHandler<T>() where T : Command
    {
        var handlers = GetHandlerTypes<T>().ToList();

        var cmdHandler = handlers.Select(handler => 
            (ICommandHandler<T>)ObjectFactory.GetInstance(handler)).FirstOrDefault();
            
        return cmdHandler;
    }
        
    private IEnumerable<Type> GetHandlerTypes<T>() where T : Command
    {
        var handlers = typeof(ICommandHandler<>).Assembly.GetExportedTypes()
            .Where(x => x.GetInterfaces()
                .Any(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(ICommandHandler<>) ))
                .Where(h=>h.GetInterfaces()
                    .Any(ii=>ii.GetGenericArguments()
                        .Any(aa=>aa==typeof(T)))).ToList();

           
        return handlers;
    }

}

這裏能夠看到,他首先查找當前的程序集中(ICommandHandler)所在的程序集中的全部的實現了ICommandHandler的接口的類型,而後在全部的類型找查找實現了該泛型接口而且泛型的類型參數類型爲T類型的全部類型。以上面的代碼爲例,就是要找出實現了ICommandHandler<CreateItemCommand>接口的類型。能夠看到就是CreateItemCommandHandler類型。

public class CreateItemCommandHandler : ICommandHandler<CreateItemCommand>
{
    private IRepository<DiaryItem> _repository;

    public CreateItemCommandHandler(IRepository<DiaryItem> repository)
    {
        _repository = repository;
    }

    public void Execute(CreateItemCommand command)
    {
        if (command == null)
        {
            throw new ArgumentNullException("command");
        }
        if (_repository == null)
        {
            throw new InvalidOperationException("Repository is not initialized.");
        }
        var aggregate = new DiaryItem(command.Id, command.Title, command.Description, command.From, command.To);
        aggregate.Version = -1;
        _repository.Save(aggregate, aggregate.Version);
    }
}

找到以後而後使用IOC實例化了該對象返回。

如今CommandBus中,找到了處理特定Command的Handler。而後執行該類型的Execute方法。

能夠看到在該類型中實例化了一個名爲aggregate的DiaryItem對象。這個和咱們以前查詢所用到的DiaryItemDto有所不一樣,這個一個領域對象,裏面包含了一系列事件。

public class DiaryItem : AggregateRoot, 
    IHandle<ItemCreatedEvent>,
    IHandle<ItemRenamedEvent>,
    IHandle<ItemFromChangedEvent>, 
    IHandle<ItemToChangedEvent>,
    IHandle<ItemDescriptionChangedEvent>,
    IOriginator
{
    public string Title { get; set; }

    public DateTime From { get; set; }
    public DateTime To { get; set; }
    public string Description { get; set; }

    public DiaryItem()
    {
            
    }

    public DiaryItem(Guid id,string title, string description,  DateTime from, DateTime to)
    {
        ApplyChange(new ItemCreatedEvent(id, title,description, from, to));
    }

    public void ChangeTitle(string title)
    {
        ApplyChange(new ItemRenamedEvent(Id, title));
    }

    public void Handle(ItemCreatedEvent e)
    {
        Title = e.Title;
        From = e.From;
        To = e.To;
        Id = e.AggregateId;
        Description = e.Description;
        Version = e.Version;
    }

    public void Handle(ItemRenamedEvent e)
    {
        Title = e.Title;
    }
    ...
}

ItemCreatedEvent 事件的定義以下,其實就是用來存儲傳輸過程當中須要用到的數據。

public class ItemCreatedEvent:Event
{
    public string Title { get; internal set; }
    public DateTime From { get; internal set; }
    public DateTime To { get; internal set; }
    public string Description { get;internal set; }

    public ItemCreatedEvent(Guid aggregateId, string title ,
        string description, DateTime from, DateTime to)
    {
        AggregateId = aggregateId;
        Title = title;
        From = from;
        To = to;
        Description = description;
    }
}

能夠看到在Domain對象中,除了定義基本的字段外,還定義了一些相應的事件,好比在構造函數中,其實是發起了一個名爲ItemCreateEvent的事件,同時還定義了處理時間的邏輯,這些邏輯都放在名爲Handle的接口方法發,例如ItemCerateEvent的處理方法爲Handle(ItemCreateEvent)方法。

ApplyChange方法在AggregateRoot對象中,他是彙集根,這是DDD中的概念。經過這個根能夠串起全部對象。 該類實現了IEventProvider接口,他保存了全部在_changes中的全部沒有提交的變動,其中的ApplyChange的用來爲特定的Event查找Eventhandler的方法:

public abstract class AggregateRoot : IEventProvider
{
    private readonly List<Event> _changes;

    public Guid Id { get; internal set; }
    public int Version { get; internal set; }
    public int EventVersion { get; protected set; }

    protected AggregateRoot()
    {
        _changes = new List<Event>();
    }

    public IEnumerable<Event> GetUncommittedChanges()
    {
        return _changes;
    }

    public void MarkChangesAsCommitted()
    {
        _changes.Clear();
    }

    public void LoadsFromHistory(IEnumerable<Event> history)
    {
        foreach (var e in history) ApplyChange(e, false);
        Version = history.Last().Version;
        EventVersion = Version;
    }

    protected void ApplyChange(Event @event)
    {
        ApplyChange(@event, true);
    }

    private void ApplyChange(Event @event, bool isNew)
    {
        dynamic d = this;

        d.Handle(Converter.ChangeTo(@event, @event.GetType()));
        if (isNew)
        {
            _changes.Add(@event);
        }
    }
}

在ApplyChange的實現中,this其實就是對應的實現了AggregateRoot的DiaryItem的Domain對象,調用的Handle方法就是咱們以前在DiaryItem中定義的行爲。而後將該event保存在內部的未提交的事件列表中。相關的信息及事件都保存在了定義的aggregate對象中並返回。

而後Command繼續執行,而後調用了_repository.Save(aggregate, aggregate.Version);這個方法。先看這個Repository對象。

public class Repository<T> : IRepository<T> where T : AggregateRoot, new()
{
    private readonly IEventStorage _storage;
    private static object _lockStorage = new object();

    public Repository(IEventStorage storage)
    {
        _storage = storage;
    } 

    public void Save(AggregateRoot aggregate, int expectedVersion)
    {
        if (aggregate.GetUncommittedChanges().Any())
        {
            lock (_lockStorage)
            {
                var item = new T();

                if (expectedVersion != -1)
                {
                    item = GetById(aggregate.Id);
                    if (item.Version != expectedVersion)
                    {
                        throw new ConcurrencyException(string.Format("Aggregate {0} has been previously modified",
                                                                        item.Id));
                    }
                }

                _storage.Save(aggregate);
            }
        }
    }

    public T GetById(Guid id)
    {
        IEnumerable<Event> events;
        var memento = _storage.GetMemento<BaseMemento>(id);
        if (memento != null)
        {
            events = _storage.GetEvents(id).Where(e=>e.Version>=memento.Version);
        }
        else
        {
            events = _storage.GetEvents(id);
        }
        var obj = new T();
        if(memento!=null)
            ((IOriginator)obj).SetMemento(memento);
            
        obj.LoadsFromHistory(events);
        return obj;
    }
}

這個方法主要是用來對事件進行持久化的。 全部的聚合的變更都會存在該Repository中,首先,檢查當前的聚合是否和以前存儲在storage中的聚合一致,若是不一致,則表示對象在其餘地方被更改過,拋出ConcurrencyException,不然將該變更保存在Event Storage中。

IEventStorage用來存儲全部的事件,其實現類型爲InMemoryEventStorage。

public class InMemoryEventStorage:IEventStorage
{
    private List<Event> _events;
    private List<BaseMemento> _mementos;

    private readonly IEventBus _eventBus;

    public InMemoryEventStorage(IEventBus eventBus)
    {
        _events = new List<Event>();
        _mementos = new List<BaseMemento>();
        _eventBus = eventBus;
    }

    public IEnumerable<Event> GetEvents(Guid aggregateId)
    {
        var events = _events.Where(p => p.AggregateId == aggregateId).Select(p => p);
        if (events.Count() == 0)
        {
            throw new AggregateNotFoundException(string.Format("Aggregate with Id: {0} was not found", aggregateId));
        }
        return events;
    }

    public void Save(AggregateRoot aggregate)
    {
        var uncommittedChanges = aggregate.GetUncommittedChanges();
        var version = aggregate.Version;
            
        foreach (var @event in uncommittedChanges)
        {
            version++;
            if (version > 2)
            {
                if (version % 3 == 0)
                {
                    var originator = (IOriginator)aggregate;
                    var memento = originator.GetMemento();
                    memento.Version = version;
                    SaveMemento(memento);
                }
            }
            @event.Version=version;
            _events.Add(@event);
        }
        foreach (var @event in uncommittedChanges)
        {
            var desEvent = Converter.ChangeTo(@event, @event.GetType());
            _eventBus.Publish(desEvent);
        }
    }

    public T GetMemento<T>(Guid aggregateId) where T : BaseMemento
    {
        var memento = _mementos.Where(m => m.Id == aggregateId).Select(m=>m).LastOrDefault();
        if (memento != null)
            return (T) memento;
        return null;
    }

    public void SaveMemento(BaseMemento memento)
    {
        _mementos.Add(memento);
    }
}

在GetEvent方法中,會找到全部的聚合根Id相關的事件。在Save方法中,將全部的事件保存在內存中,而後每隔三個事件創建一個快照。能夠看到這裏面使用了備忘錄模式。

而後在foreach循環中,對於全部的沒有提交的變動,EventBus將該事件發佈出去。

如今,全部的發生變動的事件已經記錄下來了。事件已經被髮布到EventBus上,而後對應的EventHandler再處理對應的事件,而後與DB交互。如今來看EventBus的Publish方法。

public class EventBus:IEventBus
{
    private IEventHandlerFactory _eventHandlerFactory;

    public EventBus(IEventHandlerFactory eventHandlerFactory)
    {
        _eventHandlerFactory = eventHandlerFactory;
    }
        
    public void Publish<T>(T @event) where T : Event
    {
        var handlers = _eventHandlerFactory.GetHandlers<T>();
        foreach (var eventHandler in handlers)
        {
            eventHandler.Handle(@event);
        }
    }
}

能夠看到EventBus的Publish和CommandBus中的Send方法很類似,都是首先經過EventHandlerFactory查找對應Event的Handler,而後調用其Handler方法。好比

public class StructureMapEventHandlerFactory : IEventHandlerFactory
{
    public IEnumerable<IEventHandler<T>> GetHandlers<T>() where T : Event
    {
        var handlers = GetHandlerType<T>();
            
        var lstHandlers = handlers.Select(handler => (IEventHandler<T>) ObjectFactory.GetInstance(handler)).ToList();
        return lstHandlers;
    }

    private static IEnumerable<Type> GetHandlerType<T>() where T : Event
    {
           
        var handlers = typeof(IEventHandler<>).Assembly.GetExportedTypes()
            .Where(x => x.GetInterfaces()
                .Any(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(IEventHandler<>)))
                .Where(h => h.GetInterfaces()
                    .Any(ii => ii.GetGenericArguments()
                        .Any(aa => aa == typeof(T))))
                 .ToList();
        return handlers;
    }
}

而後返回並實例化了ItemCreatedEventHandler 對象,該對象的實現以下:

public class ItemCreatedEventHandler : IEventHandler<ItemCreatedEvent>
{
    private readonly IReportDatabase _reportDatabase;
    public ItemCreatedEventHandler(IReportDatabase reportDatabase)
    {
        _reportDatabase = reportDatabase;
    }
    public void Handle(ItemCreatedEvent handle)
    {
        DiaryItemDto item = new DiaryItemDto()
            {
                Id = handle.AggregateId,
                Description =  handle.Description,
                From = handle.From,
                Title = handle.Title,
                To=handle.To,
                Version =  handle.Version
            };

        _reportDatabase.Add(item);
    }
}

能夠看到在Handler方法中,從事件中獲取參數,而後新建DTO對象,而後將該對象更新到DB中。

到此,整個Command執行完成。

六 結語

CQRS是一種思想很簡單清晰的設計模式,他經過在業務上分離操做和查詢來使得系統具備更好的可擴展性及性能,使得可以對系統的不一樣部分進行擴展和優化。在CQRS中,全部的涉及到對DB的操做都是經過發送Command,而後特定的Command觸發對應事件來完成操做,這個過程是異步的,而且全部涉及到對系統的變動行爲都包含在具體的事件中,結合Eventing Source模式,能夠記錄下全部的事件,而不是以往的某一點的數據信息,這些信息能夠做爲系統的操做日誌,能夠來對系統進行回退或者重放。

CQRS 模式在實現上有些複雜,不少地方好比AggregationRoot、Domain Object都涉及到DDD中的相關概念,本人對DDD不太懂。這裏僅爲了演示CQRS模式,因此使用的例子是codeproject上的,末尾列出了一些參考文章,若是您想了解更多,能夠有針對性的閱讀。

最後,但願CQRS模式能讓您在設計高性能,可擴展性的程序時可以多一種選擇和考慮。

七 參考文獻

  1. Introduction to CQRS http://www.codeproject.com/Articles/555855/Introduction-to-CQRS
  2. CQRS http://martinfowler.com/bliki/CQRS.html
  3. CQRS Journey http://msdn.microsoft.com/en-us/library/jj554200.aspx
  4. Command and Query Responsibility Segregation (CQRS) Pattern http://msdn.microsoft.com/en-us/library/dn568103.aspx
  5. EntityFramework之領域驅動設計實踐:CQRS體系結構模式 http://www.cnblogs.com/daxnet/archive/2010/08/02/1790299.html
  6. Event Sourcing Pattern http://msdn.microsoft.com/en-us/library/dn589792.aspx
做者: yangecnuyangecnu's Blog on 博客園
出處: http://www.cnblogs.com/yangecnu/
做品yangecnu 創做,採用 知識共享署名-非商業性使用-禁止演繹 2.5 中國大陸許可協議進行許可。 歡迎轉載,但任何轉載必須保留完整文章,在顯要地方顯示署名以及原文連接。如您有任何疑問或者受權方面的協商,請 給我留言
 
分類: Design Patterns
標籤: CQRS
好文要頂 關注我 收藏該文 聯繫我
47
1
 
(請您對文章作出評價)
 
« 上一篇: 熔斷器設計模式
» 下一篇: 使用ServiceStack構建Web服務
posted @ 2014-08-26 18:52 yangecnu 閱讀( 14935) 評論( 26) 編輯 收藏
 
 
 
#1樓 2014-08-26 18:55 田園裏的蟋蟀  
幹得漂亮,CQRS 就從這篇博文開始學起,感謝分享。
 
#2樓 2014-08-26 19:20 netfocus  
不錯,做爲入門指導,很好。
 
#3樓 2014-08-26 19:47 xuefly  
學習了 +1
 
#4樓 2014-08-26 21:32 Eric.HT  
最近DDD熱起來了。博客園終於又開始有人關注CQRS了
 
#5樓 2014-08-26 22:16 友人M  
認真讀了兩遍、大概明白咋回事兒了,回頭還要來再讀。mark。
 
#6樓 2014-08-27 08:57 richiezhang  
寫的挺好,支持一下
 
#7樓 2014-08-27 09:02 玩吧華華  
對於簡單的,CRUD模式的用戶界面以及與之相關的數據訪問操做已經足夠的話,不必使用 CRUD操做,這些都是一個簡單的對數據進行增刪改查。

好像描述錯了!
 
#8樓 2014-08-27 09:53 Da ge  
以前對於cqrs仍是有點迷糊,看了樓主的文章後突然明朗了
 
#9樓 2014-08-27 10:03 xuefly  
個人文字大都難以分類,我感受這篇跟CQRS沾邊
http://www.cnblogs.com/xuefly/p/3726664.html
 
#10樓 2014-08-27 10:12 大佛腳下  
Nice
 
#11樓 [ 樓主] 2014-08-27 10:46 yangecnu  
@玩吧華華
謝謝指正,已經改正。
 
#12樓 2014-08-27 10:51 玩吧華華  
@yangecnu
學習了,看完後對後半部分不太理解,支持樓主繼續分享
 
#13樓 2014-08-27 12:08 田園裏的蟋蟀  
今天大概花了40分鐘,來閱讀樓主的這篇博文,這還只是大體的閱讀,並無深刻,須要思考的地方不少,樓主的其餘博文我沒閱讀過,談一些我對這篇博文的一些見解。

首先,樓主的排版或字體閱讀起來很是舒服,前半部分看起來很是「官方」,後半部分就很專業,還有一點就是,和清培兄的博文風格很類似,就是有時候一段話很長(能夠加個","),因此閱讀起來有些吃力,另外隨手記錄了一些錯別字或語句不一樣的地方:

1,「用戶須要更新DTO對象」,DTO只是傳輸對象,不存在更新的概念。
2,「是偏向寫,仍是偏向度,就像通常的數據結構查找和修改時間複雜度不同同樣」
3,「這只是從數據庫角度處理了讀寫分離」,前面表述的是「從數據庫」,這個詞閱讀起來有點誤解,能夠加粗或高亮。
4,「這種分離是基於方法是執行命令仍是執行查詢這一原則來定的(這個和CQS的定義一致)」
5,「從庫的的結構能夠和主庫能夠徹底同樣」
6,「其個數也能夠根據查詢的數據庫不一樣而進行擴展」
7,「對查詢或者修改的某一方的修改致使另外一方出現問題的狀況」
8,「如今咱們的全部的發生變動的事件已經記錄下來了」

我沒有學習過CQRS模式,對樓主後半部分的代碼有兩個疑問,但願能夠指教一下:

1,ReportDatabase中是「Q」模式,爲何代碼中還有Add和Delete操做。
2,Repository<T>看樓主表述,應該是「C」模式,爲何還有GetById操做?

最後,很是感謝樓主的分享。。。
 
#14樓 [ 樓主] 2014-08-27 13:07 yangecnu  
@田園裏的蟋蟀
謝謝這麼認真仔細的閱讀,我其實也是第一次接觸這個,對DDD也並不熟悉,不少都是直接翻譯或者總結其餘參考文獻。可能有些地方處理的不太好。相關地方根據建議修改了。
關於您提的兩個問題:
1.在codeproject的Diary這個例子中。並無把DB分爲主庫和ReportDatabase,保存數據庫只有一個地方,命名爲了ReportDB,做者這樣命名,多是想和保存事件的那個庫EventStorage區分開來。這裏稱之爲ReportDatabase確實偏頗。
2.Repository<T>中主要是對事件進行保存和處理。其中GetById是根據Id查找對應的事件,看在事件發送和處理這段時間,該有沒有發生過更改,若是發生過更改,則觸發異常。
 
#15樓 2014-08-27 13:50 xuefly  
當前請求究竟是Command仍是Query由當前請求的響應者決定。前端控制器收到一條消息好比記爲message1,這個請求內容爲message1的請求可能通過一個至關複雜的處理管道,管道還會分叉,每一個分叉中的消息可能和原始的message1有所不一樣,新的分叉中的消息好比記爲message1.1吧,message1.1是由message和一些系統狀態組合出的新的message對象,這個message1.1繼續往下走,有可能還有message1.1.1, message1.1.2,message1.1.3……,message被這麼有組織有偏移的分類後就滿是些很是具體的message了,這些具體的message就能夠用具體的邏輯處理了。問題的關鍵是在這個「message+系統」的聯合運動過程當中每一次對message的分類都不能損失信息(最初的那個message包含了全部信息,這個大message進入咱們的系統,配合上咱們的系統的狀態來分解message,這個分解是按照咱們事先書寫好的程序邏輯進行的,咱們這些程序員每天寫程序的本質就是對事物按照人類使用了幾萬年的古老分類法進行分類,反正這個問題我表述不清,反正就是事物的運動不會損失信息,反正就是message若是要持久化的話這個持久化不該損失任何信息,持久化模型可能跟message dto模型是不一樣結構的,但配合上咱們的系統狀態後每一條message的持久化必定不會損失信息)。還有一個問題是在這個「message在系統中的運動」這個過程當中咱們還會對message從另外一個角度進行分類,這另外一個角度是當前的message是否已經被「handled」,message1被MessageHandler1 handled後記爲message1',message1'的結構跟message1有多是不一樣的,message1'稱做MessageHandler1輸出的event,而這條MessageHandler1輸出的message1'可能又會被送入MessageHandler2處理,message1'對於MessageHandler1來講是輸出是event,但對於MessageHandler2來講倒是Command。至此咱們將全部的message按照輸入輸出的角度能夠分做兩類:輸入的是Command,輸出的是Event。雖然這條message可能在咱們的服務端通過一條極其複雜的帶有分叉的處理管道,但如果這條message的處理被要求是事務性的就難辦了,若是失敗了咱們如何把這條message從那套極其複雜的管道中退回來?退回來是很難的,可是必定是可以辦到的,由於咱們的message在任何一個處理環節都沒有損失信息(惟一損失的是時間)既然沒有損失任何信息固然是能夠原路返回的,原路返回的時候別忘了把系統狀態返回過來,系統要回到收到包含這條message的請求以前的狀態。咱們的系統毫不可能回到收到這條message以前的狀態!由於時間是沒法後退的,因此這是爲何咱們常常聽到「最終一致性」這個詞。

能夠致使系統的狀態變動的就是Command,是Command就要有辦法控制(head、get、read、query等操做是不是有反作用的,是否會致使你的系統狀態的變動在於你的業務邊界是什麼。資源每被get一次訪問計數加了1,若是在你的業務邊界下訪問次數加1有意義的話狀態就變動了,沒意義就不用變動不用加1)。

全部的運動都應該是能夠控制的。好比var age = 30;這麼一條賦值語句,這個語句中有個動詞「=」,它的意思是賦值,賦值是動詞。賦值致使了系統狀態的變動,全部的命令都應該是可控的,怎麼控制?只須要用個using語句一包裹就能夠控制了: using(var acCotnext = new ACContext(subject, resource,action)){ var age = 30; }
摘自: http://git.oschina.net/anycmd/anycmd
 
#16樓 2014-09-07 16:25 永遠的阿哲  
很是感謝lz的翻譯,我也仔細研讀了好久,收穫良多。
lz在文中引用了在線記日誌(Diary)系統做爲示例項目,我有如下兩點的思考。

1.領域模型有兩套對屬性的修改方式,一種是直接修改屬性,一種是經過Change屬性方法來修改屬性。可是隻有後面一種方式才能生成Event事件。這樣就帶來了bug的可能性,也不直觀。有沒有可能在保持傳統編程習慣的前提下達到設計意圖呢?

2.領域對象實現了不少IHandle<>接口,表示其具備處理這些事件的能力。聚合根對象裏有一個ApplyChange方法,是將事件對象對領域對象的改變應用到領域對象上。在代碼中做者使用了動態類型的方式進行動態調用,繞過了接口實現的約束,我認爲這是不穩當的。我認爲正確的作法是檢查領域對象是否實現了對這個事件的處理接口,若是沒有實現,則應跳過或拋出異常。

我以爲,CQRS模式增大了項目設計與實現的複雜度,對人的要求也較高。固然,它帶來的好處也是明顯的,即很優雅的實現了對數據生命週期的徹底掌控。
 
#17樓 2015-01-08 22:47 kuangkro  
我對CQRS架構有幾點疑問,望解答:
Command和Event是否可以引用Domain的entity或valueobject?爲何?
假設:Event和Command不可以使用entity和ValueObject,如今咱們的Event裏面包含複雜對象屬性,咱們在Apply這個Event的時候勢必會引入這些複雜對象屬性的命令空間,那麼會不會讓這個Aggregate變得不純粹呢?
若是咱們須要引入一個DomainService來處理一些業務,那應該在那一步去調用這個DomianService?
CommandHandler的做用是什麼?可以包含業務邏輯嘛?
咱們在EventHandler裏面能夠在發送命令嗎?
 
#18樓 2015-05-05 15:19 茗::流  
感受 差一個 執行流程圖
 
#19樓 2015-05-08 14:54 陳航  
讓樓主補一個啊
 
#20樓 2015-05-12 09:00 euler  
命令查詢職責分離(CQRS)模式
 
#21樓 2015-09-06 10:56 fishkuro  
一個字段一個ChangeEvent,這麼麻煩,tm的想屎的心都有。
 
#22樓 2015-11-01 18:26 god__love_you  
very good
 
#23樓 2015-11-11 15:12 yanqinqiang  
這個架構對DDD的知識要求很高, 但每每DDD在實際項目中衆說紛紜,很難達成統一。
 
#24樓 2015-11-23 19:35 Futuer  
很不錯的文章,多謝樓主
 
#25樓 2016-02-12 14:40 黑色街角  
果斷mark一下!謝謝樓主!
 
#26樓 2016-04-09 11:53 nandychen  
講的很好,1024個贊。
相關文章
相關標籤/搜索