CQRS之旅——旅程6(咱們系統的版本管理)

旅程6:咱們系統的版本管理

準備下一站:升級和遷移

「變化是生活的調味品。」威廉·考珀
複製代碼

此階段的最高目標是瞭解如何升級包含實現CQRS模式和事件源的限界上下文的系統。團隊在這一階段實現的用戶場景包括對代碼的更改和對數據的更改:更改了一些現有的數據模式並添加了新的數據模式。除了升級系統和遷移數據外,團隊還計劃在沒有停機時間的狀況下進行升級和遷移,以便在Microsoft Azure中運行實時系統。git

本章的工做術語定義:

本章使用了一些術語,咱們將在下面進行描述。有關更多細節和可能的替代定義,請參閱參考指南中的「深刻CQRS和ES」。github

  • 命令(Command):命令是要求系統執行更改系統狀態的操做。命令是必須服從(執行)的一種指令,例如:MakeSeatReservation。在這個限界上下文中,命令要麼來自用戶發起請求時的UI,要麼來自流程管理器(當流程管理器指示聚合執行某個操做時)。單個接收方處理一個命令。命令總線(command bus)傳輸命令,而後命令處理程序將這些命令發送到聚合。發送命令是一個沒有返回值的異步操做。web

  • 事件(Event):一個事件,好比OrderConfirmed,描述了系統中發生的一些事情,一般是一個命令的結果。領域模型中的聚合引起事件。事件也能夠來自其餘限界上下文。多個訂閱者能夠處理特定的事件。聚合將事件發佈到事件總線。處理程序在事件總線上註冊特定類型的事件,而後將事件傳遞給訂閱服務器。在訂單和註冊限界上下文中,訂閱者是流程管理器和讀取模型生成器。數據庫

  • 冪等性(Idempotency):冪等性是一個操做的特性,這意味着該操做能夠屢次應用而不改變結果。例如,「將x的值設置爲10」的操做是冪等的,而「將x的值加1」的操做不是冪等的。在消息傳遞環境中,若是消息能夠屢次傳遞而不改變結果,則消息是冪等的:這多是由於消息自己的性質,也多是由於系統處理消息的方式。c#

用戶故事:

在這個過程的這個階段,團隊實現了下面描述的用戶故事。windows

不停機升級

V2版本的目標是升級系統,包括任何須要的數據遷移,而不須要把系統停機。若是這在當前實現中不可行,那麼停機時間應該最小化,而且應該修改系統,以便在未來支持零停機時間升級(從V3版本開始)。瀏覽器

Beth(業務經理)發言:
確保咱們可以在不停機的狀況下進行升級,這對咱們在市場中的信譽相當重要。安全

顯示剩餘座位數量

目前,當註冊者建立一個訂單時,沒有顯示每種座位類型的剩餘座位數量。當註冊者選擇購買座位時,UI應該顯示此信息。服務器

處理不須要付費的座位

目前,當註冊者選擇不須要付費的座位時,UI流仍然會將註冊者帶到支付頁面,即便不須要支付任何費用。系統應該檢測何時沒有支付,並調整流程,讓註冊者直接進入訂單的確認頁面。session

架構

該應用程序旨在部署到Microsoft Azure。在旅程的這個階段,應用程序由兩個角色組成,一個包含ASP.Net MVC Web應用程序的web角色和一個包含消息處理程序和領域對象的工做角色。應用程序在寫端和讀端都使用Azure SQL DataBase實例進行數據存儲。應用程序使用Azure服務總線來提供其消息傳遞基礎設施。下圖展現了這個高級體系結構。

在研究和測試解決方案時,能夠在本地運行它,可使用Azure compute emulator,也能夠直接運行MVC web應用程序,並運行承載消息處理程序和領域域對象的控制檯應用程序。在本地運行應用程序時,可使用本地SQL Server Express數據庫,並使用一個在SQL Server Express數據庫實現的簡單的消息傳遞基礎設施。

有關運行應用程序的選項的更多信息,請參見附錄1「發佈說明」。

模式和概念

在旅程的這個階段,團隊處理的大多數關鍵挑戰都與如何最好地執行從V1到V2的遷移有關。本節將介紹其中的一些挑戰。

處理「事件定義發生更改"的狀況

當團隊檢查V2的發佈需求,很明顯,咱們須要改變在訂單和註冊限界上下文中使用的一些事件來適應一些新特性:RegistrationProcessManager將會改變,當訂單有一個不須要付費的座位時系統將提供一個更好的用戶體驗。

訂單和註冊限界上下文使用事件源,所以在遷移到V2以後,事件存儲將包含舊事件,但將開始保存新事件。當系統事件被重放時,系統必須能正確處理全部的舊事件和新事件。

團隊考慮了兩種方法來處理系統中的這類更改。

在基礎設施中進行事件映射或過濾

在基礎設施中映射和過濾事件消息是一種選擇。此方法是對舊的事件消息和消息格式進行處理,在它們到達領域以前在基礎設施的某個位置處理它們。您能夠過濾掉再也不相關的舊消息,並使用映射將舊格式的消息轉換爲新格式。這種方法最初比較複雜,由於它須要對基礎設施進行更改,可是它能夠保持領域域的純粹,領域只須要理解當前的新事件集合就能夠了。

在聚合中處理多個版本的消息

在聚合中處理多個版本的消息是另外一種選擇。在這種方法中,全部消息類型(包括舊消息和新消息)都傳遞到領域,每一個聚合必須可以處理舊消息和新消息。從短時間來看,這多是一個合適的策略,但它最終會致使域模型受到遺留事件處理程序的污染。

團隊爲V2版本選擇了這個選項,由於它包含了最少數量的代碼更改。

Jana(軟件架構師)發言:
當前在聚合中處理舊事件和新事件並不妨礙您之後使用第一種選擇:在基礎設施中使用映射/過濾機制。

履行消息冪等性

V2版本中要解決的一個關鍵問題是使系統更加健壯。在V1版本中,在某些場景中,可能會屢次處理某些消息,致使系統中的數據不正確或不一致。

