EventBus源碼分析

首先從訂閱開始java

EventBus.getDefault().register(this);
複製代碼

register方法會獲取傳入的object對象的class,經過findSubscriberMethods方法來查找這個class中訂閱的方法,以下緩存

public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }
複製代碼

findSubscriberMethods方法實現以下,其中有一個ConcurrentHashMap類型的靜態對象METHOD_CACHE,是用來緩存對應類的訂閱方法的,以便後續再次訂閱時不用從新去findMethods,能夠直接從緩存中讀取。bash

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }
複製代碼

查找訂閱方法經過ignoreGeneratedIndex字段分爲兩種方式app

第一種findUsingReflection是經過反射來查找,找到被@Subscribe註解修飾的方法,而且根據具體的註解以及方法參數生成一個SubscriberMethod對象:ssh

findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
複製代碼

第二種findUsingInfo是經過apt的方式,提早找到訂閱的方法,能夠避免經過反射查找方法帶來的耗時。async

具體使用方法:在gradle配置apt,rebuild項目,會生成一個註解方法索引類,在EventBusBuilder中經過addIndex方法新建一個該類的對象傳入便可。ide

這邊還有一個問題,對於子類重寫父類的訂閱方法如何處理。在上面的兩種方式中在查找完子類的訂閱方法後都會繼續去查找父類的訂閱方法,都經過一個叫作checkAdd的方法進行支撐,該方法返回true表示能夠添加到訂閱方法的集合中去。post

boolean checkAdd(Method method, Class<?> eventType) {
    // 2 level check: 1st level with event type only (fast), 2ndlevelwith complete signature when required.
    // Usually a subscriber doesn't have methods listening to thesameevent type.
    Object existing = anyMethodByEventType.put(eventType, method);
    if (existing == null) {
        return true;
    } else {
        if (existing instanceof Method) {
            if (!checkAddWithMethodSignature((Method)existing,eventType)) {
                // Paranoia check
                throw new IllegalStateException();
            }
            // Put any non-Method object to "consume" theexistingMethod
            anyMethodByEventType.put(eventType, this);
        }
        return checkAddWithMethodSignature(method, eventType);
    }
}
複製代碼

checkAdd中設置了兩種檢查方式,第一種是經過eventType也就是訂閱方法的入參來檢查,這種方式比較快,只須要看下以前有沒有這種入參的方法就能夠了。註釋中也指出了一般一個類不會有多個入參相同的訂閱方法。gradle

第二種是經過checkAddWithMethodSignature方法來檢查,源碼以下:ui

private boolean checkAddWithMethodSignature(Method method,Class<?>eventType) {
    methodKeyBuilder.setLength(0);
    methodKeyBuilder.append(method.getName());
    methodKeyBuilder.append('>').append(eventType.getName());
    String methodKey = methodKeyBuilder.toString();
    Class<?> methodClass = method.getDeclaringClass();
    Class<?> methodClassOld=subscriberClassByMethodKey.put(methodKey, methodClass);
    if (methodClassOld == null||methodClassOld.isAssignableFrom(methodClass)) {
        // Only add if not already found in a sub class
        return true;
    } else {
        // Revert the put, old class is further down theclasshierarchy
        subscriberClassByMethodKey.put(methodKey, methodClassOld);
        return false;
    }
}
複製代碼

經過method以及eventType來生成一個key,來存儲方法所在的類。其中methodClassOld == null ||methodClassOld.isAssignableFrom(methodClass)這個判斷條件對應着兩種狀況,methodClassOld == null說明是入參相同可是方法名不一樣的方法正在被添加,直接返回true就能夠了methodClassOld.isAssignableFrom(methodClass)這個條件是爲了過濾掉父類被子類重寫的方法,前面說過了查找訂閱方法是從子類開始遍歷的,此時若是子類重寫了父類的訂閱方法,那麼methodClassOld對應的是子類,methodClass對應的是父類,顯然這個判斷就會爲false,以後進入下面的else分支return false,也就是忽略掉父類被子類重寫的方法。

查找訂閱方法基本就這麼點,查找完畢以後須要執行訂閱操做,也就是register方法中的subscribe(subscriber, subscriberMethod);操做,直接看下該方法的實現:

private void subscribe(Object subscriber, SubscriberMethodsubscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    Subscription newSubscription = new Subscription(subscriber,subscriberMethod);
    CopyOnWriteArrayList<Subscription> subscriptions =subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " +subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority >subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }
    List<Class<?>> subscribedEvents =typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventTypehave to be considered.
            // Note: Iterating over all events may be inefficientwith lots of sticky events,
            // thus data structure should be changed to allow a moreefficient lookup
            // (e.g. an additional map storing sub classes of superclasses: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries =stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                checkPostStickyEventToSubscription(newSubscriptin, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription,stickyEvent);
        }
    }
}
複製代碼

