基於版本:Guava 22.0git
Wiki:EventBusgithub
0. EventBus簡介安全
提供了發佈-訂閱模型,能夠方便的在EventBus上註冊訂閱者,發佈者能夠簡單的將事件傳遞給EventBus,EventBus會自動將事件傳遞給相關聯的訂閱者。併發
支持同步/異步模式。異步
只能用於線程間通訊。ide
1. EventBus類圖工具
EventBus是同步實現oop
AsyncEventBus是異步實現post
2. 代碼實例學習
EventBus eventBus = new EventBus(); eventBus.register(new Object() { @Subscribe public void listen(Object subReport) throws Exception { System.out.println("receive object event!"); } @Subscribe public void listen(Integer subReport) throws Exception { System.out.println("receive integer event!"); } }); eventBus.post(Integer.valueOf(1));
這段代碼的輸出以下:
receive integer event!
receive object event!
能夠看到咱們聲明瞭一個EventBus,而後向其註冊了兩個訂閱者(含有Subscribe註解的方法),而後調用post方法向EventBus發佈了一條消息。
消息類型是Integer,因爲訂閱者的關注對象是Integer與Object,都與這條消息有關,因此兩個訂閱者都收到了通知。
可是這個發佈-訂閱模式是如何實現的呢?咱們下面來逐步分析一下EventBus的源碼。
3. EventBus.register()
EventBus.register /** * Registers all subscriber methods on {@code object} to receive events. * * @param object object whose subscriber methods should be registered. */ public void register(Object object) { subscribers.register(object); } SubscriberRegistry.register /** * Registers all subscriber methods on the given listener object. */ void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);//獲取傳入的listener中含有的全部的訂閱者 for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey();//訂閱的事件類型 Collection<Subscriber> eventMethodsInListener = entry.getValue();//對應的訂閱者自己 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);//線程安全的修改subscribers對象 } eventSubscribers.addAll(eventMethodsInListener); } } /** * Returns all subscribers for the given listener grouped by the type of event they subscribe to. 分析傳入的對象,遍歷其中全部含有Subscribe註解並且只含有一個參數的方法,而後將其包裝成訂閱者並返回。 */ private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) { Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create(); Class<?> clazz = listener.getClass(); for (Method method : getAnnotatedMethods(clazz)) {//getAnnotatedMethods方法最終會調用到下面的SubscriberRegistry.getAnnotatedMethodsNotCached方法,這個方法會用反射處理傳入的clazz及其全部的父類,提取出含有Subscribe註解而且有且只有一個參數的方法 Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0];//獲取這個方法的惟一參數的Class methodsInListener.put(eventType, Subscriber.create(bus, listener, method));//將EventBus,訂閱者對象,訂閱者方法包裝一下並放入map,後續觸發事件的時候會用到 } return methodsInListener; } SubscriberRegistry.getAnnotatedMethodsNotCached private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();//連clazz的父類也會處理 Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); for (Class<?> supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {//含有Subscribe註解,並且不是合成的方法 // TODO(cgdecker): Should check for a generic parameter type and error out Class<?>[] parameterTypes = method.getParameterTypes(); checkArgument( parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length);//方法必須有且只有一個參數 MethodIdentifier ident = new MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); }
從SubscriberRegistry.getAnnotatedMethods到SubscriberRegistry.getAnnotatedMethodsNotCached的跳轉會涉及到Guava的LoadingCache,其餘的邏輯都很是直觀。
register方法的大概流程以下:
a. 解析傳入的Object對象,用反射分析對應的class及其全部父類,找到全部用Subscribe註解修飾,並且只有一個參數的方法。由這個方法能夠生成訂閱者對象。
b. 解析這個方法的參數(訂閱者訂閱的事件類型)
c. 每一個EventBus在初始化時都會與一個SubscriberRegistry關聯,這個SubscriberRegistry內部維護了一個名爲subscribers的ConcurrentMap,這個ConcurrentMap的 key是EventBus關心的事件類型,value是訂閱了這些事件的訂閱者的集合,subscribers在後續發佈事件的流程中很是有用(能夠經過發佈的事件類型,找到這個事件所關聯的全部訂閱者,並通知這些相關的訂閱者)。如今用a,b中的信息來更新subscribers,因爲subscribers可能被併發操做,因此用到了ConcurrentMap.putIfAbsent方法以保證線程安全。
總之,調用register方法會更新subscribers,subscribers中含有事件與訂閱者的關聯關係。
4. EventBus.post()
EventBus.post public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);//獲取發佈事件所關聯的全部訂閱者(包括event的全部父類/接口所關聯的訂閱者) if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers);//用分派器觸發全部訂閱者的訂閱方法,分派器有同步/異步的不一樣實現 } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event));//若是這個事件沒有訂閱者,則將這個事件包裝爲DeadEvent事件,而後從新觸發post方法 } } SubscriberRegistry.getSubscribers /** * Gets an iterator representing an immutable snapshot of all subscribers to the given event at * the time this method is called. */ Iterator<Subscriber> getSubscribers(Object event) { ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());//flattenHierarchy方法會解析出event的class,及其全部父類/接口的class List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size()); for (Class<?> eventType : eventTypes) {//遍歷event關聯的全部class,找到這些class關聯的全部訂閱者,添加到subscriberIterators中 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers != null) { // eager no-copy snapshot subscriberIterators.add(eventSubscribers.iterator()); } } return Iterators.concat(subscriberIterators.iterator()); } SubscriberRegistry.flattenHierarchy /** * Flattens a class's type hierarchy into a set of {@code Class} objects including all * superclasses (transitively) and all interfaces implemented by these superclasses. */ @VisibleForTesting static ImmutableSet<Class<?>> flattenHierarchy(Class<?> concreteClass) { try { return flattenHierarchyCache.getUnchecked(concreteClass);//這裏會用Guava的TypeToken工具解析出concreteClass的class,以及全部父類/接口的class並返回 } catch (UncheckedExecutionException e) { throw Throwables.propagate(e.getCause()); } }
post方法的大概流程以下:
a. 用Guava的TypeToken工具解析Event的class,以及Event的全部父類/接口的class,全部訂閱了這些class的訂閱者後續都會收到通知
b. 調用EventBus關聯的Dispatcher的dispatcher方法,將Event發佈給全部關聯的訂閱者。
下一步咱們會解析Dispatcher,以及Dispatcher是如何實現EventBus的同步/異步語義的。
5. EventBus的默認Dispatcher
EventBus /** * Creates a new EventBus named "default". */ public EventBus() { this("default"); } /** * Creates a new EventBus with the given {@code identifier}. * * @param identifier a brief name for this bus, for logging purposes. Should be a valid Java * identifier. */ public EventBus(String identifier) { this( identifier, MoreExecutors.directExecutor(),//由當前線程直接運行提交任務的「線程池」 Dispatcher.perThreadDispatchQueue(), LoggingHandler.INSTANCE); } Dispatcher.perThreadDispatchQueue() static Dispatcher perThreadDispatchQueue() { return new PerThreadQueuedDispatcher(); } Dispatcher.PerThreadQueuedDispatcher /** * Implementation of a {@link #perThreadDispatchQueue()} dispatcher. */ private static final class PerThreadQueuedDispatcher extends Dispatcher { // This dispatcher matches the original dispatch behavior of EventBus. /** * Per-thread queue of events to dispatch. */ private final ThreadLocal<Queue<Event>> queue = new ThreadLocal<Queue<Event>>() { @Override protected Queue<Event> initialValue() { return Queues.newArrayDeque(); } };//爲每一個調用post方法的工做線程維護一個發佈隊列,工做線程獨立操做這個隊列完成時間發佈流程,因此是線程安全的 /** * Per-thread dispatch state, used to avoid reentrant event dispatching. */ private final ThreadLocal<Boolean> dispatching = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { return false; } }; @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers));//將發佈事件+事件所關聯的全部訂閱者包裝成一個Event對象,並提交至發佈隊列 if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) {//嘗試從發佈隊列中poll元素 while (nextEvent.subscribers.hasNext()) {//遍歷發佈事件的全部訂閱者,向訂閱者派發事件,因爲EventBus默認使用的線程池是MoreExecutors.directExecutor(),因此實際上發佈者會串行並且同步的向事件的全部訂閱者派發事件,直到所有派發結束,post方法纔會返回,因此EventBus在默認狀況下是同步的。 nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } } private static final class Event { private final Object event; private final Iterator<Subscriber> subscribers; private Event(Object event, Iterator<Subscriber> subscribers) { this.event = event; this.subscribers = subscribers; } } } Subscriber.dispatchEvent /** * Dispatches {@code event} to this subscriber using the proper executor. */ final void dispatchEvent(final Object event) {//向訂閱者派發事件 executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event);//用反射調用訂閱的方法 } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); } Subscriber.invokeSubscriberMethod /** * Invokes the subscriber method. This method can be overridden to make the invocation * synchronized. */ @VisibleForTesting void invokeSubscriberMethod(Object event) throws InvocationTargetException { try { method.invoke(target, checkNotNull(event)); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } }
同步語義的實現關鍵在於
a. 使用的線程池爲MoreExecutors.directExecutor(),這實際上不能算是一個真正的線程池,全部提交到這個池子裏的任務,都是由提交任務的線程自身來執行的。
b. EventBus爲每一個調用post方法的線程維護了一個發佈隊列,工做線程會將事件提交到這個私有隊列裏,而後逐個通知事件所關聯的訂閱者,因爲使用的線程池是MoreExecutors.directExecutor(),因此這個過程其實是徹底串行並且同步執行的,調用post方法的工做線程實際上會在將事件通知給全部訂閱者後纔會返回,從而實現了同步的EventBus語義。
6. AsyncEventBus的Dispatcher
AsyncEventBus public AsyncEventBus(Executor executor) { super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); Dispatcher.legacyAsync() static Dispatcher legacyAsync() { return new LegacyAsyncDispatcher(); } Dispatcher.LegacyAsyncDispatcher private static final class LegacyAsyncDispatcher extends Dispatcher { // This dispatcher matches the original dispatch behavior of AsyncEventBus. // // We can't really make any guarantees about the overall dispatch order for this dispatcher in // a multithreaded environment for a couple reasons: // // 1. Subscribers to events posted on different threads can be interleaved with each other // freely. (A event on one thread, B event on another could yield any of // [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.) // 2. It's possible for subscribers to actually be dispatched to in a different order than they // were added to the queue. It's easily possible for one thread to take the head of the // queue, immediately followed by another thread taking the next element in the queue. That // second thread can then dispatch to the subscriber it took before the first thread does. // // All this makes me really wonder if there's any value in queueing here at all. A dispatcher // that simply loops through the subscribers and dispatches the event to each would actually // probably provide a stronger order guarantee, though that order would obviously be different // in some cases. /** * Global event queue. */ private final ConcurrentLinkedQueue<EventWithSubscriber> queue = Queues.newConcurrentLinkedQueue();//公用的,支持併發訪問的發佈隊列 @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) {//將Event+訂閱者包裝成EventWithSubscriber,放入發佈隊列中 queue.add(new EventWithSubscriber(event, subscribers.next())); } EventWithSubscriber e; while ((e = queue.poll()) != null) { e.subscriber.dispatchEvent(e.event);//從發佈隊列中提取EventWithSubscriber,而後將事件發佈給訂閱者,這一過程是發送到AsyncEventBus初始化時提供的線程池裏執行的,因此是異步的。也就是說post方法返回的時候,可能發佈過程還沒有完成,還有訂閱者沒有收到消息。 } } private static final class EventWithSubscriber { private final Object event; private final Subscriber subscriber; private EventWithSubscriber(Object event, Subscriber subscriber) { this.event = event; this.subscriber = subscriber; } } }
異步語義實現的關鍵在於:
a. 發佈事件使用的線程池是建立AsyncEventBus時傳入的線程池,通常來講都是正常的含有多個工做線程的線程池,提交到線程池裏的任務能夠被異步執行
b. LegacyAsyncDispatcher中維護了一個公用的ConcurrentLinkedQueue,這個AsyncEventBus收到的全部Event都會被put到這個隊列中,put完Event的線程同時也會從這個隊列中poll Event,而後submit這些Event到線程池中,若是這個線程池是普通的含有工做線程的線程池,那麼submit完Event以後post方法便可當即返回,傳遞Event給訂閱者的任務會由線程池裏的工做線程完成。這樣就實現了異步語義。
總的來講,EventBus的源碼仍是比較清晰易懂的,實現手法也很是優雅,值得咱們學習。