使用領域事件來捕獲發生在領域中的一些事情。git
領域驅動實踐者發現他們能夠經過了解更多發生在問題域中的事件,來更好的理解問題域。這些事件,就是領域事件,主要是與領域專家一塊兒進行知識提煉環節中得到。redis
領域事件,能夠用於一個限界上下文內的領域模型,也可使用消息隊列在限界上下文間進行異步通訊。spring
領域事件是領域專家所關心的發生在領域中的一些事件。數據庫
將領域中所發生的活動建模成一系列離散事件。每一個事件都用領域對象表示。領域事件是領域模型的組成部分,表示領域中所發生的事情。設計模式
領域事件的主要用途:緩存
領域事件表示已經發生的某種事實,該事實在發生後便不會改變。所以,領域事件一般建模成值對象。bash
但,這也有特殊的狀況,爲了迎合序列化和反序列化框架需求,在建模時,常常會進行必定的妥協。架構
在建模領域事件時,咱們應該根據限界上下文中的通用語言來命名事件。app
若是事件由聚合上的命令操做產生,一般根據該操做方法的名字來命名事件。事件名字代表聚合上的命令方法在執行成功後的事實。即事件命名須要反映過去發生過的事情。框架
public class AccountEnabledEvent extends AbstractAggregateEvent<Long, Account> {
public AccountEnabledEvent(Account source) {
super(source);
}
}
複製代碼
事件的屬性主要用於驅動後續業務流程。固然,也會擁有一些通用屬性。
事件具備一些通用屬性,如:
通用屬性可使用事件接口來規範。
接口或類 | 含義 |
---|---|
DomainEvent | 通用領域事件接口 |
AggregateEvent | 由聚合發佈的通用領域事件接口 |
AbstractDomainEvent | DomainEvent 實現類,維護 id 和 建立時間 |
AbstractAggregateEvent | AggregateEvent 實現類,繼承子 AbstractDomainEvent,並添加 source 屬性 |
但,事件最主要的仍是業務屬性。咱們須要考慮,是誰致使事件的發生,這可能涉及產生事件的聚合或其餘參與該操做的聚合,也多是其餘任何類型的操做數據。
事件是事實的描述,自己不會有太多的業務操做。
領域事件一般被設計爲不變對象,事件所攜帶的數據已經反映出該事件的來源。事件構造函數完成狀態初始化,同時提供屬性的 getter 方法。
這裏須要注意的是事件惟一標識,一般狀況下,事件是不可變的,那爲何會涉及惟一標識的概念呢?
對於從聚合中發佈出來的領域事件,使用事件的名稱、產生事件的標識、事件發生的時間等足以對不一樣的事件進行區分。但,這樣會增長事件比較的複雜性。
對於由調用方發佈的事件,咱們將領域事件建模成聚合,能夠直接使用聚合的惟一標識做爲事件的標識。
事件惟一標識的引入,會大大減小事件比較的複雜性。但,其最大的意義在於限界上下文的集成。
當咱們須要將領域事件發佈到外部的限界上下文時,惟一標識就是一種必然。爲了保證事件投遞的冪等性,在發送端,咱們可能會進行屢次發送嘗試,直至明確發送成功爲止;而在接收端,當接收到事件後,須要對事件進行重複性檢測,以保障事件處理的冪等性。此時,事件的惟一標識即可以做爲事件去重的依據。
事件惟一標識,自己對領域建模影響不大,但對技術處理好處巨大。所以,將它做爲通用屬性進行管理。
咱們如何避免領域事件與處理者間的耦合呢?
一種簡單高效的方式即是使用觀察者模式,這種模式能夠在領域事件和外部組件間進行解耦。
爲了統一,咱們須要定義了一套接口和實現類,以基於觀察者模式,完成事件的發佈。
涉及接口和實現類以下:
接口或類 | 含義 |
---|---|
DomainEventPublisher | 用於發佈領域事件 |
DomainEventHandlerRegistry | 用於註冊 DomainEventHandler |
DomainEventBus | 擴展自 DomainEventPublisher 和 DomainEventHandlerRegistry 用於發佈和管理領域事件處理器 |
DefaultDomainEventBus | DomainEventBus 默認實現 |
DomainEventHandler | 用於處理領域事件 |
DomainEventSubscriber | 用於判斷是否接受領域事件 |
DomainEventExecutor | 用於執行領域事件處理器 |
使用實例如 DomainEventBusTest 所示:
public class DomainEventBusTest {
private DomainEventBus domainEventBus;
@Before
public void setUp() throws Exception {
this.domainEventBus = new DefaultDomainEventBus();
}
@After
public void tearDown() throws Exception {
this.domainEventBus = null;
}
@Test
public void publishTest(){
// 建立事件處理器
TestEventHandler eventHandler = new TestEventHandler();
// 註冊事件處理器
this.domainEventBus.register(TestEvent.class, eventHandler);
// 發佈事件
this.domainEventBus.publish(new TestEvent("123"));
// 檢測事件處理器是夠運行
Assert.assertEquals("123", eventHandler.data);
}
@Value
class TestEvent extends AbstractDomainEvent{
private String data;
}
class TestEventHandler implements DomainEventHandler<TestEvent>{
private String data;
@Override
public void handle(TestEvent event) {
this.data = event.getData();
}
}
}
複製代碼
在構建完發佈訂閱結構後,須要將其與領域模型進行關聯。領域模型如何獲取 Publisher,事件處理器如何進行訂閱。
比較經常使用的方案即是將 DomainEventBus 綁定到線程上下文。這樣,只要是同一調用線程均可以方便的獲取 DomainEventBus 對象。
具體的交互以下:
DomainEventBusHolder 用於管理 DomainEventBus。
public class DomainEventBusHolder {
private static final ThreadLocal<DomainEventBus> THREAD_LOCAL = new ThreadLocal<DomainEventBus>(){
@Override
protected DomainEventBus initialValue() {
return new DefaultDomainEventBus();
}
};
public static DomainEventPublisher getPubliser(){
return THREAD_LOCAL.get();
}
public static DomainEventHandlerRegistry getHandlerRegistry(){
return THREAD_LOCAL.get();
}
public static void clean(){
THREAD_LOCAL.remove();
}
}
複製代碼
Account 的 enable 直接使用 DomainEventBusHolder 進行發佈。
public class Account extends JpaAggregate {
public void enable(){
AccountEnabledEvent event = new AccountEnabledEvent(this);
DomainEventBusHolder.getPubliser().publish(event);
}
}
public class AccountEnabledEvent extends AbstractAggregateEvent<Long, Account> {
public AccountEnabledEvent(Account source) {
super(source);
}
}
複製代碼
AccountApplication 完成訂閱器註冊以及業務方法調用。
public class AccountApplication extends AbstractApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(AccountApplication.class);
@Autowired
private AccountRepository repository;
public void enable(Long id){
// 清理以前綁定的 Handler
DomainEventBusHolder.clean();
// 註冊 EventHandler
AccountEnableEventHandler enableEventHandler = new AccountEnableEventHandler();
DomainEventBusHolder.getHandlerRegistry().register(AccountEnabledEvent.class, enableEventHandler);
Optional<Account> accountOptional = repository.getById(id);
if (accountOptional.isPresent()) {
Account account = accountOptional.get();
// enable 使用 DomainEventBusHolder 直接發佈事件
account.enable();
repository.save(account);
}
}
class AccountEnableEventHandler implements DomainEventHandler<AccountEnabledEvent>{
@Override
public void handle(AccountEnabledEvent event) {
LOGGER.info("handle enable event");
}
}
}
複製代碼
先將事件緩存在實體中,在實體狀態成功持久化到存儲後,再進行事件發佈。
具體交互以下:
實例代碼以下:
public class Account extends JpaAggregate {
public void enable(){
AccountEnabledEvent event = new AccountEnabledEvent(this);
registerEvent(event);
}
}
複製代碼
Account 的 enable 方法,調用 registerEvent 對事件進行註冊。
@MappedSuperclass
public abstract class AbstractAggregate<ID> extends AbstractEntity<ID> implements Aggregate<ID> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAggregate.class);
@JsonIgnore
@QueryTransient
@Transient
@org.springframework.data.annotation.Transient
private final transient List<DomainEventItem> events = Lists.newArrayList();
protected void registerEvent(DomainEvent event) {
events.add(new DomainEventItem(event));
}
protected void registerEvent(Supplier<DomainEvent> eventSupplier) {
this.events.add(new DomainEventItem(eventSupplier));
}
@Override
@JsonIgnore
public List<DomainEvent> getEvents() {
return Collections.unmodifiableList(events.stream()
.map(eventSupplier -> eventSupplier.getEvent())
.collect(Collectors.toList()));
}
@Override
public void cleanEvents() {
events.clear();
}
private class DomainEventItem {
DomainEventItem(DomainEvent event) {
Preconditions.checkArgument(event != null);
this.domainEvent = event;
}
DomainEventItem(Supplier<DomainEvent> supplier) {
Preconditions.checkArgument(supplier != null);
this.domainEventSupplier = supplier;
}
private DomainEvent domainEvent;
private Supplier<DomainEvent> domainEventSupplier;
public DomainEvent getEvent() {
if (domainEvent != null) {
return domainEvent;
}
DomainEvent event = this.domainEventSupplier != null ? this.domainEventSupplier.get() : null;
domainEvent = event;
return domainEvent;
}
}
}
複製代碼
registerEvent 方法在 AbstractAggregate 中,registerEvent 方法將事件保存到 events 集合,getEvents 方法獲取全部事件,cleanEvents 方法清理緩存的事件。
Application 實例以下:
@Service
public class AccountApplication extends AbstractApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(AccountApplication.class);
@Autowired
private AccountRepository repository;
@Autowired
private DomainEventBus domainEventBus;
@PostConstruct
public void init(){
// 使用 Spring 生命週期註冊事件處理器
this.domainEventBus.register(AccountEnabledEvent.class, new AccountEnableEventHandler());
}
public void enable(Long id){
Optional<Account> accountOptional = repository.getById(id);
if (accountOptional.isPresent()) {
Account account = accountOptional.get();
// enable 將事件緩存在 account 中
account.enable();
repository.save(account);
List<DomainEvent> events = account.getEvents();
if (!CollectionUtils.isEmpty(events)){
// 成功持久化後,對事件進行發佈
this.domainEventBus.publishAll(events);
}
}
}
class AccountEnableEventHandler implements DomainEventHandler<AccountEnabledEvent>{
@Override
public void handle(AccountEnabledEvent event) {
LOGGER.info("handle enable event");
}
}
}
複製代碼
AccountApplication 的 init 方法完成事件監聽器的註冊,enable 方法在實體成功持久化後,將緩存的事件經過 DomainEventBus 實例 publish 出去。
一般狀況下,領域事件是由聚合的命令方法產生,並在命令方法執行成功後,進行事件的發佈。 有時,領域事件並非聚合中的命令方法產生的,而是由用戶所發生的請求產生。
此時,咱們須要將領域事件建模成一個聚合,而且擁有本身的資源庫。但,因爲領域事件表示的是過去發生的事情,所以資源庫只作追加操做,不能對事件進行修改和刪除功能。
例如,對用戶點擊事件進行發佈。
@Entity
@Data
public class ClickAction extends JpaAggregate implements DomainEvent {
@Setter(AccessLevel.PRIVATE)
private Long userId;
@Setter(AccessLevel.PRIVATE)
private String menuId;
public ClickAction(Long userId, String menuId){
Preconditions.checkArgument(userId != null);
Preconditions.checkArgument(StringUtils.isNotEmpty(menuId));
setUserId(userId);
setMenuId(menuId);
}
@Override
public String id() {
return String.valueOf(getId());
}
@Override
public Date occurredOn() {
return getCreateTime();
}
}
複製代碼
ClickAction 繼承自 JpaAggregate 實現 DomainEvent 接口,並重寫 id 和 occurredOn 方法。
@Service
public class ClickActionApplication extends AbstractApplication {
@Autowired
private ClickActionRepository repository;
@Autowired
private DomainEventBus domainEventBus;
public void clickMenu(Long id, String menuId){
ClickAction clickAction = new ClickAction(id, menuId);
clickAction.prePersist();
this.repository.save(clickAction);
domainEventBus.publish(clickAction);
}
}
複製代碼
ClickActionApplication 在成功保存 ClickAction 後,使用 DomainEventBus 對事件進行發佈。
由什麼組件向領域事件註冊訂閱器呢?大多數請求,由應用服務完成,有時也能夠由領域服務進行註冊。
因爲應用服務是領域模型的直接客戶,它是註冊領域事件訂閱器的理想場所,即在應用服務調用領域方法以前,就完成了對事件的訂閱。
基於 ThreadLocal 進行訂閱:
public void enable(Long id){
// 清理以前綁定的 Handler
DomainEventBusHolder.clean();
// 註冊 EventHandler
AccountEnableEventHandler enableEventHandler = new AccountEnableEventHandler();
DomainEventBusHolder.getHandlerRegistry().register(AccountEnabledEvent.class, enableEventHandler);
Optional<Account> accountOptional = repository.getById(id);
if (accountOptional.isPresent()) {
Account account = accountOptional.get();
// enable 使用 DomainEventBusHolder 直接發佈事件
account.enable();
repository.save(account);
}
}
複製代碼
基於實體緩存進行訂閱:
@PostConstruct
public void init(){
// 使用 Spring 生命週期註冊事件處理器
this.domainEventBus.register(AccountEnabledEvent.class, new AccountEnableEventHandler());
}
public void enable(Long id){
Optional<Account> accountOptional = repository.getById(id);
if (accountOptional.isPresent()) {
Account account = accountOptional.get();
// enable 將事件緩存在 account 中
account.enable();
repository.save(account);
List<DomainEvent> events = account.getEvents();
if (!CollectionUtils.isEmpty(events)){
// 成功持久化後,對事件進行發佈
this.domainEventBus.publishAll(events);
}
}
}
複製代碼
完成事件發佈後,讓咱們一塊兒看下事件處理。
咱們一般將領域事件用於維護模型的一致性。在聚合建模中有一個原則,就是在一個事務中,只能對一個聚合進行修改,由此產生的變化必須在獨立的事務中運行。
在這種狀況下,須要謹慎處理的事務的傳播性。
應用服務控制着事務。不要在事件通知過程當中修改另外一個聚合實例,由於這樣會破壞聚合的一大原則:在一個事務中,只能對一個聚合進行修改。
對於簡單場景,咱們可使用特殊的事務隔離策略對聚合的修改進行隔離。具體流程以下:
但,最佳方案是使用異步處理。及每個定義方都在各自獨立的事務中修改額外的聚合實例。
事件訂閱方不該該在另外一個聚合上執行命令方法,由於這樣將破壞「在單個事務中只修改單個聚合實例」的原則。全部聚合實例間的最終一致性必須經過異步方式處理。
詳見,異步處理領域事件。
批處理過程一般須要複雜的查詢,而且須要龐大的事務支持。若是在接收到領域事件時,系統就當即處理,業務需求不只獲得了更快的知足,並且杜絕了批處理操做。
在系統的非高峯時期,一般使用批處理進行一些系統的維護,好比刪除過時數據、建立新的對象、通知用戶、更新統計信息等。這些批處理每每須要複雜的查詢,並須要龐大的事務支持。
若是咱們監聽系統中的領域事件,在接收領域事件時,系統當即處理。這樣,本來批量集中處理的過程就被分散成許多小的處理單元,業務須要也能更快的知足,用戶能夠能夠及時的進行下一步操做。
對於單個限界上下文中的全部領域事件,爲它們維護一個事件存儲具備不少的好處。
對事件進行存儲能夠:
事件存儲是個比較大的課題,將有專門章節進行講解。
基於領域事件的限界上下文集成,主要由消息隊列和 REST 事件兩種模式。
在此,重心講解基於消息隊列的上下文集成。
在不一樣的上下文中採用消息系統時,咱們必須保證最終一致性。在這種狀況下,咱們至少須要在兩種存儲之間保存最終一致性:領域模型所使用的存儲和消息隊列所使用的持久化存儲。咱們必須保證在持久化領域模型時,對於的事件也已經成功發佈。若是兩種不一樣步,模型可能會處於不正確的狀態。
通常狀況下,有三種方式:
通常狀況下,第三種,是比較優雅的解決方案。
在一致性要求不高時,能夠經過領域事件訂閱器直接向消息隊列發送事件。具體流程以下:
對一致性要求高時,須要先將事件存儲,而後經過後臺線程加載並分發到消息隊列。具體流程以下:
領域事件能夠與異步工做流程協同,包括限界上下文間使用消息隊列進行異步通訊。固然,在同一個限界上下文中,也能夠啓動異步處理流程。
做爲事件的發佈者,不該關心是否執行異步處理。異常處理是由事件執行者決定。
DomainEventExecutor 提供對異步處理的支持。
DomainEventExecutor eventExecutor =
new ExecutorBasedDomainEventExecutor("EventHandler", 1, 100);
this.domainEventBus.register(AccountEnabledEvent.class,
eventExecutor,
new AccountEnableEventHandler());
複製代碼
異步處理,就意味着放棄數據庫事務的 ACID 特性,而選擇使用最終一致性。
使用領域事件時須要對事件進行區分,以免技術實現的問題。
認識內部事件和外部事件之間的區別相當重要。
通常狀況下,在典型的業務用例中,可能會有不少的內部事件,而只有一兩個外部事件。
內部事件存在於限界上下文內部,受限界上下文邊界保護。
內部事件被限制在單個有界上下文邊界內部,因此能夠直接引用領域對象。
public interface AggregateEvent<ID, A extends Aggregate<ID>> extends DomainEvent{
A source();
default A getSource(){
return source();
}
}
複製代碼
好比 AggregateEvent 中的 source 指向發佈該事件的聚合。
public class LikeSubmittedEvent extends AbstractAggregateEvent<Long, Like> {
public LikeSubmittedEvent(Like source) {
super(source);
}
public LikeSubmittedEvent(String id, Like source) {
super(id, source);
}
}
複製代碼
LikeSubmittedEvent 類直接引用 Like 聚合。
外部事件存在於限界上下文間,被多個上下文共享。
通常狀況下,外部事件,只做爲數據載體存在。經常採用平面結構,並公開全部屬性。
@Data
public class SubmittedEvent {
private Owner owner;
private Target target;
}
複製代碼
SubmittedEvent 爲扁平化結構,主要是對數據的封裝。
因爲外部事件被多個上下文共享,版本管理就顯得很是重要,以免重大更改對其服務形成影響。
領域事件是一種通用模式,它的本質是將領域概念添加到發佈-訂閱模式。
發佈-訂閱是比較成熟的設計模式,具備很高的通用性。所以,建議針對領域需求進行封裝。
好比直接使用 geekhalo-ddd 相關模塊。
定義領域事件:
@Value
public class LikeCancelledEvent extends AbstractAggregateEvent<Long, Like> {
public LikeCancelledEvent(Like source) {
super(source);
}
}
複製代碼
訂閱領域事件:
this.domainEventBus.register(LikeCancelledEvent.class, likeCancelledEvent->{
CanceledEvent canceledEvent = new CanceledEvent();
canceledEvent.setOwner(likeCancelledEvent.getSource().getOwner());
canceledEvent.setTarget(likeCancelledEvent.getSource().getTarget());
this.redisBasedQueue.pushLikeEvent(canceledEvent);
});
複製代碼
異步執行領域事件:
DomainEventExecutor eventExecutor =
new ExecutorBasedDomainEventExecutor("LikeEventHandler", 1, 100);
this.domainEventBus.register(LikeCancelledEvent.class,
eventExecutor,
likeCancelledEvent->{
CanceledEvent canceledEvent = new CanceledEvent();
canceledEvent.setOwner(likeCancelledEvent.getSource().getOwner());
canceledEvent.setTarget(likeCancelledEvent.getSource().getTarget());
this.redisBasedQueue.pushLikeEvent(canceledEvent);
});
複製代碼
內存總線簡單高效,同時支持同步、異步兩個處理方案,比較適合處理繁雜的內部事件;消息隊列雖然複雜,但擅長解決服務間通訊問題,適合處理外部事件。
理論上,只有在業務成功完成後,才應該對外發布事件。所以,將領域事件緩存在實體中,並在完成業務操做後將其進行發佈,是一種較好的解決方案。
相比,使用 ThreadLocal 管理訂閱器,並在事件 publish 時進行訂閱回調,事件緩存方案有明顯的優點。
IOC 容器爲咱們提供了不少使用功能,其中也包括髮布-訂閱功能,如 Spring。
一般狀況下,領域模型不該該直接依賴於 Spring 容器。所以,在領域中咱們仍然使用內存總線,爲其添加一個訂閱者,將內存總線中的事件轉發到 Spring 容器中。
class SpringEventDispatcher implements ApplicationEventPublisherAware {
@Autowired
private DomainEventBus domainEventBus;
private ApplicationEventPublisher eventPublisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.eventPublisher = applicationEventPublisher;
}
@PostConstruct
public void addListener(){
this.domainEventBus.register(event->true, event -> {this.eventPublisher.publishEvent(event);});
}
}
複製代碼
此時,咱們就能夠直接使用 Spring 的 EventListener 機制對領域事件進行處理。
@Component
public class RedisBasedQueueExporter {
@Autowired
private RedisBasedQueue redisBasedQueue;
@EventListener
public void handle(LikeSubmittedEvent likeSubmittedEvent){
SubmittedEvent submittedEvent = new SubmittedEvent();
submittedEvent.setOwner(likeSubmittedEvent.getSource().getOwner());
submittedEvent.setTarget(likeSubmittedEvent.getSource().getTarget());
this.redisBasedQueue.pushLikeEvent(submittedEvent);
}
@EventListener
public void handle(LikeCancelledEvent likeCancelledEvent){
CanceledEvent canceledEvent = new CanceledEvent();
canceledEvent.setOwner(likeCancelledEvent.getSource().getOwner());
canceledEvent.setTarget(likeCancelledEvent.getSource().getTarget());
this.redisBasedQueue.pushLikeEvent(canceledEvent);
}
}
複製代碼