subscriptionsByEventType這個Map是將eventType做爲key保存其所對應的訂閱方法的集合。該方法將剛查找到的方法添加到對應的集合中去,添加時有這樣一層判斷i == size || subscriberMethod.priority >subscriptions.get(i).subscriberMethod.priority這表示這個集合裏的方法會按照所設定的優先級進行排序。緊接着又出現了個MaptypesBySubscriber將訂閱者做爲key保存一個Class的集合,暫時看不出有啥用,就先無論,最後再檢查下是否是粘性事件,若是是粘性事件就根據所保存的粘性事件來執行該方法。eventInheritance也是在bulider中設置的,若是爲true則會考慮事件的繼承性,若是如今有eventType爲正在訂閱的方法的eventType的子類的粘性事件存在,那麼這個粘性事件也會被正在訂閱的方法接收到,直接說可能比較繞,舉個栗子,如今我有兩個事件,其中一個是另外一個的子類,而且有兩個粘性訂閱方法,以下:

class EventMessage {
  
    }

    class SubEventMessage extends EventMessage {

    }
    
    @Subscribe(sticky =  true)
    public void onEvent(EventMessage message) {
        // do something
    }

    @Subscribe(sticky =  true)
    public void onSubEvent(SubEventMessage message) {
        // do something
    }
複製代碼

當執行register時,若是內存中存在着一個類型爲SubEventMessage的事件,那麼訂閱的時候onEvent方法會被執行,入參是內存中類型爲SubEventMessage的事件。

如今register大體就分析完了,再來看下unregister方法:

public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }
複製代碼

unregister方法十分簡單,typesBySubscriber是剛纔在進行訂閱的時候不知道用來幹什麼的Map,如今知道是在取消訂閱時用到的,這個Map將訂閱者做爲key,將其全部的訂閱方法的eventType存入到對應的List中去,取消訂閱時將這個List取出來,遍歷去移除對應的訂閱方法,具體實如今unsubscribeByEventType中,也十分簡單,就不贅述了。

訂閱和取消訂閱都看過了,還差個發送事件,發送事件分爲postpostSticky兩種,先看post

public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }
複製代碼

currentPostingThreadState是個ThreadLocal,而後從中取出當前線程的postingState,也就是說每一個線程都會維護一個本身的posting狀態,以後會有個循環將事件隊列清空,經過postSingleEvent方法來進一步處理:

private void postSingleEvent(Object event, PostingThreadStatepostingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event,postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event,postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered forevent " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass !=NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}
複製代碼

一樣是經過eventInheritance來判斷是否要涉及eventType的父類,以後再經過postSingleEventForEventType方法的返回值來獲得該事件是否被處理,若是沒有被處理,那麼會返回false進入下一個分支,logNoSubscriberMessagessendNoSubscriberEvents都是在builder中傳入的,前者用於沒有訂閱者處理事件時打印日誌,後者用於沒有訂閱者處理事件時發送一個NoSubscriberEvent類型的事件,因此具體是怎麼處理事件的還要繼續看postSingleEventForEventType方法:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }
複製代碼

postSingleEventForEventType方法從subscriptionsByEventType中去獲取對應事件類型的全部訂閱者,若是沒有訂閱者就返回false表示事件沒有被處理,不然就遍歷全部的訂閱者,經過postToSubscription方法來處理事件,接着往裏看:

private void postToSubscription(Subscription subscription, Objectevent, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster notdecoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " +subscription.subscriberMethod.threadMode);
    }
}
複製代碼

在這個方法內終於看到經過區分註解中的threadMode來區分不一樣的處理方式了,先來看下這幾種threadMode分別表明什麼意思:

Mode 含義
POSTING 在當前線程執行
MAIN 在主線程執行
MAIN_ORDERED 在主線程有序執行
BACKGROUND 在後臺線程執行
ASYNC 在新的線程執行

能夠看到有幾個差很少,那具體有什麼區別呢?直接從代碼裏看,先說明幾個東西,invokeSubscriber就是直接調用訂閱方法,還有幾個後綴爲poster的變量暫時先理解爲調用了enqueue方法後,訂閱方法就會在某個時間被執行,後面再詳細講。

如今能夠看代碼了,POSTING沒什麼好說的,直接調用invokeSubscriber,也就是說在調用eventBus.post的線程執行。

MAINMAIN_ORDERED都是在主線程執行,後者的ORDERED體如今什麼地方呢,先看下MAIN的分支,其中經過mainThreadPoster.enqueue插入的事件會在主線程執行,判斷當前線程是不是主線程來決定直接調用訂閱方法仍是經過mainThreadPoster來發布,這裏應該沒什麼疑惑的,主要是MAIN_ORDERED

