《領域驅動設計之PHP實現》全書翻譯 - 領域事件

第六章 領域事件

  1. 《領域驅動設計之PHP實現》全書翻譯 - DDD入門
  2. 《領域驅動設計之PHP實現》全書翻譯 - 架構風格
  3. 《領域驅動設計之PHP實現》全書翻譯 - 值對象
  4. 《領域驅動設計之PHP實現》全書翻譯 - 實體
  5. 《領域驅動設計之PHP實現》全書翻譯 - 服務
  6. 《領域驅動設計之PHP實現》全書翻譯 - 領域事件
  7. 《領域驅動設計之PHP實現》全書翻譯 - 模塊
  8. 《領域驅動設計之PHP實現》全書翻譯 - 聚合
  9. 《領域驅動設計之PHP實現》全書翻譯 - 工廠
  10. 《領域驅動設計之PHP實現》全書翻譯 - 倉儲
  11. 《領域驅動設計之PHP實現》全書翻譯 - 應用服務
  12. 《領域驅動設計之PHP實現》全書翻譯 - 集成上下文
  13. 《用PHP實現六邊形架構》

軟件事件即系統中其它組件感興趣的事物。PHP 程序員工做中廣泛不使用事件,由於這不是該語言的特性。不過,如今更常見的是,新的框架和庫採用它們來提供一種新的解耦,重用和加速代碼的途徑。php

領域事件是領域發生改變時相關的事件。領域事件即發生在領域內的事件,是領域專家所關心的。程序員

在領域驅動設計中,領域事件是基礎構建塊,它們能夠:web

  • 與其它界限上下文通信
  • 提升性能和可擴展性,推動最終一致性
  • 做爲歷史歸檔記錄

領域事件的本質是異步通訊。有關此主題的詳細信息,咱們推薦 Gregor Hohpe 和 Bobby Woolf 《企業集成模式:設計,構建及部署消息傳遞解決方案》一書。算法

簡介

假設有一個 Javascript 2D 平臺遊戲,在屏幕上同時有大量不一樣組件交互。其中一個組件表示剩餘生命值,另外一個顯示全部得分,還有一個顯示完成當前等級還剩餘的時間。每次你的角色跳到敵人身上,分數就會增長。當你的得分高於一個分數值時,就會獲取額外一條命。當你的角色撿起一把鑰匙,一扇門一般就會打開。可是全部這些組件是如何相互交互的呢?此場景最佳構架又是什麼?數據庫

這裏有兩個可選的方案:第一種是每一個組件與它所鏈接的組件結合起來。不過,在上面的例子中,意味着有大量組件耦合在一塊兒,每一個額外的功能增長都須要開發者修改代碼。但你還記得開閉原則(OCP)嗎?增長一個新的組件不該該使它必須更新第一個組件,這會有太多要維護的工做。json

第二種,更好的方法,就是將全部組件鏈接到一個單獨的對象上,該對象處理遊戲中全部重要的事件。它接收來自每一個組件的事件並轉發給特定的組件。例如,得分組件可能對一個 EnemyKilled 事件感興趣,而 LifeCaptered 事件對玩家實體和剩餘生命數組件至關有用。一般這種方式,全部組件與一個管理全部通知的單獨組件耦合。使用這種方法,增長或者移除組件都不會影響現有的組件。segmentfault

當開發一個單體應用時,事件對於解耦組件很是有用。當用分佈式方式來開發整個領域時,事件對於領域中發揮做用的每一個服務或者應用之間的解耦至關有用。關鍵點是同樣的,但規模卻不一樣。設計模式

定義

領域事件是一種特殊類型的事件,用來通知本地或者遠程領域限界上下文的變化。
Vaughn Vernon 定義領域事件爲:數組

領域中發生的事情。

Eric Evans 定義領域事件爲:服務器

領域事件即領域模型中一個完整的部分,是領域中發生的事件的表現形式。突然不相碰的領域活動,同時明確領域專家但願追蹤的,或者被通知的事件,或與其它領域對象狀態改變有關的事件。

Martin Flower 定義領域事件爲:

一類捕獲影響領域的感興趣的記錄。

在 web 應用裏的領域事件例子有用戶註冊定貨轉移用戶以及添加產品等。

小故事

在一家售票代理機構中,運營經理決定提升 U2 秀節目的價格。她進入後臺,編輯該節目。一個ShowPriceChanged 領域事件被髮布,而且在同一事務裏將新的節目價格持久到數據庫中。

一個批處理進程獲取該領域事件並它投遞到 RabbitMQ 隊列中。領域事件被成成兩個隊列:一是同一個本地限界上下文,另外一個遠程事件用於商務智能目的。

在第一個隊列中,一個工做進程經過事件裏的 ID 檢索相應的節目,並將其推送到 Elasticsearch 服務器,從而使得用戶在搜索時能夠看到最新價格。

在第二個隊列中,另外一個進程將信息插入到一個日誌服務器或者數據池,在這能夠運行報表或者數據挖掘進程。

一個不能使用領域事件集成到系統的外部應用能夠經過本地限界上下文提供的 REST API,訪問全部的 ShowPriceChanged 事件。

如你所見,領域事件在處理最終一致性和整合不一樣限界上下文時很是有用。聚合建立併發布事件。訂閱者能夠存儲事件以及以後轉發它們給其它遠程訂閱者。

隱喻

星期二咱們去巴布飯店吃飯,用信用卡支付。這可能被建模爲一個事件,事件類型爲 "下單",主題是 "個人信用卡", 發生時間爲 "星期二"。若是巴布飯使用舊的手動系統,直到週五才傳輸交易,那麼交易將在週五生效。

事情就這樣發生了。並非全部事情都有意義,一些值得記錄但並不會發生反應。然而,通常是最感興趣的事情才發生反應。許多須要對感興趣的事件作出反應。多數狀況下你須要知道爲何一個系統會作出這樣的反應。

經過將系統的輸入傳輸到領域事件流中,你能夠記錄全部的系統輸入。這有助於你組織你的處理邏輯,還容許你保留系統輸入的審覈日誌。

練習
嘗試在你當前的領域中定位潛在的領域事件

真實案例

在進入瞭解領域事件的細節以前,讓咱們來看一個真實的領域事件實例,以及它們是怎樣對咱們的應用和整個領域起到幫助的。

