設計模式之發佈訂閱模式(3) 深刻Spring Events事件驅動模型

以前文章中咱們講解了 發佈訂閱模式的核心概念 ,並經過 Redis的 Pub/Sub 命令 演示了其分佈式場景下的實現。相比後面要講到的 Guava EventBus,能夠說 Spring Events 的使用更加廣泛,其功能也更增強大。java

事件(Events)是框架中常常被忽略的、重要的功能,也是發佈/訂閱模式的一種常見實現。Spring框架自己就是事件驅動的。git

下面咱們就一塊兒看一下Spring容器中的事件驅動模型,而後一塊兒快速實現一個自定義的事件發佈和監聽,接着再分別探討一下同步和異步的事件監聽如何實現,再接着介紹一下如何定義監聽器的順序,最後提供一個基於SpEL(Spring Expression Language )實現的條件化的事件監聽機制。github

學完本節課程,將你會發現:spring

  1. 經過事件機制將代碼解耦,會讓本身的代碼很是乾淨,且擴展性極強。
  2. 異步的事件處理機制,能夠大大提升程序的響應速度,且內部的線程池會大大提升程序的併發效率。
  3. 條件化和泛型化的監聽器可讓你減小不少顯式的邏輯判斷,從而讓每一個事件監聽的原子性更強。

🚜 本文源碼Github地址 安全

Spring自己的事件驅動模型

Spring 容器與事件模型

Spring的事件機制主要提供了以下幾個接口和類:springboot

  • ApplicationContextEvent

Spring提供的事件抽象類,你能夠繼承它來實現自定義的事件。bash

  • ApplicationEventMulticaster

ApplicationEventMulticaster是一個事件廣播器, 它的做用是把Applicationcontext發佈的Event廣播給全部的監聽器。併發

  • ApplicationListener

ApplicationListener繼承自EventListener, 全部的監聽器都要實現這個接口。app

這個接口只有一個onApplicationEvent()方法, 該方法接受一個ApplicationEvent或其子類對象做爲參數, 在方法體中,能夠經過不一樣對Event類的判斷來進行相應的處理。框架

當事件觸發時全部的監聽器都會收到消息, 若是你須要對監聽器的接收順序有要求,但是實現該接口的一個實現SmartApplicationListener, 經過這個接口能夠指定監聽器接收事件的順序。

  • ApplicationContext

實現事件機制須要三個部分:事件源、事件和事件監聽器。 上面介紹的ApplicationEvent至關於事件, ApplicationListener至關於事件監聽器, 這裏的事件源說的就是ApplicationContext

ApplicationContext是Spring中的全局容器, 也叫"應用上下文", 它負責讀取bean的配置, 管理bean的加載, 維護bean之間的依賴關係, 也就是負責管理bean的整個生命週期。

ApplicationContext就是咱們平時所說的IOC容器。

  • ApplicationContextAware

當一個類實現了ApplicationContextAware接口以後,Aware接口的Bean在被初始以後,能夠取得一些相對應的資源,這個類能夠直接獲取 spring 配置文件中全部注入的bean對象。

Spring提供了不少以Aware結尾的接口,經過實現這些接口,你就得到了獲取Spring容器內資源的能力。

Spring自己實現了以下4個Event:

  • ContextStartedEvent (容器啓動)
  • ContextStoppedEvent (容器中止)
  • ContextClosedEvent (容器關閉)
  • ContextRefreshedEvent (容器刷新)

自定義 Spring Events 實現

自定義Spring的事件模型須要三個角色:事件(Event)、發佈者(Publisher)、監聽者(Listerner)。

自定義Event

下面自定義了一個註冊事件,這個Event的構造函數提供了兩個參數,一個是發佈源(source),一個是發佈的消息(message)。

package net.ijiangtao.tech.designpattern.pubsub.spring.common;

import lombok.Getter;
import org.springframework.context.ApplicationEvent;

