設計模式 - 事件監聽者模式 - JDK & Spring & Guava 各有千秋

​ 何爲事件監聽者模式 ?java

​ 第一就是爲啥我強調事件二字 ,由於他是目標 . 在咱們開發中絕對見到過一堆後綴是 Listener的類, 這個就是監聽者模式, 監聽者模式是一種CS開發架構,很好的作了一種設計的解耦,監聽者註冊到一個郵局中,訂閱某種事件(提早說好了), 郵局會按需求發佈消息, 監聽者會及時收到消息來處理 . 其中整個Java開發環境中 , JDK已經幫咱們定義好了接口 , Spring就是基於JDK接口下實現的, Guava則是另外一種實現方式, 各有千秋 , 你們看看吧 , 究竟是回調好仍是阻塞, 仍是Guava這種方式呢, 他和觀察者模式有何區別呢 ? 咱們有機會在講spring

1. Java原生規範

1. EventObject

事件對象 , 他須要一個事件源 , 用構造函數傳遞的設計模式

public class EventObject implements java.io.Serializable {
    protected transient Object  source;

    public EventObject(Object source) {
        if (source == null)
            throw new IllegalArgumentException("null source");

        this.source = source;
    }
    ........... 其餘省略 
}
複製代碼

2. EventListener

事件監聽者,他是負責監聽事件的 , JAVA提供的是一個空接口, 讓咱們根絕需求寫安全

public interface EventListener {
}
複製代碼

3. 總結

咱們發現 java 提供的只提供了一個事件對象 ,和一個事件監聽器 ,因此須要咱們遵照這個規範去開發多線程

2. Java規範設計一個監聽者模式 - 基於回調模式

1. 事件源 - EventSource

通常狀況下 都會設置成一個 Object 類型的 , 不須要咱們去設計一個,爲了體現設計模式的角色,咱們就設計了一個架構

@ToString
@Setter
@Getter
public class EventSource {
    private String name;
    private String info;
}
複製代碼

2. 事件對象 - EventObject

這裏咱們繼承了 EventObject , 只是簡單的實現了一下 , 並無作過多的包裝app

public class CoreEventObject extends EventObject {
    public CoreEventObject(EventSource source) {
        super(source);
    }
}
複製代碼

3. 事件監聽者 - EventListener

這裏咱們真正的監聽者 , 通常狀況下都須要設計成一個 函數式接口 , 我這個是和Spring框架學習的 , 由於函數式接口才能體現回調 ,框架

@FunctionalInterface
public interface CoreEventListener<E extends CoreEventObject> extends EventListener {

    void onEventObject(E event);
}
複製代碼

4. 事件發佈者 - EventPublisher

事件發佈者 ,由於沒有發佈的事件對象, 哪來的監聽異步

public class EventPublisher<E extends CoreEventObject> {

    private CoreEventListener<E> listener;


    public EventPublisher(CoreEventListener<E> listener) {
        this.listener = listener;
    }

    
    public void publish(E object){
        System.out.println("發佈事件 : " + object);
        // 傳給 CoreEventListener
        listener.onEventObject(object);
    }
}
複製代碼

5. 測試Demo

public class TestDemo {
    public static void main(String[] args) {
        // 1. 建立一個事件發佈者
        EventPublisher<CoreEventObject> publisher = new EventPublisher<>(new CoreEventListener<CoreEventObject>() {
            @Override
            public void onEventObject(CoreEventObject event) {
                System.out.println("接收到事件源 : " + event.getSource() + " , 當前線程 : " + Thread.currentThread().getName()); 
            }
        });

        // 2. 發佈一個事件對象 
        publisher.publish(getCoreEventObject());
    }

    private static CoreEventObject getCoreEventObject(){
        ..... 此處省略 
        return eventObject;
    }
}
複製代碼

輸出結果 :ide

發佈事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019)]
接收到事件源 : EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019) , 當前線程 : main
複製代碼

咱們發現咱們成功的接收到了事件對象 和 事件源 , 這個就是鉤子函數的魅力 . 其實你只是作了一個事件發佈你無意觀察其餘的東西 , 只須要一個監聽者就能夠作到監聽了 , 這樣你的事件發佈 和 監聽 徹底就解耦了 .其實底層就是一個地址引用 .

3. 回調函數存在的問題 ?

1. 咱們的問題 ?

不少場景下,咱們的發佈事件和監聽事件徹底在兩個線程中,那麼咱們如何拿到事件對象呢 ?

若是咱們簡單使用一下 , 會這麼寫 ?

public class TestEventListener implements CoreEventListener<CoreEventObject> {

    private CoreEventObject object;