Jana(軟件架構師)發言:
消息冪等性在任何使用消息傳遞的系統中都很重要,這不只僅是在實現CQRS模式或使用事件源的系統中。

在某些場景中,設計冪等消息是可能的,例如:使用「將座位配額設置爲500」的消息,而不是「在座位配額中增長100」的消息。您能夠安全地屢次處理第一個消息,但不能處理第二個消息。

然而,並不老是可以使用冪等消息,所以團隊決定使用Azure服務總線的重複刪除特性,以確保它只傳遞一次消息。團隊對基礎設施進行了一些更改,以確保Azure服務總線可以檢測重複消息,並配置Azure服務總線來執行重複消息檢測。

要了解Contoso是如何實現這一點的,請參閱下面的「不讓命令消息重複」一節。此外,咱們須要考慮系統中的消息處理程序如何從隊列和Topic檢索消息。當前的方法使用Azure服務總線peek/lock機制。這是一個分紅三個階段的過程:

  1. 處理程序從隊列或Topic檢索消息,並在其中留下消息的鎖定副本。其餘客戶端沒法看到或訪問鎖定的消息。
  2. 處理程序處理消息。
  3. 處理程序從隊列中刪除鎖定的消息。若是鎖定的消息在固定時間後沒有解鎖或刪除,則解鎖該消息並使其可用,以便再次檢索。

若是步驟因爲某種緣由失敗,這意味着系統能夠不止一次地處理消息。

Jana(軟件架構師)發言:
該團隊計劃在旅程的下一階段解決這個問題(步驟失敗的問題)。更多信息,請參見第7章「添加彈性和優化性能」。

阻止屢次處理事件

在V1中,在某些場景裏,若是在處理事件時發生錯誤,系統可能屢次處理事件。爲了不這種狀況,團隊修改了體系結構,以便每一個事件處理程序都有本身對Azure Topic的訂閱。下圖顯示了兩個不一樣的模型。

在V1中,可能發生如下行爲:

  1. EventProcessor實例從服務總線中的全部訂閱者那裏接收到OrderPlaced事件。
  2. EventProcessor實例有兩個已註冊的處理程序,RegistrationProcessManagerRouterOrderViewModelGenerator處理程序類,因此會在兩個裏都觸發調用Handle方法。
  3. OrderViewModelGenerator類中的Handle方法執行成功。
  4. RegistrationProcessManagerRouter類中的Handle方法拋出異常。
  5. EventProcessor實例捕獲到異常而後拋棄掉事件消息。消息將自動放回訂閱中。
  6. EventProcessor實例第二次從全部訂閱者那裏接收到OrderPlaced事件。
  7. 事件又觸發兩個處理方法,致使RegistrationProcessManagerRouter類和OrderViewModelGenerator第二次處理事件消息。
  8. 每當RegistrationProcessManagerRouter類拋出異常時,OrderViewModelGenerator類都會觸發處理該事件。

在V2模型中,若是處理程序類拋出異常,EventProcessor實例將事件消息放回與該處理程序類關聯的訂閱。重試邏輯如今只會致使EventProcessor實例重試引起異常的處理程序,所以沒有其餘處理程序會從新處理消息。

集成事件的持久化

在V1版本中提出的一個問題是,系統如何持久化從會議管理限界上下文發送到訂單和註冊限界上下文的集成事件。這些事件包括關於會議建立和發佈的信息,以及座位類型和配額更改的詳細信息。

在V1版本中,訂單和註冊上下文中的ConferenceViewModelGenerator類經過更新視圖模型並向SeatsAvailability聚合發送命令來處理這些事件,以告訴它更改座位配額值。

這種方法意味着訂單和註冊限界上下文不存儲任何歷史記錄,這可能會致使問題。例如,其餘視圖從這裏中查找座椅類型描述時,這裏只包含座椅類型描述的最新值。所以,在其餘地方重播一組事件可能會從新生成另外一個包含不正確座椅類型描述的讀取模型投影。

團隊考慮瞭如下五個方法來糾正這種狀況:

  • 將全部事件保存在原始限界上下文中(會議管理限界上下文中),並使用共享的事件存儲,訂單和註冊限界上下文中能夠訪問該存儲來重播這些事件。接收限界上下文能夠重放事件流,直到它須要查看的以前的座椅類型描述時爲止。
  • 當全部事件到達接收限界上下文(訂單和註冊限界上下文)時保存它們。
  • 讓視圖模型生成器中的命令處理程序保存事件,只選擇它須要的那些。
  • 讓視圖模型生成器中的命令處理程序保存不一樣的事件,實際上就是爲此視圖模型使用事件源。
  • 未來自全部限界上下文的全部命令和事件消息存儲在消息日誌中。

第一種選擇並不老是可行的。在這種特殊狀況下,它能夠工做,由於同一個團隊同時實現了限界上下文和基礎設施,使得使用共享事件存儲變得很容易。

Gary(CQRS專家)發言:
儘管從純粹主義者的角度來看,第一個選項破壞了限界上下文之間的嚴格隔離,但在某些場景中,它多是一個可接受的實用解決方案。

第三種選擇可能存在的風險是,所需的事件集合可能在將來發生變化。若是咱們如今不保存事件,它們將永遠丟失。

儘管第五個選項存儲了全部命令和事件,其中一些可能永遠都不須要再次引用,但它確實提供了一個完整的日誌,記錄了系統中發生的全部事情。這對於故障診斷頗有用,還能夠幫助您知足還沒有肯定的需求。該團隊選擇了這個選項而不是選項二,由於它提供了一個更通用的機制,可能具備將來的好處。

持久化事件的目的是,當訂單和註冊上下文須要有關當前座位配額的信息時,能夠回放這些事件,以便計算剩餘座位的數量。要一致地計算這些數字,必須始終以相同的順序回放事件。這種順序有幾種選擇:

  • 會議管理限界上下文發送事件的順序。
  • 訂單和註冊上下文接收事件的順序。
  • 訂單和註冊上下文處理事件的順序。