讓咱們考慮一個簡單的 Application Service,新用戶註冊,例如一個電子商務上下文。Application Service 會在其它章節闡述,因此沒必要在表面上操心太多。相反的,僅須要關注執行方法:

class SignUpUserService implements ApplicationService
{
    private $userRepository;
    private $userFactory;
    private $userTransformer;

    public function __construct(
        UserRepository $userRepository,
        UserFactory $userFactory,
        UserTransformer $userTransformer
    )
    {
        $this->userRepository = $userRepository;
        $this->userFactory = $userFactory;
        $this->userTransformer = $userTransformer;
    }

    /**
     * @param SignUpUserRequest $request
     * @return User
     * @throws UserAlreadyExistsException
     */
    public function execute(SignUpUserRequest $request)
    {
        $email = $request->email();
        $password = $request->password();
        $user = $this->userRepository->userOfEmail($email);
        if ($user) {
            throw new UserAlreadyExistsException();
        }
        $user = $this->userFactory->build(
            $this->userRepository->nextIdentity(),
            $email,
            $password
        );
        $this->userRepository->add($user);
        $this->userTransformer->write($user);
    }
}

如上所示,Application Service 部分會檢查用戶是否存在。若是不存在,則會建立一個新用戶並添加到 UserRepository 中。

如今考慮一個附加需求:一個新用戶在註冊時須要用郵件提醒。不須要想太多,首先咱們想到的方法就是更新 Application Service,加入一段能夠完成這項工做的代碼,多是 EmailSender 這種在添加方法以後運行的代碼。不過,如今讓咱們考慮另外一種方法。

觸發一個 UserRegistered 事件,另外一個組件監聽到後發送郵件怎麼樣?這種新方法有一些很是酷的好處。首先,在新用戶註冊時,咱們不須要每次再去更新 Application Service 的代碼。其次,它更易於測試。Application Service 也變得更簡單。每次有新的動做開發時,咱們僅須要爲此動做寫測試用例。

後來在同一個電子商務項目中,咱們被告知集成一個非 PHP 編寫的開源遊戲化平臺。每次用戶在咱們的電子商務上下文下單或者瀏覽產品時,他們能夠在他們的電子商務用戶主頁上看到所獲取的徽章或者被郵件通知到。咱們該如何爲此問題建模呢?

按照第一種方法,咱們將用以前確認電子郵件的方法來更新應用服務,來整合到新的平臺中。使用領域事件的方法,咱們能夠爲 UserRegistered 建立另外一個 listener 事件,該事件能夠用 REST 或者 SOA 的方式鏈接到遊戲平臺。更妙的是,它能夠將事件放到 RabbitMQ 這樣的消息隊列,以便遊戲限界上下文能夠訂閱並自動收到通知。咱們電子商務限界上下文根本不須要了解遊戲上下文。

特徵

領域事件一般是不可變的,由於他們是過去某些內容的記錄。除了事件的描述外,一個領域事件一般包含一個事件發生時刻的時間戳以及事件中涉及的實體標識。此外,一個領域事件一般具備單獨的時間戳,來指示事件什麼時候進入系統,以及輸入事件的人員身份。領域事件自己的標識能夠基於這些屬性集。例如,若是同一個事件的兩個實例到達一個節點,它們能夠被視爲相同。

領域事件的本質就是,你可使用它來捕獲應用中那麼能夠觸發改變的事物,或者領域中其它應用中你感興趣的改變。這些隨後被處理的事件對象會致使系統的改變,並被存儲在審記系統中。

命名約定

全部事件都必須用過去時動詞表示,由於它們都在過去發生的。例如,CustomerRelocatedCargoShipped,或者 InventoryLossageRecorded。在英語中有一些有趣的例子,人們可能會傾向於使用名詞,而不是過去時動詞。例如一個對天然災害感興趣的國會議來講,"地震"或者"傾塌"就是相關事件。咱們建議儘可能避免在領域事件中使用相似名詞的誘惑,而是堅持用動詞的過去時態。

領域事件與通用語言

當咱們討論"重定位用戶"的反作用時,請思考通用語言的不一樣。這個事件使概念變得明確,而之前,聚合或者多個聚合之間發生的改變會留下隱式的概念,這些都須要探索和定義。例如,在大多數系統中,當Hibernate或者實體框架這樣的庫上發生反作用時,它不會影響到領域。從客戶端的角度來看,這些事件是隱式和透明的。事件的引入使概念變得明確,並使之成爲通用語言的一部分。"重定位用戶"不只僅是改變某些內容,還會在語言中顯式的產生CustomerRelocatedEvent事件。

不變性

正如咱們說起過的,領域事件關注的是領域內過去發生的改變。根據定義,你不可能改變過去,除非你是Marty McFly而且有一個DeLorean(譯者注:這裏是《回到將來》電影裏的角色)。所以,請記住領域事件是不可變的。

Symfony 事件分派器

一些 PHP 框架支持事件。不過,不要混淆這些事件與領域事件。它們在特徵和目的上是不一樣的。例如,SymfonyEvent Dispatcher 組件,若是你須要爲一個狀態機實現一個事件系統,則能夠依賴它。在Symfony中,在請求與響應的轉換過程也是由事件處理。可是,Symfony Events是可變的,而且每一個listeners偵聽器都可以修改,添加或者更新事件中的信息。

事件建模

爲了準確地描述你的領域業務,你須要與領域專家緊密合做,來定義通用語言。這須要使用領域事件,實體,值對象等等來完成領域概念。在對事件建模時,依據通用語言,在它們的限界上下文內去命名事件及它們的屬性。若是一個事件是一個聚合上的命令執行操做的結果,則名稱一般派生自執行的命令。事件名稱必須反映事件過去的性質,這一點很是和重要。

讓咱們考慮用戶註冊功能。領域事件須要表示它。下面的代碼顯示了基本領域事件的最小接口:

interface DomainEvent
{
    /**
     * @return DateTimeImmutable
     */
    public function occurredOn();
}

正如你所見,最小的必要信息就是DateTimeImmutable,這是爲了知道事件是什麼時候發生的。

如今讓咱們用下面的代碼來建模用戶註冊事件。正如咱們在上面提到的,事件名稱必須是動詞過去式,那麼UserRegistered是個不錯的選擇:

class UserRegistered implements DomainEvent
{
    private $userId;

    public function __construct(UserId $userId)
    {
        $this->userId = $userId;
        $this->occurredOn = new \DateTimeImmutable();
    }

    public function userId()
    {
        return $this->userId;
    }

    public function occurredOn()
    {
        return $this->occurredOn;
    }
}

通知訂閱者新用戶建立所必需的最少許信息就是 UserId。有了這個信息,任何過程,命令,或者應用服務 - 無論是是否來自同一限界上下文 - 均可能都此事件作出反應。

通常來講

  • 領域事件一般被設計爲不變的
  • 構造器將初始化領域事件的所有狀態
  • 領域事件有 getters 訪問器來訪問它們的屬性
  • 領域事件包含執行此動做的聚合根
  • 領域事件包含其它與第一個事件關聯的聚合根
  • 領域事件包含觸發事件的參數(若是有用的話)

可是,若是相同或者不一樣的限界上下文須要更多信息的話會發生什麼?下面讓咱們看看用更多信息來建模領域事件 - 例如,郵箱地址:

class UserRegistered implements DomainEvent
{
    private $userId;
    private $userEmail;

    public function __construct(UserId $userId, $userEmail)
    {
        $this->userId = $userId;
        $this->userEmail = $userEmail;
        $this->occurredOn = new DateTimeImmutable();
    }

    public function userId()
    {
        return $this->userId;
    }

    public function userEmail()
    {
        return $this->userEmail;
    }

    public function occurredOn()
    {
        return $this->occurredOn;
    }
}

上面,咱們添加了郵箱地址,添加更多信息到一個領域事件能夠幫助提升性能或者使不一樣限界上下文整合理簡單化。從另外一個限界上下文的視角來考慮,也有助於建模事件。當在上游的限界上下文建立一個新用戶時,下游的上下文則建立它本身的用戶。添加用戶郵箱能夠保存一個同步請求到上游上下文,以防萬一下游的上下文須要它。

你是否還記得遊戲化機制的例子?爲了建立遊平臺用戶,也就是所說的玩家,那麼一個來自電子商務限界上下文的 UserId 可能就夠了。但若是遊戲平臺要用郵件通知用戶中獎消息怎麼樣?在這種狀況下,郵箱地址則是必要的。因此,若是郵箱地址包含在源領域事件中,咱們就能夠作到。若是不在,遊戲限界上下文就須要用 REST 或者 SOA 從電子商務上下文中獲取這些信息。

爲什麼不用整個用戶實體

想知道你是否應該在限界上下文的領域事件中包含整個用戶實體?咱們的建議是不須要。領域事件通常用於內部的給定上下文或者外部其它上下文的消息通訊。換句話說,在 C2C 電子商務產品目錄限界上下文中的賣方是誰,產品反饋中的產品評論做者是誰。二者能夠共享相同的ID或者電子郵件,可是賣方和做者是不一樣的概念,表明來自不一樣的限界上下文。所以,來自一個限界上下文的實體在另外一個上下文沒有任何意義或徹底不一樣。

Doctrine 事件

領域事件不只僅是作批量做業,例如發送郵件或者與其它上下文通訊。它們也對性能和可擴展提高感興趣。讓咱們看一個例子。

考慮如下場景:你有一個電子商務應用,你的主要持久化機制工具是 MySQL,可是對於瀏覽或者過濾你的產品目錄,你用了一個更好的方法,例如 Elasticsearch 或者 Solr。在 Elasticsearch 裏,你最獲取到存儲在完整數據庫中的一部分信息。如何保持數據同步?內容團隊經過後臺工具更新目錄時會發生什麼?

有人不時重建整個目錄的索引。這很是昂貴且緩慢。一種更明智的方法是更新與已更新的產品的一個或一些文檔。咱們該怎麼作呢?答案是使用領域事件。

不過,假如你已經在用 Doctrine 了,這些對你來講就不怎麼新鮮了。根據 Doctrine 2 ORM 2 Documentation:

Doctrine 2 具備輕量級事件系統,該系統是 Common 包的一部分。Doctrine 使用它來高度系統事件,主要是生命週期事件。你也能夠將其用於你的自定義事件。

此外,它聲明瞭:

生命週期回調定義在一個實體類上。它們使你能夠在該實體的實例遇到相關生命週期事件時觸發回調。每一個生命週期事件能夠定義多個回調。生命週期回調最好用於特定實體類生命週期的簡單操做上。

讓咱們看一個來自 Doctrine Events Documentation 中的例子:

/** @Entity @HasLifecycleCallbacks */
class User
{
// ...
    /**
     * @Column(type="string", length=255)
     */
    public $value;
    /** @Column(name="created_at", type="string", length=255) */
    private $createdAt;

    /** @PrePersist */
    public function doStuffOnPrePersist()
    {
        $this->createdAt = date('Y-m-d H:i:s');
    }

    /** @PrePersist */
    public function doOtherStuffOnPrePersist()
    {
        $this->value = 'changed from prePersist callback!';
    }

    /** @PostPersist */
    public function doStuffOnPostPersist()
    {
        $this->value = 'changed from postPersist callback!';
    }

    /** @PostLoad */
    public function doStuffOnPostLoad()
    {
        $this->value = 'changed from postLoad callback!';
    }

    /** @PreUpdate */
    public function doStuffOnPreUpdate()
    {
        $this->value = 'changed from preUpdate callback!';
    }
}

你能夠將特定任務掛載到 Doctrine 實體生命週期的每一個不一樣的重要時刻。例如,在 PostPersist 上,你能夠生成實體的 JSON 文檔並將其放到 Elasticsearch 中。這樣,就很容易使不一樣持久化機制間的數據保持一致。

Doctrine 事件是一個很好的例子來講明用事件圍繞你的實體的好處。可是你能夠想知道使用它們的問題是什麼。這是由於它們耦合到框架,它們是同步的,而且它們在你的應用程序級別上起做用,卻不是出於通訊的目的。因此這就是爲何儘管難以實施和處理,領域事件仍然很是有趣的緣由。

持久化領域事件

