原文連接:https://www.cnblogs.com/yangecnu/p/Introduction-CQRS.htmlhtml
在經常使用的三層架構中,一般都是經過數據訪問層來修改或者查詢數據,通常修改和查詢使用的是相同的實體。在一些業務邏輯簡單的系統中可能沒有什麼問題,可是隨着系統邏輯變得複雜,用戶增多,這種設計就會出現一些性能問題。雖然在DB上能夠作一些讀寫分離的設計,但在業務上若是在讀寫方面混合在一塊兒的話,仍然會出現一些問題。數據庫
本文介紹了命令查詢職責分離模式(Command Query Responsibility Segregation,CQRS),該模式從業務上分離修改 (Command,增,刪,改,會對系統狀態進行修改)和查詢(Query,查,不會對系統狀態進行修改)的行爲。從而使得邏輯更加清晰,便於對不一樣部分進行鍼對性的優化。文章首先簡要介紹了傳統的CRUD方式存在的問題,接着介紹了CQRS模式,最後以一個簡單的在線日記系統演示瞭如何實現CQRS模式。要談到讀寫操做,首先咱們來看傳統的CRUD的問題。設計模式
在之前的管理系統中,命令(Command,一般用來更新數據,操做DB)和查詢(Query)一般使用的是在數據訪問層中Repository中的實體對象(這些對象是對DB中表的映射),這些實體有多是SQLServer中的一行數據或者多個表。安全
一般對DB執行的增,刪,改,查(CRUD)都是針對的系統的實體對象。如經過數據訪問層獲取數據,而後經過數據傳輸對象DTO傳給表現層。或者,用戶須要更新數據,經過DTO對象將數據傳給Model,而後經過數據訪問層寫回數據庫,系統中的全部交互都是和數據查詢和存儲有關,能夠認爲是數據驅動(Data-Driven)的,以下圖:數據結構
對於一些比較簡單的系統,使用這種CRUD的設計方式可以知足要求。特別是經過一些代碼生成工具及ORM等可以很是方便快速的實現功能。多線程
可是傳統的CRUD方法有一些問題:架構
這裏面很重要的一個問題是,系統中的讀寫頻率比,是偏向讀,仍是偏向寫,就如同通常的數據結構在查找和修改上時間複雜度不同,在設計系統的結構時也須要考慮這樣的問題。解決方法就是咱們常常用到的對數據庫進行讀寫分離。 讓主數據庫處理事務性的增,刪,改操做(Insert,Update,Delete)操做,讓從數據庫處理查詢操做(Select操做),數據庫複製被用來將事務性操做致使的變動同步到集羣中的從數據庫。這只是從DB角度處理了讀寫分離,可是從業務或者系統上面讀和寫仍然是存放在一塊兒的。他們都是用的同一個實體對象。異步
要從業務上將讀和寫分離,就是接下來要介紹的命令查詢職責分離模式。ide
CQRS最先來自於Betrand Meyer(Eiffel語言之父,開-閉原則OCP提出者)在 Object-Oriented Software Construction 這本書中提到的一種 命令查詢分離(Command Query Separation,CQS) 的概念。其基本思想在於,任何一個對象的方法能夠分爲兩大類:函數
根據CQS的思想,任何一個方法均可以拆分爲命令和查詢兩部分,好比:
private int i = 0;
private int Increase(int value) { i += value; return i; }
這個方法,咱們執行了一個命令即對變量i進行相加,同時又執行了一個Query,即查詢返回了i的值,若是按照CQS的思想,該方法能夠拆成Command和Query兩個方法,以下:
private void IncreaseCommand(int value) { i += value; } private int QueryValue() { return i; }
操做和查詢分離使得咱們可以更好的把握對象的細節,可以更好的理解哪些操做會改變系統的狀態。固然CQS也有一些缺點,好比代碼須要處理多線程的狀況。
CQRS是對CQS模式的進一步改進成的一種簡單模式。 它由Greg Young在CQRS, Task Based UIs, Event Sourcing agh! 這篇文章中提出。「CQRS只是簡單的將以前只須要建立一個對象拆分紅了兩個對象,這種分離是基於方法是執行命令仍是執行查詢這一原則來定的(這個和CQS的定義一致)」。
CQRS使用分離的接口將數據查詢操做(Queries)和數據修改操做(Commands)分離開來,這也意味着在查詢和更新過程當中使用的數據模型也是不同的。這樣讀和寫邏輯就隔離開來了。
使用CQRS分離了讀寫職責以後,能夠對數據進行讀寫分離操做來改進性能,可擴展性和安全。以下圖:
主數據庫處理CUD,從庫處理R,從庫的的結構能夠和主庫的結構徹底同樣,也能夠不同,從庫主要用來進行只讀的查詢操做。在數量上從庫的個數也能夠根據查詢的規模進行擴展,在業務邏輯上,也能夠根據專題從主庫中劃分出不一樣的從庫。從庫也能夠實現成ReportingDatabase,根據查詢的業務需求,從主庫中抽取一些必要的數據生成一系列查詢報表來存儲。
使用ReportingDatabase的一些優勢一般可使得查詢變得更加簡單高效:
固然這也有一些缺點,好比從庫數據的更新。若是使用SQLServer,自己也提供了一些如故障轉移和複製機制來方便部署。
CQRS模式有一些優勢:
在下場景中,能夠考慮使用CQRS模式:
可是在如下場景中,可能不適宜使用CQRS:
在CQRS中,查詢方面,直接經過方法查詢數據庫,而後經過DTO將數據返回。在操做(Command)方面,是經過發送Command實現,由CommandBus處理特定的Command,而後由Command將特定的Event發佈到EventBus上,而後EventBus使用特定的Handler來處理事件,執行一些諸如,修改,刪除,更新等操做。這裏,全部與Command相關的操做都經過Event實現。這樣咱們能夠經過記錄Event來記錄系統的運行歷史記錄,而且可以方便的回滾到某一歷史狀態。Event Sourcing就是用來進行存儲和管理事件的。這裏不展開介紹。
CQRS模式在思想上比較簡單,可是實現上仍是有些複雜。它涉及到DDD,以及Event Sourcing,這裏使用codeproject上的 Introduction to CQRS 這篇文章的例子來講明CQRS模式。這個例子是一個簡單的在線記日誌(Diary)系統,實現了日誌的增刪改查功能。總體結構以下:
上圖很清晰的說明了CQRS在讀寫方面的分離,在讀方面,經過QueryFacade到數據庫裏去讀取數據,這個庫有多是ReportingDB。在寫方面,比較複雜,操做經過Command發送到CommandBus上,而後特定的CommandHandler處理請求,產生對應的Event,將Eevnt持久化後,經過EventBus特定的EevntHandler對數據庫進行修改等操做。
例子代碼能夠到codeproject上下載,總體結構以下:
由三個項目構成,Diary.CQRS包含了全部的Domain和消息對象。Configuration經過使用一個名爲StructMap的IOC來初始化一些變量方便Web調用,Web是一個簡單的MVC3項目,在Controller中有與CQRS交互的代碼。
下面分別看Query和Command方面的實現:
查詢方面很簡單,日誌列表和明細獲取就是簡單的查詢。下面先看列表查詢部分的代碼。
public ActionResult Index() { ViewBag.Model = ServiceLocator.ReportDatabase.GetItems(); return View(); } public ActionResult Edit(Guid id) { var item = ServiceLocator.ReportDatabase.GetById(id); var model = new DiaryItemDto() { Description = item.Description, From = item.From, Id = item.Id, Title = item.Title, To = item.To, Version = item.Version }; return View(model); }
ReportDatabase的GetItems和GetById(id)方法就是簡單的查詢,從命名能夠看出他是ReportDatabase。
public class ReportDatabase : IReportDatabase { static List<DiaryItemDto> items = new List<DiaryItemDto>(); public DiaryItemDto GetById(Guid id) { return items.Where(a => a.Id == id).FirstOrDefault(); } public void Add(DiaryItemDto item) { items.Add(item); } public void Delete(Guid id) { items.RemoveAll(i => i.Id == id); } public List<DiaryItemDto> GetItems() { return items; } }
ReportDataBase只是在內部維護了一個List的DiaryItemDto列表。在使用的時候,是經過IRepositoryDatabase對其進行操做的,這樣便於mock代碼。
Query方面的代碼很簡單。在實際的應用中,這一塊就是直接對DB進行查詢,而後經過DTO對象返回,這個DB多是應對特定場景的報表數據庫,這樣能夠提高查詢性能。
下面來看Command方向的實現:
Command的實現比較複雜,下面以簡單的建立一個新的日誌來講明。
在MVC的Control中,能夠看到Add的Controller中只調用了一句話:
[HttpPost] public ActionResult Add(DiaryItemDto item) { ServiceLocator.CommandBus.Send(new CreateItemCommand(Guid.NewGuid(), item.Title, item.Description, -1, item.From, item.To)); return RedirectToAction("Index"); }
首先聲明瞭一個CreateItemCommand,這個Command只是保存了一些必要的信息。
public class CreateItemCommand:Command { public string Title { get; internal set; } public string Description { get;internal set; } public DateTime From { get; internal set; } public DateTime To { get; internal set; } public CreateItemCommand(Guid aggregateId, string title, string description,int version,DateTime from, DateTime to) : base(aggregateId,version) { Title = title; Description = description; From = from; To = to; } }
而後將Command發送到了CommandBus上,其實就是讓CommandBus來選擇合適的CommandHandler來處理。
public class CommandBus:ICommandBus { private readonly ICommandHandlerFactory _commandHandlerFactory; public CommandBus(ICommandHandlerFactory commandHandlerFactory) { _commandHandlerFactory = commandHandlerFactory; } public void Send<T>(T command) where T : Command { var handler = _commandHandlerFactory.GetHandler<T>(); if (handler != null) { handler.Execute(command); } else { throw new UnregisteredDomainCommandException("no handler registered"); } } }
這個裏面須要值得注意的是CommandHandlerFactory這個類型的GetHandler方法,他接受一個類型爲T的泛型,這裏就是咱們以前傳入的CreateItemCommand。來看他的GetHandler方法。
public class StructureMapCommandHandlerFactory : ICommandHandlerFactory { public ICommandHandler<T> GetHandler<T>() where T : Command { var handlers = GetHandlerTypes<T>().ToList(); var cmdHandler = handlers.Select(handler => (ICommandHandler<T>)ObjectFactory.GetInstance(handler)).FirstOrDefault(); return cmdHandler; } private IEnumerable<Type> GetHandlerTypes<T>() where T : Command { var handlers = typeof(ICommandHandler<>).Assembly.GetExportedTypes() .Where(x => x.GetInterfaces() .Any(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(ICommandHandler<>) )) .Where(h=>h.GetInterfaces() .Any(ii=>ii.GetGenericArguments() .Any(aa=>aa==typeof(T)))).ToList(); return handlers; } }
這裏能夠看到,他首先查找當前的程序集中(ICommandHandler)所在的程序集中的全部的實現了ICommandHandler的接口的類型,而後在全部的類型找查找實現了該泛型接口而且泛型的類型參數類型爲T類型的全部類型。以上面的代碼爲例,就是要找出實現了ICommandHandler<CreateItemCommand>接口的類型。能夠看到就是CreateItemCommandHandler類型。
public class CreateItemCommandHandler : ICommandHandler<CreateItemCommand> { private IRepository<DiaryItem> _repository; public CreateItemCommandHandler(IRepository<DiaryItem> repository) { _repository = repository; } public void Execute(CreateItemCommand command) { if (command == null) { throw new ArgumentNullException("command"); } if (_repository == null) { throw new InvalidOperationException("Repository is not initialized."); } var aggregate = new DiaryItem(command.Id, command.Title, command.Description, command.From, command.To); aggregate.Version = -1; _repository.Save(aggregate, aggregate.Version); } }
找到以後而後使用IOC實例化了該對象返回。
如今CommandBus中,找到了處理特定Command的Handler。而後執行該類型的Execute方法。
能夠看到在該類型中實例化了一個名爲aggregate的DiaryItem對象。這個和咱們以前查詢所用到的DiaryItemDto有所不一樣,這個一個領域對象,裏面包含了一系列事件。
public class DiaryItem : AggregateRoot, IHandle<ItemCreatedEvent>, IHandle<ItemRenamedEvent>, IHandle<ItemFromChangedEvent>, IHandle<ItemToChangedEvent>, IHandle<ItemDescriptionChangedEvent>, IOriginator { public string Title { get; set; } public DateTime From { get; set; } public DateTime To { get; set; } public string Description { get; set; } public DiaryItem() { } public DiaryItem(Guid id,string title, string description, DateTime from, DateTime to) { ApplyChange(new ItemCreatedEvent(id, title,description, from, to)); } public void ChangeTitle(string title) { ApplyChange(new ItemRenamedEvent(Id, title)); } public void Handle(ItemCreatedEvent e) { Title = e.Title; From = e.From; To = e.To; Id = e.AggregateId; Description = e.Description; Version = e.Version; } public void Handle(ItemRenamedEvent e) { Title = e.Title; } ... }
ItemCreatedEvent 事件的定義以下,其實就是用來存儲傳輸過程當中須要用到的數據。
public class ItemCreatedEvent:Event { public string Title { get; internal set; } public DateTime From { get; internal set; } public DateTime To { get; internal set; } public string Description { get;internal set; } public ItemCreatedEvent(Guid aggregateId, string title , string description, DateTime from, DateTime to) { AggregateId = aggregateId; Title = title; From = from; To = to; Description = description; } }
能夠看到在Domain對象中,除了定義基本的字段外,還定義了一些相應的事件,好比在構造函數中,其實是發起了一個名爲ItemCreateEvent的事件,同時還定義了處理時間的邏輯,這些邏輯都放在名爲Handle的接口方法發,例如ItemCerateEvent的處理方法爲Handle(ItemCreateEvent)方法。
ApplyChange方法在AggregateRoot對象中,他是彙集根,這是DDD中的概念。經過這個根能夠串起全部對象。 該類實現了IEventProvider接口,他保存了全部在_changes中的全部沒有提交的變動,其中的ApplyChange的用來爲特定的Event查找Eventhandler的方法:
public abstract class AggregateRoot : IEventProvider { private readonly List<Event> _changes; public Guid Id { get; internal set; } public int Version { get; internal set; } public int EventVersion { get; protected set; } protected AggregateRoot() { _changes = new List<Event>(); } public IEnumerable<Event> GetUncommittedChanges() { return _changes; } public void MarkChangesAsCommitted() { _changes.Clear(); } public void LoadsFromHistory(IEnumerable<Event> history) { foreach (var e in history) ApplyChange(e, false); Version = history.Last().Version; EventVersion = Version; } protected void ApplyChange(Event @event) { ApplyChange(@event, true); } private void ApplyChange(Event @event, bool isNew) { dynamic d = this; d.Handle(Converter.ChangeTo(@event, @event.GetType())); if (isNew) { _changes.Add(@event); } } }
在ApplyChange的實現中,this其實就是對應的實現了AggregateRoot的DiaryItem的Domain對象,調用的Handle方法就是咱們以前在DiaryItem中定義的行爲。而後將該event保存在內部的未提交的事件列表中。相關的信息及事件都保存在了定義的aggregate對象中並返回。
而後Command繼續執行,而後調用了_repository.Save(aggregate, aggregate.Version);這個方法。先看這個Repository對象。
public class Repository<T> : IRepository<T> where T : AggregateRoot, new() { private readonly IEventStorage _storage; private static object _lockStorage = new object(); public Repository(IEventStorage storage) { _storage = storage; } public void Save(AggregateRoot aggregate, int expectedVersion) { if (aggregate.GetUncommittedChanges().Any()) { lock (_lockStorage) { var item = new T(); if (expectedVersion != -1) { item = GetById(aggregate.Id); if (item.Version != expectedVersion) { throw new ConcurrencyException(string.Format("Aggregate {0} has been previously modified", item.Id)); } } _storage.Save(aggregate); } } } public T GetById(Guid id) { IEnumerable<Event> events; var memento = _storage.GetMemento<BaseMemento>(id); if (memento != null) { events = _storage.GetEvents(id).Where(e=>e.Version>=memento.Version); } else { events = _storage.GetEvents(id); } var obj = new T(); if(memento!=null) ((IOriginator)obj).SetMemento(memento); obj.LoadsFromHistory(events); return obj; } }
這個方法主要是用來對事件進行持久化的。 全部的聚合的變更都會存在該Repository中,首先,檢查當前的聚合是否和以前存儲在storage中的聚合一致,若是不一致,則表示對象在其餘地方被更改過,拋出ConcurrencyException,不然將該變更保存在Event Storage中。
IEventStorage用來存儲全部的事件,其實現類型爲InMemoryEventStorage。
public class InMemoryEventStorage:IEventStorage { private List<Event> _events; private List<BaseMemento> _mementos; private readonly IEventBus _eventBus; public InMemoryEventStorage(IEventBus eventBus) { _events = new List<Event>(); _mementos = new List<BaseMemento>(); _eventBus = eventBus; } public IEnumerable<Event> GetEvents(Guid aggregateId) { var events = _events.Where(p => p.AggregateId == aggregateId).Select(p => p); if (events.Count() == 0) { throw new AggregateNotFoundException(string.Format("Aggregate with Id: {0} was not found", aggregateId)); } return events; } public void Save(AggregateRoot aggregate) { var uncommittedChanges = aggregate.GetUncommittedChanges(); var version = aggregate.Version; foreach (var @event in uncommittedChanges) { version++; if (version > 2) { if (version % 3 == 0) { var originator = (IOriginator)aggregate; var memento = originator.GetMemento(); memento.Version = version; SaveMemento(memento); } } @event.Version=version; _events.Add(@event); } foreach (var @event in uncommittedChanges) { var desEvent = Converter.ChangeTo(@event, @event.GetType()); _eventBus.Publish(desEvent); } } public T GetMemento<T>(Guid aggregateId) where T : BaseMemento { var memento = _mementos.Where(m => m.Id == aggregateId).Select(m=>m).LastOrDefault(); if (memento != null) return (T) memento; return null; } public void SaveMemento(BaseMemento memento) { _mementos.Add(memento); } }
在GetEvent方法中,會找到全部的聚合根Id相關的事件。在Save方法中,將全部的事件保存在內存中,而後每隔三個事件創建一個快照。能夠看到這裏面使用了備忘錄模式。
而後在foreach循環中,對於全部的沒有提交的變動,EventBus將該事件發佈出去。
如今,全部的發生變動的事件已經記錄下來了。事件已經被髮布到EventBus上,而後對應的EventHandler再處理對應的事件,而後與DB交互。如今來看EventBus的Publish方法。
public class EventBus:IEventBus { private IEventHandlerFactory _eventHandlerFactory; public EventBus(IEventHandlerFactory eventHandlerFactory) { _eventHandlerFactory = eventHandlerFactory; } public void Publish<T>(T @event) where T : Event { var handlers = _eventHandlerFactory.GetHandlers<T>(); foreach (var eventHandler in handlers) { eventHandler.Handle(@event); } } }
能夠看到EventBus的Publish和CommandBus中的Send方法很類似,都是首先經過EventHandlerFactory查找對應Event的Handler,而後調用其Handler方法。好比
public class StructureMapEventHandlerFactory : IEventHandlerFactory { public IEnumerable<IEventHandler<T>> GetHandlers<T>() where T : Event { var handlers = GetHandlerType<T>(); var lstHandlers = handlers.Select(handler => (IEventHandler<T>) ObjectFactory.GetInstance(handler)).ToList(); return lstHandlers; } private static IEnumerable<Type> GetHandlerType<T>() where T : Event { var handlers = typeof(IEventHandler<>).Assembly.GetExportedTypes() .Where(x => x.GetInterfaces() .Any(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(IEventHandler<>))) .Where(h => h.GetInterfaces() .Any(ii => ii.GetGenericArguments() .Any(aa => aa == typeof(T)))) .ToList(); return handlers; } }
而後返回並實例化了ItemCreatedEventHandler 對象,該對象的實現以下:
public class ItemCreatedEventHandler : IEventHandler<ItemCreatedEvent> { private readonly IReportDatabase _reportDatabase; public ItemCreatedEventHandler(IReportDatabase reportDatabase) { _reportDatabase = reportDatabase; } public void Handle(ItemCreatedEvent handle) { DiaryItemDto item = new DiaryItemDto() { Id = handle.AggregateId, Description = handle.Description, From = handle.From, Title = handle.Title, To=handle.To, Version = handle.Version }; _reportDatabase.Add(item); } }
能夠看到在Handler方法中,從事件中獲取參數,而後新建DTO對象,而後將該對象更新到DB中。
到此,整個Command執行完成。
CQRS是一種思想很簡單清晰的設計模式,他經過在業務上分離操做和查詢來使得系統具備更好的可擴展性及性能,使得可以對系統的不一樣部分進行擴展和優化。在CQRS中,全部的涉及到對DB的操做都是經過發送Command,而後特定的Command觸發對應事件來完成操做,這個過程是異步的,而且全部涉及到對系統的變動行爲都包含在具體的事件中,結合Eventing Source模式,能夠記錄下全部的事件,而不是以往的某一點的數據信息,這些信息能夠做爲系統的操做日誌,能夠來對系統進行回退或者重放。
CQRS 模式在實現上有些複雜,不少地方好比AggregationRoot、Domain Object都涉及到DDD中的相關概念,本人對DDD不太懂。這裏僅爲了演示CQRS模式,因此使用的例子是codeproject上的,末尾列出了一些參考文章,若是您想了解更多,能夠有針對性的閱讀。
最後,但願CQRS模式能讓您在設計高性能,可擴展性的程序時可以多一種選擇和考慮。