大多數狀況下,這些順序是相同的。沒有什麼正確的順序。你只須要選擇一個和它保持一致就好了。所以,選擇由簡單性決定。在本例中,最簡單的方法是按照訂單和註冊限界上下文中處理程序接收事件的順序持久化事件(第二個選項)。

Markus(軟件開發人員)發言:
這種選擇一般不會出如今事件源中。每一個聚合會都以固定的順序建立事件,這就是系統用於持久存儲事件的順序。在此場景中,集成事件不是由單個聚合建立的。

爲這些事件保存時間戳也有相似的問題。若是未來須要查看特定時間剩餘的座位數量,那麼時間戳可能會頗有用。這裏的選擇是,當事件在會議管理限界上下文中建立時,仍是在訂單和註冊限界上下文中接收時,應該建立時間戳?當會議管理限界上下文建立事件時,訂單和註冊限界上下文可能因爲某種緣由離線。所以,團隊決定在會議管理有界上下文發佈事件時建立時間戳。

消息排序

團隊建立並運行來驗證V1版本的驗收測試,凸顯出了消息排序的一個潛在問題:執行會議管理限界上下文的驗收測試向訂單和註冊限界上下文發送了一系列命令,這些命令有時會出現順序錯誤。

Markus(軟件開發人員)發言:
當人類用戶真實測試系統的這一部分時,不太會注意到這種效果,由於發出命令的時間間隔要長得多,這使得消息不太可能無序地到達。

團隊考慮了兩種方法來確保消息以正確的順序到達。

  • 第一個方法是使用消息會話,這是Azure服務總線的一個特性。若是您使用消息會話,這將確保會話內的消息以與它們發送時相同的順序傳遞。
  • 第二種方法是修改應用程序中的處理程序,經過使用發送消息時添加到消息中的序列號或時間戳來檢測無序消息。若是接收處理程序檢測到一條無序消息,它將拒絕該消息,並在處理了在被拒絕消息以前發送的消息以後,將其放回稍後處理的隊列或Topic。

在這種狀況下,首選的解決方案是使用Azure服務總線消息會話,由於這隻須要對現有代碼進行更少的更改。這兩種方法都會給消息傳遞帶來一些額外的延遲,可是團隊並不認爲這會對系統的性能產生顯著的影響。

實現細節

本節描述訂單和註冊限界上下文的實現的一些重要功能。您可能會發現擁有一份代碼拷貝頗有用,這樣您就能夠繼續學習了。您能夠從Download center下載一個副本,或者在GitHub上查看存儲庫:github.com/mspnp/cqrs-…。您能夠從GitHub上的Tags頁面下載V2版本的代碼。

備註:不要指望代碼示例與參考實現中的代碼徹底匹配。本章描述了CQRS過程當中的一個步驟,隨着咱們瞭解更多並重構代碼,實現可能會發生變化。
複製代碼

**添加對「不須要支付的訂單」的支持

作出這一改變有三個具體的目標,它們都是相關的。咱們但願:

  • 修改RegistrationProcessManager類和相關聚合,以處理不須要支付的訂單。
  • 修改UI中的導航,當訂單不須要支付時跳過付款步驟。
  • 確保系統在升級到V2以後可以正確地工做,包括使用新事件和舊事件。

RegistrationProcessManager類的更改

在此以前,RegistrationProcessManager類在收到來自UI的註冊者已完成支付的通知後發送了一個ConfirmOrderPayment命令。如今,若是有一個不須要支付訂單,UI將直接向訂單聚合發送一個ConfirmOrder命令。若是訂單須要支付,RegistrationProcessManager類在從UI接收到成功支付的通知後,再向訂單聚合發送一個ConfirmOrder命令。

Jana(軟件架構師)發言:
注意,命令的名稱已從ConfirmOrderPayment更改成ConfirmOrder。這反映了訂單不須要知道任何關於付款的信息。它只須要知道訂單已經確認。相似地,如今有一個新的OrderConfirmed事件用於替代舊的OrderPaymentConfirmed事件。

當訂單聚合接收到ConfirmOrder命令時,它將引起一個OrderConfirmed事件。除被持久化外,該事件還由如下對象處理:

  • OrderViewModelGenerator類,它在其中更新讀取模型中的訂單狀態。
  • SeatAssignments聚合,在其中初始化一個新的SeatAssignments實例。
  • RegistrationProcessManager類,它在其中觸發一個提交座位預訂的命令。

UI的更改

UI中的主要更改是在RegistrationController MVC控制器類中的SpecifyRegistrantAndPaymentDetails action裏的。以前,此action方法返回InitiateRegistrationWithThirdPartyProcessorPayment(action result)。如今,若是Order對象的新IsFreeOfCharge屬性爲true,它將返回一個CompleteRegistrationWithoutPayment(action result)。不然,它返回一個CompleteRegistrationWithThirdPartyProcessorPayment(action result)。

[HttpPost]
public ActionResult SpecifyRegistrantAndPaymentDetails(AssignRegistrantDetails command, string paymentType, int orderVersion)
{
    ...

    var pricedOrder = this.orderDao.FindPricedOrder(orderId);
    if (pricedOrder.IsFreeOfCharge)
    {
        return CompleteRegistrationWithoutPayment(command, orderId);
    }

    switch (paymentType)
    {
        case ThirdPartyProcessorPayment:

            return CompleteRegistrationWithThirdPartyProcessorPayment(command, pricedOrder, orderVersion);

        case InvoicePayment:
            break;

        default:
            break;
    }

    ...
}
複製代碼

CompleteRegistrationWithThirdPartyProcessorPayment將用戶重定向到ThirdPartyProcessorPayment action,CompleteRegistrationWithoutPayment方法將用戶直接重定向到ThankYou action。

數據遷移