/** * 註冊事件 * @author ijiangtao * @create 2019-05-02 12:59 **/
@Getter
public class RegisterEvent extends ApplicationEvent {

    private String message;

    public RegisterEvent(Object source, String message) {
        super(source);
        this.message = message;
    }

}
複製代碼

自定義Publisher

下面提供了一個發佈自定義事件的發佈器,咱們經過ApplicationEventPublisher來把事件發佈出去。

package net.ijiangtao.tech.designpattern.pubsub.spring.common;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/** * 註冊事件發佈器 * @author ijiangtao * @create 2019-05-02 13:01 **/
@Component
@Slf4j
public class RegisterEventPublisher {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    public void publish(final String message) {
        log.info("publis a RegisterEvent,message:{}", message + " time: " + LocalTime.now());
        RegisterEvent registerEvent = new RegisterEvent(this, message);
        applicationEventPublisher.publishEvent(registerEvent);
    }
}

複製代碼

自定義Listener

下面提供幾個自定義事件的監聽器,它們都實現了ApplicationListener<RegisterEvent>接口,同時爲了模擬處理事件的過程,這裏讓當前線程休眠了3秒。由於實現過程相似,這裏僅提供一個實現。

package net.ijiangtao.tech.designpattern.pubsub.spring.common;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/** * 發送註冊成功郵件提醒 * * @author ijiangtao * @create 2019-05-02 13:07 **/
@Component
@Slf4j
public class SendRegisterEmailListener implements ApplicationListener<RegisterEvent> {
    @Override
    public void onApplicationEvent(RegisterEvent event) {
        try {
            Thread.sleep(3 * 1000);
        } catch (Exception e) {
            log.error("{}", e);
        }
        log.info("SendRegisterEmailListener message: " + event.getMessage()+" time: "+ LocalTime.now());
    }

}
複製代碼

測試自定義事件機制

下面經過一個單元測試發佈了自定義事件。經過觀察log輸出,發現事件發佈之後,每一個監聽器都依次輸出了監聽日誌。

package net.ijiangtao.tech.designpattern.pubsub.spring;

import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.designpattern.pubsub.spring.common.RegisterEventPublisher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/** * Spring Events * * @author ijiangtao * @create 2019-05-02 12:53 **/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringEventsCommonTests {

    @Autowired
    private RegisterEventPublisher registerEventPublisher;

    @Test
    public void test1(){
        registerEventPublisher.publish(" Danny is here.");
        try {
            Thread.sleep(10 * 1000);
        } catch (Exception e) {
            log.error("{}", e);
        }
    }

}
複製代碼

這樣,一個基於Spring Events的事件監聽器就實現了。

實現異步的 Spring Events

經過觀察日誌中打印的時間你會發現,上面註冊的全部監聽器,都是依次執行的,也就是Spring Events的事件處理默認是同步的。同步的事件監聽耗時比較長,須要等待上一個監聽處理結束,下一個監聽器才能執行。

那麼能不能改爲異步監聽呢?答案是確定的。下面介紹兩種實現方式。

配置

經過JDK提供的SimpleApplicationEventMulticaster將事件廣播出去,就能夠實現異步併發地讓多個監聽器同時執行事件監聽動做。

package net.ijiangtao.tech.designpattern.pubsub.spring.async.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ApplicationEventMulticaster;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

/** * 異步事件監聽配置 * * @author ijiangtao * @create 2019-05-02 13:23 **/
@Configuration
public class AsynchronousSpringEventsConfig {

    @Bean(name = "applicationEventMulticaster")
    public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
        SimpleApplicationEventMulticaster eventMulticaster  = new SimpleApplicationEventMulticaster();
        eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return eventMulticaster;
    }
}
複製代碼

經過SimpleApplicationEventMulticaster的源碼能夠看到它的multicastEvent方法會經過線程池併發執行事件發佈動做。

