前一篇文章介紹了Conference案例的架構設計,本篇文章開始介紹Conference案例的代碼實現。因爲代碼比較多,一開始就所有介紹全部細節,估計不少人接受不了,也理解不了。因此,我先進行一次QuickStart的介紹,即選取某個簡單典型的場景從前到後過一下每一個環節。這樣你們就可以快速對代碼的重要關鍵環節有大概的理解。另外,我如今正在作ENode的官網,到時會像axon framework同樣,介紹ENode框架自己、使用場景、性能數據、案例,以及論壇社區等功能;html
本文打算選擇Conference案例中一個不太複雜的場景(發佈會議),來快速過一下須要開發者實現的代碼環節。架構
首先,咱們來看一下發佈一個會議的UI入口,前面的文章介紹過,當客戶建立好一個會議後,他能夠先編輯會議的全部座位類型,而後若是容許預訂者預約了,那他就須要先發布這個會議。就像淘寶的賣家要上架商品後商品纔會對買家可見同樣。本質上,發佈一個會議,其實就是將會議聚合根的isPublished修改成true。UI以下圖所示:併發
當客戶點擊Publish按鈕後,前臺提交HttpPost請求到服務端,而後請求就會被ASP.NET MVC的Controller處理,Controller的Action的邏輯以下:app
上面的代碼比較清晰,咱們先判斷當前的_conference實例是否爲空,若是爲空,則直接返回HttpNotFound結果。那_conference實例哪裏來的?這裏考慮到ConferenceController中的大部分Action都要使用當前的_conference實例,因此咱們爲了代碼的複用,在Controller的OnActionExecuting方法裏,提早獲取了當前的_conference實例,代碼我稍後再貼。_conference實例有了以後咱們就能夠構建一個PublishConference的命令。該命令只須要一個當前要發佈的Conference的ID便可。而後,咱們調用ExecuteCommandAsync方法,去異步執行一個命令。而後,咱們使用await關鍵字異步等待命令的處理結果。最後判斷結果是否成功,作相應的處理。框架
ExecuteCommandAsync方法:異步
該方法內部使用_commandService的ExecuteAsync方法來異步執行一個命令。_commandService是ENode框架提供的分佈式的命令發送或執行服務,該服務是經過ConferenceController的構造函數注入的,代碼以下:分佈式
使用ENode框架開發的Controller,通常是須要兩個服務,一個是commanService,用於發送命令;另外一個是某個queryService,用語查詢數據;Controller依賴這兩個服務充分體現了CQRS架構的特色。固然,有時查詢服務可能不止一個,那就能夠注入多個,看咱們本身須要便可。ENode使用的依賴注入框架是Autofac。你們可能在想,爲什麼要弄一個ExecuteCommandAsync方法出來呢?由於要處理超時的狀況,假如一個命令處理超時了(好比5s),那Controller的Action也須要當即返回了。TimeoutAfter的代碼以下:函數
你們能夠看到TimeoutAfter方法內部,爲了實現當超過指定時間後要求的Task還未處理完的狀況,咱們建立了一個延後指定時間執行的Task,而後經過Task.WhenAny方法異步等待任務執行,最後判斷完成的Task是哪一個,從而實現超時的處理。這個作法是我在網上找到的,以爲還不錯,這個作法可讓咱們在實現徹底異步的同時還能實現超時處理。最後,咱們看一下OnActionExecuting方法:性能
OnActionExecutingui
這個方法的代碼的邏輯也比較簡單,就是根據HttpRequest中包含的slug參數先獲取一個Conference聚合根;若是存在,則進一步根據accessCode參數檢查accessCode參數是否合法。經過合法,則認爲提供的slug和accessCode有效。你們能夠把slug理解爲惟必定位一個Conference的,而accessCode是使用該Conference的密碼。因爲這個只是一個案例,因此咱們經過這種簡單有效的方法來爲用戶受權。
瞭解了Controller的實現,咱們接下來看看Command的定義,Command是一個DTO對象。代碼以下:
很是簡單,因爲ENode框架基類提供的Command類已經提供了一個AggregateRootId的屬性,因此咱們的PublishConference命令無需再定義其餘額外的屬性了。須要提一下的是,ENode框架要求,全部Command要建立或修改的聚合根的ID必須在Command發送以前賦值,這個是框架的一個約束,我認爲這個一般不是問題。若是你但願聚合根的ID是一個long,那也許你須要本身部署一個全局long生成服務了,有興趣的朋友能夠和我交流實現方案,我有實現經驗。若是你的ID是一個字符串,那用ENode框架提供的ObjectId類便可,它能夠幫你自動生成一個24位長度的全局惟一順序字符串。接下來咱們看看Command Handler的實現。
一個CommandHandler中的代碼一般是一句話,ENode框架的最大好處是可讓開發者無需關注C端的技術實現,開發者只須要關心如何實現本身的業務邏輯便可。如上圖所示,咱們會先定義一個ConferenceCommandHandler的類,而後實現ICommandHandler<PublishConference>接口,而後進一步實現對應的Handle方法。在Handle方法內部,咱們只須要從當前的上下文根據Command所關聯的聚合根ID獲取當前要操做的聚合根,而後調用聚合根的業務方法便可。咱們不須要像經典DDD那樣把聚合根從IConferenceRepository中取出來,再修改聚合根,再保存聚合根。而且經典DDD每每還會和工做單元(Unit of Work)配合;由於經典DDD,是支持一個應用層的方法同時修改多個聚合根的,而ENode框架是要求一個命令一次只能建立或修改一個聚合根,即架構設計上就是面向最終一致性的,主要目的爲了實現更高的吞吐量,這點開發者須要明確與瞭解。CommandHandler,從代碼實現的角度,我相信ENode框架提供的方式是很是簡單和直接的,沒有任何多餘的東西。你們能夠看到使用ENode框架開發,大部分狀況是不須要定義Repository的。下面咱們來看看Domain聚合根的實現。
使用ENode框架開發的領域模型,聚合根的實現一般是這樣的:
當前咱們這裏被調用的方法是Publish,該方法內部,先判斷當前聚合根是否已經處於發佈狀態,若是是,則拋出異常便可,固然,你選擇忽略也沒問題;若是不是,則調用ApplyEvent方法Apply當前領域事件。ApplyEvent方法的邏輯是,先找到當前事件對應的Handle方法,而後調用該Handle方法;而後調用完成後,把當前事件放入一個聚合根內部的事件隊列中。
若是對ENode框架的實現有必定了解的朋友應該知道,ENode在處理一個命令時,ENode框架處理Command的核心流程是這樣的:
上面這個是正常流程,在這裏我順便提一下,爲了讓你們更好的理解內部實現的機制。經過上面這些介紹,我想你們應該至少能夠理解上面的Publish方法和Handle方法了吧。
另外,有些朋友可能會想,爲什麼是先產生事件,再修改狀態呢?
主要緣由是由於這個Handle方法是會在事件溯源(ES)的時候被重複利用的。當咱們要從EventStore經過ES還原某個聚合根時,咱們是先從EventStore獲取該聚合根所產生的全部的事件,而後對每一個事件調用聚合根的對應的Handle方法,從而實現聚合根狀態的還原。這個過程也就是咱們常說的事件溯源,即ES(Event Sourcing)。
須要強調的是,聚合根應該在產生事件以前把各類業務規則和業務邏輯實現掉,而後只有當前操做知足全部的業務規則時,才調用ApplyEvent方法。而後在聚合根裏的全部Handle方法中,就是僅僅簡單的等於號賦值操做,不能有任何業務邏輯,這點很是重要。爲何要這樣呢?由於假如咱們把一些業務規則和邏輯放在Handle方法中,好比if怎麼樣的時候作什麼賦值,else的時候作另外的賦值。那假如哪一天咱們的Handle方法裏的判斷邏輯變化了,那咱們經過事件溯源還原出來的聚合根的狀態就不對了。這點應該不難理解吧。
從更高層面(哲學)的角度來理解,EventStore中存儲的事件並非完整的歷史。事件+聚合根的Handle方法纔是完整的歷史,二者結合才能夠完整地將聚合根的狀態還原到最新狀態。由於是歷史,歷史沒法改變,因此咱們的事件和Handle方法也都不能修改;或者若是真的要修改,也必須確保兼容老的結構和實現,這點很是重要。下面咱們來看看Event Handler的實現:
EventHandler的做用是根據C端聚合根產生的事件來更新CQRS的讀庫。須要注意的是ENode整個框架對外提供的API基本都是異步IO的(實際上內部的實現也都是異步IO的,只有整個鏈路都是異步得,才能發揮異步的好處)。因此咱們更新讀庫時,須要使用ADO.NET提供的Async方法類更新DB。這裏我使用ENode自帶的Dapper輕量級高性能ORM來實現對讀庫的更新。上面的代碼中就是更新Conference表的IsPublish字段。可是爲了確保避免併發致使的數據覆蓋,因此咱們須要嚴謹的利用樂觀控制來確保數據不會被覆蓋,ENode要求咱們使用Version機制來實現樂觀鎖。
關於併發控制的討論,其實還有很是多的細節能夠討論。我以前寫過一篇文章,你們有興趣的能夠去看一下,本文的目的是作一個QuickStart,因此不作過多展開了。TryUpdateRecordAsync方法的內部實現以下,很簡單,我就不作介紹了。
還有一點須要特別提一下,就是爲什麼要使用Dapper而不使用EF這種ORM框架。由於ENode框架實現的是CQRS+ES的架構。因此,咱們在更新讀庫時,是根據事件更新讀庫。那怎麼樣的更新是最快的呢?就是直接經過Insert或Update語句來更新DB。而若是經過EF這種框架,由於是面向OO的ORM,因此通常是須要先從DB取出數據轉換爲對象,再更新對象,再保存對象這樣的思路。這個過程我我的認爲,對於CQRS+ES架構的應用來講,是比較繁瑣和低效(2次IO,先讀出來,再保存回去)的。咱們在更新讀庫時,更好的方式應該是利用像Dapper這樣的ORM框架,簡單直接的更新讀庫(一次IO操做便可)。另外,我經過對Dapper作了一些簡單的二次封裝,能夠作到用最直接的代碼實現目的,且兼顧了代碼的可讀性、可維護性、靈活性,以及性能。另外,查詢數據時,經過Dapper也很是簡單,並且還支持返回dynamic對象。Dapper是基於約定的框架,不須要作ORM映射方面的配置。我我的認爲使用在CQRS+ES架構中是很是合理的。因此,對我來講,EF能夠退休了,呵呵。
好了,上面介紹了發佈會議的全部須要用戶寫的代碼,是否是很簡單呢?我我的認爲和經典DDD的架構相比,因爲有ENode框架的支持,因此開發基於CQRS+ES架構的應用,是很是簡單的。下一篇要寫什麼還沒想好,你們還想了解什麼,能夠及時給我反饋啊。