持久化事件老是一個好的想法。大家中的一些人能夠想知道爲何不能直接發佈領域事件到一個消息或者日誌系統。這是由於持久化它們有一些有趣的好處:

  • 你能夠經過 REST 接口暴露你的領域事件給其它限界上下文
  • 你能夠在推送領域事件和聚合變化到 RabbitMQ 持久化它們到同一數據庫事務。(你並不想發送未發生的事件通知,就像你並不但願錯過發生過的事件通知。)
  • 商務智能系統可使用這些數據來分析和預測趨勢。
  • 你能夠審覈你的實體變化。
  • 對於事件源(Event Souring)機制,你能夠從領域事件中重建聚合。

事件存儲

咱們在哪持久化領域事件?在一個事件存儲器(Event Store)。事件存儲器是一個領域事件倉儲,它做爲一個抽象(接口或抽象類)存在於咱們的領域空間中。它的職責是附帶領域事件並對進行查詢。一種可能的基本接口以下:

interface EventStore
{
    public function append(DomainEvent $aDomainEvent);

    public function allStoredEventsSince($anEventId);
}

然而,根據你領域事件的用途,上一個接口能夠有更多的方法來查詢事件。

在實現方面,你能夠決定使用 Doctrine Respository, DBAL,或者普通的 PDO。由於領域事件是不可變的,因此使用 Doctrine Repository 會加大沒必要要的性能損失,儘管對於中小型程序而言,Doctrine 可能還夠用。讓咱們看下 Doctrine 的可能實現:

class DoctrineEventStore extends EntityRepository implements EventStore
{
    private $serializer;

    public function append(DomainEvent $aDomainEvent)
    {
        $storedEvent = new StoredEvent(
            get_class($aDomainEvent),
            $aDomainEvent->occurredOn(),
            $this->serializer()->serialize($aDomainEvent, 'json')
        );
        $this->getEntityManager()->persist($storedEvent);
    }

    public function allStoredEventsSince($anEventId)
    {
        $query = $this->createQueryBuilder('e');
        if ($anEventId) {

            $query->where('e.eventId > :eventId');
            $query->setParameters(['eventId' => $anEventId]);
        }
        $query->orderBy('e.eventId');
        return $query->getQuery()->getResult();
    }

    private function serializer()
    {
        if (null === $this->serializer) {
            /** \JMS\Serializer\Serializer\SerializerBuilder */
            $this->serializer = SerializerBuilder::create()->build();
        }
        return $this->serializer;
    }
}

StoreEvent 須要 Doctrine 實體映射到數據庫。正如你所見,在附帶和持久化 Store 以後,是沒有 flush 方法調用的,若是這個操做在 Doctrine 事務內,那麼是沒必要要的。所以,咱們暫時擱置這裏,咱們會在應用服務一章中再深刻探討。

如今咱們來看 StoreEvent 的實現:

class StoredEvent implements DomainEvent
{
    private $eventId;
    private $eventBody;
    private $occurredOn;
    private $typeName;

    /**
     * @param string $aTypeName
     * @param \DateTimeImmutable $anOccurredOn
     * @param string $anEventBody
     */
    public function __construct(
        $aTypeName, \DateTimeImmutable $anOccurredOn, $anEventBody
    )
    {
        $this->eventBody = $anEventBody;
        $this->typeName = $aTypeName;
        $this->occurredOn = $anOccurredOn;
    }

    public function eventBody()
    {
        return $this->eventBody;
    }

    public function eventId()
    {
        return $this->eventId;
    }

    public function typeName()
    {
        return $this->typeName;
    }

    public function occurredOn()
    {
        return $this->occurredOn;
    }
}

下面是它的映射:

Ddd\Domain\Event\StoredEvent:
  type: entity
  table: event
  repositoryClass:
    Ddd\Infrastructure\Application\Notification\DoctrineEventStore
  id:
    eventId:
      type: integer
      column: event_id
      generator:
      strategy: AUTO
  fields:
    eventBody:
      column: event_body
      type: text
    typeName:
      column: type_name
      type: string
      length: 255
    occurredOn:
      column: occurred_on
      type: datetime

爲了用不一樣字段來持久化領域事件,咱們將不得不將這些字段鏈接成一個序列化的字符串。 typeName 字段說明領域事件的領域廣度。一個實體或者值對象在限界上下文裏纔有意義,但領域事件在限界上下文間定義了通信協議。

在分佈式系統中,會發生垃圾。你將不得不處理未發佈,在事務鏈中某個地方丟失或已屢次發佈的領域事務。這就是爲何必須使用 ID 持久化領域事件很重要,這能夠輕鬆跟蹤哪一個領域事件已經發布,哪一個已經丟失。

從領域模型中發佈事件

領域事件應該在它們表明的事實發生時發佈。例如,當一個新用戶已經註冊時,一個新的 UserRegistered 事件應該被髮布。

參考下面的報紙比喻:

  • 建模一個領域事件就像寫一篇新文章
  • 發佈一個領域事件就像把文章印刷到報紙上
  • 傳播一個領域事件就像投遞報紙,讓每一個人均可以讀到這篇文章

發佈領域事件推薦的方法就是使用一個簡單的監聽者 - 觀察者模式來實現 DomainEventPublisher

從實體中發佈事件

繼續用咱們應用中新用戶註冊的例子,咱們看看相應的領域事件是怎樣發佈的:

class User
{
    protected $userId;
    protected $email;
    protected $password;

    public function __construct(UserId $userId, $email, $password)
    {
        $this->setUserId($userId);
        $this->setEmail($email);
        $this->setPassword($password);
        DomainEventPublisher::instance()->publish(
            new UserRegistered($this->userId)
        );
    }
    // ...
}

如示例所示,用戶建立時,一個新的 UserRegistered 事件將發佈。這在實體的構造函數內完成,而不是外面。由於用這個方法,能夠輕鬆保持咱們領域的一致性;任何建立新用戶的客戶端都會發布相應的事件。另外一方面,這使得須要建立用戶實體而不使用其構造函數的基礎結構變得更加複雜。例如,Doctrine 使用序列化和反序列化技術來從新建立對象而不調用構造函數。然而,若是你必須建立本身的應用程序,這將不會像 Doctrine 那樣容易。

通常來說,從簡單數據(例如數組)構造對象稱爲水合(水化反應)。讓咱們看看一種簡單的方法來構建從數據庫中獲取的新用戶。首先,讓咱們經過應用工廠方法(Factory Method)模式將領域事件的發佈提取爲本身的方法。

根據 Wikipedia