public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
     ResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event);
     Iterator var4 = this.getApplicationListeners(event, type).iterator();

     while(var4.hasNext()) {
         ApplicationListener<?> listener = (ApplicationListener)var4.next();
         Executor executor = this.getTaskExecutor();
         if (executor != null) {
             executor.execute(() -> {
                 this.invokeListener(listener, event);
             });
         } else {
             this.invokeListener(listener, event);
         }
     }

 }
複製代碼

註解

經過註解的方式發佈事件,只須要在Listener上加上@Async,而且在發佈事件的地方加上@EnableAsync註解便可。

package net.ijiangtao.tech.designpattern.pubsub.spring.async.annotation;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/** * 發送優惠券 * * @author ijiangtao * @create 2019-05-02 13:07 **/
@Component
@Slf4j
public class UserActionListenerAsyncAnnotation implements ApplicationListener<RegisterEvent> {

    @Async
    @Override
    public void onApplicationEvent(RegisterEvent event) {
        try {
            Thread.sleep(3 * 1000);
        } catch (Exception e) {
            log.error("{}", e);
        }
        log.info("UserActionListener message: " + event.getMessage()+" time: "+ LocalTime.now());
    }

}
複製代碼
package net.ijiangtao.tech.designpattern.pubsub.spring;

import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.designpattern.pubsub.spring.async.annotation.RegisterEventPublisherAsyncAnnotation;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.test.context.junit4.SpringRunner;

/** * Spring Events * * @author ijiangtao * @create 2019-05-02 12:53 **/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
@EnableAsync
public class SpringEventsAsyncAnnotationTests {

    @Autowired
    private RegisterEventPublisherAsyncAnnotation registerEventPublisherAsyncAnnotation;

    @Test
    public void test2() {
        registerEventPublisherAsyncAnnotation.publish(" Danny is here (Async).");
        try {
            Thread.sleep(10 * 1000);
        } catch (Exception e) {
            log.error("{}", e);
        }
    }

}
複製代碼

實現 Smart Listener

經過實現SmartApplicationListener接口,能夠自定義監聽器的執行順序、支持的事件類型等。

package net.ijiangtao.tech.designpattern.pubsub.spring.smart;

import lombok.Getter;
import org.springframework.context.ApplicationEvent;

/** * event * * @author ijiangtao * @create 2019-05-02 15:33 **/
@Getter
public class SmartEvent extends ApplicationEvent {

    private String message;

    public SmartEvent(Object source, String message) {
        super(source);
    }

}
複製代碼
package net.ijiangtao.tech.designpattern.pubsub.spring.smart;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

/** * SmartApplicationListener * * @author ijiangtao * @create 2019-05-02 15:32 **/
@Component
@Slf4j
public class CustomSmartApplicationListener1 implements SmartApplicationListener {


    /** * 自定義支持的事件類型 * @param eventType * @return */
    @Override
    public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
        return eventType == SmartEvent.class;
    }

    /** * 定義支持的事件源類型 * @param sourceType * @return */
    @Override
    public boolean supportsSourceType(Class<?> sourceType) {
        return sourceType == String.class;
    }

    /** * 自定義優先級別 * @return */
    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }

    @Override
    public void onApplicationEvent(ApplicationEvent applicationEvent) {
        log.info("CustomSmartApplicationListener {}",applicationEvent.getSource());
    }

}
複製代碼

條件化的事件監聽

有時候咱們但願一個監聽器但願監聽多個事件,例如一個系統安全監聽器(SecurityEventListener),能夠監聽各類系統安全問題(NetWorkSecurityEvent、SQLSecurityEvent、AuthorizationSecurityEvent,等等),這個是時候你可讓監聽器監聽這些Event的父類SecurityEvent,這樣你的監聽器就能夠監聽到全部該Event的子類型。