會議管理限界上下文在其Azure SQL數據庫實例中的PricedOrders表中存儲來自訂單和註冊限界上下文的訂單信息。之前,會議管理限界上下文接收OrderPaymentConfirmed事件,如今它接收OrderConfirmed事件,該事件包含一個附加的IsFreeOfCharge屬性。這將成爲數據庫中的一個新列。

Markus(軟件開發人員)發言:
在遷移過程當中,咱們不須要修改該表中的現有數據,由於布爾值的默認值爲false。全部現有條目都是在系統支持不須要付費的訂單以前建立的。

在遷移過程當中,任何正在運行的ConfirmOrderPayment命令均可能丟失,由於它們再也不由訂單聚合處理。您應該驗證當前的命令總線沒有這些命令。

Poe(IT運維人員)發言:
咱們須要仔細計劃如何部署V2版本,以便確保全部現有的、正在運行的ConfirmOrderPayment命令都由運行V1版本的工做角色實例處理。

系統將RegistrationProcessManager類實例的狀態保存到SQL數據庫表中。這個表的架構沒有變化。遷移後您將看到的唯一更改是StateValue列中的一個新添加值。這反映了RegistrationProcessManager類中的ProcessState枚舉中額外的PaymentConfirmationReceived值,以下面的代碼示例所示:

public enum ProcessState
{
    NotStarted = 0,
    AwaitingReservationConfirmation = 1,
    ReservationConfirmationReceived = 2,
    PaymentConfirmationReceived = 3,
}
複製代碼

在V1版本中,事件源系統爲訂單聚合保存的事件包括OrderPaymentConfirmed事件。所以,事件存儲區包含此事件類型的實例。在V2版本中,OrderPaymentConfirmed事件被替換爲OrderConfirmed事件。

團隊決定在V2版本中,當反序列化事件時,不在基礎設施級別映射和過濾事件。這意味着,當系統從事件存儲中重播這些事件時,處理程序必須同時理解舊事件和新事件。下面的代碼示例在SeatAssignmentsHandler類中顯示了這一點:

static SeatAssignmentsHandler()
{
    Mapper.CreateMap<OrderPaymentConfirmed, OrderConfirmed>();
}

public SeatAssignmentsHandler(IEventSourcedRepository<Order> ordersRepo, IEventSourcedRepository<SeatAssignments> assignmentsRepo)
{
    this.ordersRepo = ordersRepo;
    this.assignmentsRepo = assignmentsRepo;
}

public void Handle(OrderPaymentConfirmed @event)
{
    this.Handle(Mapper.Map<OrderConfirmed>(@event));
}

public void Handle(OrderConfirmed @event)
{
    var order = this.ordersRepo.Get(@event.SourceId);
    var assignments = order.CreateSeatAssignments();
    assignmentsRepo.Save(assignments);
}
複製代碼

您還能夠在OrderViewModelGenerator類中看到一樣的技術。

Order類中的方法略有不一樣,由於這是持久化到事件存儲中的事件之一。下面的代碼示例顯示了Order類中受保護構造函數的一部分:

protected Order(Guid id)
    : base(id)
{
    ...
    base.Handles<OrderPaymentConfirmed>(e => this.OnOrderConfirmed(Mapper.Map<OrderConfirmed>(e)));
    base.Handles<OrderConfirmed>(this.OnOrderConfirmed);
    ...
}
複製代碼

Jana(軟件架構師)發言:
以這種方式處理舊事件對於這個場景很是簡單,由於唯一須要更改的是事件的名稱。若是事件的屬性也發生了變化,狀況會更加複雜。未來,Contoso將考慮在基礎設施中進行映射,以免遺留事件污染領域模型。

在UI中顯示剩餘座位

作出這一改變有三個具體的目標,它們都是相關的。咱們想要:

  • 修改系統,在會議系統的讀模型中包含每一個座位類型的剩餘座位數量信息。
  • 修改UI以顯示每種座位類型的剩餘座位數量。
  • 確保升級到V2後系統功能正常。

向讀模型添加關於剩餘座位數量的信息

系統要能顯示剩餘座位數量的信息來自兩個地方:

  • 當業務客戶建立新的座位類型或修改座位配額時,會議管理限界上下文將引起SeatCreatedSeatUpdated事件。
  • 在訂單和註冊限界上下文中,當註冊者建立一個訂單的時候,可用座位(SeatsAvailability)聚合將引起SeatsReserved、SeatsReservationCancelled和AvailableSeatsChanged事件。

備註:ConferenceViewModelGenerator類不使用SeatCreatedSeatUpdated事件。

訂單和註冊限界上下文中的ConferenceViewModelGenerator類如今處理這些事件,並使用它們來計算和存儲讀模型中的座位類型數量。下面的代碼示例顯示了ConferenceViewModelGenerator類中的相關處理程序:

public void Handle(AvailableSeatsChanged @event)
{
    this.UpdateAvailableQuantity(@event, @event.Seats);
}

public void Handle(SeatsReserved @event)
{
    this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged);
}

public void Handle(SeatsReservationCancelled @event)
{
    this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged);
}

private void UpdateAvailableQuantity(IVersionedEvent @event, IEnumerable<SeatQuantity> seats)
{
    using (var repository = this.contextFactory.Invoke())
    {
        var dto = repository.Set<Conference>().Include(x => x.Seats).FirstOrDefault(x => x.Id == @event.SourceId);
        if (dto != null)
        {
            if (@event.Version > dto.SeatsAvailabilityVersion)
            {
                foreach (var seat in seats)
                {
                    var seatDto = dto.Seats.FirstOrDefault(x => x.Id == seat.SeatType);
                    if (seatDto != null)
                    {
                        seatDto.AvailableQuantity += seat.Quantity;
                    }
                    else
                    {
                        Trace.TraceError("Failed to locate Seat Type read model being updated with id {0}.", seat.SeatType);
                    }
                }

                dto.SeatsAvailabilityVersion = @event.Version;

                repository.Save(dto);
            }
            else
            {
                Trace.TraceWarning ...
            }
        }
        else
        {
            Trace.TraceError ...
        }
    }
}
複製代碼

