總線(Bus)通常指計算機各類功能部件之間傳送信息的公共通訊幹線,而EventBus則是事件源(publisher)向訂閱方(subscriber)發送訂閱事件的總線,它解耦了觀察者模式中訂閱方和事件源之間的強依賴關係。
圖片來源:git
下面以guava 19版本的EventBus的源碼進行分析。EventBus有三個操做:註冊Listener--register(Object Listener),註銷Listener--unregister(Object Listener),發佈Event--post(Object event)。在19版本以前,listener的註冊和註銷,事件的發佈的工做都是由EventBus完成,在18版本以後,EventBus把Listener的註冊和註銷的工做委託給SubscriberRegistry, 把事件發佈的工做委託給Dispatcher來完成,這樣的修改職責分明,代碼結構更加清晰了。在併發方面也有大的修改,之前對維護某類事件和其感興趣的Subscriber的操做用了讀寫鎖來控制對容器的操做,19版本及以後用了併發容器ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>>避免了對鎖的使用。
在觀察者模式中,事件源中會維護一個Listener的列表,並且向這個事件源註冊的Listener通常只會收到一類事件的通知,若是Listener對多個不一樣類的事件感興趣,則須要向多個事件源註冊。EventBus是怎樣實現Listener一次註冊,可以知道Listener對那些事件感興趣的,進而在有某類事件發生時通知到Listener的呢?答案在SubscriberRegistry這個類中。在SubscriberRegister有一個實例屬性ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers,它維護了某個事件類型和對其感興趣的Subscriber的列表。
register(Object listener)的工做就是找出這個Listener對哪些事件感興趣,而後把這種事件類型和該Listener構建成的Subscriber加到subscribers中。unregister的過程和register相似,只從subscribers刪掉Listener感興趣的事件。下面咱們分別看看18版本和19版本register,主要的區別就是一個是加鎖的版本,一個用的是併發容器。
18版本:github
/** * Registers all subscriber methods on {@code object} to receive events. * Subscriber methods are selected and classified using this EventBus's * {@link SubscriberFindingStrategy}; the default strategy is the * {@link AnnotatedSubscriberFinder}. * * @param object object whose subscriber methods should be registered. */ public void register(Object object) { Multimap<Class<?>, EventSubscriber> methodsInListener = finder.findAllSubscribers(object); subscribersByTypeLock.writeLock().lock(); try { subscribersByType.putAll(methodsInListener); } finally { subscribersByTypeLock.writeLock().unlock(); } }
19版本:緩存
/** * Registers all subscriber methods on the given listener object. */ void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); eventSubscribers = MoreObjects.firstNonNull( subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
SubscriberRegister經過reflection找出該Listener對象(包括其父類)哪些Method用@Subscriber註解了,用@Subscriber註解的方法表示當某件事件發生時,但願收到事件通知。在@Subscriber註解的方法中只能包含一個參數那就是Event,不然會出錯。在reflection的時候用LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache緩存了該Listener Class和對應的Method,加快了後面對同一類Listener進行register的效率。用MethodIdentifier做爲Map的key來判別Method的是否相等。
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); for (Class<?> supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { // TODO(cgdecker): Should check for a generic parameter type and error out Class<?>[] parameterTypes = method.getParameterTypes(); checkArgument(parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length); MethodIdentifier ident = new MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); }
在構建Subscriber的時候根據方法是否有@AllowConcurrentEvents,分爲同步和併發執行method。併發
Dispatcher發佈事件的時候有三種模式: 1. ImmediateDispatcher,來了一個事件則通知對這個事件感興趣的訂閱者。 2. LegacyAsyncDispatcher,會有一個全局的隊列ConcurrentLinkedQueue<EventWithSubscriber> queue保存EventWithSubscriber(事件和subscriber),若是被不一樣的線程poll 不能保證在queue隊列中的event是有序發佈的。 3. PerThreadQueuedDispatcher,在同一個線程post的Event,執行的順序是有序的。用ThreadLocal<Queue<Event>> queue來實現每一個線程post的Event是有序的,在把事件添加到queue後會有一個ThreadLocal<Boolean> dispatching來判斷當前線程是否正在分發,若是正在分發,則此次添加的event不會立刻進行分發而是等到dispatching的值爲false才進行。這樣作的緣由是爲了防止同一個事件被重複分發。個人理解是這樣的:若是沒有dispatching這個狀態變量,在ThreadA中EventA發佈了,ListenerA收到了,ListenerA進行處理,在處理的過程當中若是又觸發了EventA的發佈,若是該線程不結束則會陷入到一種循環中去。
通常的應用的場景就是在用觀察者模式的地方就能夠用EventBus進行替代。結合Spring的使用過程以下:ide