有時候咱們須要根據同一個事件拋出的消息的某個值來決定用哪一個監聽器來處理。例如SecurityEvent有個安全級別level屬性,你定義了5個level,每一個level都有不一樣的處理機制。按照傳統的實現方式須要經過條件判斷(if/else或者switch/case等)來實現,代碼的封裝性很差。這種狀況下,你能夠在你的Listener的監聽方法上增長@EventListener註解,並經過condition參數來指定過濾條件。例如 condition = "#event.success eq false")就是經過SpEL表示:當方法的參數event變量的success屬性等於false的時候才執行監聽方法。

下面咱們就演示一下實現過程。

提供泛型的Event基類:

package net.ijiangtao.tech.designpattern.pubsub.spring.generic;

import lombok.Getter;

/** * GenericSpringEvent * * @author ijiangtao * @create 2019-05-02 13:47 **/
@Getter
public class GenericSpringEvent<T> {

    private T what;

    protected boolean success;

    public GenericSpringEvent(T what, boolean success) {
        this.what = what;
        this.success = success;
    }

}
複製代碼

基於Event基類自定義Event實現

package net.ijiangtao.tech.designpattern.pubsub.spring.generic.checkout;

import lombok.Getter;
import net.ijiangtao.tech.designpattern.pubsub.spring.generic.GenericSpringEvent;

/**
 * GenericSpringEventCheckout
 *
 * @author ijiangtao
 * @create 2019-05-02 13:58
 **/
@Getter
public class GenericSpringEventCheckout extends GenericSpringEvent<Long> {

    private Long userId;

    public GenericSpringEventCheckout(Long userId, boolean success) {
        super(userId, success);
    }

}
複製代碼

提供條件化監聽器

監聽器監聽基類Event的全部字類,而且經過@EventListener註解和SpEL定義監聽的Event的過濾條件。

package net.ijiangtao.tech.designpattern.pubsub.spring.generic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

/** * @author ijiangtao * @create 2019-05-02 13:52 **/
@Component
@Slf4j
public class GenericSpringEventSuccessListenerLong {

    @EventListener(condition = "#event.success")
    public void handle(GenericSpringEvent<Long> event) {
        log.info("Handling generic event Success (conditional). {}",event.getWhat());
    }

}
複製代碼
package net.ijiangtao.tech.designpattern.pubsub.spring.generic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

/** * @author ijiangtao * @create 2019-05-02 13:52 **/
@Component
@Slf4j
public class GenericSpringEventFailListenerLong {

    @EventListener(condition = "#event.success eq false")
    public void handle(GenericSpringEvent<Long> event) {
        log.info("Handling generic event Fail (conditional). {}",event.getWhat());
    }

}
複製代碼

自定義事件發佈器

package net.ijiangtao.tech.designpattern.pubsub.spring.generic.checkout;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/** * GenericSpringEventPublisher * * @author ijiangtao * @create 2019-05-02 13:55 **/
@Component
@Slf4j
public class GenericSpringEventPublisherCheckout {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    public void publish(final Long userId, boolean success) {

        log.info("publis a GenericSpringEventPublisher, userId:{}", userId + " time: " + LocalTime.now());

        GenericSpringEventCheckout eventCheckout = new GenericSpringEventCheckout(userId, success);

        applicationEventPublisher.publishEvent(eventCheckout);
    }

}
複製代碼

單元測試

下面提供了一個測試方法,經過觀察日誌發現,不一樣的條件,觸發了不一樣的監聽器。

package net.ijiangtao.tech.designpattern.pubsub.spring;

import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.designpattern.pubsub.spring.generic.checkout.GenericSpringEventPublisherCheckout;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationEvent;
import org.springframework.test.context.junit4.SpringRunner;

/** * Spring Events * * @author ijiangtao * @create 2019-05-02 12:53 **/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringEventsGenericTests {

    @Autowired
    private GenericSpringEventPublisherCheckout checkoutPubliser;


    @Test
    public void test1() {

        ApplicationEvent applicationEvent;
        checkoutPubliser.publish(101L, true);

        checkoutPubliser.publish(202L, false);
    }

}

複製代碼

Wechat-westcall

相關連接

相關文章
相關標籤/搜索