模板方法模式是一種行爲設計模式,它在一個操做裏定義了一個算法的程序骨架,而將實現延遲到子步驟中

class User
{
    protected $userId;
    protected $email;
    protected $password;

    public function __construct(UserId $userId, $email, $password)
    {
        $this->setUserId($userId);
        $this->setEmail($email);
        $this->setPassword($password);
        $this->publishEvent();
    }

    protected function publishEvent()
    {
        DomainEventPublisher::instance()->publish(
            new UserRegistered($this->userId)
        );
    }
    // ...
}

如今,讓咱們用一個新的基礎架構實體來擴展當前的 User 類,該實體將爲咱們完成這項工做。這裏的小技巧是使 publishEvent 方法不執行任何操做,以便領域事件不會被髮布:

class CustomOrmUser extends User
{
    protected function publishEvent()
    {
    }

    public static function fromRawData($data)
    {
        return new self(
            new UserId($data['user_id']),
            $data['email'],
            $data['password']
        );
    }
}

記住要謹使用此方法。你可能會從持久化機制中得到無效的對象。由於領域規則老是在變化。另外一種不使用父構造函數的方法可能以下:

class CustomOrmUser extends User
{
    public function __construct()
    {
    }

    public static function fromRawData($data)
    {
        $user = new self();
        $user->userId = new UserId($data['user_id']);
        $user->email = $data['email'];
        $user->password = $data['password'];
        return $user;
    }
}

用這種方法,父構造函數不能被調用而且 User 的屬性必須被保護。其它的方法還有反射,在本色構造函數裏傳標識,使用諸如 Proxy-Manager 的代理庫,或者使用像 Doctrine 這樣的 ORM。

其它發佈領域事件的方法

正如你在以前的例子中所見,咱們使用了靜態類來發布領域事件。做爲替代方案,其餘人,尤爲是在使用事件源時,會建議在實體內用一個字段保存全部觸發的事件。爲了訪問全部事件,在聚合裏使用 getter 方法器。這也是一種有效的方法。可是,有時很難跟蹤哪些實體已觸發事件。在非實體的地方觸發事件也可能很困難,例如:領域服務。從好的方面來講,測試一個實體是否觸發了事件將容易得多。

從領域或者應用服務中發佈事件

你應該努力從更深層的事務鏈發佈領域事件。實體或值對象的內部越近越好。正如咱們在上一節中看到的,有時候這並不容易,但最終對於客戶端來講卻更簡單。咱們看到開發者從應用服務或者領域服務中發佈領域事件。這看起來更容易實現,但最終將致使貧血領域模型。這與在領域服務中推送業務邏輯而不是放到你的實體中沒有什麼不一樣。

Domain Event Publisher 是怎樣工做的

領域發佈發佈者(Domain Event Publisher)是一個單例類,來自於咱們須要發佈領域事件的限界上下文。它同時支付附加監聽器,Domain Event Subscriber 會監聽他們感興趣的任何領域事件。這與使用 on 方法的 jQuery 訂閱事件沒有太大差異:

class DomainEventPublisher
{
    private $subscribers;
    private static $instance = null;

    public static function instance()
    {
        if (null === static::$instance) {
            static::$instance = new static();
        }
        return static::$instance;
    }

    private function __construct()
    {
        $this->subscribers = [];
    }

    public function __clone()
    {
        throw new BadMethodCallException('Clone is not supported');
    }

    public function subscribe(
        DomainEventSubscriber $aDomainEventSubscriber
    )
    {
        $this->subscribers[] = $aDomainEventSubscriber;
    }

    public function publish(DomainEvent $anEvent)
    {
        foreach ($this->subscribers as $aSubscriber) {
            if ($aSubscriber->isSubscribedTo($anEvent)) {
                $aSubscriber->handle($anEvent);
            }
        }
    }
}

publish 方法經過全部可能的訂閱者,來檢查它們是否對發佈的領域事件感興趣。若是是,訂閱者的 handle 方法將被調用。

subscribe 方法添加一個新的 DomainEventSubscriber,它將監聽指定的領域事件類型:

interface DomainEventSubscriber
{
    /**
     * @param DomainEvent $aDomainEvent
     */
    public function handle($aDomainEvent);

    /**
     * * @param DomainEvent $aDomainEvent
     * @return bool
     */
    public function isSubscribedTo($aDomainEvent);
}

正如咱們已經討論過的,持久化全部領域事件是個好主意。咱們能夠在咱們的應用程序中經過使用指定的訂閱者來輕鬆地持久化全部已發佈的領域事件。咱們如今建立一個 DomainEventSubscriber,它會監聽全部領域事件,不管什麼類型,都會持久化到咱們的事件存儲器 (EventStore) 中。

class PersistDomainEventSubscriber implements DomainEventSubscriber
{
    private $eventStore;

    public function __construct(EventStore $anEventStore)
    {
        $this->eventStore = $anEventStore;
    }

    public function handle($aDomainEvent)
    {
        $this->eventStore->append($aDomainEvent);
    }

    public function isSubscribedTo($aDomainEvent)
    {
        return true;
    }
}

$eventStore 能夠是自定義的 Doctrine Repository, 或者正如所看到的其它有能力持久化DomainEvents 到數據庫的對象。

設置領域事件監聽者

設置 DomainEventPublisher 訂閱者最好的地方是哪裏?這看須要。對於可能影響整個請求週期的全局訂閱者,最好的位置多是 DomainEventPublisher 自身初始化的地方。對於受特殊應用服務影響的訂閱者,服務實例化的地方多是個更好的選擇。讓咱們來看一個使用 Silex 的例子。

Silex 裏,註冊 DomainEventPublisher 最好的方法就是經過使用一個應用中間件持久化全部領域事件。根據 Silex 2.0 Documentation:

一個 before 應用中間件容許你在 controller 執行前調整請求。

這是訂閱負責將這些事件持久化到數據庫的監聽器的正確位置,這些事件將在之後發送到 RabbitMQ:

// ...
$app['em'] = $app->share(function () {
    return (new EntityManagerFactory())->build();
});
$app['event_repository'] = $app->share(function ($app) {
    return $app['em']->getRepository(
        'Ddd\Domain\Model\Event\StoredEvent'
    );
});
$app['event_publisher'] = $app->share(function ($app) {
    return DomainEventPublisher::instance();
});
$app->before(
    function (Symfony\Component\HttpFoundation\Request $request)
    use ($app) {
        $app['event_publisher']->subscribe(
            new PersistDomainEventSubscriber(
                $app['event_repository']
            )
        );
    }
);