    @Override
    public void onEventObject(CoreEventObject object) {
 		// 賦值給成員變量 
        this.object = object;
    }

    // 獲取成員變量 
    public CoreEventObject getObject() {
        return object;
    }
}
複製代碼

測試一下 :

public class TestDemo {

    public static void main(String[] args) {

        TestEventListener listener = new TestEventListener();
        CoreEventObject object = listener.getObject();
        // 先去拿 ,後去發佈
        System.out.println(object.getSource());

        EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener);
        publisher.publish(getCoreEventObject());
    }


    private static CoreEventObject getCoreEventObject(){
			.... 
        return eventObject;
    }

}

複製代碼

輸出結果

Exception in thread "main" java.lang.NullPointerException
	at com.example.listener_design_pattern.TestDemo.main(TestDemo.java:28)
複製代碼

有些人就會說 , 你這不對哇 ,你固然拿不到了 ,由於人家還沒發佈了 ,可是在多線程 ,在解耦的狀況下 ,你哪知道對面什麼時候發佈結束了 , 你再去拿呢 ? 那就須要java的多線程知識了 ,Future 給咱們帶來了提醒 , 就是阻塞的思想 , 只有監聽者真正的收到對象 , 咱們才能去拿 .

2. 解決問題

瞭解過我前面提到的那一節 FutureTask是如何實現的 ,我以爲問題就迎刃而解了 .

public class TestEventListener implements CoreEventListener<CoreEventObject> {
    private CoreEventObject object;

    /** * 當 X = 0 ,表明 obj尚未初始化了 * 當 x = 1 , 表明 obj 以及初始化了 , 已經接收到了 */
    private static volatile int x = 0;

    @Override
    public void onEventObject(CoreEventObject object) {
        this.object = object;
        // 收到改爲 1
        x = 1;
    }

    public CoreEventObject getObject() {
        while (true) {
            if (x == 1) {
                break;
            }
        }
        // 拿到對象,再設置爲1
        x = 0;
        return object;
    }
}
複製代碼

因爲這個解決方案,會使得執行getObject() 的線程一直的阻塞下去,就是死循環下去,咱們必須一個線程去執行這個方法 ,

public class TestDemo {
    public static void main(String[] args) {
        TestEventListener listener = new TestEventListener();
        // 新建一個線程去接收
        Thread thread = new Thread(() -> {
            System.out.println("我開始接收對象 : " + System.currentTimeMillis());
            CoreEventObject object = listener.getObject();
            System.out.println("成功接收對象 : "+object.getSource());
        });
        thread.start();
		// 新建一個線程去發佈
        EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener);
        new Thread(()->{
            publisher.publish(getCoreEventObject());
        }).start();
    }

    private static CoreEventObject getCoreEventObject(){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        EventSource source = new EventSource();
        source.setName("事件源");
        source.setInfo("" + System.currentTimeMillis());
        return new CoreEventObject(source);
    }
}
複製代碼

輸出結果 :

我開始接收對象 : 1573282924555
發佈事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=1573282925590)]
成功接收對象 : EventSource(name=事件源, info=1573282925590)
複製代碼

咱們咱們接收的時候是在1573282924555的時間搓 , 而真正拿到的對象確實在1573282925590發佈的 , 這個就徹底在倆時間軸上,因此咱們成功的解決了問題 .

4. Spring 中的 ApplicationListener

1. ApplicationEvent

​ Class to be extended by all application events. Abstract as it doesn't make sense for generic events to be published directly.

​ 此類被全部的 application events 所繼承 。抽象的緣由是由於直接發佈這個ApplicationEvent是沒有意義的。

2. ApplicationListener

​ Interface to be implemented by application event listeners. Based on the standard java.util.EventListener interface for the Observer design pattern.

​ 這個接口被全部的 application event listeners.所實現 , 基於Java的 java.util.EventListener接口規範

3. 開始使用

咱們有一個需求就是 ,咱們有一個服務會從遠程不斷的去拉去配置信息 ,一旦有改變就會發布配置信息 .

1. Config - 事件源

@ToString
@Setter
@Getter
public class Config {
    private String namespace;
    private Map<String, Object> info;
}
複製代碼

2. ConfigEvent - 事件對象

// 這個註解,咱們是根據Spring源碼看到的 , 因此一致性,我就加了
@SuppressWarnings("serial")
public class ConfigEvent extends ApplicationEvent {

    public ConfigEvent(Config source) {
        super(source);
    }
}
複製代碼

3. ConfigEventListener - 事件監聽者

@Component
public class ConfigEventListener implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {

    @Override
    public void onApplicationEvent(ConfigEvent event) {
        System.out.println("接收到更新信息 : " + event.getSource()+" , 當前線程 : "+Thread.currentThread().getName());
    }
	// 保證執行順序 , 多個 ConfigEventListener就須要實現這個接口
    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
	
