事件的發表主要是調用EvenBus類的post方法,先來看下這個方法的實現
java
public void post(Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); if (postingState.isPosting) { return; } else { postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper(); postingState.isPosting = true; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting = false; postingState.isMainThread = false; } } }
currentPostingThreadState 是ThreadLocal類,用於同一個線程的數據的共享。當發佈事件時,會先把事件存儲在eventQueue事件隊列中。若是當前正在發佈事件,則不須要本身處理,由其餘程序來發布。併發
try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting = false; postingState.isMainThread = false; }
經過不斷的循環從隊列中獲取每一個事件,調用postSingleEvent發送單個事件。再來看下postSingleEvent方法異步
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<? extends Object> eventClass = event.getClass(); List<Class<?>> eventTypes = findEventTypes(eventClass); boolean subscriptionFound = false; int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(clazz); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } subscriptionFound = true; } } if (!subscriptionFound) { Log.d(TAG, "No subscribers registered for event " + eventClass); if (eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } }
在subscriptionsByEventType中獲取監聽該事件的訂閱者列表,在發送給具體訂閱者時記錄事件和訂閱者到postingThreadState中,在經過postToSubscription方法通知到具體的訂閱者。
async
在來看下postToSubscription的實現函數
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case PostThread: invokeSubscriber(subscription, event); break; case MainThread: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case BackgroundThread: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case Async: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
判斷訂閱者的類型判斷是PostThread、MainThread、BackgroundThread、仍是Async
oop
類型爲PostThread時(訂閱者將在發起事件的同一個線程中執)直接調用invokeSubscriber方法
post
void invokeSubscriber(Subscription subscription, Object event) throws Error { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { Throwable cause = e.getCause(); if (event instanceof SubscriberExceptionEvent) { // Don't send another SubscriberExceptionEvent to avoid infinite event recursion, just log Log.e(TAG, "SubscriberExceptionEvent subscriber " + subscription.subscriber.getClass() + " threw an exception", cause); SubscriberExceptionEvent exEvent = (SubscriberExceptionEvent) event; Log.e(TAG, "Initial event " + exEvent.causingEvent + " caused exception in " + exEvent.causingSubscriber, exEvent.throwable); } else { if (logSubscriberExceptions) { Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), cause); } SubscriberExceptionEvent exEvent = new SubscriberExceptionEvent(this, cause, event, subscription.subscriber); post(exEvent); } } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }
就直接利用反射調用訂閱者方法。
this
類型爲MainThread時(訂閱者將在主線程中執行),若是發起的線程是主線程,則直接調用invokeSubscriber方法。不然發送給mainThreadPoster。spa
mainThreadPoster就是一個Handler,裏面最主要的就是維護了一個隊列PendingPostQueue,這裏介紹一下EvenBus定義了兩個模型PendingPost和PendingPostQueue線程
PendingPost有一個靜態的列表pendingPostPoo,用於存儲PendingPost對象l,相似於線程池的做用,當事件發送完後,會把pendingPost的屬性制null,而pendingPost對象則存到pendingPostPool中,這樣下次要發送事件時就不須要再new一個pendingpost對象,而是pengdingPostPool中取出再附上事件和訂閱者。
PendingPostQueue,維護了一個鏈表,其中有兩個屬性 head 和tail 的pendingPost。
再回到mainThreadPoster的調用,把訂閱者和事件封裝成PendingPost對象,並加到PendingPostQueue隊列中,併發送message。mainThreadPoster接收到消息後會遍歷queue中的全部事件並調用EvenBus的invokeSubscriber方法通知給訂閱者。
類型爲BackgriundThread時(訂閱者將在一個後臺線程中執行,若是發送的線程不是主線程,則直接在發起時間的線程中執行,若是發送的線程是主線程,則會在惟一的一個後臺線程中按照順序來執行全部的後臺時間響應函數)
若是不是主線程則直接調用invokeSubscriber方法。若是是主線程發送給backgroundPoster。
backgroundPoster繼承於Runnable,EventBus定義的一個線程,把訂閱者和事件封裝成PendingPost對象後加入到PendingPostQueue中,再在線程中批量執行invokeSubscriber方法。
類型爲Async時(訂閱者將在另外一個異步線程中執行),發送給asyncPoster,asyncPoster與backgroundPoster基本同樣,也是繼承與Runnable,不過他每次只執行隊列中的一個事件而非批量執行。
EvenBus對於訂閱者能夠定義其類型是否爲sticky,若是是,則訂閱者在註冊的時候Evenbus就會發送給他上一次發送的事件。固然只有調用postSticky發送的事件纔會存儲其事件,用於發送給剛註冊的訂閱者。