使用此設置,每次聚合發佈領域事件時,它將被持久化到數據庫中。任務完成。

練習

若是你使用 Symfony, Laravel, 或者其它 PHP 框架,找到一種方法,來訂閱全局指定訂閱者,圍繞你的領域事件執行任務。

若是你要在請求即將完成時對全部領域事件執行任何操做,則能夠建立一個監聽器,該監聽器將全部已發佈的的領域事件存儲在內存中。若是你添加一個 getter 訪問器到這個監聽器,來返回全部領域事件,則能夠決定要作什麼。如前文所建議,若是你不想或沒法持久化事件到同一事務,這將很是有用。

測試領域事件

你已經知道了如何發佈領域事件,但你怎樣對此作單元測試並確保 UserRegistered 真的被觸發?最簡單的方法就是,咱們建議用一個指定的 EventListener,它被看成一個 Spy 來記錄領域事件是否發佈。讓咱們看看 User 實體的單元測試例子:

use Ddd\Domain\DomainEventPublisher;
use Ddd\Domain\DomainEventSubscriber;

class UserTest extends \PHPUnit_Framework_TestCase
{
// ...
    /**
     * @test
     */
    public function itShouldPublishUserRegisteredEvent()
    {
        $subscriber = new SpySubscriber();
        $id = DomainEventPublisher::instance()->subscribe($subscriber);
        $userId = new UserId();
        new User($userId, 'valid@email.com', 'password');
        DomainEventPublisher::instance()->unsubscribe($id);
        $this->assertUserRegisteredEventPublished($subscriber, $userId);
    }

    private function assertUserRegisteredEventPublished(
        $subscriber, $userId
    )
    {
        $this->assertInstanceOf(
            'UserRegistered', $subscriber->domainEvent
        );
        $this->assertTrue(
            $subscriber->domainEvent->serId()->equals($userId)
        );
    }
}

class SpySubscriber implements DomainEventSubscriber
{
    public $domainEvent;

    public function handle($aDomainEvent)
    {
        $this->domainEvent = $aDomainEvent;
    }

    public function isSubscribedTo($aDomainEvent)
    {
        return true;
    }
}

對於上面有一些替代方案。你能夠爲 DomainEventPublisher 或者某些反射框架使用靜態 setter 來檢測調用。不過,咱們認爲咱們分享的方法更爲天然。最後但並不是最不重要的一點就是,請記住清理 Spy 訂閱。以避免影響其餘單元測試的執行。

廣播事件給遠程限界上下文

爲了將一組領域事件傳達給本地或者遠程限界上下文,主要有兩種策略:消息或者 REST API。第一個方法是使用諸如 RabbitMQ 之類的消息系統來傳輸領域事件。第二個應時建立一個 REST API,來訪問特定上下文的領域事件。

消息中間件

隨着全部領域事件持久化到數據庫中,惟一剩下的事情就是將它們推送到咱們最喜歡的消息系統中。咱們更喜歡 RabbitMQ,不過其餘任何系統(例如 ActiveMQ 或者 ZeroMQ)都能任務。要使用 PHP 整合 RabbitMQ,沒有不少選擇,但 php-amqplib 能夠完成這項工做。

首先, 咱們須要一種可以將持久化的領域事件發送 RabbitMQ 的服務。你能夠想要爲全部事件而查詢 EventStore,併發送每一個事件,這不是壞事。然而,咱們能夠屢次推送同一領域事件,一般來講,咱們須要將從新發布的領域事件減小到最少。若是重發的領域事件爲0,那就更好了。爲了避免重發領域事件,咱們須要某種組件來跟蹤哪些領域事件已經被推送,哪些仍然殘餘。最後但並不是最不重要的一點就是,一旦咱們知道必須推送哪些領域事件,就將它們發送,並追蹤發佈到消息系統中的最後一個事件。讓咱們看一下該服務的可能實現:

class NotificationService
{
    private $serializer;
    private $eventStore;
    private $publishedMessageTracker;
    private $messageProducer;

    public function __construct(
        EventStore $anEventStore,
        PublishedMessageTracker $aPublishedMessageTracker,
        MessageProducer $aMessageProducer,
        Serializer $aSerializer
    )
    {
        $this->eventStore = $anEventStore;
        $this->publishedMessageTracker = $aPublishedMessageTracker;
        $this->messageProducer = $aMessageProducer;
        $this->serializer = $aSerializer;
    }

    /**
     * @return int
     */
    public function publishNotifications($exchangeName)
    {
        $publishedMessageTracker = $this->publishedMessageTracker();
        $notifications = $this->listUnpublishedNotifications(
            $publishedMessageTracker
                ->mostRecentPublishedMessageId($exchangeName)
        );
        if (!$notifications) {
            return 0;
        }
        $messageProducer = $this->messageProducer();
        $messageProducer->open($exchangeName);
        try {
            $publishedMessages = 0;
            $lastPublishedNotification = null;
            foreach ($notifications as $notification) {
                $lastPublishedNotification = $this->publish(
                    $exchangeName,
                    $notification,
                    $messageProducer
                );
                $publishedMessages++;
            }
        } catch (\Exception $e) {
// Log your error (trigger_error, Monolog, etc.)
        }
        $this->trackMostRecentPublishedMessage(
            $publishedMessageTracker,
            $exchangeName,
            $lastPublishedNotification
        );
        $messageProducer->close($exchangeName);
        return $publishedMessages;
    }

    protected function publishedMessageTracker()
    {
        return $this->publishedMessageTracker;
    }

    /**
     * @return StoredEvent[]
     */
    private function listUnpublishedNotifications(
        $mostRecentPublishedMessageId
    )
    {
        return $this
            ->eventStore()
            ->allStoredEventsSince($mostRecentPublishedMessageId);
    }

    protected function eventStore()
    {
        return $this->eventStore;
    }

    private function messageProducer()
    {
        return $this->messageProducer;
    }