    // 初始化之後要作什麼 ? 
    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("初始化當前ConfigEventListener");
    }
}
複製代碼

5 . ConfigServer - 配置中心服務

@Service
public class ConfigServer {
	
    // 注入applicationContext,由於只有他才能夠執行發佈事件
    @Autowired
    private ApplicationContext applicationContext;

	// 這個是開啓異步 ,後面會說到
    // @Async
    public void publishConfig(){
        // 須要發佈 --- > 改變的事件
         System.out.println("發佈事件成功 , 當前線程 : "+Thread.currentThread().getName());
        applicationContext.publishEvent(getChange());
    }
    
    
    public ConfigEvent getChange(){
        Config config = new Config();
        config.setNamespace("application");
        HashMap<String, Object> conf = new HashMap<>();
        conf.put("server.port", 8088);
        config.setInfo(conf);
        return  new ConfigEvent(config);
    }
}
複製代碼

6. 啓動測試

@SpringBootApplication
public class SpringListenerApplication implements CommandLineRunner {

	public static void main(String[] args) {
		SpringApplication.run(SpringListenerApplication.class, args);
	}

	@Autowired
	private ConfigServer server;

	@Override
	public void run(String... args) throws Exception {
		server.publishConfig();
	}
}
複製代碼

輸出結果 :

.....
初始化當前ConfigEventListener
.... 
發佈事件成功 , 當前線程 : main
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 當前線程 : main
複製代碼

因此一個Spring-Boot的事件監聽仍是很簡單的 ,類比到 Spring一個道理,相信懂得人都知道 . 可是又一個問題是咱們的 發佈和監聽都是 main線程 ,很差吧 ,玩意有不少事件了 ?

4. 開啓異步發佈

須要兩個註解 @EnableAsync啓動Async功能 , 和 @Async某個方法使用異步執行

發佈事件成功 , 當前線程 : SimpleAsyncTaskExecutor-1
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 當前線程 : SimpleAsyncTaskExecutor-1
複製代碼

咱們發現就出現了線程池執行 , 這個理的線程池 ,是能夠進行配置的 , 只須要咱們顯式的注入下面這個SimpleAsyncTaskExecutor Bean 就能夠了

@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
    SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
    // 須要傳入一個 ThreadFactory實現類 , 因此看過我前面寫的文章應該會寫這個,好比 JUC- Executor那節
    executor.setThreadFactory(new MyThreadFactory("anthony"));
    return executor;
}
複製代碼

輸出結果 :

發佈事件成功 , 當前線程 : anthony-1
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 當前線程 : anthony-1
複製代碼

5 . 多個監聽器順序執行

你能夠跟我同樣選擇實現 ApplicationListenerOrdered ,或者你能夠直接實現 SmartApplicationListener都同樣的哈,沒有哪一個好哪一個很差

監聽器 一 :

@Component
public class ConfigEventListenerStart implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {

    @Override
    public void onApplicationEvent(ConfigEvent event) {
        System.out.println("ConfigEventListenerStart 接收到更新信息 : " + event.getSource()+" , 當前線程 : "+Thread.currentThread().getName());
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("初始化當前監聽器 : " + this.toString());
    }
}
複製代碼

監聽器 二 :

@Component
public class ConfigEventListenerEnd implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {

    @Override
    public void onApplicationEvent(ConfigEvent event) {
        System.out.println("ConfigEventListenerEnd 接收到更新信息 : " + event.getSource()+" , 當前線程 : "+Thread.currentThread().getName());
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE-1;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("初始化當前監聽器 : " + this.toString());
    }
}
複製代碼

輸出結果 :

初始化當前監聽器 : com.example.springlistener.listener.ConfigEventListenerEnd@6b54655f
初始化當前監聽器 : com.example.springlistener.listener.ConfigEventListenerStart@665e9289
..... 
發佈事件成功 , 當前線程 : anthony-1
ConfigEventListenerStart 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 當前線程 : anthony-1
ConfigEventListenerEnd  接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 當前線程 : anthony-1    
複製代碼

5. Guava 中的 EventBus

​ Guava的EventBus 就是一個很好的事件註冊發佈的管理工具 , 他屬於一種推送的模式 , 跟spring的很類似,

1. 依賴

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>28.0-jre</version>
</dependency>
複製代碼

2. 快速開始

