在前面兩篇文章中,我詳細介紹了基本事件系統的實現,包括事件派發和訂閱、經過事件處理器執行上下文來解決對象生命週期問題,以及一個基於RabbitMQ的事件總線的實現。接下來對於事件驅動型架構的討論,就須要結合一個實際的架構案例來進行分析。在領域驅動設計的討論範疇,CQRS架構自己就是事件驅動的,所以,我打算首先介紹一下CQRS架構下相關部分的實現,而後再繼續討論事件驅動型架構實現的具體問題。html
固然,CQRS架構自己的實現也是根據實際狀況的不一樣,須要具體問題具體分析的,不只如此,CQRS架構的實現也是很是複雜的,毫不是一套文章一套案例可以解釋清楚並涵蓋所有的。因此,我不會把大部分篇幅放在CQRS架構實現的細節上,而是會着重介紹與咱們的主題相關的內容,並對無關的內容進行弱化。或許,在這個系列文章結束的時候,咱們會獲得一個完整的、可以運行的CQRS架構系統,不過,這套系統極有可能僅供技術研討和學習使用,沒法直接用於生產環境。git
基於這樣的前提,咱們今天首先看一下CQRS架構中聚合與聚合根的實現,或許你會以爲目前討論的內容與你本打算關心的事件驅動架構沒什麼關係,而事實是,CQRS架構中聚合與聚合根的實現是徹底面向事件驅動的,而這部份內容也會爲咱們以後的討論作下鋪墊。不只如此,我還會在本文討論一些基於.NET/C#的軟件架構設計的思考與實踐(請注意文章中我添加了Note字樣而且字體加粗的句子),所以,我仍是會推薦你繼續讀完這篇文章。github
早在2010年,我針對CQRS架構總結過一篇文章,題目是:《EntityFramework之領域驅動設計實踐【擴展閱讀】:CQRS體系結構模式》,固然,這篇文章跟Entity Framework本沒啥關係,只是延續了領域驅動設計這一話題進行的擴展討論罷了。這篇文章介紹了CQRS架構模式所產生的背景、結構,以及相關的一些概念,好比:最近很是流行的詞語:「事件溯源」、解決事件溯源性能問題的「快照」、用於存取事件數據的「事件存儲(Event Store)」,還有從新認識了什麼叫作「對象的狀態」,等等。此外,在後續的博文中,我也常常對CQRS架構中的實現細節作些探討,有興趣的讀者能夠翻看我過去的博客文章。整體上講,CQRS架構基本符合下圖所描述的結構:sql
看上去是否是特別複雜?沒錯,特別複雜,並且每一個部分均可以使用不一樣的工具、框架,以不一樣的形式進行實現。整個架構甚至能夠是語言、平臺異構的,還能夠跟外部系統進行整合,實現大數據分析、呈現等等,玩法可謂之五花八門,這些通通都不在咱們的討論範圍以內。咱們今天打算討論的,就是上圖右上部分「領域模型」框框裏的主題:CQRS架構中的聚合與聚合根。數據庫
說到聚合與聚合根,瞭解過領域驅動設計(DDD)的讀者確定對這兩個概念很是熟悉。一般狀況下,具備相同生命週期,組合起來可以共同表述一種領域概念的一組模型對象,就能夠組成一個聚合。在每一個聚合中,銜接各個領域模型對象,並向外提供統一訪問聚合的對象,就是聚合根。聚合中的全部對象,離開聚合根,就不能完整地表述一個領域概念。好比:收貨地址沒法離開客戶,訂單詳情沒法離開訂單,庫存沒法離開貨品等等。因此從定義上來看,一個聚合大概就是這樣:編程
好吧,對這些概念比較熟悉的讀者來講,我在此算是多囉嗦了幾句。接下來,讓咱們結合CQRS架構中命令處理器對領域模型的更改過程來看看,除了以上這些常規特徵以外,聚合與聚合根還有哪些特殊之處。當命令處理器接到操做命令時,便開始對領域模型進行更改,步驟以下:設計模式
接下來在事件消息總線和事件處理器中將會發生的事情,咱們從此還會討論,這裏就很少說了。從這個過程,咱們不可貴出:緩存
聽起來是否是很是複雜?確實如此。那咱們就先從領域事件入手,逐步實現CQRS中的聚合與聚合根。安全
領域事件,顧名思義,就是從領域模型中產生的事件消息。概念上很簡單,好比,客戶登陸網站,就會由客戶登陸實體產生一個事件派發出去,例如CustomerLoggedOnEvent,表示客戶登陸這件事已經發生了。雖然在DDD的實踐中,領域事件更多地在CQRS架構中被討論,其實即使是非事件驅動型架構,也能夠經過領域模型來發布消息,達到系統解耦的目的。架構
延續以前的設計,咱們的領域事件繼承了IEvent接口,並增長了三個屬性/方法,此外,爲了編程方便,咱們實現了領域事件的抽象類,UML類圖以下:
圖中的綠色部分就是在以前咱們的事件模型上新加的接口和類,用以表述領域事件的概念。其中:
好了,若是說咱們將發生在某聚合上的領域事件保存到關係型數據庫,那麼,當須要得到該聚合的全部領域事件時,只須要下面一句SQL就好了:
SELECT * FROM [Events] WHERE [AggregateRootId]=aggregateRootId AND [AggregateRootType]=aggregateRootType ORDER BY [Sequence] ASC
這就是最簡單的事件存儲數據庫的實現了。不過,咱們暫時不介紹這些內容。
事實上,與標準的事件(IEvent接口)相比,除了上面三個主要的屬性以外,領域事件還能夠包含更多的屬性和方法,這就要看具體的需求和設計了。不過目前爲止,咱們定義這三個屬性已經夠用了,不要把問題搞得太複雜。
有了領域事件的基本模型,咱們開始設計CQRS下的聚合。
因爲外界訪問聚合都是經過聚合根來實現的,所以,針對聚合的操做都會被委託給聚合根來處理。好比,當用戶地址發生變化時,服務層會調用Customer.ChangeAddress方法,這個方法就會產生一個領域事件,並經過內聯的事件處理機制更改聚合中Address值對象中的狀態。因而,從技術角度,聚合的設計也就是聚合根的實現。
首先須要設計的是與聚合相關的概念所表述的接口、類及其之間的關係。結合領域驅動設計中的概念,咱們獲得下面的設計:
其中,實體(IEntity)、聚合根(IAggregateRoot)都是你們耳熟能詳的領域驅動設計的概念。因爲實體都是經過Id進行惟一標識,因此,IEntity會有一個id的屬性,爲了簡單起見,咱們使用Guid做爲它的類型。聚合根(IAggregateRoot)繼承於IEntity接口,有趣的是,在咱們目前的場景中,IAggregateRoot並不包含任何成員,它僅僅是一個空接口,在整個框架代碼中,它僅做爲泛型的類型約束。Note:這種作法其實也是很是常見的一種框架設計模式。具備事件溯源能力的聚合根(IAggregateRootWithEventSourcing)又繼承於IAggregateRoot接口,而且有以下三個成員:
此外,你還發現咱們還有兩個神奇的接口:IPurgable和IPersistedVersionSetter。這兩個接口的職責是:
Note:爲何不將這兩個接口中的方法直接放在IAggregateRootWithEventSourcing中呢?是由於單一職責原則。聚合自己不該該存在所謂之「清空緩存」或者「設置保存版本號」這樣的概念,這樣的概念對於技術人員來講比較容易理解,但是若是將這些技術細節加入領域模型中,就會污染領域模型,形成領域專家沒法理解領域模型,這是違背面向對象分析與設計的單一職責原則的,也違背了領域驅動設計的原則。那麼,即便把這些方法經過額外的接口獨立出去,實現了IAggregateRootWithEventSourcing接口的類型,不仍是要實現這兩個接口中的方法嗎?這樣,聚合的訪問者不仍是能夠訪問這兩個額外的方法嗎?的確如此,這些接口是須要被實現的,可是咱們可使用C#中接口的顯式實現,這樣的話,若是不將IAggregateRootWithEventSourcing強制轉換成IPurgable或者IPersistedVersionSetter的話,是沒法直接經過聚合根對象自己來訪問這些方法的,這起到了很是好的保護做用。接口的顯式實如今軟件系統的框架設計中也是經常使用手段。
在上面的類圖中,IAggregateRootWithEventSourcing最終由AggregateRootWithEventSourcing抽象類實現。不要抱怨類的名字太長,它有助於咱們理解這一類型在咱們的領域模型中的角色和功能。下面的代碼列出了該抽象類的主要部分的實現:
public abstract class AggregateRootWithEventSourcing : IAggregateRootWithEventSourcing { private readonly Lazy<Dictionary<string, MethodInfo>> registeredHandlers; private readonly Queue<IDomainEvent> uncommittedEvents = new Queue<IDomainEvent>(); private Guid id; private long persistedVersion = 0; private object sync = new object(); protected AggregateRootWithEventSourcing() : this(Guid.NewGuid()) { } protected AggregateRootWithEventSourcing(Guid id) { registeredHandlers = new Lazy<Dictionary<string, MethodInfo>>(() => { var registry = new Dictionary<string, MethodInfo>(); var methodInfoList = from mi in this.GetType().GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) let returnType = mi.ReturnType let parameters = mi.GetParameters() where mi.IsDefined(typeof(HandlesInlineAttribute), false) && returnType == typeof(void) && parameters.Length == 1 && typeof(IDomainEvent).IsAssignableFrom(parameters[0].ParameterType) select new { EventName = parameters[0].ParameterType.FullName, MethodInfo = mi }; foreach (var methodInfo in methodInfoList) { registry.Add(methodInfo.EventName, methodInfo.MethodInfo); } return registry; }); Raise(new AggregateCreatedEvent(id)); } public Guid Id => id; long IPersistedVersionSetter.PersistedVersion { set => Interlocked.Exchange(ref this.persistedVersion, value); } public IEnumerable<IDomainEvent> UncommittedEvents => uncommittedEvents; public long Version => this.uncommittedEvents.Count + this.persistedVersion; void IPurgable.Purge() { lock (sync) { uncommittedEvents.Clear(); } } public void Replay(IEnumerable<IDomainEvent> events) { ((IPurgable)this).Purge(); events.OrderBy(e => e.Timestamp) .ToList() .ForEach(e => { HandleEvent(e); Interlocked.Increment(ref this.persistedVersion); }); } [HandlesInline] protected void OnAggregateCreated(AggregateCreatedEvent @event) { this.id = @event.NewId; } protected void Raise<TDomainEvent>(TDomainEvent domainEvent) where TDomainEvent : IDomainEvent { lock (sync) { // 首先處理事件數據。 this.HandleEvent(domainEvent); // 而後設置事件的元數據,包括當前事件所對應的聚合根類型以及 // 聚合的ID值。 domainEvent.AggregateRootId = this.id; domainEvent.AggregateRootType = this.GetType().AssemblyQualifiedName; domainEvent.Sequence = this.Version + 1; // 最後將事件緩存在「未提交事件」列表中。 this.uncommittedEvents.Enqueue(domainEvent); } } private void HandleEvent<TDomainEvent>(TDomainEvent domainEvent) where TDomainEvent : IDomainEvent { var key = domainEvent.GetType().FullName; if (registeredHandlers.Value.ContainsKey(key)) { registeredHandlers.Value[key].Invoke(this, new object[] { domainEvent }); } } }
上面的代碼不算複雜,它根據上面的分析和描述,實現了IAggregateRootWithEventSourcing接口,篇幅緣由,就很少作解釋了,不過有幾點仍是能夠鑑賞一下的:
如今,咱們已經實現了CQRS架構下的聚合與聚合根,雖然實際上這個結構有可能比咱們的實現更爲複雜,可是目前的這個設計已經可以知足咱們進一步研究討論的需求了。下面,咱們再更進一步,看看CQRS中倉儲應該如何實現。
爲何說是「初探」?由於咱們目前打算實現的倉儲暫時不包含事件派發的邏輯,這部份內容我會在後續文章中講解。首先看看,倉儲的接口是什麼樣的。在CQRS架構中,倉儲只具有兩種操做:
你或許會問,那根據某個條件查詢知足該條件的全部聚合對象呢?注意,這是CQRS架構中查詢部分的職責,不屬於咱們的討論範圍。
一般,倉儲的接口定義以下:
public interface IRepository { Task SaveAsync<TAggregateRoot>(TAggregateRoot aggregateRoot) where TAggregateRoot : class, IAggregateRootWithEventSourcing; Task<TAggregateRoot> GetByIdAsync<TAggregateRoot>(Guid id) where TAggregateRoot : class, IAggregateRootWithEventSourcing; }
與以前領域事件的設計相似,咱們爲倉儲定義一個抽象類,全部倉儲的實現都應該基於這個抽象類:
public abstract class Repository : IRepository { protected Repository() { } public async Task<TAggregateRoot> GetByIdAsync<TAggregateRoot>(Guid id) where TAggregateRoot : class, IAggregateRootWithEventSourcing { var domainEvents = await LoadDomainEventsAsync(typeof(TAggregateRoot), id); var aggregateRoot = ActivateAggregateRoot<TAggregateRoot>(); aggregateRoot.Replay(domainEvents); return aggregateRoot; } public async Task SaveAsync<TAggregateRoot>(TAggregateRoot aggregateRoot) where TAggregateRoot : class, IAggregateRootWithEventSourcing { var domainEvents = aggregateRoot.UncommittedEvents; await this.PersistDomainEventsAsync(domainEvents); aggregateRoot.PersistedVersion = aggregateRoot.Version; aggregateRoot.Purge(); } protected abstract Task<IEnumerable<IDomainEvent>> LoadDomainEventsAsync(Type aggregateRootType, Guid id); protected abstract Task PersistDomainEventsAsync(IEnumerable<IDomainEvent> domainEvents); private TAggregateRoot ActivateAggregateRoot<TAggregateRoot>() where TAggregateRoot : class, IAggregateRootWithEventSourcing { var constructors = from ctor in typeof(TAggregateRoot).GetTypeInfo().GetConstructors() let parameters = ctor.GetParameters() where parameters.Length == 0 || (parameters.Length == 1 && parameters[0].ParameterType == typeof(Guid)) select new { ConstructorInfo = ctor, ParameterCount = parameters.Length }; if (constructors.Count() > 0) { TAggregateRoot aggregateRoot; var constructorDefinition = constructors.First(); if (constructorDefinition.ParameterCount == 0) { aggregateRoot = (TAggregateRoot)constructorDefinition.ConstructorInfo.Invoke(null); } else { aggregateRoot = (TAggregateRoot)constructorDefinition.ConstructorInfo.Invoke(new object[] { Guid.NewGuid() }); } // 將AggregateRoot下的全部事件清除。事實上,在AggregateRoot的構造函數中,已經產生了AggregateCreatedEvent。 aggregateRoot.Purge(); return aggregateRoot; } return null; } }
代碼也是很是簡單、容易理解的:GetByIdAsync方法根據給定的聚合根類型以及ID值,從後臺存儲中讀取全部屬於該聚合的領域事件,並在聚合上進行回放,以便將聚合恢復到存儲前的狀態;SaveAsync方法則從聚合根上得到全部未被提交的領域事件,將這些事件保存到後臺存儲,而後設置聚合的「已保存版本」,最後清空未提交事件的緩存。剩下的就是如何實現LoadDomainEventsAsync以及PersistDomainEventsAsync兩個方法了。而這兩個方法,本來就應該是事件存儲對象的職責範圍了。
Note:你也許會問:若是某個聚合從開始到如今,已經發生了大量的領域事件了,那麼這樣一條條地將事件回放到聚合上,豈不是性能很是低下?沒錯,這個問題咱們能夠經過快照來解決。在後續文章中我會介紹。你還會問:日積月累,事件存儲系統中的事件數量豈不是會愈來愈多嗎?須要刪除嗎?答案是:不刪!不過能夠對數據進行歸檔,或者依賴一些第三方框架來處理這個問題,可是,從領域驅動設計的角度,領域事件表明着整個領域模型系統中發生過的全部事情,事情既然已經發生,就沒法再被抹去,所以,刪除事件存儲系統中的事件是不合理的。那數據量愈來愈大怎麼辦?答案是:或許,存儲硬件設備要比業務數據更便宜。
倉儲的實現咱們暫且探索到這一步,目前咱們只須要有一個正確的聚合保存、讀取(經過領域事件重塑)的邏輯就能夠了,並不須要關心事件自己是如何被讀取被保存的。接下來,咱們在.NET Core的測試項目中,藉助Moq框架,經過Mock一個假想的倉儲,來驗證整個系統從聚合、聚合根的實現到倉儲設計的正確性。
Moq是一個很好的Mock框架,簡單輕量,並且支持.NET Core,在單元測試的項目中使用Moq是一種很好的實踐。Moq上手很是簡單,只須要在單元測試項目上添加Moq的NuGet依賴包就能夠開始着手編寫測試用例了。爲了測試咱們的聚合根以及倉儲對聚合根保存、讀取的設計,首先咱們定義一個簡單的聚合:
public class Book : AggregateRootWithEventSourcing { public void ChangeTitle(string newTitle) { this.Raise(new BookTitleChangedEvent(newTitle)); } public string Title { get; private set; } [HandlesInline] private void OnTitleChanged(BookTitleChangedEvent @event) { this.Title = @event.NewTitle; } public override string ToString() { return Title; } }
Book類是一個聚合根,它繼承AggregateRootWithEventSourcing抽象類,同時它有一個屬性,Title,表示書的名稱,而ChangeTitle方法(業務方法)會直接產生一個BookTitleChangedEvent領域事件,以後,OnTitleChanged成員函數會負責將領域事件中的NewTitle的值設置到Book聚合根的Title狀態上,完成書本標題的更新。與之相關的BookTitleChangedEvent的定義以下:
public class BookTitleChangedEvent : DomainEvent { public BookTitleChangedEvent(string newTitle) { this.NewTitle = newTitle; } public string NewTitle { get; set; } public override string ToString() { return $"{Sequence} - {NewTitle}"; } }
首先,下面兩個測試用例用於測試Book聚合自己產生領域事件的過程是否正確,若是正確,那麼當Book自己本構造時,會產生一個AggregateCreatedEvent,若是更改書本的標題,則又會產生一個BookTitleChangedEvent,因此,第一個測試中,book的版本應該爲1,而第二個則爲2:
[Fact] public void CreateBookTest() { // Arrange & Act var book = new Book(); // Assert Assert.NotEqual(Guid.Empty, book.Id); Assert.Equal(1, book.Version); } [Fact] public void ChangeBookTitleEventTest() { // Arrange var book = new Book(); // Act book.ChangeTitle("Hit Refresh"); // Assert Assert.Equal("Hit Refresh", book.Title); Assert.Equal(2, book.UncommittedEvents.Count()); Assert.Equal(2, book.Version); }
接下來,測試倉儲保存Book聚合的正確性,由於咱們沒有實現一個有效的倉儲實例,所以,這裏藉助Moq幫咱們動態生成。在下面的代碼中,讓Moq對倉儲抽象類的PersisDomainEventsAsync受保護成員進行動態生成,指定當它被任何IEnumerable<IDomainEvent>做爲參數調用時,都將這些事件保存到一個本地的List中,因而,最後只須要檢查List中的領域事件是否符合咱們的要求就能夠了。代碼以下:
[Fact] public async Task PersistBookTest() { // Arrange var domainEventsList = new List<IDomainEvent>(); var mockRepository = new Mock<Repository>(); mockRepository.Protected().Setup<Task>("PersistDomainEventsAsync", ItExpr.IsAny<IEnumerable<IDomainEvent>>()) .Callback<IEnumerable<IDomainEvent>>(evnts => domainEventsList.AddRange(evnts)) .Returns(Task.CompletedTask); var book = new Book(); // Act book.ChangeTitle("Hit Refresh"); await mockRepository.Object.SaveAsync(book); // Assert Assert.Equal(2, domainEventsList.Count); Assert.Empty(book.UncommittedEvents); Assert.Equal(2, book.Version); }
同理,咱們還能夠測試倉儲讀取聚合並恢復聚合狀態的正確性,一樣仍是使用Moq對倉儲的LoadDomainEventsAsync進行Mock:
[Fact] public async Task RetrieveBookTest() { // Arrange var fakeId = Guid.NewGuid(); var domainEventsList = new List<IDomainEvent> { new AggregateCreatedEvent(fakeId), new BookTitleChangedEvent("Hit Refresh") }; var mockRepository = new Mock<Repository>(); mockRepository.Protected().Setup<Task<IEnumerable<IDomainEvent>>>("LoadDomainEventsAsync", ItExpr.IsAny<Type>(), ItExpr.IsAny<Guid>()) .Returns(Task.FromResult(domainEventsList.AsEnumerable())); // Act var book = await mockRepository.Object.GetByIdAsync<Book>(fakeId); // Assert Assert.Equal(fakeId, book.Id); Assert.Equal("Hit Refresh", book.Title); Assert.Equal(2, book.Version); Assert.Empty(book.UncommittedEvents); }
好了,其它的幾個測試用例就很少作介紹了,使用Visual Studio運行一下測試而後查看結果就能夠了:
本文又是一篇長篇幅的文章,好吧,要介紹的東西太多,並且這些內容又不能單獨割開成多個主題,因此也就很難控制篇幅了。文章主要介紹了基於CQRS架構的聚合以及聚合根的設計與實現,同時引出了倉儲的部分實現,這些內容也是爲從此進一步討論事件驅動型架構作準備。本文介紹的內容對於一個真實的CQRS系統實現來講仍是有必定差距的,但整體結構也大體如此。文中還說起了快照的概念,這部份內容我從此在介紹事件存儲的實現部分還會詳細討論,下一章打算擴展一下倉儲自己,瞭解一下倉儲對領域事件的派發,以及事件處理器對領域事件的處理。
本系列文章的源代碼在https://github.com/daxnet/edasample這個Github Repo裏,經過不一樣的release tag來區分針對不一樣章節的源代碼。本文的源代碼請參考chapter_4這個tag,以下: