到達旅程的終點:最後的任務。
「你不能飛的像一隻長着鷦鷯翅膀的老鷹那樣。」亨利·哈德遜
複製代碼
咱們旅程的最後階段的三個主要目標是使系統對故障更具彈性,提升UI的響應能力,並確保咱們的設計是可伸縮的。增強系統的工做主要集中在訂單和註冊限界上下文中的RegistrationProcessManager類。性能改進工做的重點是當訂單建立時UI與領域域模型的交互方式。前端
本章使用了一些術語,咱們將在下面進行描述。有關更多細節和可能的替代定義,請參閱參考指南中的「深刻CQRS和ES」。git
命令(Command):命令是要求系統執行更改系統狀態的操做。命令是必須服從(執行)的一種指令,例如:MakeSeatReservation。在這個限界上下文中,命令要麼來自用戶發起請求時的UI,要麼來自流程管理器(當流程管理器指示聚合執行某個操做時)。命令由單個接收方只處理一次,命令要麼經過命令總線(command bus)傳遞給接收方,要麼直接在進程中傳遞。若是是經過總線傳遞的,則該命令是異步發送的,在進程中傳遞,命令將同步發送。程序員
事件(Event):一個事件,好比OrderConfirmed,描述了系統中發生的一些事情,一般是一個命令的結果。領域模型中的聚合引起事件。事件也能夠來自其餘限界上下文。多個訂閱者能夠處理特定的事件。聚合將事件發佈到事件總線。處理程序在事件總線上註冊特定類型的事件,而後將事件傳遞給訂閱服務器。在訂單和註冊限界上下文中,訂閱者是流程管理器和讀取模型生成器。github
快照(Snapshots):快照是一種能夠應用於事件源的優化。在從新還原(rehydrated)聚合時,不須要重播與聚合相關的全部持久化事件,而是加載聚合狀態的最新副本,而後只重播保存快照後持久的事件。經過這種方式,能夠減小必須從事件存儲里加載的數據量。web
冪等性(Idempotency):冪等性是一個操做的特性,這意味着該操做能夠屢次應用而不改變結果。例如,「將x的值設置爲10」的操做是冪等的,而「將x的值加1」的操做不是冪等的。在消息傳遞環境中,若是消息能夠屢次傳遞而不改變結果,則消息是冪等的:這多是由於消息自己的性質,也多是由於系統處理消息的方式。sql
最終一致性(Eventual consistency):最終一致性是一個一致性模型,它不能保證當即訪問更新的值。對數據對象進行更新後,存儲系統不保證對該對象的後續訪問將返回更新後的值。然而,存儲系統確實保證,若是在足夠長的時間內沒有對對象進行新的更新,那麼最終全部訪問均可以返回最後更新的值。數據庫
該應用程序旨在部署到Microsoft Azure。在旅程的這個階段,應用程序由兩個角色組成,一個包含ASP.Net MVC Web應用程序的web角色和一個包含消息處理程序和領域對象的工做角色。應用程序在寫端和讀端都使用Azure SQL DataBase實例進行數據存儲。在某些地方,應用程序還在寫端使用Azure table,在讀端使用Azure blob來存儲數據。應用程序使用Azure服務總線來提供其消息傳遞基礎設施。下圖展現了這個高級體系結構。c#
在研究和測試解決方案時,能夠在本地運行它,可使用Azure compute emulator,也能夠直接運行MVC web應用程序,並運行承載消息處理程序和領域域對象的控制檯應用程序。在本地運行應用程序時,可使用本地SQL Server Express數據庫,並使用一個在SQL Server Express數據庫實現的簡單的消息傳遞基礎設施。windows
有關運行應用程序的選項的更多信息,請參見附錄1「發佈說明」。api
在旅程的這個階段,團隊研究了增強RegistrationProcessManager類的方法。該類負責管理訂單和註冊上下文中的聚合之間的交互,並確保它們彼此一致。若是要將限界上下文做爲一個總體來維護其一致狀態,那麼流程管理器必須可以適應各類各樣的故障條件。
一般,流程管理器接收傳入的事件,而後在限界上下文內部基於流程管理器的狀態發出一個或多個命令到聚合。當流程管理器發出命令時,它一般會更改本身的狀態。
訂單和註冊限界上下文包含RegistrationProcessManager類。此流程管理器在此限界上下文中和支付限界上下文中負責經過路由事件和命令協調聚合的活動。所以,流程管理器負責確保這些限界上下文中的聚合正確地彼此同步。
Gary(CQRS專家)發言:
一個聚合決定了寫模型中的一致性邊界,這個邊界和系統持久存儲數據的一致性相關。流程管理器管理不一樣聚合(可能在不一樣的限界上下文中)之間的關係,並確保聚合最終彼此一致。
註冊過程的失敗可能對系統產生不利後果:聚合可能彼此不一樣步,這可能致使系統中出現不可預測的行爲,或者一些進程可能最終成爲殭屍進程,繼續運行並使用資源,但永遠不會完成。團隊肯定了如下與RegistrationProcessManager流程管理器相關的特定故障場景。流程管理器也許會:
這些設想可概括爲兩個具體的問題:
若是流程管理器自己的行爲是冪等的,那麼若是它第二次接收並處理一個事件,則不會致使系統中的不一致。使流程管理器的行爲具備冪等性,能夠避免前三種故障條件中固有的問題。崩潰以後,您能夠簡單地從新啓動流程管理器,並第二次從新處理傳入的事件。
您可讓流程管理器發送的全部命令都是冪等的,來替代讓流程管理器冪等。從新啓動流程管理器可能會致使第二次發送命令,但若是這些命令是冪等的,則不會對流程或系統產生不利影響。要使此方法生效,您仍然須要修改流程管理器,以確保它至少發送一次全部命令。若是命令是冪等的,那麼屢次發送它們並不重要,可是若是根本不發送就很重要。
在V1版本中,大多數消息處理要麼已是冪等的,要麼系統檢測到重複的消息並將它們發送到dead-letter隊列。例外狀況是OrderPlaced事件和SeatsReserved事件,所以團隊修改了系統V3版本處理這兩個事件的方式,以解決這個問題。
須要事務行爲來確保當RegistrationProcessManager類保存其狀態時,系統始終會發送命令。這要求團隊實現一個僞事務,由於將Azure服務總線和SQL數據庫表一塊兒放到分佈式事務中既不可取也不可行。
團隊爲V3版本所採用的解決方案是確保系統持久保存RegistrationProcessManager生成的全部命令,同時持久保存RegistrationProcessManager實例的狀態。而後,系統嘗試發送命令,並在成功發送以後將它們從存儲中刪除。每當從存儲中加載RegistrationProcessManager實例時,系統還檢查未發送的消息。
在這個階段,咱們使用Visual Studio運行性能和壓力測試,以分析響應時間並肯定瓶頸。團隊使用Visual Studio Load Test來模擬訪問應用程序的不一樣用戶數量,並在代碼中添加了額外的跟蹤,以記錄時間信息,以便進行詳細分析。團隊在Azure中建立了性能測試環境,在Azure VM角色實例中運行測試控制器和測試代理。這使咱們可以經過使用測試代理模擬不一樣數量的虛擬用戶來測試Contoso會議管理系統在不一樣負載下的執行狀況。
做爲這項工做的結果,團隊對系統進行了許多更改,以優化其性能。
Gary(CQRS專家)發言:
儘管在旅程中,團隊在項目結束時進行了性能測試和優化工做,但一般在你想作的時候就作這個工做是有意義的,這能夠解決可伸縮性問題並儘快加固代碼。若是您正在構建本身的基礎設施,而且須要可以處理高吞吐量,則尤爲如此。
Markus(軟件開發人員)發言:
由於實現CQRS模式會致使對組成系統的許多不一樣部分的職責進行很是清晰的分離,因此咱們發現添加優化和增強相對容易,由於許多必要的更改在系統中都很是容易定位。
當註冊者建立一個訂單時,她將訪問UI中的如下頁面序列。
有關UI中的屏幕和流程的更多信息,請參閱第5章「準備發佈V1版本」中的「基於任務的UI」一節。
在V2版本中,系統必須在註冊頁面付款頁面之間處理如下命令和事件:
此外,MVC控制器在發送初始RegisterToConference命令以前經過查詢讀模型來填充訂單,從而驗證是否有足夠的座位可用。
當團隊使用Visual Studio Load Test和不一樣的用戶負載模式來對應用程序作負載測試時,咱們注意到高負載經常發生在UI等待領域完成其處理時和讀模型接收寫模型數據時。這樣沒法顯示下一個頁面。特別是,隨着V2版本部署到中型的web和工做角色實例後,咱們發現:
備註:從UI在服務總線上發送初始命令到讀模型中出現訂價訂單,從而使UI可以向用戶顯示下一個屏幕。5秒是咱們但願看到的最大等待時間。
爲了解決這個問題,團隊肯定了兩個優化目標:UI和領域之間的交互,以及基礎設施。咱們決定首先處理UI和領域之間的交互。當這不能充分提升性能時,咱們還進行了基礎設施優化。
團隊與領域專家討論了在UI向領域發送RegisterToConference命令以前,是否老是須要驗證座位可用性。
Gary(CQRS專家)發言:
這個場景說明了與最終一致性相關的一些實際問題。讀端(在本例中是訂價訂單視圖模型)最終與寫端保持一致。一般,當您實現CQRS模式時,您應該可以接受最終的一致性,而不須要在UI中等待更改傳播到讀取端。然而,在這種狀況下,UI必須等待寫模型傳播到與特定順序相關的讀端信息。這可能代表原系統這一部分的分析和設計存在問題。
領域專家明確表示,系統應該在接受付款以前確認座位是否可用。Contoso不但願出售座位以後向註冊人解釋,這些座位是不可用的。所以,該團隊尋找了簡化流程的方法,直到註冊者看到付款屏幕爲止。
Beth(業務經理)發言:
這種謹慎的策略並不適用於全部狀況。在某些狀況下,即便不能當即完成訂單,企業也可能寧願接受這筆錢。企業可能知道庫存很快就會補充,或者客戶很樂意等待。在咱們的場景中,儘管Contoso能夠在沒有票的狀況下將錢退還給註冊者,註冊者也許仍然會購買機票,由於他覺得系統已經確認過,這筆錢是無法退還的。因此這很明顯是一個業務和領域專家要作的決策。
團隊肯定了對UI流的如下兩個優化。
大多數狀況下,會議有足夠的座位,註冊者沒必要相互爭奪來預訂座位。隨着大會的門票接近售罄,只有很短的一段時間內,報名者纔會爭奪最後幾個座位。
若是會議有足夠的可用座位,那麼註冊者到達付款界面卻發現系統沒法預訂座位的風險就很小。在這種狀況下,V2版本里,在到達付款頁面以前執行的一些處理能夠在付款頁面上當用戶輸入信息的時候異步發生,這樣就減小了註冊者在看到付款頁面前經歷延遲的機會。
Jana(軟件架構師)發言:
從本質上講,咱們所依賴的事實是,預訂會成功,因此避免了耗時的檢查。但咱們仍然要在註冊人付款以前執行檢查,以確保座位是可用的。
可是,若是控制器在發送RegisterToConference命令以前就檢查並發現沒有足夠的座位來完成訂單,則能夠從新顯示註冊屏幕,使註冊者可以根據當前可用性更新其訂單。
Jana(軟件架構師)發言:
對這一戰略的一個可能改進是,在發送RegisterToConference命令以前,看看是否可能有足夠的座位可用。這能夠減小注冊者在最後幾個座位售罄時調整訂單的次數。然而,這種場景發生的頻率很低,可能不值得實現。
在V2版本中,MVC控制器不顯示付款頁面,直到領域發佈OrderTotalsCalculated事件,而且系統更新了price-order視圖模型。此事件是控制器顯示屏幕以前發生的最後一個事件。
若是系統更早地計算總數並更新價格訂單視圖模型,控制器就能夠更早地顯示付款頁面。團隊肯定,訂單聚合能夠在訂單下單時計算總數,而不是在預訂完成時計算總數。這將使UI流比V2版本更快的走到付款頁面。
「天天都有一些新的事實浮現,一些新的障礙那些威脅着咱們的最嚴重的障礙。我想這就是爲何這款遊戲如此值得一玩的緣由。」 羅伯特·弗爾肯·斯科特
團隊在旅程的這個階段添加的第二組優化與系統的基礎設施相關。這些更改同時處理了系統的性能和可伸縮性。下面的部分描述了咱們在這裏所作的最重要的更改。
做爲優化過程的一部分,團隊更新了系統,以確保在服務總線上發送的全部消息都是異步發送的。這種優化旨在提升應用程序的整體響應能力,並提升消息的吞吐量。做爲此更改的一部分,團隊還使用了Transient Fault Handling Application Block來處理使用服務總線時遇到的任何瞬時錯誤。
Markus(軟件開發人員)發言:
這種優化致使了對基礎設施代碼的重大更改。將異步調用與Transient Fault Handling Application Block相結合是複雜的,咱們將受益於c# 4.5中的一些新的簡化語法!
Jana(軟件架構師)發言:
有關在使用Azure服務總線時幫助優化性能的其餘通過驗證的實踐,請參閱本指南:Best Practices for Performance Improvements Using Service Bus Brokered Messaging
V2版本對命令和事件使用相同的消息傳遞基礎設施——Azure服務總線。團隊評估了Contoso會議管理系統是否須要使用相同的基礎設施發送全部命令消息。
在決定是否繼續使用Azure服務總線傳輸全部命令消息時,咱們考慮了許多因素。
咱們肯定了一組命令,系統能夠從會議web應用程序在進程中同步地發送這些命令。爲了實現這種優化,咱們必須向會議web應用程序添加一些基礎設施元素(事件存儲庫、事件總線和事件發佈者)。之前,這些基礎設施元素只在系統的工做角色中。
異步命令是不存在的,它其實是另外一個事件。若是我必須接受一個你發給個人消息而且若是我不一樣意必須發出一個事件。那這就不是你要我作什麼,而是你告訴我什麼已經作完了。乍一看,這彷佛只有一點點不一樣,但它有不少含義。
Why do lots of developers use one-way command messaging (async handling) when it's not needed? Greg Young - DDD/CQRS Group
性能測試還發現了使用可用座位(SeatsAvailability)聚合的瓶頸,咱們使用快照的形式解決了這個瓶頸。
Jana(軟件架構師)發言:
一旦團隊肯定了這個瓶頸,就很容易實現和測試這個解決方案。咱們在實現CQRS模式時所遵循的方法的優勢之一是:咱們能夠在系統中進行小的局部更改。更新不須要咱們去跨系統的多個部分進行復雜的更改。
當系統從事件存儲中從新還原(rehydrates)聚合實例時,它必須加載並重播與該聚合實例關聯的全部事件。這裏可能的優化是存儲聚合狀態在最近某個時間點的滾動快照,以便系統只須要加載快照和後續事件,從而減小必須從新加載和重播的事件數量。在Contoso會議管理系統中,隨着時間的推移,惟一可能會累積大量事件的聚合是可用座位(SeatsAvailability)聚合。咱們決定使用Memento模式做爲快照解決方案的基礎,以便與可用座位(SeatsAvailability)聚合一塊兒使用。咱們實現的解決方案使用一個memento來捕獲座位可用性聚合的狀態,而後在緩存中保存一個memento的副本。而後,系統嘗試處理緩存的數據,而不是老是從事件存儲中從新加載聚合。
Gary(CQRS專家)發言:
一般,在事件源上下文中,快照是持久化的,而不是咱們在項目中實現的臨時本地緩存。
就提升系統中事件消息的吞吐量而言,並行發佈事件被證實是最重要的優化之一。爲了獲得最好的結果,團隊進行了屢次迭代:
Jana(軟件架構師)發言:
當系統從服務總線檢索消息時,咱們在SubscriptionReceiver和SessionSubscriptionReceiver類中採用了相同的動態節流方法。
另外一個優化是向Azure服務總線Topic訂閱添加過濾器,以免讀取那些稍後將被與訂閱關聯的處理程序忽略的消息。
Markus(軟件開發人員)發言:
這裏咱們利用了Azure服務總線提供的特性。
這使可用座位(SeatsAvailability)聚合的接收者可以使用支持會話的訂閱。這是爲了確保每一個聚合實例只有一個寫入者,由於可用座位(SeatsAvailability)聚合是一個高爭用的聚合。這阻止了咱們在擴展時接收大量併發異常。
Jana(軟件架構師)發言:
在其餘地方,咱們使用帶有會話的訂閱來保證事件的順序。在本例中,咱們使用會話是出於不一樣的緣由——以確保每一個聚合實例只有一個寫入者。
這個優化緩存了會議web網站處處使用的幾個讀模型。它包含邏輯來決定如何基於特定會議的可用座位的數量來保持緩存中的數據:若是有不少空位,系統能夠緩存數據很長一段時間,可是若是不多有空位就不緩存數據。
團隊還對服務總線進行了劃分,以使應用程序更具可伸縮性,並避免在系統發送的消息量接近服務總線可以處理的最大吞吐量時進行節流。每一個服務總線Topic能夠由Azure中的不一樣節點處理,所以經過使用多個Topic,咱們能夠增長潛在的吞吐量。咱們考慮瞭如下分區方案:
有關這些劃分方案的詳細討論,請參閱Martin L. Abbott和Michael T. Fisher所寫的《可伸縮性規則:Web站點伸縮的50個原則》(Addison-Wesley, 2011)中的第11章「異步通訊和消息總線」。
咱們決定爲訂單聚合和可用聚合發佈的事件使用單獨的Topic,由於這些聚合負責了經過服務總線流動的大多數事件。
Gary(CQRS專家)發言:
並非全部的信息都具備相同的重要性。您還可使用消息總線來處理單獨的、按優先級排列的不一樣的消息類型,甚至能夠考慮不爲某些消息使用消息總線。
Jana(軟件架構師)發言:
將服務總線與系統的任何其餘關鍵組件同樣對待。這意味着您應該確保您的服務總線能夠伸縮。此外,請記住,並不是全部數據對您的業務都具備相同的價值。僅僅由於您有一個服務總線,並不意味着全部東西都必須通過它。明智的作法是消除低價值、高成本的流量。
團隊還執行了一些額外的優化,這些優化在下面的實現細節部分中列出。團隊在這一階段的主要目標是優化系統,以確保UI呈現對用戶有足夠好的響應。咱們還能夠執行其餘優化,這將有助於進一步提升性能,並優化系統使用資源的方式。例如,團隊考慮的進一步優化是擴展視圖模型生成器,該生成器填充系統中的各類讀取模型。每一個承載視圖模型生成器實例的web角色都必須經過建立對Azure服務總線主題的訂閱來處理寫端發佈的事件。
除了在提升應用程序性能的旅程的最後階段所作的更改以外,團隊還肯定了一些其餘更改,這些更改將致使進一步的改進。可是,這個旅程的可用時間有限,因此不可能在V3版本中進行這些更改。
Markus(軟件開發人員)發言:
經過接受一個服務總線會話,只要您保持鎖,就只有一個會話的寫入者和監聽者。這減小了樂觀併發異常。這種設計特別適合可用座位聚合的讀和寫模型。對於具備很是小分區的訂單聚合關聯的讀模型,您能夠從服務總線獲取多個小會話,並在每一個會話上使用存儲轉發方法。儘管系統中的讀和寫模型均可以從這種方法中受益,可是在咱們指望數據最終是一致的、而不是徹底一致的讀模型中實現起來更容易。
該網站已經緩存了一些常常訪問的讀模型數據,可是咱們能夠將緩存的使用擴展到系統的其餘區域。CQRS模式意味着咱們能夠將緩存視爲最終一致的讀模型的一部分,若是須要,還可使用不一樣的緩存或根本不使用緩存來訪問來自系統不一樣部分的讀模型數據。
咱們能夠改進可用座位(SeatsAvailability)聚合的緩存快照實現。本章稍後將詳細描述當前實現,其目的是始終檢查事件存儲,以查找在系統建立最新緩存快照以後到達的事件。當咱們接收到要處理的新命令時,若是咱們能夠檢查是否仍然使用與系統建立最新緩存快照時相同的服務總線會話,那麼咱們就能夠知道事件存儲中是否還有其餘事件。若是會話沒有更改,那麼咱們就知道本身是唯一的寫入者,所以沒有必要檢查事件存儲。若是會話已經更改,那麼其餘人可能已經將與聚合相關的事件寫入到存儲中,咱們須要進行檢查。
應用程序當前使用相同的優先級監聽全部服務總線訂閱上的全部消息。在實踐中,有些信息比其餘信息更重要。所以,當應用程序處於壓力之下時,咱們應該優先處理一些消息,以最小化對核心應用程序功能的影響。例如,咱們能夠識別某些願意接受更多延遲的讀模型。
Poe(IT運維人員)發言:
咱們還能夠在負載增長時使用自動縮放來擴展應用程序(例如使用Autoscaling Application Block),可是添加新實例須要時間。經過肯定某些消息類型的優先級,咱們能夠在自動縮放添加資源的同時,繼續在應用程序的關鍵領域提供性能。
當前實現使用隨機生成的Guid做爲存儲在SQL數據庫實例中的全部實體的鍵。當系統處於高負載下時,若是使用順序Guid,特別是與彙集索引相關的Guid,它的性能可能會更好。有關順序Guid的討論,請參見The Cost of GUIDs as Primary Keys。
做爲系統優化的一部分,咱們如今在進程中處理一些命令,而不是經過服務總線發送它們。咱們能夠將此擴展到其餘命令,並可能擴展到流程管理器。
在當前實現中,流程管理器處理傳入消息,而後存儲庫嘗試同步發送傳出消息(若是服務總線因爲節流行爲引起任何異常,則使用 Transient Fault Handling Application Block重試發送命令)。咱們能夠替代使用一種相似於EventStoreBusPublisher類的機制以讓流程管理器保存一個消息列表,這些消息必須在一個事務裏連同它的狀態一塊兒發送,而後通知系統的另外一部分,這個部分的職責是當有一些新消息準備好要發送的時候負責來發送消息。
Markus(軟件開發人員)發言:
負責發送消息的系統部分能夠異步發送消息。它還能夠爲發送消息實現動態節流,並動態控制要使用多少個並行發送器。
咱們當前的事件存儲實現是:爲存儲在事件存儲裏的每個事件發佈一個單獨的,小的消息到消息總線上。咱們能夠將其中一些消息組合在一塊兒,以減小服務總線上的I/O操做總數。例如,大型會議的可用座位(SeatsAvailability)聚合實例發佈大量事件,訂單(Order)聚合以突發方式發佈事件(當建立訂單(Order)聚合時,它同時發佈OrderPlaced事件和OrderTotalsCalculated事件)。這還將有助於減小系統中的延遲,由於目前,在那些順序很重要的場景中,咱們必須在發送下一個事件以前等待一個事件已被髮送的確認。將事件序列分組到一條消息中意味着咱們不須要在發佈單個事件之間等待確認。
Contoso會議管理系統容許您部署web和工做者角色的多個實例,從而擴展應用程序以處理更大的負載。然而,該設計並非徹底可伸縮的,由於系統的其餘一些元素,例如消息總線和數據存儲對最大可實現的吞吐量有限制。本節概述了咱們能夠對系統進行的一些更改,以刪除其中的一些約束,並顯著提升系統的可伸縮性。此次旅程的可用時間有限,因此沒能在V3版本中進行這些更改。
數據分區:系統在不一樣的分區中存儲不一樣類型的數據。在啓動代碼中,您能夠看到不一樣的限界上下文如何使用不一樣的鏈接字符串鏈接到SQL數據庫實例。可是,每一個限界上下文目前使用一個SQL數據庫實例,咱們能夠將其更改成使用多個不一樣的實例,每一個實例都包含系統使用的特定數據集。例如,訂單和註冊限界上下文能夠爲不一樣的讀取模型使用不一樣的SQL數據庫實例。咱們還能夠考慮使用federations特性來使用分片擴展一些SQL數據庫實例。
「數據持久性是大多數可伸縮SaaS企業面臨的最困難的技術問題。」 -Evan Cooke, CTO, Twilio,Scaling High-Availability Infrastructure in the Cloud
Jana(軟件架構師)發言:
在系統將數據存儲在Azure表存儲中的地方,咱們選擇用鍵對數據進行分區以實現可伸縮性。做爲使用SQL數據庫federations對數據進行切分的替代方法,咱們能夠將SQL數據庫實例中當前的一些讀模型數據移動到Azure表存儲或blob存儲中。
進一步劃分服務總線:經過爲不一樣的事件發佈者使用不一樣的Topic,咱們已經對服務總線進行了劃分,以免在系統發送的消息量接近服務總線可以處理的最大吞吐量時進行節流。咱們可使用多個類似的Topic來進一步劃分主題,並經過循環監聽它們來分擔負載。有關此方法的詳細描述,請參見Abbott和Fisher在Scalability Rules: 50 Principles for Scaling Web Sites, (Addison-Wesley, 2011)中的第11章"Asynchronous Communication and Message Buses"
存儲和轉發:咱們在前面關於性能改進的小節中介紹了存儲和轉發設計。經過批處理多個操做,您不只減小了到數據存儲的往返次數,並減小了系統中的延遲,還加強了系統的可伸縮性,由於發出更少的請求能夠減小對數據存儲的壓力。
監聽節流指示器並對其做出反應:目前,系統使用Transient Fault Handling Application Block來檢測瞬時錯誤條件,好比從Azure服務總線、SQL數據庫實例和Azure表存儲中檢測節流指示器。系統使用Block在這些場景中實現重試,一般使用指數回退策略。目前,咱們在單個訂閱級別使用動態節流,可是,咱們但願修改它來對特定主題的全部訂閱執行動態節流。相似地,咱們但願在SQL數據庫實例級和Azure存儲賬戶級實現動態節流。
Jana(軟件架構師)發言:
在應用程序裏實現動態節流的一個例子是從服務阻止節流,看EventStoreBusPublisher SubscriptionReceiver, SessionSubscriptionReceiver類是怎樣使用DynamicThrottling類來管理他們所使用的並行程度來發送或接收消息的。
Poe(IT運維人員)發言:
每個服務(Azure服務總線, SQL數據庫,Azure storage)都有本身獨特的方式來實現節流行爲,並在負載太重時通知您。例如,請參見SQL Azure Throttling。重要的是要了解應用程序使用的不一樣服務可能會對您的應用程序形成的全部節流。
Poe(IT運維人員)發言:
團隊還考慮使用Azure SQL數據庫商業版來取代Azure SQL數據庫Web版,但通過調查,咱們肯定目前版本之間的惟一區別是最大數據庫大小。不一樣版本沒有進行調優以支持不一樣類型的工做負載,並且兩個版本實現了相同的節流行爲。
有關可伸縮性的其餘信息,請參閱:
在談到可伸縮性和高可用性時,重要的是不要抱有錯誤的樂觀態度。儘管使用許多建議的實踐,應用程序每每能夠更有效地伸縮,而且對失敗更有彈性,但它們仍然容易出現高需求瓶頸。確保爲性能測試和實現性能目標分配足夠的時間。
「我常說,任何冒險工做的三分之二都是作準備」 Amelia Earhart
團隊計劃在Azure中進行從V2到V3版本的無停機遷移。爲了實現這一點,遷移過程使用一個運行在Azure工做者角色中的特殊處理器來執行一些遷移步驟。
遷移過程仍然須要您完成一個配置步驟來關閉V2處理器並打開V3處理器。回想起來,咱們應該使用一種不一樣的機制來簡化從V2到V3處理器的轉換,該轉換基於處理程序自己的反饋,以指示它們什麼時候完成了處理。
有關這些步驟的詳細信息,請參見附錄1「發佈說明」。
Poe(IT運維人員)發言:
在生產環境中執行遷移以前,應該始終在測試環境中演練遷移。
在從V2遷移到V3期間,咱們必須執行的步驟之一是經過重播事件日誌中的事件來從新構建DraftOrder和PricedOrder視圖模型,以填充新的V3讀模型表。咱們能夠異步執行此操做。然而,在某個時候,咱們須要開始將事件從活動的應用程序發送到這些讀模型。此外,咱們須要保持這些讀模型的V2和V3版本都是最新的,直到遷移過程完成,由於V2前端web角色須要V2的讀取模型數據可用,直到切換到V3前端web角色。在切換到V3前端時,咱們必須確保V3讀取的模型徹底是最新的。
爲了使這些讀取模型保持最新,咱們建立了一個做爲Azure工做者角色的臨時處理器,它在遷移過程當中運行。有關更多細節,請參閱會Conference解決方案中的MigrationToV3項目。該處理器執行的步驟是:
遷移過程首先從事件存儲中重播事件,以填充新的V3讀模型。當這一切完成時,咱們中止包含事件處理程序的V2處理器,並在V3處理器中啓動新的處理程序。當它們運行並跟蹤新Topic訂閱中積累的事件時,ad-hoc處理器還使V2的讀模型保持最新,由於此時咱們仍然擁有V2前端。當V3工做者角色準備好時,咱們能夠執行一個VIP切換來使用新的V3前端。在V3前端運行以後,咱們再也不須要V2讀模型。
使用這種方法要解決的問題之一是,如何肯定新的V3處理器應該在何時從處理事件日誌中的存檔事件切換處處理實時的事件流。在將事件寫入事件日誌的過程當中存在一些延遲,所以瞬時切換可能致使一些事件的丟失。團隊決定容許V3處理器暫時能夠同時處理存檔事件和實時事件,這意味着可能會有重複的事件,相同的事件存在於事件存儲區和由新訂閱累積的事件列表中。可是,咱們能夠檢測這些副本並相應地處理它們。
Markus(軟件開發人員)發言:
一般,咱們依賴於基礎設施來檢測重複的消息。在這個重複事件可能來自不一樣來源的特定場景中,咱們不能依賴於基礎設施,必須顯式地將重複檢測邏輯添加到代碼中。
咱們考慮的另外一種方法是在V3處理器中同時包含V2和V3處理。使用這種方法,在遷移期間不須要一個特別的工做人員角色來處理V2事件。可是,咱們決定將特定於遷移的代碼保存在一個單獨的項目中,以免V3發行版因爲包含只在遷移期間須要的功能而膨脹。
Jana(軟件架構師)發言:
若是咱們在V3處理器中同時包含V2和V3處理,遷移過程會稍微容易一些。但咱們認爲,這種方法的好處被沒必要在V3處理器中維護重複功能的好處所抵消。
遷移的每一個步驟之間的間隔須要一些時間來完成,所以遷移不會致使停機,可是用戶確實會遇到延遲。咱們能夠從處理切換開關的一些更快的機制中獲益,好比中止V2處理器並啓動V3處理器。
本節描述訂單和註冊限界上下文的實現的一些重要功能。您可能會發現擁有一份代碼拷貝頗有用,這樣您就能夠繼續學習了。您能夠從Download center下載一個副本,或者在GitHub上查看存儲庫:github.com/mspnp/cqrs-…。您能夠從GitHub上的Tags頁面下載V3版本的代碼。
備註:不要指望代碼示例與參考實現中的代碼徹底匹配。本章描述了CQRS過程當中的一個步驟,隨着咱們瞭解更多並重構代碼,實現可能會發生變化。
複製代碼
本節描述了團隊如何經過檢查SeatsReserved和OrderPlaced消息的重複實例來強化RegistrationProcessManager流程管理器。
一般,RegistrationProcessManager類向SeatAvailability聚合發送一個MakeSeatReservation命令,SeatAvailability聚合在進行預訂時發佈一個SeatsReserved事件,RegistrationProcessManager接收此通知。RegistrationProcessManager在建立訂單和更新訂單時都發送一條MakeSeatReservation命令。SeatsReserve事件到達的時候可能不是按順序的,可是,系統應該尊重與最後發送的命令相關的事件。本節描述的解決方案使RegistrationProcessManager可以識別最新的SeatsReserved消息,而後忽略任何較早的消息,而不是從新處理它們。
在RegistrationProcessManager類發送MakeSeatReservation命令以前,它將該命令的Id保存在SeatReservationCommandId變量中,以下面的代碼示例所示:
public void Handle(OrderPlaced message)
{
if (this.State == ProcessState.NotStarted)
{
this.ConferenceId = message.ConferenceId;
this.OrderId = message.SourceId;
// Use the order id as an opaque reservation id for the seat reservation.
// It could be anything else, as long as it is deterministic from the
// OrderPlaced event.
this.ReservationId = message.SourceId;
this.ReservationAutoExpiration = message.ReservationAutoExpiration;
var expirationWindow =
message.ReservationAutoExpiration.Subtract(DateTime.UtcNow);
if (expirationWindow > TimeSpan.Zero)
{
this.State = ProcessState.AwaitingReservationConfirmation;
var seatReservationCommand =
new MakeSeatReservation
{
ConferenceId = this.ConferenceId,
ReservationId = this.ReservationId,
Seats = message.Seats.ToList()
};
this.SeatReservationCommandId = seatReservationCommand.Id;
this.AddCommand(new Envelope<ICommand>(seatReservationCommand)
{
TimeToLive = expirationWindow.Add(TimeSpan.FromMinutes(1)),
});
...
}
複製代碼
而後,當它處理SeatsReserved事件時,它檢查該事件的CorrelationId屬性是否匹配SeatReservationCommandId變量的最新值,以下面的代碼示例所示:
public void Handle(Envelope<SeatsReserved> envelope)
{
if (this.State == ProcessState.AwaitingReservationConfirmation)
{
if (envelope.CorrelationId != null)
{
if (string.CompareOrdinal(this.SeatReservationCommandId.ToString(), envelope.CorrelationId) != 0)
{
// Skip this event.
Trace.TraceWarning("Seat reservation response for reservation id {0} does not match the expected correlation id.", envelope.Body.ReservationId);
return;
}
}
...
}
複製代碼
注意這個Handle方法如何處理Envelope實例而不是SeatsReserved實例。做爲V3版本的一部分,事件被封裝在一個包含CorrelationId屬性的Envelope實例中。EventDispatcher中的DoDispatchMessage方法分配關聯Id的值。
Markus(軟件開發人員)發言:
做爲添加此功能的反作用,EventProcessor類在將事件轉發給處理程序時,不能再使用dynamic關鍵字。如今在V3中,它使用了新的EventDispatcher類,該類使用反射來標識給定消息類型的正確處理程序。
在性能測試期間,團隊發現了這個特定的SeatsReserved事件的另外一個問題。因爲系統在加載時其餘地方出現了延遲,所以第二份SeatsReserved事件被髮布了。而後,這個Handle方法拋出一個異常,致使系統在將消息發送到dead-letter隊列以前屢次重試處理該消息。爲了解決這個特定的問題,團隊修改了這個方法,添加了else if子句,以下面的代碼示例所示:
public void Handle(Envelope<SeatsReserved> envelope)
{
if (this.State == ProcessState.AwaitingReservationConfirmation)
{
...
}
else if (string.CompareOrdinal(this.SeatReservationCommandId.ToString(), envelope.CorrelationId) == 0)
{
Trace.TraceInformation("Seat reservation response for request {1} for reservation id {0} was already handled. Skipping event.", envelope.Body.ReservationId, envelope.CorrelationId);
}
else
{
throw new InvalidOperationException("Cannot handle seat reservation at this stage.");
}
}
複製代碼
Markus(軟件開發人員)發言:
此優化僅應用於此特定消息。注意,它使用了以前保存在實例中的SeatReservationCommandId屬性的值。若是但願對其餘消息執行這種檢查,則須要在流程管理器中存儲更多信息。
爲了檢測重複的OrderPlaced事件,RegistrationProcessManagerRouter類如今執行一個檢查,以查看事件是否已經被處理。V3版本的新代碼以下面的代碼示例所示:
public void Handle(OrderPlaced @event)
{
using (var context = this.contextFactory.Invoke())
{
var pm = context.Find(x => x.OrderId == @event.SourceId);
if (pm == null)
{
pm = new RegistrationProcessManager();
}
pm.Handle(@event);
context.Save(pm);
}
}
複製代碼
Azure中不可能有包含將RegistrationProcessManager持久化到存儲裏併發送命令的事務。所以,團隊決定保存流程管理器生成的全部命令,以便在流程崩潰時不會丟失這些命令,它們能夠稍後發送。咱們使用另外一個進程來可靠地處理髮送命令。
Markus(軟件開發人員)發言:
已經遷移到V3版本的遷移實用程序更新了數據庫模式,以適應新的存儲需求。
下面來自SqlProcessDataContext類的代碼示例顯示了系統如何持久化全部命令以及進程管理器的狀態:
public void Save(T process)
{
var entry = this.context.Entry(process);
if (entry.State == System.Data.EntityState.Detached)
this.context.Set<T>().Add(process);
var commands = process.Commands.ToList();
UndispatchedMessages undispatched = null;
if (commands.Count > 0)
{
// If there are pending commands to send, we store them as undispatched.
undispatched = new UndispatchedMessages(process.Id)
{
Commands = this.serializer.Serialize(commands)
};
this.context.Set<UndispatchedMessages>().Add(undispatched);
}
try
{
this.context.SaveChanges();
}
catch (DbUpdateConcurrencyException e)
{
throw new ConcurrencyException(e.Message, e);
}
this.DispatchMessages(undispatched, commands);
}
複製代碼
下面來自SqlProcessDataContext類的代碼示例展現了系統如何發送命令消息:
private void DispatchMessages(UndispatchedMessages undispatched, List<Envelope<ICommand>> deserializedCommands = null)
{
if (undispatched != null)
{
if (deserializedCommands == null)
{
deserializedCommands = this.serializer.Deserialize<IEnumerable<Envelope<ICommand>>>(undispatched.Commands).ToList();
}
var originalCommandsCount = deserializedCommands.Count;
try
{
while (deserializedCommands.Count > 0)
{
this.commandBus.Send(deserializedCommands.First());
deserializedCommands.RemoveAt(0);
}
}
catch (Exception)
{
// We catch a generic exception as we don't know what implementation of ICommandBus we might be using.
if (originalCommandsCount != deserializedCommands.Count)
{
// If we were able to send some commands, then update the undispatched messages.
undispatched.Commands = this.serializer.Serialize(deserializedCommands);
try
{
this.context.SaveChanges();
}
catch (DbUpdateConcurrencyException)
{
// If another thread already dispatched the messages, ignore and surface original exception instead.
}
}
throw;
}
// We remove all the undispatched messages for this process manager.
this.context.Set<UndispatchedMessages>().Remove(undispatched);
this.retryPolicy.ExecuteAction(() => this.context.SaveChanges());
}
}
複製代碼
DispatchMessages方法還從SqlProcessDataContext類中的Find方法調用,以便當系統從新還原(rehydrates)RegistrationProcessManager實例時,它會嘗試發送任何未發送的消息。
第一個優化是容許UI直接導航到註冊者頁面,前提是會議還有不少座位可用。RegistrationController類的StartRegistration方法介紹了這個變化,它如今會在建立預約併發送RegisterToConference命令以前執行一個額外的檢查,確認有足夠的剩餘座位,以下面的代碼示例所示:
[HttpPost]
public ActionResult StartRegistration(RegisterToConference command, int orderVersion)
{
var existingOrder = orderVersion != 0 ? this.orderDao.FindDraftOrder(command.OrderId) : null;
var viewModel = existingOrder == null ? this.CreateViewModel() : this.CreateViewModel(existingOrder);
viewModel.OrderId = command.OrderId;
if (!ModelState.IsValid)
{
return View(viewModel);
}
// Checks that there are still enough available seats, and the seat type IDs submitted are valid.
ModelState.Clear();
bool needsExtraValidation = false;
foreach (var seat in command.Seats)
{
var modelItem = viewModel.Items.FirstOrDefault(x => x.SeatType.Id == seat.SeatType);
if (modelItem != null)
{
if (seat.Quantity > modelItem.MaxSelectionQuantity)
{
modelItem.PartiallyFulfilled = needsExtraValidation = true;
modelItem.OrderItem.ReservedSeats = modelItem.MaxSelectionQuantity;
}
}
else
{
// Seat type no longer exists for conference.
needsExtraValidation = true;
}
}
if (needsExtraValidation)
{
return View(viewModel);
}
command.ConferenceId = this.ConferenceAlias.Id;
this.commandBus.Send(command);
return RedirectToAction(
"SpecifyRegistrantAndPaymentDetails",
new { conferenceCode = this.ConferenceCode, orderId = command.OrderId, orderVersion = orderVersion });
}
複製代碼
若是沒有足夠的可用座位,控制器將從新顯示當前屏幕,顯示當前可用的座位數量,以便註冊者修改其訂單。
更改的其他部分在RegistrationController類中的SpecifyRegistrantAndPaymentDetails方法中。下面來自V2版本的代碼示例顯示,在優化以前,控制器在繼續跳轉到註冊頁面以前調用WaitUntilSeatsAreConfirmed方法:
[HttpGet]
[OutputCache(Duration = 0, NoStore = true)]
public ActionResult SpecifyRegistrantAndPaymentDetails(Guid orderId, int orderVersion)
{
var order = this.WaitUntilSeatsAreConfirmed(orderId, orderVersion);
if (order == null)
{
return View("ReservationUnknown");
}
if (order.State == DraftOrder.States.PartiallyReserved)
{
return this.RedirectToAction("StartRegistration", new { conferenceCode = this.ConferenceCode, orderId, orderVersion = order.OrderVersion });
}
if (order.State == DraftOrder.States.Confirmed)
{
return View("ShowCompletedOrder");
}
if (order.ReservationExpirationDate.HasValue && order.ReservationExpirationDate < DateTime.UtcNow)
{
return RedirectToAction("ShowExpiredOrder", new { conferenceCode = this.ConferenceAlias.Code, orderId = orderId });
}
var pricedOrder = this.WaitUntilOrderIsPriced(orderId, orderVersion);
if (pricedOrder == null)
{
return View("ReservationUnknown");
}
this.ViewBag.ExpirationDateUTC = order.ReservationExpirationDate;
return View(
new RegistrationViewModel
{
RegistrantDetails = new AssignRegistrantDetails { OrderId = orderId },
Order = pricedOrder
});
}
複製代碼
下面的代碼示例顯示了這個方法的V3版本,它再也不等待預訂被確認:
[HttpGet]
[OutputCache(Duration = 0, NoStore = true)]
public ActionResult SpecifyRegistrantAndPaymentDetails(Guid orderId, int orderVersion)
{
var pricedOrder = this.WaitUntilOrderIsPriced(orderId, orderVersion);
if (pricedOrder == null)
{
return View("PricedOrderUnknown");
}
if (!pricedOrder.ReservationExpirationDate.HasValue)
{
return View("ShowCompletedOrder");
}
if (pricedOrder.ReservationExpirationDate < DateTime.UtcNow)
{
return RedirectToAction("ShowExpiredOrder", new { conferenceCode = this.ConferenceAlias.Code, orderId = orderId });
}
return View(
new RegistrationViewModel
{
RegistrantDetails = new AssignRegistrantDetails { OrderId = orderId },
Order = pricedOrder
});
}
複製代碼
備註:咱們將在稍後的旅程中使這個方法異步。
複製代碼
UI流程的第二個優化是在流程的前面執行訂單總數的計算。在上面的代碼示例中,SpecifyRegistrantAndPaymentDetails方法仍然調用WaitUntilOrderIsPriced方法,這將暫停界面流直到系統計算出訂單的總數並使其可用於控制器(在讀端保存在priced-order視圖模型中)。
實現此功能的關鍵變動是在訂單(Order)聚合裏。Order類中的構造函數如今調用CalculateTotal方法並引起OrderTotalsCalculated事件,以下面的代碼示例所示:
public Order(Guid id, Guid conferenceId, IEnumerable<OrderItem> items, IPricingService pricingService)
: this(id)
{
var all = ConvertItems(items);
var totals = pricingService.CalculateTotal(conferenceId, all.AsReadOnly());
this.Update(new OrderPlaced
{
ConferenceId = conferenceId,
Seats = all,
ReservationAutoExpiration = DateTime.UtcNow.Add(ReservationAutoExpiration),
AccessCode = HandleGenerator.Generate(6)
});
this.Update(new OrderTotalsCalculated { Total = totals.Total, Lines = totals.Lines != null ? totals.Lines.ToArray() : null, IsFreeOfCharge = totals.Total == 0m });
}
複製代碼
以前,在V2版本中,訂單(Order)聚合一直等到收到MarkAsReserved命令才調用CalculateTotal方法。
本節概述了系統如今如何異步地在Azure服務總線上執行全部I/O。
SubscriptionReceiver和SessionSubscriptionReceiver類如今異步接收消息,而不是在ReceiveMessages方法的循環中同步接收消息。
有關詳細信息,請參閱SubscriptionReceiver類中的ReceiveMessages方法或SessionSubscriptionReceiver類中的ReceiveMessagesAndCloseSession方法。
Markus(軟件開發人員)發言:
此代碼示例還展現瞭如何使用Transient Fault Handling Application Block來可靠地異步接收來自服務總線Topic的消息。異步循環使代碼更難以讀取,但效率更高。這是推薦的最佳實踐。這段代碼將受益於c# 4中新的async關鍵字。
系統使用peek/lock機制從服務總線Topic訂閱中檢索消息。要了解系統如何異步執行這些操做,請參閱SubscriptionReceiver和SessionSubscriptionReceiver類中的ReceiveMessages方法。這提供了一個系統如何使用異步api的例子。
應用程序如今異步發送服務總線上的全部消息。有關詳細信息,請參見TopicSender類。
在V2版本中,系統使用Azure服務總線將全部命令傳遞給它們的接收者。這意味着系統異步地交付命令。在V3版本中,MVC控制器如今同步地在進程中發送命令,以便經過繞過命令總線並將命令直接傳遞給處理程序來改進UI中的響應時間。此外,在ConferenceProcessor工做者角色中,發送到訂單(Order)聚合的命令使用相同的機制在進程中同步發送。
Markus(軟件開發人員)發言:
咱們仍然異步地向可用座位(SeatsAvailability)聚合發送命令,由於隨着RegistrationProcessManager的多個實例並行運行,將會出現爭用,由於多個線程都試圖訪問可用座位(SeatsAvailability)聚合的同一個實例。
團隊實現這種行爲經過添加SynchronousCommandBusDecorator和CommandDispatcher類到基礎設施而且在web角色啓動的時候註冊它們,以下面的代碼展現了Global.asax.Azure.cs文件裏的OnCreateContainer方法**:
var commandBus = new CommandBus(new TopicSender(settings.ServiceBus, "conference/commands"), metadata, serializer);
var synchronousCommandBus = new SynchronousCommandBusDecorator(commandBus);
container.RegisterInstance<ICommandBus>(synchronousCommandBus);
container.RegisterInstance<ICommandHandlerRegistry>(synchronousCommandBus);
container.RegisterType<ICommandHandler, OrderCommandHandler>("OrderCommandHandler");
container.RegisterType<ICommandHandler, ThirdPartyProcessorPaymentCommandHandler>("ThirdPartyProcessorPaymentCommandHandler");
container.RegisterType<ICommandHandler, SeatAssignmentsHandler>("SeatAssignmentsHandler");
複製代碼
備註:在Conference.Azure.cs文件中也有相似的代碼,用於配置工做角色,以便在進程中發送一些命令。
複製代碼
下面的代碼示例展現了SynchronousCommandBusDecorator類如何實現命令消息的發送:
public class SynchronousCommandBusDecorator : ICommandBus, ICommandHandlerRegistry
{
private readonly ICommandBus commandBus;
private readonly CommandDispatcher commandDispatcher;
public SynchronousCommandBusDecorator(ICommandBus commandBus)
{
this.commandBus = commandBus;
this.commandDispatcher = new CommandDispatcher();
}
...
public void Send(Envelope<ICommand> command)
{
if (!this.DoSend(command))
{
Trace.TraceInformation("Command with id {0} was not handled locally. Sending it through the bus.", command.Body.Id);
this.commandBus.Send(command);
}
}
...
private bool DoSend(Envelope<ICommand> command)
{
bool handled = false;
try
{
var traceIdentifier = string.Format(CultureInfo.CurrentCulture, " (local handling of command with id {0})", command.Body.Id);
handled = this.commandDispatcher.ProcessMessage(traceIdentifier, command.Body, command.MessageId, command.CorrelationId);
}
catch (Exception e)
{
Trace.TraceWarning("Exception handling command with id {0} synchronously: {1}", command.Body.Id, e.Message);
}
return handled;
}
}
複製代碼
注意這個類是如未嘗試在不使用服務總線的狀況下同步發送命令,可是若是它找不到該命令的處理程序,它將返回到使用服務總線。下面的代碼示例展現了CommandDispatcher類如何試圖定位處理程序並傳遞命令消息:
public bool ProcessMessage(string traceIdentifier, ICommand payload, string messageId, string correlationId)
{
var commandType = payload.GetType();
ICommandHandler handler = null;
if (this.handlers.TryGetValue(commandType, out handler))
{
Trace.WriteLine("-- Handled by " + handler.GetType().FullName + traceIdentifier);
((dynamic)handler).Handle((dynamic)payload);
return true;
}
else
{
return false;
}
}
複製代碼
在Contoso會議管理系統中,惟一有事件源的聚合就是可用座位(SeatAvailability)聚合。它可能包含大量的事件而且能夠從快照中獲益。
Markus(軟件開發人員)發言:
由於咱們選擇使用memento模式,因此聚合狀態的快照存儲在memento中。
下面的代碼示例來自AzureEventSourcedRepository類中的Save方法,展現了若是存在緩存且聚合實現了IMementoOriginator接口,系統如何建立緩存的memento對象。
public void Save(T eventSourced, string correlationId)
{
...
this.cacheMementoIfApplicable.Invoke(eventSourced);
}
複製代碼
而後,當系統調用AzureEventSourcedRepository類中的Find方法加載聚合時,它會檢查是否有緩存的memento其中包含要使用對象狀態的快照:
private readonly Func<Guid, Tuple<IMemento, DateTime?>> getMementoFromCache;
...
public T Find(Guid id)
{
var cachedMemento = this.getMementoFromCache(id);
if (cachedMemento != null && cachedMemento.Item1 != null)
{
IEnumerable<IVersionedEvent> deserialized;
if (!cachedMemento.Item2.HasValue || cachedMemento.Item2.Value < DateTime.UtcNow.AddSeconds(-1))
{
deserialized = this.eventStore.Load(GetPartitionKey(id), cachedMemento.Item1.Version + 1).Select(this.Deserialize);
}
else
{
deserialized = Enumerable.Empty<IVersionedEvent>();
}
return this.originatorEntityFactory.Invoke(id, cachedMemento.Item1, deserialized);
}
else
{
var deserialized = this.eventStore.Load(GetPartitionKey(id), 0)
.Select(this.Deserialize)
.AsCachedAnyEnumerable();
if (deserialized.Any())
{
return this.entityFactory.Invoke(id, deserialized);
}
}
return null;
}
複製代碼
若是緩存條目在最近幾秒鐘內更新了,那麼它頗有可能沒有過時,由於咱們只有一個寫入者用於高爭用聚合。所以,當memento建立以後,咱們樂觀的不用在事件存儲中檢查新事件。不然,咱們須要在事件存儲中檢查建立memento以後到達的事件。
下面的代碼示例顯示了SeatsAvailability類如何將其狀態數據快照添加到要緩存的memento對象中:
public IMemento SaveToMemento()
{
return new Memento
{
Version = this.Version,
RemainingSeats = this.remainingSeats.ToArray(),
PendingReservations = this.pendingReservations.ToArray(),
};
}
複製代碼
在第5章「準備發佈V1版本」中,您瞭解了系統如何在將事件保存到事件存儲時發佈事件。這種優化使系統可以並行發佈其中一些事件,而不是按順序發佈。重要的是,與特定聚合實例關聯的事件必須按照正確的順序發送,所以系統只爲不一樣的分區鍵建立新任務。下面的代碼示例來自EventStoreBusPublisher類中的Start方法,展現瞭如何定義並行任務:
Task.Factory.StartNew(
() =>
{
try
{
foreach (var key in GetThrottlingEnumerable(this.enqueuedKeys.GetConsumingEnumerable(cancellationToken), this.throttlingSemaphore, cancellationToken))
{
if (!cancellationToken.IsCancellationRequested)
{
ProcessPartition(key);
}
else
{
this.enqueuedKeys.Add(key);
return;
}
}
}
catch (OperationCanceledException)
{
return;
}
},
TaskCreationOptions.LongRunning);
複製代碼
SubscriptionReceiver和SessionSubscriptionReceiver類使用相同的DynamicThrottling類來動態限制從服務總線檢索消息的速度。
團隊向Azure服務總線訂閱添加了過濾器,以將每一個訂閱接收到的消息限制爲訂閱打算處理的消息。您能夠在Settings.Template.xml文件中看到這些過濾器的定義,以下面的代碼片斷所示:
<Topic Path="conference/events" IsEventBus="true">
<Subscription Name="log" RequiresSession="false"/>
<Subscription Name="Registration.RegistrationPMOrderPlaced" RequiresSession="false" SqlFilter="TypeName IN ('OrderPlaced')"/>
<Subscription Name="Registration.RegistrationPMNextSteps" RequiresSession="false" SqlFilter="TypeName IN ('OrderUpdated','SeatsReserved','PaymentCompleted','OrderConfirmed')"/>
<Subscription Name="Registration.OrderViewModelGenerator" RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderUpdated','OrderPartiallyReserved','OrderReservationCompleted','OrderRegistrantAssigned','OrderConfirmed','OrderPaymentConfirmed')"/>
<Subscription Name="Registration.PricedOrderViewModelGenerator" RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderTotalsCalculated','OrderConfirmed','OrderExpired','SeatAssignmentsCreated','SeatCreated','SeatUpdated')"/>
<Subscription Name="Registration.ConferenceViewModelGenerator" RequiresSession="true" SqlFilter="TypeName IN ('ConferenceCreated','ConferenceUpdated','ConferencePublished','ConferenceUnpublished','SeatCreated','SeatUpdated','AvailableSeatsChanged','SeatsReserved','SeatsReservationCancelled')"/>
<Subscription Name="Registration.SeatAssignmentsViewModelGenerator" RequiresSession="true" SqlFilter="TypeName IN ('SeatAssignmentsCreated','SeatAssigned','SeatUnassigned','SeatAssignmentUpdated')"/>
<Subscription Name="Registration.SeatAssignmentsHandler" RequiresSession="true" SqlFilter="TypeName IN ('OrderConfirmed','OrderPaymentConfirmed')"/>
<Subscription Name="Conference.OrderEventHandler" RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderRegistrantAssigned','OrderTotalsCalculated','OrderConfirmed','OrderExpired','SeatAssignmentsCreated','SeatAssigned','SeatAssignmentUpdated','SeatUnassigned')"/>
...
</Topic>
複製代碼
在V2版本中,系統沒有爲命令使用會話,由於咱們不須要命令的順序保證。然而,咱們如今但願爲命令使用會話來保證每一個可用座位(SeatsAvailability)聚合實例都有一個監聽者,這將在不從這個高爭用聚合中得到大量併發異常的狀況下幫助咱們進行擴展。
Conference.Processor.Azure.cs文件中的如下代碼示例顯示了系統如何建立一個專用的SessionSubscriptionReceiver實例來接收發送到可用座位(SeatsAvailability)聚合的消息:
var seatsAvailabilityCommandProcessor =
new CommandProcessor(new SessionSubscriptionReceiver(azureSettings.ServiceBus, Topics.Commands.Path, Topics.Commands.Subscriptions.SeatsAvailability, false), serializer);
...
container.RegisterInstance<IProcessor>("SeatsAvailabilityCommandProcessor", seatsAvailabilityCommandProcessor);
複製代碼
下面的代碼示例顯示了新的抽象SeatsAvailabilityCommand類,其中包含一個基於與該命令關聯的會議的會話ID:
public abstract class SeatsAvailabilityCommand : ICommand, IMessageSessionProvider
{
public SeatsAvailabilityCommand()
{
this.Id = Guid.NewGuid();
}
public Guid Id { get; set; }
public Guid ConferenceId { get; set; }
string IMessageSessionProvider.SessionId
{
get { return "SeatsAvailability_" + this.ConferenceId.ToString(); }
}
}
複製代碼
命令總線如今使用一個單獨的訂閱來訂閱爲可用座位(SeatsAvailability)聚合指定的命令。
Markus(軟件開發人員)發言:
團隊對RegistrationProcessManager流程管理器應用了相似的技術,爲OrderPlaced事件建立單獨的訂閱來處理新訂單。一個單獨的訂閱接收指定給流程管理器的全部其餘事件。
做爲V3版本中性能優化的一部分,團隊爲存儲在訂單和註冊限界上下文讀模型中的會議信息添加了緩存行爲。這減小了讀取這些經常使用數據所花費的時間。
下面的代碼示例來自CachingConferenceDao類中的GetPublishedSeatTypes方法,展現了系統如何根據可用座位的數量決定是否緩存會議數據:
TimeSpan timeToCache;
if (seatTypes.All(x => x.AvailableQuantity > 200 || x.AvailableQuantity <= 0))
{
timeToCache = TimeSpan.FromMinutes(5);
}
else if (seatTypes.Any(x => x.AvailableQuantity < 30 && x.AvailableQuantity > 0))
{
// There are just a few seats remaining. Do not cache.
timeToCache = TimeSpan.Zero;
}
else if (seatTypes.Any(x => x.AvailableQuantity < 100 && x.AvailableQuantity > 0))
{
timeToCache = TimeSpan.FromSeconds(20);
}
else
{
timeToCache = TimeSpan.FromMinutes(1);
}
if (timeToCache > TimeSpan.Zero)
{
this.cache.Set(key, seatTypes, new CacheItemPolicy { AbsoluteExpiration = DateTimeOffset.UtcNow.Add(timeToCache) });
}
複製代碼
Jana(軟件架構師)發言:
您能夠看到,經過調整緩存時間,甚至決定根本不緩存數據,咱們是如何管理「顯示陳舊數據」相關的風險。
系統如今還使用緩存來保存PricedOrderViewModelGenerator類中的座位類型描述。
爲了減小流經服務總線Topic的消息數量,咱們建立了兩個附加主題來傳輸訂單(Order)和可用座位(SeatAvailability)聚合發佈的事件,從而對服務總線進行了分區。這有助於咱們避免在應用程序承受很是高的負載時被服務總線節流。Settings.xml文件中的如下片斷展現了這些新Topic的定義:
Topic Path="conference/orderevents" IsEventBus="true">
<Subscription Name="logOrders" RequiresSession="false"/>
<Subscription Name="Registration.RegistrationPMOrderPlacedOrders" RequiresSession="false" SqlFilter="TypeName IN ('OrderPlaced')"/>
<Subscription Name="Registration.RegistrationPMNextStepsOrders" RequiresSession="false" SqlFilter="TypeName IN ('OrderUpdated','SeatsReserved','PaymentCompleted','OrderConfirmed')"/>
<Subscription Name="Registration.OrderViewModelGeneratorOrders" RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderUpdated','OrderPartiallyReserved','OrderReservationCompleted', 'OrderRegistrantAssigned','OrderConfirmed','OrderPaymentConfirmed')"/>
<Subscription Name="Registration.PricedOrderViewModelOrders" RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderTotalsCalculated','OrderConfirmed', 'OrderExpired','SeatAssignmentsCreated','SeatCreated','SeatUpdated')"/>
<Subscription Name="Registration.SeatAssignmentsViewModelOrders" RequiresSession="true" SqlFilter="TypeName IN ('SeatAssignmentsCreated','SeatAssigned','SeatUnassigned','SeatAssignmentUpdated')"/>
<Subscription Name="Registration.SeatAssignmentsHandlerOrders" RequiresSession="true" SqlFilter="TypeName IN ('OrderConfirmed','OrderPaymentConfirmed')"/>
<Subscription Name="Conference.OrderEventHandlerOrders" RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderRegistrantAssigned','OrderTotalsCalculated', 'OrderConfirmed','OrderExpired','SeatAssignmentsCreated','SeatAssigned','SeatAssignmentUpdated','SeatUnassigned')"/>
</Topic>
<Topic Path="conference/availabilityevents" IsEventBus="true">
<Subscription Name="logAvail" RequiresSession="false"/>
<Subscription Name="Registration.RegistrationPMNextStepsAvail" RequiresSession="false" SqlFilter="TypeName IN ('OrderUpdated','SeatsReserved','PaymentCompleted','OrderConfirmed')"/>
<Subscription Name="Registration.PricedOrderViewModelAvail" RequiresSession="true" SqlFilter="TypeName IN ('OrderPlaced','OrderTotalsCalculated','OrderConfirmed', 'OrderExpired','SeatAssignmentsCreated','SeatCreated','SeatUpdated')"/>
<Subscription Name="Registration.ConferenceViewModelAvail" RequiresSession="true" SqlFilter="TypeName IN ('ConferenceCreated','ConferenceUpdated','ConferencePublished', 'ConferenceUnpublished','SeatCreated','SeatUpdated','AvailableSeatsChanged', 'SeatsReserved','SeatsReservationCancelled')"/>
</Topic>
複製代碼
本節概述了團隊優化應用程序性能和提升其彈性的一些額外方法:
在此以前,系統生成Guid,用於聚合的Id,例如訂單和註冊聚合。使用Guid.NewGuid方法,它生成隨機Guid。若是在SQL數據庫實例中使用這些Guid做爲主鍵值,這將致使索引中頻繁的頁面分割,從而對數據庫的性能產生負面影響。在V3版本中,團隊添加了一個實用程序類來生成連續的Guid。這確保SQL數據庫表中的新條目老是追加在後面,這提升了數據庫的總體性能。下面的代碼示例顯示了新的GuidUtil類:
public static class GuidUtil
{
private static readonly long EpochMilliseconds = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks / 10000L;
/// <summary>
/// Creates a sequential GUID according to SQL Server's ordering rules.
/// </summary>
public static Guid NewSequentialId()
{
// This code was not reviewed to guarantee uniqueness under most conditions, nor completely optimize for avoiding
// page splits in SQL Server when doing inserts from multiple hosts, so do not re-use in production systems.
var guidBytes = Guid.NewGuid().ToByteArray();
// Get the milliseconds since Jan 1 1970.
byte[] sequential = BitConverter.GetBytes((DateTime.Now.Ticks / 10000L) - EpochMilliseconds);
// Discard the 2 most significant bytes, as we only care about the milliseconds increasing, but the highest ones should be 0 for several thousand years to come.
if (BitConverter.IsLittleEndian)
{
guidBytes[10] = sequential[5];
guidBytes[11] = sequential[4];
guidBytes[12] = sequential[3];
guidBytes[13] = sequential[2];
guidBytes[14] = sequential[1];
guidBytes[15] = sequential[0];
}
else
{
Buffer.BlockCopy(sequential, 2, guidBytes, 10, 6);
}
return new Guid(guidBytes);
}
}
複製代碼
有關進一步信息,請參見The Cost of GUIDs as Primary Keys和Good Page Splits and Sequential GUID Key Generation
團隊將公共會議web應用程序中的一些MVC控制器轉換爲異步控制器。這避免了阻塞一些ASP.NET線程並使咱們可以在裏面使用Task類。
例如,團隊修改了控制器在讀模型中使用計時器的輪詢更新方式。
當系統從Azure服務總線檢索消息時,團隊啓用了prefetch選項。此選項使系統可以在一次到服務器的往返中檢索多個消息,並有助於減小從服務總線Topic檢索現有消息的延遲。
下面來自SubscriptionReceiver類的代碼示例展現瞭如何啓用此選項。
protected SubscriptionReceiver(ServiceBusSettings settings, string topic, string subscription, bool processInParallel, ISubscriptionReceiverInstrumentation instrumentation, RetryStrategy backgroundRetryStrategy)
{
this.settings = settings;
this.topic = topic;
this.subscription = subscription;
this.processInParallel = processInParallel;
this.tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(settings.TokenIssuer, settings.TokenAccessKey);
this.serviceUri = ServiceBusEnvironment.CreateServiceUri(settings.ServiceUriScheme, settings.ServiceNamespace, settings.ServicePath);
var messagingFactory = MessagingFactory.Create(this.serviceUri, tokenProvider);
this.client = messagingFactory.CreateSubscriptionClient(topic, subscription);
if (this.processInParallel)
{
this.client.PrefetchCount = 18;
}
else
{
this.client.PrefetchCount = 14;
}
...
}
複製代碼
在V2版本中,SessionSubscriptionReceiver建立會話來依次接收來自Azure服務總線的消息。可是,若是您正在使用會話,則只能處理來自該會話的消息。在切換到另外一個會話以前,其餘消息將被忽略。在V3版本中,SessionSubscriptionReceiver並行建立多個會話,使系統可以同時接收來自多個會話的消息。
有關詳細信息,請參見SessionSubscriptionReceiver類中的AcceptSession方法。
Markus(軟件開發人員)發言:
AcceptSession方法使用Transient Fault Handling Application Block來可靠地接收會話。
當系統經過向RegistrationProcessManager類添加時間戳屬性來保存RegistrationProcessManager類時,團隊還添加了一個樂觀併發性檢查,以下面的代碼示例所示:
[ConcurrencyCheck]
[Timestamp]
public byte[] TimeStamp { get; private set; }
複製代碼
有關更多信息,請參見MSDN網站上的Code First Data Annotations。
在進行了樂觀併發性檢查以後,咱們還刪除了SessionSubscriptionReceiver類中的鎖,這是系統中潛在的瓶頸。
Azure服務總線代理消息能夠爲TimeToLive屬性分配一個值,當time-to-live過時時,消息將自動發送到dead-letter隊列。若是與MakeSeatReservation命令關聯的訂單已通過期,應用程序將使用服務總線的這個特性來避免處理MakeSeatReservation命令。
咱們在PricedOrderViewModelGenerator類中標識了許多位置,能夠在這些位置優化代碼。之前,當這個類處理正在預約或過時的訂單時,系統對SQL數據庫實例進行兩次調用,如今系統只發起一個調用。
在旅程的這個階段,團隊從新組織了Conference.Specflow項目,以更好地反映測試的目的。
Conference.Specflow項目中的Features\Integration文件夾中的測試旨在直接測試領域的行爲,經過查看發送和接收的命令和事件來驗證領域的行爲。這些測試的目的是讓程序員而不是領域專家可以理解,並使用比廣泛使用的語言更專業的詞彙表來表示。除了驗證領域的行爲並幫助開發人員理解系統中的命令流和事件流以外,這些測試還被證實對於在事件丟失或接收順序錯誤的場景中測試領域的行爲很是有用。
Conference文件夾包含會議管理限界上下文的集成測試,而Registration文件夾包含訂單和註冊限界上下文的測試。
Markus(軟件開發人員)發言:
這些集成測試假定命令處理程序信任命令的發送者發送的是有效的命令消息。這一點可能不適用於正在設計測試的其餘系統。
UserInterface文件夾包含驗收測試。這些測試在第4章「擴展和加強訂單和註冊限界上下文」有更詳細的描述。Controllers文件夾包含使用MVC控制器做爲入口點的測試,Views文件夾包含使用WatiN經過其UI驅動系統的測試。
在咱們的CQRS旅程和V3僞產品發佈的最後階段中,重點是彈性和性能。下一章總結了咱們在整個旅程中所學到的教訓,並提出了一些建議,若是咱們有機會從新開始咱們所學到的知識,咱們可能會作得有所不一樣。