​ 主要的對象就是 , EventBus 事件總線, 他是管理全部監聽者(或者叫作訂閱者) ,經過EventBus#register 或者 EventBus#unRegister 來管理的 , 同時監聽者要有監聽的事件 , 這裏是基於方法級別的 (注意方法只能有一個參數,就是監聽的事件), 須要在方法上加上 @Subscribe 註解來表示監聽 , EventBus能夠經過 EventBus#post 方法來發布事件 , 對應類型的監聽者就會收到 . 同時EventBus能夠處理異常

public class QuicklyStart {

    public static void main(String[] args) {
        // 建立一個 事件總線
        EventBus bus = new EventBus(new SubscriberExceptionHandler() {
            @Override
            public void handleException(Throwable exception, SubscriberExceptionContext context) {
                // 處理訂閱者異常信息
                System.out.println("異常信息 : "+exception.getMessage() + ", 異常事件 : " + context.getEvent());
            }
        });

        // 註冊你的監聽器 , 其實更加準確來講是訂閱者 , 他屬於一種發佈訂閱模式
        bus.register(new EventListener());

        // 事件總線發佈事件
        bus.post("sb");

        // 事件總線發佈事件
        bus.post(new Event("hello Guava"));

    }


}

/** * 事件源 */
class Event {

    String msg;

    public Event(String msg) {
        this.msg = msg;
    }

    @Override
    public String toString() {
        return "Event{" + "msg='" + msg + '\'' + '}';
    }
}

/** * 監聽器 */
class EventListener {

    /** * {@link Subscribe} 一個這個表明一個訂閱者,EventBus會將符合的事件發佈到對應的訂閱者上 , 可是不支持java的基本數據類型, int 之類的 * * @param event */
    @Subscribe
    public void onEvent(Event event) {
        System.out.println("當前線程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event);
    }


    @Subscribe
    public void onStringEvent(String event) {
        error(); // 模擬異常
        System.out.println("當前線程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event);
    }

    private void error() {
        int i = 1 / 0;
    }
}
複製代碼

輸出 :

異常信息 : / by zero, 異常事件 : sb
當前線程 : main, 接收到事件 : Event{msg='hello Guava'}
複製代碼

3. 基本原理

其實很簡單 , 第一註冊的時候 :

/** Registers all subscriber methods on the given listener object. */
void register(Object listener) {
  // 其實就是註解掃描 , 而後把元信息都給整出來
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
  Class<?> eventType = entry.getKey();
  Collection<Subscriber> eventMethodsInListener = entry.getValue();
    // 查找有沒有 
  CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

    // 沒有建立一個對象
  if (eventSubscribers == null) {
    CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
    eventSubscribers =
        MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
  }

    // 添加進去 ,實際上是 Subscriber對象 , 把method信息, 對象信息所有封裝進去了
  eventSubscribers.addAll(eventMethodsInListener);
}
}
複製代碼

第二就是 發佈

public void post(Object event) {
      // 根據事件獲取對應的 Subscriber
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
        // 發佈出去
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }



@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
  checkNotNull(event);
  checkNotNull(subscribers);
    // 獲取當前線程的隊列 ,用ThreadLocal維護的線程安全, 實際上是爲了安全
  Queue<Event> queueForThread = queue.get();
    // 建立一個事件對象
  queueForThread.offer(new Event(event, subscribers));

  if (!dispatching.get()) {
    dispatching.set(true);
    try {
      Event nextEvent;
      while ((nextEvent = queueForThread.poll()) != null) {
        while (nextEvent.subscribers.hasNext()) {
            // 處理
          nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
        }
      }
    } finally {
      dispatching.remove();
      queue.remove();
    }
  }
}


 /** Dispatches {@code event} to this subscriber using the proper executor. */
  final void dispatchEvent(final Object event) {
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
                // 關鍵點 ,就是這 
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
                // 出現異常直接就 ... 調用異常回調方法了
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }



  @VisibleForTesting
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
        // 原來如此 , .........method.invok 真.... 因此能夠抓取異常
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
  }
複製代碼

因此這個玩意很簡單 , 原理一看就分析出來了

6. 總結

​ 咱們發現咱們的本身實現的監聽者和 SpringGuava 這倆種實現有啥區別 , 無非就是咱們本身實現的監聽者模式, 對於 listener 的管理,沒有作 , 咱們只是一個 Publisher 一個 Listener,一對一的關係 , 這樣子就很很差, 100個監聽者就須要100個發佈者 , 不符合設計模式的原則 , 因此參考Guava,咱們發現他無非作的就是一個對於Listener 的管理 , 可是有一個細節但願你們知道, 對於監聽者模式 , 萬一事件發佈失敗了 , 咱們如何知道, 因此Guava 至少幫咱們作了 , 他不是基於回調機制的, 而是使用了JavaMethod#invoke ,看需求而定吧 , 只不過回調更加輕量級,

相關文章
相關標籤/搜索