UpdateAvailableQuantity方法將事件上的版本與讀模型的當前版本進行比較,以檢測可能的重複消息。

Markus(軟件開發人員)發言:
此檢查僅檢測重複的消息,而不是超出序列的消息。

修改UI以顯示剩餘的座位數量

如今,當UI向會議的讀模型查詢座位類型列表時,列表包括當前可用的座位數量。下面的代碼示例顯示了RegistrationController MVC控制器如何使用SeatType類的AvailableQuantity

private OrderViewModel CreateViewModel()
{
    var seatTypes = this.ConferenceDao.GetPublishedSeatTypes(this.ConferenceAlias.Id);
    var viewModel =
        new OrderViewModel
        {
            ConferenceId = this.ConferenceAlias.Id,
            ConferenceCode = this.ConferenceAlias.Code,
            ConferenceName = this.ConferenceAlias.Name,
            Items =
                seatTypes.Select(
                    s =>
                        new OrderItemViewModel
                        {
                            SeatType = s,
                            OrderItem = new DraftOrderItem(s.Id, 0),
                            AvailableQuantityForOrder = s.AvailableQuantity,
                            MaxSelectionQuantity = Math.Min(s.AvailableQuantity, 20)
                        }).ToList(),
        };

    return viewModel;
}
複製代碼

數據遷移

保存會議讀模型數據的數據庫有一個新列來保存用於檢查重複事件的版本號,而保存座位類型讀模型數據有一個新列來保存可用的座椅數量。

做爲數據遷移的一部分,有必要爲每一個可用座位(SeatsAvailability)聚合重放事件存儲中的全部事件,以便正確計算可用數量。

不讓命令消息重複

系統目前使用Azure服務總線傳輸消息。當系統從ConferenceProcessor類的啓動代碼初始化Azure服務總線時,它配置Topic來檢測重複的消息,以下面的ServiceBusConfig類的代碼示例所示:

private void CreateTopicIfNotExists() 
{     
    var topicDescription =         
        new TopicDescription(this.topic)         
        {             
            RequiresDuplicateDetection = true,
            DuplicateDetectionHistoryTimeWindow = topic.DuplicateDetectionHistoryTimeWindow,         
        };     
    try     
    {         
        this.namespaceManager.CreateTopic(topicDescription);     
    }     
    catch (MessagingEntityAlreadyExistsException) { } 
} 
複製代碼
備註:您能夠在Settings.xml文件中配置DuplicateDetectionHistoryTimeWindow
能夠向Topic元素添加這個屬性。默認值是1小時。
複製代碼

可是,爲了使重複檢測工做正常,您必須確保每一個消息都有一個唯一的ID。下面的代碼示例顯示了MarkSeatsAsReserved命令:

public class MarkSeatsAsReserved : ICommand
{
    public MarkSeatsAsReserved()
    {
        this.Id = Guid.NewGuid();
        this.Seats = new List<SeatQuantity>();
    }

    public Guid Id { get; set; }

    public Guid OrderId { get; set; }

    public List<SeatQuantity> Seats { get; set; }

    public DateTime Expiration { get; set; }
}
複製代碼

CommandBus類中的BuildMessage方法使用命令Id建立一個唯一的消息Id, Azure服務總線可使用這個消息Id來檢測重複:

private BrokeredMessage BuildMessage(Envelope command) 
{ 
    var stream = new MemoryStream(); 
    ...

    var message = new BrokeredMessage(stream, true);
    if (!default(Guid).Equals(command.Body.Id))
    {
        message.MessageId = command.Body.Id.ToString();
    }

...

    return message;
} 
複製代碼

保證消息順序

團隊決定使用Azure服務總線消息會話來保證系統中的消息順序。

系統從ConferenceProcessor類中的OnStart方法配置Azure服務總線Topic和訂閱。Settings.xml配置文件中的配置指定了具體的訂閱使用會話。ServiceBusConfig類中的如下代碼示例顯示了系統如何建立和配置訂閱。

private void CreateSubscriptionIfNotExists(NamespaceManager namespaceManager, TopicSettings topic, SubscriptionSettings subscription)
{
    var subscriptionDescription =
        new SubscriptionDescription(topic.Path, subscription.Name)
        {
            RequiresSession = subscription.RequiresSession
        };

    try
    {
        namespaceManager.CreateSubscription(subscriptionDescription);
    }
    catch (MessagingEntityAlreadyExistsException) { }
}
複製代碼

如下來自SessionSubscriptionReceiver類的代碼示例演示瞭如何使用會話接收消息:

private void ReceiveMessages(CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        MessageSession session;
        try
        {
            session = this.receiveRetryPolicy.ExecuteAction<MessageSession>(this.DoAcceptMessageSession);
        }
        catch (Exception e)
        {
            ...
        }

        if (session == null)
        {
            Thread.Sleep(100);
            continue;
        }


        while (!cancellationToken.IsCancellationRequested)
        {
            BrokeredMessage message = null;
            try
            {
                try
                {
                    message = this.receiveRetryPolicy.ExecuteAction(() => session.Receive(TimeSpan.Zero));
                }
                catch (Exception e)
                {
                    ...
                }

                if (message == null)
                {
                    // If we have no more messages for this session, exit and try another.
                    break;
                }

                this.MessageReceived(this, new BrokeredMessageEventArgs(message));
            }
            finally
            {
                if (message != null)
                {
                    message.Dispose();
                }
            }
        }

        this.receiveRetryPolicy.ExecuteAction(() => session.Close());
    }
}

private MessageSession DoAcceptMessageSession()
{
    try
    {
        return this.client.AcceptMessageSession(TimeSpan.FromSeconds(45));
    }
    catch (TimeoutException)
    {
        return null;
    }
}
複製代碼

Markus(軟件開發人員)發言:
您可能會發現,將使用消息會話的ReceiveMessages方法的這個版本與SubscriptionReceiver類中的原始版本進行比較是頗有用的。