    private function publish(
        $exchangeName,
        StoredEvent $notification,
        MessageProducer $messageProducer
    )
    {
        $messageProducer->send(
            $exchangeName,
            $this->serializer()->serialize($notification, 'json'),
            $notification->typeName(),
            $notification->eventId(),
            $notification->occurredOn()
        );
        return $notification;
    }

    private function serializer()
    {
        return $this->serializer;
    }

    private function trackMostRecentPublishedMessage(
        PublishedMessageTracker $publishedMessageTracker,
        $exchangeName,
        $notification
    )
    {
        $publishedMessageTracker->trackMostRecentPublishedMessage(
            $exchangeName, $notification
        );
    }
}

NotificationService 依賴三個接口。咱們已經看到 EventStore,它主要負責增長和查詢領域事件。第二個是 PublishedMessageTracker,主要用來追蹤已推送的消息。第三個就是 MessageProducer,一個表示咱們消息系統的接口:

interface PublishedMessageTracker
{
    /**
     * @param string $exchangeName
     * @return int
     */
    public function mostRecentPublishedMessageId($exchangeName);

    /**
     * @param string $exchangeName
     * @param StoredEvent $notification
     */
    public function trackMostRecentPublishedMessage(
        $exchangeName, $notification
    );
}

mostRecentPublishedMessageId 方法返回 最後發佈消息的 ID,所以這個過程能夠從下一次開始。trackMostRecentPublishedMessage 負責追蹤哪一個消息是最後發送的,目的是在你可能須要時重發消息。exchangeName 表明咱們將要把領域事件發往的通訊頻道。讓咱們看看一個 Doctrine 實現的 PublishedMessageTracker

class DoctrinePublishedMessageTracker extends EntityRepository\
implements PublishedMessageTracker
{
    /**
     * @param $exchangeName
     * @return int
     */
    public function mostRecentPublishedMessageId($exchangeName)
    {
        $messageTracked = $this->findOneByExchangeName($exchangeName);
        if (!$messageTracked) {
            return null;
        }
        return $messageTracked->mostRecentPublishedMessageId();
    }

    /**
     * @param $exchangeName
     * @param StoredEvent $notification
     */
    public function trackMostRecentPublishedMessage(
        $exchangeName, $notification
    )
    {
        if (!$notification) {
            return;
        }
        $maxId = $notification->eventId();
        $publishedMessage = $this->findOneByExchangeName($exchangeName);
        if (null === $publishedMessage) {
            $publishedMessage = new PublishedMessage(
                $exchangeName,
                $maxId
            );
        }
        $publishedMessage->updateMostRecentPublishedMessageId($maxId);
        $this->getEntityManager()->persist($publishedMessage);
        $this->getEntityManager()->flush($publishedMessage);
    }
}

這裏的代碼很是簡單明瞭。咱們惟一須要的極端狀況就是,系統還沒有發佈任何領域事件。

爲何是交換機名稱?

咱們將在第 12 章集成限界上下文一章中更爲詳細地介紹這一點。可是,當系統正在運行而且新的限界上下文開始起做用時,你可能會對全部領域事件重發到新的限界上下文感興趣。所以,跟蹤上一次發佈的領域事件及其改善的頻道可能會在之後派上用場。

爲了跟蹤已發佈的領域事件,咱們須要一個交換機名稱一個通知 ID。下面是一種可能的實現:

class PublishedMessage
{
    private $mostRecentPublishedMessageId;
    private $trackerId;
    private $exchangeName;

    /**
     * @param string $exchangeName
     * @param int $aMostRecentPublishedMessageId
     */
    public function __construct(
        $exchangeName, $aMostRecentPublishedMessageId
    )
    {
        $this->mostRecentPublishedMessageId =
            $aMostRecentPublishedMessageId;
        $this->exchangeName = $exchangeName;
    }

    public function mostRecentPublishedMessageId()
    {
        return $this->mostRecentPublishedMessageId;
    }

    public function updateMostRecentPublishedMessageId($maxId)
    {
        $this->mostRecentPublishedMessageId = $maxId;
    }

    public function trackerId()
    {
        return $this->trackerId;
    }
}

這是其對應的映射關係:

Ddd\Domain\Event\PublishedMessage:
  type: entity
  table: event_published_message_tracker
  repositoryClass:
    Ddd\Infrastructure\Application\Notification\
    DoctrinePublished\MessageTracker
  id:
  trackerId:
    column: tracker_id
    type: integer
    generator:
    strategy: AUTO
  fields:
    mostRecentPublishedMessageId:
      column: most_recent_published_message_id
      type: bigint
    exchangeName:
      type: string
      column: exchange_name

如今,讓咱們看看 MessageProducer 接口用來作什麼的,以及它的實現細節:

interface MessageProducer
{
    public function open($exchangeName);

    /**
     * @param $exchangeName
     * @param string $notificationMessage
     * @param string $notificationType
     * * @param int $notificationId
     * @param \DateTimeImmutable $notificationOccurredOn
     * @return
     */
    public function send(
        $exchangeName,
        $notificationMessage,
        $notificationType,
        $notificationId,
        \DateTimeImmutable $notificationOccurredOn
    );

    public function close($exchangeName);
}

簡單!openclose 方法打開和關閉一個消息系統鏈接。send 方法攜帶一個消息體(消息名稱及消息 ID),併發送到咱們的消息引擎,而不用關心它是什麼。由於咱們選擇的是 RabbitMQ,咱們須要實現鏈接及發送過程:

abstract class RabbitMqMessaging
{
    protected $connection;
    protected $channel;

    public function __construct(AMQPConnection $aConnection)
    {
        $this->connection = $aConnection;
        $this->channel = null;
    }

    private function connect($exchangeName)
    {
        if (null !== $this->channel) {
            return;
        }
        $channel = $this->connection->channel();
        $channel->exchange_declare(
            $exchangeName, 'fanout', false, true, false
        );
        $channel->queue_declare(
            $exchangeName, false, true, false, false
        );
        $channel->queue_bind($exchangeName, $exchangeName);
        $this->channel = $channel;
    }

    public function open($exchangeName)
    {
    }

    protected function channel($exchangeName)
    {
        $this->connect($exchangeName);
        return $this->channel;
    }

    public function close($exchangeName)
    {
        $this->channel->close();
        $this->connection->close();
    }
}

