EventBus 是 Guava 的事件處理機制,是觀察者模式(生產/消費模型)的一種實現。java
觀察者模式在咱們平常開發中使用很是普遍,例如在訂單系統中,訂單狀態或者物流信息的變動會向用戶發送APP推送、短信、通知賣家、買家等等;審批系統中,審批單的流程流轉會通知發起審批用戶、審批的領導等等。git
Observer模式也是 JDK 中自帶就支持的,其在 1.0 版本就已經存在 Observer,不過隨着 Java 版本的飛速升級,其使用方式一直沒有變化,許多程序庫提供了更加簡單的實現,例如 Guava EventBus、RxJava、EventBus 等github
EventBus 優勢編程
缺點跨域
若是須要分佈式使用仍是須要使用 MQ
網絡
Gradle併發
compile group: 'com.google.guava', name: 'guava', version: '29.0-jre'
Maven異步
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>29.0-jre</version> </dependency>
引入依賴後,這裏咱們主要使用 com.google.common.eventbus.EventBus
類進行操做,其提供了 register
、unregister
、post
來進行註冊訂閱、取消訂閱和發佈消息分佈式
public void register(Object object); public void unregister(Object object); public void post(Object event);
1. 首先建立一個 EventBuside
EventBus eventBus = new EventBus();
2. 建立一個訂閱者
在 Guava EventBus 中,是根據參數類型進行訂閱,每一個訂閱的方法只能由一個參數,同時須要使用 @Subscribe
標識
class EventListener { /** * 監聽 Integer 類型的消息 */ @Subscribe public void listenInteger(Integer param) { System.out.println("EventListener#listenInteger ->" + param); } /** * 監聽 String 類型的消息 */ @Subscribe public void listenString(String param) { System.out.println("EventListener#listenString ->" + param); } }
3. 註冊到 EventBus 上併發布消息
EventBus eventBus = new EventBus(); eventBus.register(new EventListener()); eventBus.post(1); eventBus.post(2); eventBus.post("3");
運行結果爲
EventListener#listenInteger ->1 EventListener#listenInteger ->2 EventListener#listenString ->3
根據須要咱們能夠建立多個訂閱者完成訂閱信息,同時若是一個類型存在多個訂閱者,則全部訂閱方法都會執行
爲何說這麼作是同步的呢?
Guava Event 其實是使用線程池來處理訂閱消息的,經過源碼能夠看出,當咱們使用默認的構造方法建立 EventBus
的時候,其中 executor
爲 MoreExecutors.directExecutor()
,其具體實現中直接調用的 Runnable#run
方法,使其仍然在同一個線程中執行,因此默認操做仍然是同步的,這種處理方法也有適用的地方,這樣既能夠解耦也可讓方法在同一個線程中執行獲取同線程中的便利,好比事務的處理
EventBus 部分源碼
public class EventBus { private static final Logger logger = Logger.getLogger(EventBus.class.getName()); private final String identifier; private final Executor executor; private final SubscriberExceptionHandler exceptionHandler; private final SubscriberRegistry subscribers; private final Dispatcher dispatcher; public EventBus() { this("default"); } public EventBus(String identifier) { this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE); } public EventBus(SubscriberExceptionHandler exceptionHandler) { this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler); } EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) { this.subscribers = new SubscriberRegistry(this); this.identifier = (String)Preconditions.checkNotNull(identifier); this.executor = (Executor)Preconditions.checkNotNull(executor); this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher); this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler); } }
DirectExecutor 部分源碼
enum DirectExecutor implements Executor { INSTANCE; private DirectExecutor() { } public void execute(Runnable command) { command.run(); } public String toString() { return "MoreExecutors.directExecutor()"; } }
經過上面的源碼,能夠看出只要將構造方法中的 executor 換成一個線程池實現便可, 同時 Guava EventBus 爲了簡化操做,提供了一個簡化的方案即 AsyncEventBus
EventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());
這樣便可實現異步使用
AsyncEventBus 源碼
public class AsyncEventBus extends EventBus { public AsyncEventBus(String identifier, Executor executor) { super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); } public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) { super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler); } public AsyncEventBus(Executor executor) { super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); } }
若是處理時發生異常應該如何處理? 在看源碼中,不管是 EventBus
仍是 AsyncEventBus
均可傳入自定義的 SubscriberExceptionHandler
該 handler 當出現異常時會被調用,我可能夠從參數 exception
獲取異常信息,從 context
中獲取消息信息進行特定的處理
其接口聲明爲
public interface SubscriberExceptionHandler { /** Handles exceptions thrown by subscribers. */ void handleException(Throwable exception, SubscriberExceptionContext context); }
在上面的基礎上,咱們能夠定義一些消息類型來實現不一樣消息的監聽和處理,經過實現 SubscriberExceptionHandler
來處理異常的狀況,不管時同步仍是異步都能遊刃有餘