用Spring Cloud Stream實現Event Driven Architectures

  微服務的出現和原生的雲架構觸發了DDD,CQRS和Event Sourcing的復甦。全部這些方式實現的核心是Domain Event(領域事件), 它是分佈式系統中實現一整套DDD架構及實現最終一致性的主要機制。java

  Domain Events的通信改變了domain領域對象的狀態,好比Customer Created、Account Credited等。它們老是以過去時來表達,一個domain event呈現了一個特定的domain已經發生的行爲。git

  DDD推薦把一個總體系統拆分紅多個微服務。如何拆分咱們改日再談,可是假設一個遵循DDD來設計的金融系統來定義Customer和Account做爲一個Aggregates聚合。宏觀來看一個Aggregate聚合就是領域對象,由實體類和是對象組成,它定義了一個事務邊界。一個聚合封裝了業務邏輯來驗證業務規則和保持聚合內的一致性。聚合間經過發佈領域事件來觸發狀態的改變。CustomerCreated一個Customer聚合發佈的領域事件,AccountCreated是一個Account發佈的領域事件。Customer保存了它擁有的account,Account也許保存了歸屬的customers的引用。不管如何,每一個聚合均可能須要處理其餘聚合發佈的事件。在CQRS中,command service以聚合爲單位來更新狀態和發佈domain events,這些domain events被query service接收和處理來保持物化視圖的最終一致性。github

  domain event是一個表達發生了什麼並攜帶了須要修改的數據、時間戳、聚合ID,及其餘附加的元數據。在一個分佈式系統中,domain events以發佈訂閱的形式被髮布到一個MQ系統中。這容許任意多個遠程進程(微服務)來異步地訂閱接收和處理這些領域事件。一個CQRS+ES系統中監聽domain events的微服務可能須要訂閱並處理多種類型的domain event 事件,因此須要一些機制來分發每一類事件到指定的事件處理方法中。spring

  在java的世界中,咱們老是能夠在監聽消息中間件的事件時用switch case分別處理每一類事件。這只是開個玩笑,咱們固然能夠作的更好。事實上,現有的開源CQRS+ES框架好比Axon提供了一個@EventHandler註解來啓用事件總線進行基於入參類型的事件分發。使用Axon,CustomerCreatedEvent和AccountCreatedEvent是明確的java類型(Class),因此你能夠這麼寫:express

public class MyEventHandler {
   @EventHandler
   public void handle(CustomerCreatedEvent event) {
   ...
   }
 
   @EventHandler
   public void handle(AccountCreatedEvent event) {
   ...
   }
}

  Eventuate是另外一個CQRS+ES框架提供了類似的實現機制,示例編程

  以上的方式聽起來都遵循了面向對象編程。然而,這卻使分佈式系統的Java類互相依賴。最終domain events變成了分佈式系統中的共享類庫。在衆多微服務中依賴共享類庫產生了高耦合的缺點。分佈式系統中的這種類型的耦合必須被消除,這種高耦合是微服務架構中的一個反模式。舉個例子,若是你把domain events打包到一個公共的jar中被各個微服務共享依賴,當一個領域事件被新增或修改後,每個有依賴的微服務都必須從新部署,不論這個微服務依不依賴它訂閱的domain event類型。固然,每一個微服務能夠解碼每一個消息的有效負載到不一樣的本地定義的事件類型,但這須要大量重複性的工做。json

Handling Events with Spring Cloud Stream

  最近發佈的Spring Cloud Stream Chelsea release 介紹了一種原生的事件分發特性,支持事件驅動架構同時避免依賴共享的domain類型。Spring Cloud Stream提供了一個@StreamListener註解用來控制 序列化的載荷 做爲 方法的入參 並 執行方法,例如:架構

@StreamListener(Sink.INPUT)
public void handle(Foo foo){
...
}

將會自動的轉換一個經過kafka或Rabbit MQ(或其餘任何支持的消息中間件)傳輸的序列化的json載荷到一個‘Foo’對象中而後執行‘handle’方法。一般Spring Cloud Stream應用會爲每個channel聲明一個stream監聽器,監聽的channel綁定到其餘應用發佈數據的topic上。框架

  新的事件分發特性在@StreamListener上增長了condition屬性來使路由消息到多個監聽器成爲可能,condition的值是用SPEL表達式運算出來的一個boolean值。condition應用到傳入的消息上,可以計算任何消息載荷或特定的消息頭、或其組合。這提供了一種極其靈活的路由機制並不須要不通的事件類型定義類。例如,咱們定義一個帶String eventType屬性的Event類型,Spring Cloud Stream將提供開盒即用的功能:dom

@EnableBinding
class MyEventHandler{
    @StreamListener(target=Sink.INPUT, condition="payload.eventType=='CustomerCreatedEvent'")
    public void handleCustomerEvent(@Payload Event event) {
      // handle the message</span>
     }
 
     @StreamListener(target=Sink.INPUT, condition="payload.eventType=='AccountCreatedEvent'")
    public void handleAccountEvent(@Payload Event event) {
      // handle the message</span>
     }
}

Custom Annotations

  這是一個提高,可是仍沒有到達咱們想要的可用程序。預想中的完美的效果,咱們想看到跟其餘的CQRS+ES框架類似效果的實現,咱們的目標效果若是:

@EnableEventHandling
class MyEventHandler{
    @EventHandler(eventType = "CustomerCreatedEvent")
    public void handleCustomerEvent(@Payload Event event) {
      // handle the message
     }
 
     @EventHandler(eventType = "CustomerCreatedEvent")
    public void handleAccountEvent(@Payload Event event) {
      // handle the message
     }
}

  幸運的是,Sring容許咱們來小小的自定義它。Core Spring Framework已經對自定義註解有優秀的支持,因此你能夠很容易的自定義一個@EventHandler註解來替代@StreamListener。咱們能夠定義默認的目標channel到 ' Sink.INPUT ':

@StreamListener 
@Target({ElementType.METHOD}) 
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented public @interface EventHandler {
    @AliasFor(annotation=StreamListener.class, attribute="target")
    String value() default ""; 
 
    @AliasFor(annotation=StreamListener.class, attribute="target")
    String target() default Sink.INPUT; 
 
    @AliasFor(annotation=StreamListener.class, attribute="condition")
    String condition() default "";
}

如今咱們已經接近咱們要的目標效果了,可是仍然沒有徹底到達那個效果:

@EnableBinding
class MyEventHandler{
    @EventHandler(condition="payload.eventType=='CustomerCreatedEvent'")
    public void handleCustomerEvent(@Payload Event event) {
      // handle the message
     }
 
     @EventHandler(condition="payload.eventType=='AccountCreatedEvent'")
    public void handleAccountEvent(@Payload Event event) {
      // handle the message
     }
}

  最後一步若是須要採用eventType的值來識別事件類型,須要一點附加的魔法。咱們規定,在每一個消息中提供一個eventType頭。一旦咱們採用了這個規約,condition表達式能夠轉化成模板來實現,只須要重載Spring Cloud Stream處理StreamListener註解的Bean Post Processor。重載的函數以下:

import static org.springframework.cloud.stream.config.BindingServiceConfiguration.STREAM_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_NAME;

@Configuration
public class EventHandlerConfiguration {

    /*
	 * The SpEL expression used to allow the Spring Cloud Stream Binder to dispatch to methods
     * Annotated with @EventHandler
     */

	private static String eventHandlerSpelPattern = "payload.eventType=='%s'";

	/**
	 * Override the default {@link StreamListenerAnnotationBeanPostProcessor} to inject value of
	 * 'eventType' attribute into 'condition' expression.
	 *
	 * @return
	 */
	@Bean(name = STREAM_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_NAME)
	public static BeanPostProcessor streamListenerAnnotationBeanPostProcessor() {
		return new StreamListenerAnnotationBeanPostProcessor() {
			@Override
			protected StreamListener postProcessAnnotation(StreamListener originalAnnotation, Method annotatedMethod) {
				Map<String, Object> attributes = new HashMap<>(
						AnnotationUtils.getAnnotationAttributes(originalAnnotation));
				if (StringUtils.hasText(originalAnnotation.condition())) {
					String spelExpression = String.format(eventHandlerSpelPattern, originalAnnotation.condition());
					attributes.put("condition", spelExpression);
				}
				return AnnotationUtils.synthesizeAnnotation(attributes, StreamListener.class, annotatedMethod);
			}
		};
	}

}

接下來,咱們能夠用自定義的EnableEventHandling註解來引入這個configuration:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({EventHandlerConfig.class})
public @interface EnableEventHandling {
 
}

最終,咱們修改EventHandler註解,定義一個eventType屬性來作condition的別名:

@StreamListener
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface EventHandler {
    /**
     * The name of the binding target (e.g. channel) that the method subscribes to.
     * @return the name of the binding target.
     */
    @AliasFor(annotation=StreamListener.class, attribute="condition")
    String value() default "";
 
    /**
     * The name of the binding target (e.g. channel) that the method subscribes to.
     * @return the name of the binding target.
     */
    @AliasFor(annotation=StreamListener.class, attribute="target")
    String target() default Sink.INPUT;
 
    /**
     * A condition that must be met by all items that are dispatched to this method.
     * @return a SpEL expression that must evaluate to a {@code boolean} value.
     */
    @AliasFor(annotation=StreamListener.class, attribute="condition")
    String eventType() default "";
}

Summary

用Spring Cloud Stream和一些小小的Spring魔法,咱們已經實現了在相似CQRS+ES的EDA架構中處理domain events的註解驅動框架,咱們實現了面向事件的註解,對比已你看到過的CQRS+ES框架的:

@EnableEventHandling
class MyEventHandler{
    @EventHandler(eventType = "CustomerCreatedEvent")
    public void handleCustomerEvent(@Payload Event event) {
      // handle the message
     }
 
     @EventHandler(eventType = "CustomerCreatedEvent")
    public void handleAccountEvent(@Payload Event event) {
      // handle the message
     }
}

不像已有的CQRS+ES框架的是,咱們不依賴載荷的類型來路由事件。這意味着咱們避免了微服務架構中須要共享common數據類型的缺點。固然,若是你真正想要根據載荷類型來路由消息,咱們也能夠很容易的修改來實現。

 

原文連接:https://dturanski.wordpress.com/2017/03/26/spring-cloud-stream-for-event-driven-architectures/

水平有限,翻譯的可能不是很好,將就的看吧,但願有用。

項目源碼:https://github.com/dturanski/event-handler-annotation-demo

相關文章
相關標籤/搜索