您必須確保當你發送消息包含一個會話ID,這樣才能使用消息會話接收一條消息。系統使用事件的SourceID做爲會話ID,以下面的代碼示例所示的EventBus類中的BuildMessage方法:

var message = new BrokeredMessage(stream, true);
message.SessionId = @event.SourceId.ToString();
複製代碼

經過這種方式,您能夠確保以正確的順序接收來自單個源的全部消息。

Poe(IT運維人員)發言:
在V2版本中,團隊更改了系統建立Azure服務總線Topic和訂閱的方式。以前,SubscriptionReceiver類建立了它們(若是它們還不存在)。如今,系統在應用程序啓動時使用配置數據建立它們。這發生在啓動過程的早期,以免在系統初始化訂閱以前將消息發送到Topic時丟失消息的風險。

然而,只有當消息按正確的順序傳遞到總線上時,會話才能保證按順序傳遞消息。若是系統異步發送消息,則必須特別注意確保消息以正確的順序放在總線上。在咱們的系統中,來自每一個單獨聚合實例的事件按順序到達是很重要的,可是咱們不關心來自不一樣聚合實例的事件的順序。所以,儘管系統異步發送事件,EventStoreBusPublisher實例仍然會在發送下一個事件以前等待前一個事件已發送的確認。如下來自TopicSender類的示例說明了這一點:

public void Send(Func<BrokeredMessage> messageFactory)
{
    var resetEvent = new ManualResetEvent(false);
    Exception exception = null;
    this.retryPolicy.ExecuteAction(
        ac =>
        {
            this.DoBeginSendMessage(messageFactory(), ac);
        },
        ar =>
        {
            this.DoEndSendMessage(ar);
        },
        () => resetEvent.Set(),
        ex =>
        {
            Trace.TraceError("An unrecoverable error occurred while trying to send a message:\r\n{0}", ex);
            exception = ex;
            resetEvent.Set();
        });

    resetEvent.WaitOne();
    if (exception != null)
    {
        throw exception;
    }
}
複製代碼

Jana(軟件架構師)發言:
此代碼示例展現了系統如何使用Transient Fault Handling Application Block來讓異步調用可靠。

有關消息排序和Azure服務總線的更多信息,請參見Microsoft Azure Queues and Microsoft Azure Service Bus Queues - Compared and Contrasted

有關異步發送消息和排序的信息,請參閱博客文章Microsoft Azure Service Bus Splitter and Aggregator

從會議管理限界上下文中持久化事件

團隊決定建立一個包含全部發送的命令和事件的消息日誌。這將使訂單和註冊限界上下文可以從會議管理限界上下文查詢此日誌,以獲取其構建讀模型所需的事件。這不是事件源,由於咱們沒有使用這些事件來重建聚合的狀態,儘管咱們使用相似的技術來捕獲和持久化這些集成事件。

Gary(CQRS專家)發言:
此消息日誌確保不會丟失任何消息,以便未來可以知足其餘需求。

向消息添加額外元數據

系統如今將全部消息保存到消息日誌中。爲了方便查詢特定命令或事件,系統如今向每一個消息添加了更多的元數據。之前,唯一的元數據是事件類型,如今,事件元數據包括事件類型、命名空間、程序集和路徑。系統將元數據添加到EventBus類中的事件和CommandBus類中的命令中。

捕獲消息並將消息持久化到消息日誌中

系統使用Azure服務總線中對會議/命令和會議/事件topic的額外訂閱來接收系統中每條消息的副本。而後,它將消息附加到Azure表存儲中。下面的代碼示例顯示了AzureMessageLogWriter類的實例,它用於將消息保存到表中:

public class MessageLogEntity : TableServiceEntity 
{ 
    public string Kind { get; set; }     
    public string CorrelationId { get; set; }     
    public string MessageId { get; set; }     
    public string SourceId { get; set; }     
    public string AssemblyName { get; set; }     
    public string Namespace { get; set; }     
    public string FullName { get; set; }     
    public string TypeName { get; set; }     
    public string SourceType { get; set; }     
    public string CreationDate { get; set; }     
    public string Payload { get; set; } 
} 
複製代碼

Kind屬性指定消息是命令仍是事件。MessageId和CorrelationId屬性由消息傳遞基礎設施設置的,其他屬性是從消息元數據中設置的。

下面的代碼示例顯示了這些消息的分區和RowKey的定義:

PartitionKey = message.EnqueuedTimeUtc.ToString("yyyMM"),
RowKey = message.EnqueuedTimeUtc.Ticks.ToString("D20") + "_" + message.MessageId
複製代碼

注意,RowKey保存了消息最初發送的順序,並添加到消息ID上,以確保唯一性,以防兩條消息同時入隊。

Jana(軟件架構師)發言:
這與事件存儲不一樣,在事件存儲區中,分區鍵標識聚合實例,而RowKey標識聚合的版本號。

數據遷移

當Contoso將系統從V1遷移到V2時,它將使用消息日誌在訂單和註冊限界上下文中重建會議和價格訂單的讀模型。

Gary(CQRS專家)發言:
Contoso能夠在須要重建與聚合無關的事件構建的讀模型時來使用消息日誌,例如來自會議管理限界上下文的集成事件。

會議讀模型包含會議的信息,幷包含來自會議管理限界上下文的ConferenceCreated、ConferenceUpdated、ConferencePublished、ConferenceUnpublished、SeatCreated和SeatUpdated事件的信息。

價格訂單讀模型持有來自於SeatCreated和SeatUpdated事件的信息,這些事件來自於會議管理限界上下文。

然而,在V1中,這些事件消息沒有被持久化,所以讀模型不能在V2中從新填充。爲了解決這個問題,團隊實現了一個數據遷移實用程序,它使用一種最佳方法來生成包含要存儲在消息日誌中的丟失數據的事件。例如,在遷移到V2以後,消息日誌不包含ConferenceCreated事件,所以遷移實用程序在會議管理限界上下文使用的數據庫中找到這些信息,並建立丟失的事件。您能夠在MigrationToV2項目的Migrator類中的GeneratePastEventLogMessagesForConferenceManagement方法中看到這是如何完成的。

