上一篇講述了
EventBus
的整個執行流程, 本片則從細節處出發,探討下設計的精妙java
看代碼時,能夠看到不少地方都用到了緩存,如再註冊時, 根據class獲取全部帶註解的方法; 推送消息時,根據事件類型,獲取全部的超類集合緩存
如註冊時,一條完整的調用鏈安全
com.google.common.eventbus.SubscriberRegistry#register -> com.google.common.eventbus.SubscriberRegistry#findAllSubscribers -> com.google.common.eventbus.SubscriberRegistry#getAnnotatedMethods -> subscriberMethodsCache.getUnchecked(clazz) -> com.google.common.eventbus.SubscriberRegistry#getAnnotatedMethodsNotCached
TypeToken.of(concreteClass).getTypes().rawTypes()); // 咱們本身的實現, 一直到返回null爲止 clz.getSuperClass().getSuperClass(); // 獲取接口 clz.getInterfaces()
異步推送處理Event和同步處理主要的區別點是使用的 Dispatcher不一樣, 同步是使用 PerThreadQueuedDispatcher
, 異步是 LegacyAsyncDispatcher
異步
異步的消息分發ide
/** * Global event queue. */ private final ConcurrentLinkedQueue<EventWithSubscriber> queue = Queues.newConcurrentLinkedQueue(); @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { queue.add(new EventWithSubscriber(event, subscribers.next())); } EventWithSubscriber e; while ((e = queue.poll()) != null) { e.subscriber.dispatchEvent(e.event); } } private static final class EventWithSubscriber { private final Object event; private final Subscriber subscriber; private EventWithSubscriber(Object event, Subscriber subscriber) { this.event = event; this.subscriber = subscriber; } } }
同步的消息推送學習
/** * Per-thread queue of events to dispatch. */ private final ThreadLocal<Queue<Event>> queue = new ThreadLocal<Queue<Event>>() { @Override protected Queue<Event> initialValue() { return Queues.newArrayDeque(); } }; @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers)); if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } } private static final class Event { private final Object event; private final Iterator<Subscriber> subscribers; private Event(Object event, Iterator<Subscriber> subscribers) { this.event = event; this.subscribers = subscribers; } } }
執行時, 在 AsyncEventBus
是在線程池中執行; 而 EventBus
則是直接執行, 實質上的執行器this
public static Executor directExecutor() { return DirectExecutor.INSTANCE; } /** See {@link #directExecutor} for behavioral notes. */ private enum DirectExecutor implements Executor { INSTANCE; @Override public void execute(Runnable command) { command.run(); } }
沒有訂閱者時, 拋一個 DeadEvent
google
訂閱者接收消息後的,執行異常時 (訂閱者之間的隔離)spa
final void dispatchEvent(final Object event) { executor.execute(new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
Guava的EventBus不支持定義訂閱者的順序,更談不上截斷線程