發佈消息的業務方沒有限制,任何人,能夠在任何地方,任什麼時候間推送一條消息(或者說觸發一個自定義事件)java
代碼一覽異步
/** Posts the given event to the event bus. */ public void post(Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); if (!postingState.isPosting) { // 標記,發送中時,就拒絕掉再次發的請求 postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper(); postingState.isPosting = true; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting = false; postingState.isMainThread = false; } } }
依賴的消息推送代碼以下async
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false; if (eventInheritance) { List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { Log.d(TAG, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } }
這個表示訂閱者監聽的事件類(即訂閱者方法的參數類型)與事件類徹底一致時纔會接受請求ide
徹底匹配事件類,而後執行下面的邏輯oop
NoSubscriberEvent
事件出來Method.invoke()
便可private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { // 獲取事件類對應的全部訂閱者信息 subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; } private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { // xxx invokeSubscriber(subscription, event); // xxx } void invokeSubscriber(Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }
看到上面的執行,同步執行回調,沒有采用Executor
,沒有使用線程池post
這個表示,訂閱者監聽的事件類只要是發送事件的超類or該類,就能夠接受請求,如你監聽一個Object的事件,則全部的消息推送,這種場景下你均可以接收學習
相比上面,多了一步就是獲取Event的全部超類丟入集合,遍歷這個集合,獲取全部類對應的訂閱者信息,執行回調方法便可測試
獲取超類的方法, 再第三篇小結中,咱們也說到了如何獲取超類,Guava是使用內部封裝的TypeToken.of(concreteClass).getTypes().rawTypes());
, 下面的使用則是以前提到的 clazz.getSuperclass()
, clazz.getInterfaces()
, 後面這個也是咱們最多見,也最容易想到的方法this
/** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */ private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List<Class<?>> eventTypes = eventTypesCache.get(eventClass); if (eventTypes == null) { eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null) { eventTypes.add(clazz); addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } }
上面貼出的代碼是實際的執行者,但在具體的執行者以前,是有一個方法,內部選擇不一樣的使用姿式來發消息以下線程
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case ASYNC: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
以 backgroundPoster.enqueue
做爲測試研究目標,其餘幾個設計思路沒什麼兩樣, 這個方法內部是
private final PendingPostQueue queue; public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; eventBus.getExecutorService().execute(this); } } } @Override public void run() { try { try { while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { Log.w("Event", Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } }
首先是獲取 PendingPost
對象, 這個就是表示準備發佈的消息,塞入隊列,讓後把本類丟入 EventBus
的線程池來執行
上面的設計思路,一個是不一樣的類型,選擇不一樣消息發送機制,這個和簡單工程模式特別類似,你制定一些發送消息的規則,根據你的須要來選擇具體的規則來執行;