Markus(軟件開發人員)發言:
您能夠在這個類中看到,Contoso還將全部現有的事件源事件複製到消息日誌中。

以下面所示,Migrator類中的RegenerateViewModels方法從新構建讀取的模型。它經過調用Query方法從消息日誌中檢索全部事件,而後使用ConferenceViewModelGeneratorPricedOrderViewModelUpdater類來處理消息。

internal void RegenerateViewModels(AzureEventLogReader logReader, string dbConnectionString)
{
    var commandBus = new NullCommandBus();

    Database.SetInitializer<ConferenceRegistrationDbContext>(null);

    var handlers = new List<IEventHandler>();
    handlers.Add(new ConferenceViewModelGenerator(() => new ConferenceRegistrationDbContext(dbConnectionString), commandBus));
    handlers.Add(new PricedOrderViewModelUpdater(() => new ConferenceRegistrationDbContext(dbConnectionString)));

    using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString))
    {
        context.UpdateTables();
    }

    try
    {
        var dispatcher = new MessageDispatcher(handlers);
        var events = logReader.Query(new QueryCriteria { });

        dispatcher.DispatchMessages(events);
    }
    catch
    {
        using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString))
        {
            context.RollbackTablesMigration();
        }

        throw;
    }
}
複製代碼

Jana(軟件架構師)發言:
查詢可能不會很快,由於它將從多個分區檢索實體。

注意這個方法如何使用NullCommandBus實例來接收來自ConferenceViewModelGenerator實例的任何命令,由於咱們只是在這裏從新構建讀模型。

之前,PricedOrderViewModelGenerator使用ConferenceDao類來獲取關於座位的信息。如今,它是自治的,並直接處理SeatCreatedSeatUpdated事件來維護這些信息。做爲遷移的一部分,必須將此信息添加到讀模型中。在前面的代碼示例中,PricedOrderViewModelUpdater類只處理SeatCreatedSeatUpdated事件,並將缺失的信息添加到價格訂單讀模型中。

從V1遷移到V2

從V1遷移到V2須要更新已部署的應用程序代碼並遷移數據。在生產環境中執行遷移以前,應該始終在測試環境中演練遷移。如下是所需步驟:

  1. 將V2版本部署到Azure的staging環境中。V2版本有一個MaintenanceMode屬性,最初設置爲true。在此模式下,應用程序向用戶顯示一條消息,說明站點當前正在進行維護,而工做角色將不處理消息。
  2. 準備好以後,將V2版本(仍然處於維護模式,MaintenanceMode爲true)切換到Azure生產環境中。
  3. 讓V1版本(如今在staging環境中運行)運行幾分鐘,以確保全部正在運行的消息都完成了它們的處理。
  4. 運行遷移程序來遷移數據(參見下面)。
  5. 成功完成數據遷移後,將每種工做角色的MaintenanceMode屬性更改成false。
  6. V2版本如今運行在Azure中。

Jana(軟件架構師)發言:
團隊考慮使用單獨的應用程序在升級過程當中向用戶顯示一條消息,告訴他們站點正在進行維護。然而,在V2版本中使用MaintenanceMode屬性提供了一個更簡單的過程,併爲應用程序添加了一個潛在有用的新特性。

Poe(IT運維人員)發言:
因爲對事件存儲的更改,不可能執行從V1到V2的無停機升級。然而,團隊所作的更改將確保從V2遷移到V3將不須要停機時間。

Markus(軟件開發人員)發言:
團隊對遷移實用程序應用了各類優化,例如批處理操做,以最小化停機時間。

下面幾節總結了從V1到V2的數據遷移。這些步驟中的一些在前面已經討論過,涉及到應用程序的特定更改或加強。

團隊爲V2引入的一個更改是,將全部命令和事件消息的副本保存在消息日誌中,以便做爲將來的證據,經過捕獲未來可能使用的全部內容來保證應用程序的安全性。遷移過程考慮到了這個新特性。

由於遷移過程複製了大量的數據,因此您應該在Azure工做角色中運行遷移過程,以最小化成本。遷移實用程序是一個控制檯應用程序,所以您可使用Azure和遠程桌面服務。有關如何在Azure角色實例中運行應用程序的信息,請參見Using Remote Desktop with Microsoft Azure Roles

Poe(IT運維人員)發言:
在一些組織中,安全策略不容許您在Azure生產環境使用遠程桌面服務。可是,您只須要一個在遷移期間承載遠程桌面會話的工做角色,您能夠在遷移完成後刪除它。您還能夠將遷移代碼做爲工做角色而不是控制檯應用程序運行,並確保它記錄遷移的狀態,以便您驗證。

爲會議管理限界上下文生成過去的日誌消息

遷移過程的一部分是在可能的狀況下從新建立V1版本處理後丟棄的消息,而後將它們添加到消息日誌中。在V1版本中,全部從會議管理限界上下文發送到訂單和註冊限界上下文的集成事件都以這種方式丟失了。系統不能從新建立全部丟失的事件,但能夠建立表示遷移時系統狀態的事件。

有關更多信息,請參見本章前面的「從會議管理限界上下文中持久化事件」一節。

遷移事件源裏的事件

在V2版本中,事件存儲爲每一個事件存儲額外的元數據,以便於查詢事件。遷移過程將全部事件從現有事件存儲複製到具備新模式的新事件存儲。

Jana(軟件架構師)發言:
原始事件不會以任何方式更新,而是被視爲不可變的。

同時,系統將全部這些事件的副本添加到V2版本中引入的消息日誌中。

有關更多信息,請參見MigrationToV2項目中Migrator類中的MigrateEventSourcedAndGeneratePastEventLogs

重建讀模型**

V2版本包括對訂單和註冊限界上下文中讀模型定義的幾個更改。MigrationToV2項目在訂單和註冊限界上下文中從新構建會議的讀模型和價格訂單的讀模型。

