開始研究源碼的設計思路,從
Listener
註冊出發,EventBus
如何維護監聽者信息,到Publisher
發送消息,消息以怎樣的渠道分發給全部的Listener
, 順序如何保證,傳遞性如何保證,出現異常如何處理,找不到監聽者怎麼處理等等html
EventBus
這個類至關於一箇中轉站,
Publisher
調用它的post(Object)
來推送事件;而後將事件一次推送給註冊的Listener
java
在初始化s時, EventBus
對象會維護一個 private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
實例, 這個就是維護訂閱關係的核心類web
註冊方法以下spring
/** * Registers all subscriber methods on {@code object} to receive events. * * @param object object whose subscriber methods should be registered. */ public void register(Object object) { subscribers.register(object); }
接着咱們看下這個類的具體實現緩存
SubscriberRegistry.java
數據結構
/** * All registered subscribers, indexed by event type. * * <p>The {@link CopyOnWriteArraySet} values make it easy and relatively lightweight to get an * immutable snapshot of all current subscribers to an event without any locking. */ private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap(); /** * Registers all subscriber methods on the given listener object. */ void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); eventSubscribers = MoreObjects.firstNonNull( subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
subscribers : - 對象初始化時建立 - 維護的是EventType
-> Listener
的映射關係,value爲一個集合,說明一個事件能夠推送給多個Listener
- 監聽者,能夠有能夠監聽多個不一樣類型的事件app
註冊流程: - 根據註冊的對象,將其中全部的回調方法都撈出來 - 將上步的結果塞入 subscribers
集合中; key爲 Listener
的類名less
註冊目的就是發佈消息後,
EventBus
能夠將這個Event
傳遞」Listener
(即訂閱方)ide
爲了實現上面的目的,若是要咱們本身實現,會怎麼作?post
@Subscribe
註解的方法撈出來Event
, 由於註冊的目的是爲了實現回調, 因此封裝一個類,包含這個Listener
對象的引用 + 要執行的方法上面註冊的實際實現和上面的步驟差很少
獲取全部包含註解的方法
實際的代碼以下
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); for (Class<?> supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { // TODO(cgdecker): Should check for a generic parameter type and error out Class<?>[] parameterTypes = method.getParameterTypes(); checkArgument(parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length); MethodIdentifier ident = new MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); }
看下上面的實現,很是有意思的是,不只將改對象中的全部@Subscribe
註解的方法撈出來,連父類中的也不放過;就是這個 TypeToken.of(clazz).getTypes().rawTypes();
從上面的限定,也能夠看出,對於回調方法是有要求的: 有且只能有一個參數, checkArgument(parameterTypes.length == 1,xxx)
過濾重載的回調方法(這點比較有意思,搞了個Map, key由方法名+方法參數確認(MethodIdentifier
的equals方法重寫了), 而不是直接用集合的contains
方法, 請注意其中的區別)
method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()
這個判斷條件的後一個能夠參考http://www.xue163.com/2122/1/21224778.html
將上面的方法轉換爲Map, 看這個 Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
Event.class
; value爲一個包含 Listener
, Method
, EventBus
實例的對象 Subscriber
將上面的map塞入subscribers
集合
subscribers
集合包含的是全部的 (事件 -> 監聽者回調集合)Event
-> Set<Listener.Method>
subscribers
的數據結構,其實能夠看到,一個Listener
對象,若是註冊屢次,最終的效果實際上是同樣的,這個監聽者,並不會被調用屢次; 若是一個Lisntener
類,有多個對象,則註冊後,每一個對象的回調都會執行到;
Subscriber
對象Subscriber
的 hashcode & equals 方法沒有重寫到此, 註冊完畢;註銷的方法和上面差很少,惟一的區別是最後一個是向 subscribers
塞數據,一個是從其中刪數據而已
題外話
若是咱們想獲取工程中全部包含某個註解的類能夠怎麼辦? - 若是是用spring的話, 能夠考慮 `ApplicationContext.getBeansWithAnnotation()` 獲取工程中,全部包含某個註解的方法,除了上面的主動註冊,有什麼其餘的方法?
###2. 推送事件
發佈方,調用
EventBus.post(Object)
方法實現消息的推送
正式開始以前,咱們能夠先預測一下,當發佈方調用了這個方法以後,會執行那些action
subscribers
中獲取出全部的監聽者,以及對應的回調方法, 放在一個集合中上面是正向的操做流程,接着一些異常狀況和邊界也須要考慮下
帶着上面的臆測,來實際看下EventBus
本身是怎麼玩的
/** * 將事件推送給全部的監聽者,無論監聽者是否正常處理,都是正確返回 * Posts an event to all registered subscribers. This method will return * successfully after the event has been posted to all subscribers, and * regardless of any exceptions thrown by subscribers. * * 若是一個事件沒有監聽者,且該事件不是 DeadEvent, 則轉爲 DeadEvent並從新推送 * <p>If no subscribers have been subscribed for {@code event}'s class, and * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a * DeadEvent and reposted. * * @param event event to post. */ public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
上面的解釋比較清楚, 基本上核心的推送就是 dispatcher.dispatch(event, eventSubscribers);
實際的使用的是 PerThreadQueuedDispatcher
推送代碼以下,邏輯比較清晰,將Event
塞入隊列, 而後將隊列中的全部消息依次推送給全部的訂閱者
/** * Per-thread queue of events to dispatch. */ private final ThreadLocal<Queue<Event>> queue = new ThreadLocal<Queue<Event>>() { @Override protected Queue<Event> initialValue() { return Queues.newArrayDeque(); } }; /** * Per-thread dispatch state, used to avoid reentrant event dispatching. */ private final ThreadLocal<Boolean> dispatching = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { return false; } }; @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers)); if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } }
最終真正執行推送Event
的是這個方法 com.google.common.eventbus.Subscriber#dispatchEvent
/** * Dispatches {@code event} to this subscriber using the proper executor. */ final void dispatchEvent(final Object event) { executor.execute(new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); } /** * Invokes the subscriber method. This method can be overridden to make the invocation * synchronized. */ @VisibleForTesting void invokeSubscriberMethod(Object event) throws InvocationTargetException { try { method.invoke(target, checkNotNull(event)); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } }
上面從源碼的角度,對整個流程順了一遍,下面的圖對幾個主要的類結構進行了抽取,並對上面的幾個方法進行了簡要的說明
圖一, 將上面說明的幾個類屬性 + 方法進行了說明
圖二, 對邏輯進行列舉
1.根據class,獲取全部超類集合 (EventBus的實際使用中,Event的超類集合都塞入了緩存,加快查詢速度)
TypeToken.of(concreteClass).getTypes().rawTypes());
2.獲取類中標有註解的方法
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz, Class annotationClz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); for (Class<?> supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { if (method.isAnnotationPresent(annotationClz) && !method.isSynthetic()) { // TODO(cgdecker): Should check for a generic parameter type and error out Class<?>[] parameterTypes = method.getParameterTypes(); MethodIdentifier ident = new MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); } private static final class MethodIdentifier { private final String name; private final List<Class<?>> parameterTypes; MethodIdentifier(Method method) { this.name = method.getName(); this.parameterTypes = Arrays.asList(method.getParameterTypes()); } @Override public int hashCode() { return Objects.hashCode(name, parameterTypes); } @Override public boolean equals(@Nullable Object o) { if (o instanceof MethodIdentifier) { MethodIdentifier ident = (MethodIdentifier) o; return name.equals(ident.name) && parameterTypes.equals(ident.parameterTypes); } return false; } }