if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster notdecoupled from subscriber
                invokeSubscriber(subscription, event);
            }
複製代碼

mainThreadPoster不爲空時,經過mainThreadPoster來發布事件,爲空時直接調用訂閱方法,說好的在主線程調用呢?這裏註釋也說明了是不正確的,實際上mainThreadPoster爲空自己就是種異常狀況,具體能夠看下它的初始化過程,這裏就不細說了。因此下面的else分支就先無論了,那麼爲何說經過mainThreadPoster發佈的事件就是「有序」的呢,實際上mainThreadPoster內部實現是個handler,能夠將事件post到主線程中去執行,因此說是有序的,這裏簡單說明下緣由:

主線程維護着一個消息隊列,循環從裏面取出消息來處理,咱們知道能夠經過viewpost方法來獲取它繪製完成以後的寬高,緣由是post方法裏的事件會被插入到消息隊列的尾部,而viewmeasure,layout,draw都在新插入的消息的前面,因此當post的方法執行時,view確定已經繪製好了。

handler經過sendMessage發送的消息也會被插入到主線程消息隊列的尾部,這就是「有序」,好比如今有一個ImageView,在它的onMeasure中去發佈一個事件,若是訂閱方法的模式是MAIN那麼會在onMeasure中調用訂閱方法,而若是模式是MAIN_ORDERED那麼會在ImageView繪製完成後調用訂閱方法。

再來看下BACKGROUNDASYNC的區別:

case BACKGROUND:
    if (isMainThread) {
        backgroundPoster.enqueue(subscription, event);
    } else {
        invokeSubscriber(subscription, event);
    }
    break;
case ASYNC:
    asyncPoster.enqueue(subscription, event);
    break;
複製代碼

其中backgroundPosterasyncPoster都會開啓一個新線程來執行訂閱方法,暫時當成是同樣的就行,那麼區別就是BACKGROUND模式若是在子線程post一個事件,那麼會直接在該線程調用訂閱方法,只有在主線程post事件纔會開啓一個新線程。而ASYNC模式,不論是在哪post事件,都會開啓一個新線程來調用訂閱方法。

最後再看下幾個poster基本上就看完了,幾個poster都實現了同一個接口Poster

interface Poster {

    /** * Enqueue an event to be posted for a particular subscription. * * @param subscription Subscription which will receive the event. * @param event Event that will be posted to subscribers. */
    void enqueue(Subscription subscription, Object event);
}
複製代碼

能夠看到裏面只有一個須要實現的方法enqueue,是用來插入事件的,這個接口被三個類實現,分別是HandlerPosterBackgroundPosterAsyncPoster,上面的mainThreadPoster對應的就是HandlerPoster,這三個類中都有個類型爲PendingPostQueue的成員變量,這是個事件隊列,具體實現就不看了,這個隊列提供了入隊和出隊的方法。

先看下HandlerPosterenqueue方法:

public class HandlerPoster extends Handler implements Poster {
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }
}
複製代碼

HandlerPoster繼承了Handler,調用enqueue方法後會向事件隊列中插入一個事件,而後將標記位handlerActive設置爲true表示正在處理事件,而後調用sendMessage發送消息通知處理事件。PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);這行是用來獲取一個消息隊列的Node用來插入到隊列中去,EventBus維護着一個pool用來保存閒置的Node當有須要時從中取出一個給事件使用,pool不夠用時纔會new新的Node出來,具體能夠看下PendingPost,這樣作的好處是能夠避免頻繁建立對象帶來的開銷。

再看下HandlerPosterhandleMessage方法:

public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
複製代碼

首先會記錄下開始處理事件的時間,而後從事件隊列中取出事件,若是爲空就將handlerActive設置爲false直接return了,若是不爲空,就調用eventBus.invokeSubscriber(pendingPost);來調用訂閱方法,執行完後,再看下時間,若是超出了規定的時間那麼從新發送一條消息,本次消息處理結束,等下次輪到本身的時候再處理事件,畢竟不能一直處理隊列裏的事件而阻塞了主線程,若是沒有超出規定事件,那麼說明還能夠有事件能夠處理下一個事件,就會再次進入循環。

BackgroundPosterAsyncPoster其實和HandlerPoster差很少,只是沒有用Handler而是用了線程池去處理事件,具體就不看了。

對了,還有個發送粘性事件:

public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriberwants to remove immediately
    post(event);
}
複製代碼

就是在stickyEvents這個map裏存一下。

好了,完了。

相關文章
相關標籤/搜索