有關更多信息,請參見本章前面的「從會議管理限界上下文中持久化事件」一節。

對測試的影響

在這個過程的這個階段,測試團隊繼續擴展驗收測試集合。他們還建立了一組測試來驗證數據遷移過程。

再說SpecFlow

以前,這組SpecFlow測試以兩種方式實現:經過自動化web瀏覽器模擬用戶交互,或者直接在MVC控制器上操做。這兩種方法都有各自的優缺點,咱們在第4章「擴展和加強訂單和註冊限界上下文」中討論過。

在與另外一位專家討論了這些測試以後,團隊還實現了第三種方法。從領域驅動設計(DDD)方法的角度來看,UI不是領域模型的一部分,核心團隊的重點應該是在領域專家的幫助下理解領域,並在領域中實現業務邏輯。UI只是機械部分,用於使用戶可以與領域進行交互。所以,驗收測試應該包括驗證領域模型是否以領域專家指望的方式工做。所以,團隊使用SpecFlow建立了一組驗收測試,這些測試旨在在不影響系統UI部分的狀況下測試領域。

下面的代碼示例顯示了SelfRegistrationEndToEndWithDomain.feature文件,該文件在Conference.AcceptanceTests項目中的Features\Domain\Registration文件夾裏,注意When和Then子句怎麼使用命令和事件的。

Gary(CQRS專家)發言:
一般,若是您的領域模型只使用聚合,您會指望When子句發送命令,Then子句查看事件或異常。然而,在本例中,領域模型包含一個經過發送命令來響應事件的流程管理器。測試將檢查是否發送了全部預期的命令,並引起了全部預期的事件。

Feature: Self Registrant end to end scenario for making a Registration for a Conference site with Domain Commands and Events
    In order to register for a conference
    As an Attendee
    I want to be able to register for the conference, pay for the Registration Order and associate myself with the paid Order automatically


Scenario: Make a reservation with the selected Order Items
Given the list of the available Order Items for the CQRS summit 2012 conference
    | seat type | rate | quota |
    | General admission | $199 | 100 |
    | CQRS Workshop | $500 | 100 |
    | Additional cocktail party | $50 | 100 |
And the selected Order Items
    | seat type | quantity |
    | General admission | 1 |
    | Additional cocktail party | 1 |
When the Registrant proceeds to make the Reservation
    # command:RegisterToConference
Then the command to register the selected Order Items is received 
    # event: OrderPlaced
And the event for Order placed is emitted
    # command: MakeSeatReservation
And the command for reserving the selected Seats is received
    # event: SeatsReserved
And the event for reserving the selected Seats is emitted
    # command: MarkSeatsAsReserved
And the command for marking the selected Seats as reserved is received
    # event: OrderReservationCompleted 
And the event for completing the Order reservation is emitted
    # event: OrderTotalsCalculated
And the event for calculating the total of $249 is emitted
複製代碼

下面的代碼示例顯示了feature文件的一些步驟實現。這些步驟使用命令總線發送命令。

[When(@"the Registrant proceed to make the Reservation")]
public void WhenTheRegistrantProceedToMakeTheReservation()
{
    registerToConference = ScenarioContext.Current.Get<RegisterToConference>();
    var conferenceAlias = ScenarioContext.Current.Get<ConferenceAlias>();

    registerToConference.ConferenceId = conferenceAlias.Id;
    orderId = registerToConference.OrderId;
    this.commandBus.Send(registerToConference);

    // Wait for event processing
    Thread.Sleep(Constants.WaitTimeout);
}

[Then(@"the command to register the selected Order Items is received")]
public void ThenTheCommandToRegisterTheSelectedOrderItemsIsReceived()
{
    var orderRepo = EventSourceHelper.GetRepository<Registration.Order>();
    Registration.Order order = orderRepo.Find(orderId);

    Assert.NotNull(order);
    Assert.Equal(orderId, order.Id);
}

[Then(@"the event for Order placed is emitted")]
public void ThenTheEventForOrderPlacedIsEmitted()
{
    var orderPlaced = MessageLogHelper.GetEvents<OrderPlaced>(orderId).SingleOrDefault();

    Assert.NotNull(orderPlaced);
    Assert.True(orderPlaced.Seats.All(
        os => registerToConference.Seats.Count(cs => cs.SeatType == os.SeatType && cs.Quantity == os.Quantity) == 1));
}
複製代碼

在遷移過程當中發現的bug

當測試團隊在遷移以後在系統上運行測試時,咱們發現訂單和註冊限界上下文中座位類型的數量與遷移以前的數量不一樣。調查揭示瞭如下緣由。

若是會議從未發佈過,則會議管理限界上下文容許業務客戶刪除座位類型,但不會引起集成事件向訂單和註冊限界上下文報告這一狀況。因此,當業務客戶建立新的座位類型時,訂單和註冊限界上下文從會議管理限界上下文接收事件,而不是當業務客戶刪除座位類型時。

遷移過程的一部分建立一組集成事件,以替換V1版本處理後丟棄的事件。它經過讀取會議管理限界上下文使用的數據庫來建立這些事件。此過程沒有爲已刪除的座位類型建立集成事件。

總之,在V1版本中,已刪除的座位類型錯誤地出如今訂單和註冊限界上下文的讀模型中。在遷移到V2版本以後,這些已刪除的座位類型沒有出如今訂單和註冊限界上下文的讀模型中。

Poe(IT運維人員)發言:
測試遷移過程不只驗證遷移是否按預期運行,並且可能揭示應用程序自己的bug。

總結

在咱們旅程的這個階段,咱們對系統進行了版本控制,並完成了V2僞生產版本。這個新版本包含了一些額外的功能和特性,好比支持不須要付費的訂單和在UI中顯示更多信息。

咱們還對基礎設施作了一些改變。例如,咱們使更多的消息具備冪等性,如今持久化集成事件。下一章將描述咱們旅程的最後階段,咱們將繼續加強基礎設施,並在準備發佈V3版本時增強系統。

相關文章
相關標籤/搜索