JAVA | Guava EventBus 使用 發佈/訂閱模式

系列文章目錄



前言

EventBus 是 Guava 的事件處理機制,是觀察者模式(生產/消費模型)的一種實現。java

觀察者模式在咱們平常開發中使用很是普遍,例如在訂單系統中,訂單狀態或者物流信息的變動會向用戶發送APP推送、短信、通知賣家、買家等等;審批系統中,審批單的流程流轉會通知發起審批用戶、審批的領導等等。git

Observer模式也是 JDK 中自帶就支持的,其在 1.0 版本就已經存在 Observer,不過隨着 Java 版本的飛速升級,其使用方式一直沒有變化,許多程序庫提供了更加簡單的實現,例如 Guava EventBus、RxJava、EventBus 等github

1、爲何要用 Observer模式以及 EventBus 優勢 ?

EventBus 優勢編程

  • 相比 Observer 編程簡單方便
  • 經過自定義參數可實現同步、異步操做以及異常處理
  • 單進程使用,無網絡影響

缺點跨域

  • 只能單進程使用
  • 項目異常重啓或者退出不保證消息持久化

若是須要分佈式使用仍是須要使用 MQ網絡

2、EventBus 使用步驟

1. 引入庫

Gradle併發

compile group: 'com.google.guava', name: 'guava', version: '29.0-jre'

Maven異步

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>29.0-jre</version>
</dependency>

引入依賴後,這裏咱們主要使用 com.google.common.eventbus.EventBus 類進行操做,其提供了 registerunregisterpost 來進行註冊訂閱、取消訂閱和發佈消息分佈式

public void register(Object object);

public void unregister(Object object);

public void post(Object event);

2. 同步使用

1. 首先建立一個 EventBuside

EventBus eventBus = new EventBus();

2. 建立一個訂閱者

在 Guava EventBus 中,是根據參數類型進行訂閱,每一個訂閱的方法只能由一個參數,同時須要使用 @Subscribe 標識

class EventListener {

  /**
   * 監聽 Integer 類型的消息
   */
  @Subscribe
  public void listenInteger(Integer param) {
    System.out.println("EventListener#listenInteger ->" + param);
  }

  /**
   * 監聽 String 類型的消息
   */
  @Subscribe
  public void listenString(String param) {
    System.out.println("EventListener#listenString ->" + param);
  }
}

3. 註冊到 EventBus 上併發布消息

EventBus eventBus = new EventBus();

eventBus.register(new EventListener());

eventBus.post(1);
eventBus.post(2);
eventBus.post("3");

運行結果爲

EventListener#listenInteger ->1
EventListener#listenInteger ->2
EventListener#listenString ->3

根據須要咱們能夠建立多個訂閱者完成訂閱信息,同時若是一個類型存在多個訂閱者,則全部訂閱方法都會執行

爲何說這麼作是同步的呢?

Guava Event 其實是使用線程池來處理訂閱消息的,經過源碼能夠看出,當咱們使用默認的構造方法建立 EventBus 的時候,其中 executorMoreExecutors.directExecutor(),其具體實現中直接調用的 Runnable#run 方法,使其仍然在同一個線程中執行,因此默認操做仍然是同步的,這種處理方法也有適用的地方,這樣既能夠解耦也可讓方法在同一個線程中執行獲取同線程中的便利,好比事務的處理

EventBus 部分源碼

public class EventBus {
  private static final Logger logger = Logger.getLogger(EventBus.class.getName());
  private final String identifier;
  private final Executor executor;
  private final SubscriberExceptionHandler exceptionHandler;
  private final SubscriberRegistry subscribers;
  private final Dispatcher dispatcher;

  public EventBus() {
    this("default");
  }

  public EventBus(String identifier) {
    this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE);
  }

  public EventBus(SubscriberExceptionHandler exceptionHandler) {
    this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
  }

  EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {
    this.subscribers = new SubscriberRegistry(this);
    this.identifier = (String)Preconditions.checkNotNull(identifier);
    this.executor = (Executor)Preconditions.checkNotNull(executor);
    this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
    this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler);
  }
}

DirectExecutor 部分源碼

enum DirectExecutor implements Executor {
  INSTANCE;

  private DirectExecutor() {
  }

  public void execute(Runnable command) {
    command.run();
  }

  public String toString() {
    return "MoreExecutors.directExecutor()";
  }
}

3. 異步使用

經過上面的源碼,能夠看出只要將構造方法中的 executor 換成一個線程池實現便可, 同時 Guava EventBus 爲了簡化操做,提供了一個簡化的方案即 AsyncEventBus

EventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());

這樣便可實現異步使用

AsyncEventBus 源碼

public class AsyncEventBus extends EventBus {
  public AsyncEventBus(String identifier, Executor executor) {
    super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
  }

  public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
    super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
  }

  public AsyncEventBus(Executor executor) {
    super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
  }
}

4. 異常處理

若是處理時發生異常應該如何處理? 在看源碼中,不管是 EventBus 仍是 AsyncEventBus 均可傳入自定義的 SubscriberExceptionHandler 該 handler 當出現異常時會被調用,我可能夠從參數 exception 獲取異常信息,從 context 中獲取消息信息進行特定的處理

其接口聲明爲

public interface SubscriberExceptionHandler {
  /** Handles exceptions thrown by subscribers. */
  void handleException(Throwable exception, SubscriberExceptionContext context);
}

總結

在上面的基礎上,咱們能夠定義一些消息類型來實現不一樣消息的監聽和處理,經過實現 SubscriberExceptionHandler 來處理異常的狀況,不管時同步仍是異步都能遊刃有餘

參考


白色兔子公衆號圖片

相關文章
相關標籤/搜索