本文主要按照以下例子展開:安全
//1. 新建bus對象,默認僅能在主線程上對消息進行調度 Bus bus = new Bus(); // maybe singleton //2. 新建類A(subscriber),answerAvailable()方法爲事件回調,在主線程上運行 class A { public A() { bus.register(this); } // 可見性爲public,僅有一個Event類型的參數 @Subscribe public void answerAvailable(AnswerAvailableEvent event) { // process event } } //3. 往bus投遞事件 bus.post(new AnswerAvailableEvent(42)); //4. 若是要A在註冊時立刻接收到一次回調,則能夠新建類B(Producer),produceAnswer() // 方法會在註冊subscriber時,對每一個訂閱了AnswerAvailableEvent方法發送事件 class B { public B() { bus.register(this); } //可見性爲public,不帶任何參數 @Produce public AnswerAvailableEvent produceAnswer() { return new AnswerAvailableEvent(); } }
首先來看看Bus bus = new Bus()
這一句,對應的源碼以下所示:app
public Bus() { this(DEFAULT_IDENTIFIER); } public Bus(String identifier) { this(ThreadEnforcer.MAIN, identifier); } public Bus(ThreadEnforcer enforcer, String identifier) { this(enforcer, identifier, HandlerFinder.ANNOTATED); } Bus(ThreadEnforcer enforcer, String identifier, HandlerFinder handlerFinder) { this.enforcer = enforcer; this.identifier = identifier; this.handlerFinder = handlerFinder; }
默認參數爲enforcer = ThreadEnforcer.MAIN,identifier = DEFAULT_IDENTIFIER,handlerFinder = HandlerFinder.ANNOTATED。咱們來看看這些參數是什麼意思。ide
ThreadEnforcer是一個接口,enforce()方法用於檢查當前線程是否爲指定的線程類型:函數
public interface ThreadEnforcer { ThreadEnforcer ANY = new ThreadEnforcer() { @Override public void enforce(Bus bus) { // Allow any thread. } }; ThreadEnforcer MAIN = new ThreadEnforcer() { @Override public void enforce(Bus bus) { if (Looper.myLooper() != Looper.getMainLooper()) { throw new IllegalStateException("Event bus " + bus + " accessed from non-main thread " + Looper.myLooper()); } } }; void enforce(Bus bus); }
不帶參數的構造函數bus()使用默認的ThreadEnforcer.MAIN,表示enforce()方法必須在主線程上執行。oop
identifier僅爲bus的名字,debug用。post
HandlerFinder用於在註冊/反註冊的時候查找Subscriber和Producer,後文會對其展開源碼級別的解析。不帶參數的構造函數bus()使用默認的HandlerFinder.ANNOTATED,表示使用註解來進行查找。this
除上述之外,bus類還有兩個成員變量handlersByType和producersByType:spa
private final ConcurrentMap<Class<?>, Set<EventHandler>> handlersByType = new ConcurrentHashMap<Class<?>, Set<EventHandler>>(); private final ConcurrentMap<Class<?>, EventProducer> producersByType = new ConcurrentHashMap<Class<?>, EventProducer>();
分別用於經過event的類型(class類型)來查找event handle和event producer。線程
以下所示,要A成爲訂閱者訂閱AnswerAvailableEvent,只需將其註冊到bus,而後使用@Subscribe註解標記回調方法便可。回調方法要求可見性爲public,有且僅有一個參數,類型爲訂閱的event。debug
class A { public A() { bus.register(this); } @Subscribe public void answerAvailable(AnswerAvailableEvent event) { // process event } }
首先看一下@Subscribe註解:
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface Subscribe { }
RetentionPolicy.RUNTIME表示它是運行時的註解,ElementType.METHOD表示用於註解方法。
再看一下register流程:
public void register(Object object) { if (object == null) { throw new NullPointerException("Object to register must not be null."); } //1. 檢查當前線程是否符合ThreadEnforcer的設置 enforcer.enforce(this); //2. 默認狀況下,經過註解在object上找出全部Producer Map<Class<?>, EventProducer> foundProducers = handlerFinder.findAllProducers(object); for (Class<?> type : foundProducers.keySet()) { //2-1. 查一下object上的producer註冊的event是否已經被別人註冊過。 final EventProducer producer = foundProducers.get(type); EventProducer previousProducer = producersByType.putIfAbsent(type, producer); //checking if the previous producer existed if (previousProducer != null) { throw new IllegalArgumentException("Producer method for type " + type + " found on type " + producer.target.getClass() + ", but already registered by type " + previousProducer.target.getClass() + "."); } //2-2. 若是沒有註冊過,那麼找出對應event的handler,觸發一次回調。 Set<EventHandler> handlers = handlersByType.get(type); if (handlers != null && !handlers.isEmpty()) { for (EventHandler handler : handlers) { dispatchProducerResultToHandler(handler, producer); } } } //3. 找出object上用@Subscribe註解了的方法 Map<Class<?>, Set<EventHandler>> foundHandlersMap = handlerFinder.findAllSubscribers(object); for (Class<?> type : foundHandlersMap.keySet()) { Set<EventHandler> handlers = handlersByType.get(type); if (handlers == null) { //3-1. 該event是第一次註冊,那麼新建一個CopyOnWriteArraySet用來保存handler和event的對應關係(EventHandler) //concurrent put if absent Set<EventHandler> handlersCreation = new CopyOnWriteArraySet<EventHandler>(); handlers = handlersByType.putIfAbsent(type, handlersCreation); if (handlers == null) { handlers = handlersCreation; } } //3-2. 保存object中新增的event-handler對應關係。 final Set<EventHandler> foundHandlers = foundHandlersMap.get(type); if (!handlers.addAll(foundHandlers)) { throw new IllegalArgumentException("Object already registered."); } } //4. 檢查object上的event是否存在對應的Producer,有則觸發一次回調 for (Map.Entry<Class<?>, Set<EventHandler>> entry : foundHandlersMap.entrySet()) { Class<?> type = entry.getKey(); EventProducer producer = producersByType.get(type); if (producer != null && producer.isValid()) { Set<EventHandler> foundHandlers = entry.getValue(); for (EventHandler foundHandler : foundHandlers) { if (!producer.isValid()) { break; } if (foundHandler.isValid()) { dispatchProducerResultToHandler(foundHandler, producer); } } } } }
總的來講register作了三件事情:觸發新的Producer;註冊新的event-handler關係;觸發舊的Producer。另外有兩點要注意一下:
因爲在通常使用場景下,發送/處理event遠比註冊/反註冊操做頻繁,因此在保證線程安全的狀況下,使用CopyOnWriteArraySet做爲保存event和handler的容器,能夠大大提升效率。
CopyOnWrite容器在讀的時候不會加鎖,寫的時候先複製一份,寫完再替換原容器。若是容器正在寫操做時發生了讀操做(或者正在讀的時候發生了寫操做),讀操做的對象爲容器的快照(snapshot)。
因爲register方法沒有加鎖,因此在3-1中,儘管已經檢查了handlers是否存在,但仍需使用putIfAbsent來保存handler。
注意到bus經過HandlerFinder來查找object上的producer和subscriber,接下來看一下HandlerFinder的實現:
interface HandlerFinder { HandlerFinder ANNOTATED = new HandlerFinder() { @Override public Map<Class<?>, EventProducer> findAllProducers( Object listener) { return AnnotatedHandlerFinder.findAllProducers(listener); } @Override public Map<Class<?>, Set<EventHandler>> findAllSubscribers( Object listener) { return AnnotatedHandlerFinder.findAllSubscribers(listener); } }; Map<Class<?>, EventProducer> findAllProducers(Object listener); Map<Class<?>, Set<EventHandler>> findAllSubscribers(Object listener); }
其中findAllProducers方法返回某event type對應的EventProducer,findAllSubscribers返回某event type對應的EventHandler集合。先看一下EventProducer和EventHandler。
EventProducer是一個producer方法的包裝類,源碼以下:
class EventProducer { final Object target; private final Method method; private final int hashCode; private boolean valid = true; EventProducer(Object target, Method method) { if (target == null) { throw new NullPointerException( "EventProducer target cannot be null."); } if (method == null) { throw new NullPointerException( "EventProducer method cannot be null."); } this.target = target; this.method = method; method.setAccessible(true); // 提早計算hashcode,以防每次調用hash()時消耗資源 final int prime = 31; hashCode = ((prime + method.hashCode()) * prime) + target.hashCode(); } public boolean isValid() { return valid; } // 應在object unregister時調用 public void invalidate() { valid = false; } public Object produceEvent() throws InvocationTargetException { if (!valid) { throw new IllegalStateException(toString() + " has been invalidated and can no longer produce events."); } try { return method.invoke(target); } catch (IllegalAccessException e) { throw new AssertionError(e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } } }
其中produceEvent方法用於得到event。能夠看出爲何Otto要求produce函數不能有參數。
與EventProducer相似,EventHandler是一個event handler方法(事件回調)的包裝類,源碼以下:
class EventHandler { private final Object target; private final Method method; private final int hashCode; private boolean valid = true; EventHandler(Object target, Method method) { if (target == null) { throw new NullPointerException( "EventHandler target cannot be null."); } if (method == null) { throw new NullPointerException( "EventHandler method cannot be null."); } this.target = target; this.method = method; method.setAccessible(true); // Compute hash code eagerly since we know it will be used frequently and we cannot estimate the runtime of the // target's hashCode call. final int prime = 31; hashCode = ((prime + method.hashCode()) * prime) + target.hashCode(); } public boolean isValid() { return valid; } public void invalidate() { valid = false; } public void handleEvent(Object event) throws InvocationTargetException { if (!valid) { throw new IllegalStateException(toString() + " has been invalidated and can no longer handle events."); } try { method.invoke(target, event); } catch (IllegalAccessException e) { throw new AssertionError(e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } } }
其中handleEvent方法用於在object上調用handle方法(事件回調),傳入event對象。能夠看出爲何Otto要求event handler函數僅能有一個參數。
dispatchProducerResultToHandler方法用於將Producer產生的event分發給對應的handler。源碼以下所示:
private void dispatchProducerResultToHandler(EventHandler handler, EventProducer producer) { Object event = null; try { event = producer.produceEvent(); } catch(InvocationTargetException e) { throwRuntimeException("Producer " + producer + " threw an exception.", e); } if (event == null) { return; } dispatch(event, handler); } protected void dispatch(Object event, EventHandler wrapper) { try { wrapper.handleEvent(event); } catch(InvocationTargetException e) { throwRuntimeException("Could not dispatch event: " + event.getClass() + " to handler " + wrapper, e); } }
邏輯比較簡單,主要是使用了Producer的produceEvent()方法得到event對象後,調用EventHandler的handleEvent()方法。
Bus類的unregister方法用於解除目標對象和bus之間的關聯關係,包括對象上的producer方法,subscriber方法,源碼以下所示:
public void unregister(Object object) { if (object == null) { throw new NullPointerException("Object to unregister must not be null."); } //1. 檢查當前線程是否符合ThreadEnforcer的設置 enforcer.enforce(this); //2. 默認狀況下,經過註解在object上找出全部Producer,將其從producersByType中刪除並標記爲invalidate Map<Class<?>, EventProducer> producersInListener = handlerFinder.findAllProducers(object); for (Map.Entry<Class<?>, EventProducer> entry : producersInListener.entrySet()) { final Class<?> key = entry.getKey(); EventProducer producer = getProducerForEventType(key); EventProducer value = entry.getValue(); if (value == null || !value.equals(producer)) { throw new IllegalArgumentException( "Missing event producer for an annotated method. Is " + object.getClass() + " registered?"); } producersByType.remove(key).invalidate(); } //3. 默認狀況下,找出object上用@Subscribe註解了的handler,將其從event集合中刪除並標記爲invalidate Map<Class<?>, Set<EventHandler>> handlersInListener = handlerFinder.findAllSubscribers(object); for (Map.Entry<Class<?>, Set<EventHandler>> entry : handlersInListener.entrySet()) { Set<EventHandler> currentHandlers = getHandlersForEventType(entry.getKey()); Collection<EventHandler> eventMethodsInListener = entry.getValue(); if (currentHandlers == null || !currentHandlers.containsAll(eventMethodsInListener)) { throw new IllegalArgumentException( "Missing event handler for an annotated method. Is " + object.getClass() + " registered?"); } for (EventHandler handler : currentHandlers) { if (eventMethodsInListener.contains(handler)) { handler.invalidate(); } } currentHandlers.removeAll(eventMethodsInListener); } }
一次簡單的事件投遞操做以下所示:
bus.post(new AnswerAvailableEvent(42));
咱們來看一下post方法的源碼實現:
public void post(Object event) { if (event == null) { throw new NullPointerException("Event to post must not be null."); } //1. 檢查當前線程是否符合ThreadEnforcer的設置 enforcer.enforce(this); //2. 向上追溯event的全部父類 Set<Class<?>>dispatchTypes = flattenHierarchy(event.getClass()); //3. 當前event沒有註冊handler,則發送一個DeadEvent事件 boolean dispatched = false; for (Class<?>eventType: dispatchTypes) { Set<EventHandler> wrappers = getHandlersForEventType(eventType); if (wrappers != null && !wrappers.isEmpty()) { dispatched = true; for (EventHandler wrapper: wrappers) { //3-1 將事件和handler放到分發隊列裏 enqueueEvent(event, wrapper); } } } //4. 當前event沒有註冊handler,則發送一個DeadEvent事件 if (!dispatched && !(event instanceof DeadEvent)) { post(new DeadEvent(this, event)); } //5. 通知隊列進行分發操做 dispatchQueuedEvents(); }
注意幾點:
發送一個Event時,訂閱了Event父類的Subscriber方法也會被調用。
事件會被放到調用者所在線程的隊列裏依次分發。
下面分點進行詳述。
進行post操做時,首先會經過flattenHierarchy方法得到event的父類或者接口:
Set<Class<?>>flattenHierarchy(Class<?>concreteClass) { Set<Class<?>>classes = flattenHierarchyCache.get(concreteClass); if (classes == null) { Set<Class<?>>classesCreation = getClassesFor(concreteClass); classes = flattenHierarchyCache.putIfAbsent(concreteClass, classesCreation); if (classes == null) { classes = classesCreation; } } return classes; } private Set<Class<?>> getClassesFor(Class<?> concreteClass) { List<Class<?>> parents = new LinkedList<Class<?>>(); Set<Class<?>> classes = new HashSet<Class<?>>(); parents.add(concreteClass); //深度優先遍歷 while (!parents.isEmpty()) { Class<?> clazz = parents.remove(0); classes.add(clazz); Class<?> parent = clazz.getSuperclass(); if (parent != null) { parents.add(parent); } } return classes; }
從上可知flattenHierarchy()經過getClassesFor()利用深度優先遍歷導出了concreteClass的全部父類。
經過post方法投遞的event首先會放在當前線程所在的Dispatch Queue中,而後依次分發。Bus類有以下成員屬性:
private final ThreadLocal<ConcurrentLinkedQueue<EventWithHandler>> eventsToDispatch = new ThreadLocal<ConcurrentLinkedQueue<EventWithHandler>>() { @Override protected ConcurrentLinkedQueue<EventWithHandler> initialValue() { return new ConcurrentLinkedQueue<EventWithHandler>(); } };
eventsToDispatch是一個ThreadLocal對象,經過initialValue()方法,eventsToDispatch每次在新的線程上調用的時候都會生成新的ConcurrentLinkedQueue實例。event是經過enqueueEvent(event, wrapper)方法放到queue中的,下面看看enqueueEvent()的實現:
protected void enqueueEvent(Object event, EventHandler handler) { eventsToDispatch.get().offer(new EventWithHandler(event, handler)); }
offer()方法會會將EventWithHandler對象放到當前線程的queue的尾部。offer方法和add方法的區別在於,當沒法插入(例如空間不夠)的狀況發生時會發揮false,熱不是拋出異常。EventWithHandler類對event和handler的關係進行了簡單的包裝,實現以下:
static class EventWithHandler { final Object event; final EventHandler handler; public EventWithHandler(Object event, EventHandler handler) { this.event = event; this.handler = handler; } }
接下來看看dispatchQueuedEvents方法的實現:
protected void dispatchQueuedEvents() { // don't dispatch if we're already dispatching, that would allow reentrancy and out-of-order events. Instead, leave // the events to be dispatched after the in-progress dispatch is complete. //1. 不能重複分發,不然會致使event的分發次序混亂 if (isDispatching.get()) { return; } isDispatching.set(true); try { while (true) { //2. 依次取出EventWithHandler,並經過dispatch方法進行分發。 EventWithHandler eventWithHandler = eventsToDispatch.get().poll(); if (eventWithHandler == null) { break; } if (eventWithHandler.handler.isValid()) { dispatch(eventWithHandler.event, eventWithHandler.handler); } } } finally { isDispatching.set(false); } }
值得注意的是,全部subscribe方法拋出的異常都會在這裏捕獲,捕獲到異常之後event分發過程即中止,直到下一次在該線程上調用post爲止。
綜上,Otto的整體結構可用下圖表示:
+-------------------------+ |Bus(ThreadLocal) | | +--------------+ | | |EventProducers| | | | +-------+ | register +-------+ | | |Produce| <----+-------+Produce| | | +-------+ | | +-------+ | | +-------+ | | | | |Produce| | | | | +-------+ | | | +--------------+ | | | | | event | | | | post(event)| +-------v--------+ | +----------------> Dispatch Queue | | | +-------+--------+ | | | | | event | | | | | +------v------+ | | |EventHandlers| | | | +---------+ | | | | |Subscribe| | register +---------+ | | +---------+ <-----+-------+Subscribe| | | +---------+ | | +---------+ | | |Subscribe| | | | | +---------+ | | | +-------------+ | | | +-------------------------+