說在前面java
本文轉自「天河聊技術」微信公衆號spring
事件驅動模型設計是一種優雅的程序設計方式,實現有不少,原理都是發佈與訂閱,觀察者設計模式實現,java自帶的實現、spring ioc的事件驅動模型,還有guava的實現,今天介紹guava eventbus的源碼實現,看過這篇文章你本身也能夠實現實現一套了。數據庫
guava event源碼解析設計模式
先上一個demo實現,瞭解車的原理以前先上去感覺下緩存
/** * 事件 * weifeng.jiang 2018-06-11 19:06 */ public class HelloEvent { private String message; public HelloEvent(String message) { this.message = message; } public String getMessage() { return message; } }
/** * 訂閱者 * weifeng.jiang 2018-06-11 19:11 */ public class EventListener { @Subscribe public void listen(HelloEvent helloEvent){ System.out.println(helloEvent.getMessage()); } }
/** * 客戶端 * weifeng.jiang 2018-06-11 19:12 */ public class Main { public static void main(String[] args) { // 建立事件總線 EventBus eventBus = new EventBus("test"); // 建立訂閱者 EventListener listener = new EventListener(); // 註冊訂閱者 eventBus.register(listener); // 發佈事件 eventBus.post(new HelloEvent("asdasd")); eventBus.post(new HelloEvent("asdasdasdas")); eventBus.post(new HelloEvent("asdasdasdasd")); } }
實現原理架構圖服務器
怎麼成爲一個訂閱者接受事件呢微信
接受事件的對象應有一個public方法,用@Subscribe註解標記這個方法,將接受事件對象傳遞給EventBus實例的register(Object)方法,參考圖一和圖三架構
怎麼發佈事件呢併發
只須要調用EventBus實例的post方法,參考圖三異步
guava eventbus事件總線有兩種,同步的實現EventBus,異步的實現AsyncEventBus,若是訂閱者在接收事件後進行長時間的邏輯處理,好比和數據庫交互,這時候就須要用異步事件了,若是是簡單處理,同步實現就能夠。
這裏以EventBus事件總線同步實現爲例進行源碼解析。
成爲訂閱者的源碼實現
和@Subscribe註解配合使用的還有一個@AllowConcurrentEvents註解,這個註解是能夠容許事件併發的執行,看下建立訂閱者對象的源碼實現,以下
/** Creates a {@code Subscriber} for {@code method} on {@code listener}. 爲監聽器上的方法建立一個訂閱服務器。*/ static Subscriber create(EventBus bus, Object listener, Method method) { return isDeclaredThreadSafe(method) ? new Subscriber(bus, listener, method) : new SynchronizedSubscriber(bus, listener, method); }
能夠容許併發事件,在這個類中
@VisibleForTesting static final class SynchronizedSubscriber extends Subscriber { private SynchronizedSubscriber(EventBus bus, Object target, Method method) { super(bus, target, method); } @Override void invokeSubscriberMethod(Object event) throws InvocationTargetException { synchronized (this) { super.invokeSubscriberMethod(event); } } } }
執行事件的時候是同步實現。
事件總線訂閱源碼實現
com.google.common.eventbus.SubscriberRegistry#register
void register(Object listener) { // 查找全部訂閱者,維護了一個key是事件類型,value是定訂閱這個事件類型的訂閱者集合的一個map Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { // 獲取事件類型 Class<?> eventType = entry.getKey(); // 獲取這個事件類型的訂閱者集合 Collection<Subscriber> eventMethodsInListener = entry.getValue(); // 從緩存中按事件類型查找訂閱者集合 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { // 從緩存中取不到,更新緩存 CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
事件和訂閱事件的訂閱者集合是在com.google.common.eventbus.SubscriberRegistry這裏維護的
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();
到這裏,訂閱者已經準備好了,準備接受事件了。
發佈事件源碼實現
com.google.common.eventbus.EventBus#post
public void post(Object event) { // 獲取事件的訂閱者集合 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { // 轉發事件 dispatcher.dispatch(event, eventSubscribers); // 若是不是死亡事件,從新包裝成死亡事件從新發布 } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
Iterator<Subscriber> getSubscribers(Object event) { // 獲取事件類型類的超類集合 ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass()); List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size()); for (Class<?> eventType : eventTypes) { // 獲取事件類型的訂閱者集合 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers != null) { // eager no-copy snapshot subscriberIterators.add(eventSubscribers.iterator()); } } return Iterators.concat(subscriberIterators.iterator()); }
事件轉發器有三種實現
第一種是當即轉發,實時性比較高,其餘兩種都是隊列實現。
執行訂閱方法都是異步實現
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
說到最後
本次源碼解析到這裏,僅供參考。