class RabbitMqMessageProducer
    extends RabbitMqMessaging
    implements MessageProducer
{
    /**
     * @param $exchangeName
     * @param string $notificationMessage
     * @param string $notificationType
     * @param int $notificationId
     * @param \DateTimeImmutable $notificationOccurredOn
     */
    public function send(
        $exchangeName,
        $notificationMessage,
        $notificationType,
        $notificationId,
        \DateTimeImmutable $notificationOccurredOn
    )
    {
        $this->channel($exchangeName)->basic_publish(
            new AMQPMessage(
                $notificationMessage,
                [
                    'type' => $notificationType,
                    'timestamp' => $notificationOccurredOn->getTimestamp(),
                    'message_id' => $notificationId
                ]
            ),
            $exchangeName
        );
    }
}

如今咱們有了一個 DomainService,能夠將領域事件推送到 RabbitMQ 這樣的消息系統中,是時候執行它們了。咱們須要選擇一種交付機制來運行服務。咱們我的建議是建立一個 Symfony Console 命令:

class PushNotificationsCommand extends Command
{
    protected function configure()
    {
        $this
            ->setName('domain:events:spread')
            ->setDescription('Notify all domain events via messaging')
            ->addArgument(
                'exchange-name',
                InputArgument::OPTIONAL,
                'Exchange name to publish events to',
                'my-bc-app'
            );
    }

    protected function execute(
        InputInterface $input, OutputInterface $output
    )
    {
        $app = $this->getApplication()->getContainer();
        $numberOfNotifications =
            $app['notification_service']
                ->publishNotifications(
                    $input->getArgument('exchange-name')
                );
        $output->writeln(
            sprintf(
                '<comment>%d</comment>' .
                '<info>notification(s) sent!</info>',
                $numberOfNotifications
            )
        );
    }
}

按照這個 Silex 例子,讓咱們看看定義在 Silex Pimple Service Container 中的 $app['notification_service'] 的定義:

// ...
$app['event_store'] = $app->share(function ($app) {
    return $app['em']->getRepository('Ddd\Domain\Event\StoredEvent');
});
$app['message_tracker'] = $app->share(function ($app) {
    return $app['em']
        ->getRepository('Ddd\Domain\Event\Published\Message');
});
$app['message_producer'] = $app->share(function () {
    return new RabbitMqMessageProducer(
        new AMQPStreamConnection('localhost', 5672, 'guest', 'guest')
    );
});
$app['message_serializer'] = $app->share(function () {
    return SerializerBuilder::create()->build();
});
$app['notification_service'] = $app->share(function ($app) {
    return new NotificationService(
        $app['event_store'],
        $app['message_tracker'],
        $app['message_producer'],
        $app['message_serializer']
    );
});
//...

用 REST 同步領域服務

有了消息傳統中已經實現的 EventStore,應該很容易添加一些分佈功能,領域查詢事件以及渲染 JSON 或者 XML 表述,以發佈 REST API。爲何這麼有趣?嗯,分佈式系統使用消息中間件必須面對許多不一樣的問題,例如消息未到達,消息重複到達,或消息到達失序。這就是爲何須要一個 API 來發布你的領域事件,以便其它限界上下文能夠要求一些缺失信息的緣由。僅做爲示例,考慮一個 /event 端點的 HTTP 請求。一個可能的實現以下:

[
  {
    "id": 1,
    "version": 1,
    "typeName": "Lw\\Domain\\Model\\User\\UserRegistered",
    "eventBody": {
      "user_id": {
        "id": "459a4ffc-cd57-4cf0-b3a2-0f2ccbc48234"
      }
    },
    "occurredOn": {
      "date": "2016-05-26 06:06:07.000000",
      "timezone_type": 3,
      "timezone": "UTC"
    }
  },
  {
    "id": 2,
    "version": 2,
    "typeName": "Lw\\Domain\\Model\\Wish\\WishWasMade",
    "eventBody": {
      "wish_id": {
        "id": "9e90435a-395c-46b0-b4c4-d4b769cbf201"
      },
      "user_id": {
        "id": "459a4ffc-cd57-4cf0-b3a2-0f2ccbc48234"
      },
      "address": "john@example.com",
      "content": "This is my new wish!"
    },
    "occurredOn": {
      "date": "2016-05-26 06:06:27.000000",
      "timezone_type": 3,
      "timezone": "UTC"
    },
    "timeTaken": "650"
  }
  //...
]

如你在前面的示例中所見,咱們在一個 JSON REST API 中暴露一組領域事件。在輸出示例中,你能夠看到一個關於每一個領域事件的 JSON 表述。這有一些有趣的要點。首先是 version 字段。有時你的領域事件會發展:它們會包含更多字段,它們會改變某些現有字段的行爲,或者會刪除某些現有字段。這就是在領域事件中添加 version 字段很重要的緣由。若是其餘限界上下文正在監聽此類事件,則它們可使用 version 字段以不一樣方式解析領域事件。在對 REST API 進行版本控制時,你也可能會遇到相同的問題。

另一個就是名稱。若是你想使用領域事件的類名,那麼大多數狀況下均可以。問題是當團隊因爲重構而決定更改類名時,在這種狀況下,全部監聽該名稱的限界上下文都將中止工做。僅當你在同一隊列中發佈不一樣領域事件時,纔會出現此問題。若是你將每一個領域事件類型發佈到不一樣的隊列中,則不是真正的問題,但若是你選擇這種方法,那麼將面臨一系列不一樣的問題,例如接收無序事件。像許多其餘狀況下同樣,這須要權衡。咱們強烈建議你閱讀 **《企業集成模式:設計,構建和部署消息系統解決方案》。在這本書裏,你將學習使用異步方法集成多個應用程序的不一樣模式。因爲領域事件在集成頻道發送消息,所以全部消息模式都適用於它們。

練習

考慮爲領域事件使用 REST API 的利弊。考慮限界上下文耦合。你也能夠爲你當前的應用實現 REST API。

小結

咱們看到了使用基本接口建模一個合適的領域事件的技巧,也瞭解到在何處發佈領域事件(越接近實體越好),而且瞭解到將這些領域事件傳播到本地和遠程限界上下文的策略。如今,剩下的惟一事情就是在消息系統中監聽通知,讀取通知,並執行相應的應用服務或命令。咱們將在第 12 章,集成有限上下文 和第 5 章,服務 中看到如何執行此操做。

相關文章
相關標籤/搜索