EventBus源碼解析

前言

最近跟一位前輩聊了一下學習方法,聊了不少,也收穫了不少。從交流的過程當中前輩送給我一句話:「學以至用,格物致知」。聽完以後意識到以前的學習方法有很大的問題,在之後的學習中須要更多的和實踐相結合,作到學以至用。各位小夥伴們有什麼更好的學習方法嗎?歡迎留言交流。
git

簡介

本篇文章將會講一下EventBus相關的知識,EventBus各位已經都很熟悉了,在平常的開發中也有使用,咱們看一下官方是如何介紹EventBus的。
EventBus是適用於AndroidJava的開源庫,使用發佈者/訂閱者模式進行鬆散耦合。EventBus使中央通訊只需幾行代碼便可解耦類,從而簡化了代碼,消除了依賴關係並加快了應用程序的開發。 github

EventBus的優勢:

一、簡化了組件之間的通訊
二、將事件的發送者和接受者分離
三、避免了由於複雜且容易出錯的依賴性和生命週期形成的問題
四、體積小
...
面試

EventBus的功能:編程

一、簡單但功能強大: EventBus是一個微型庫,具備易於學習的API。可是,經過解耦組件,您的軟件體系結構可能會受益不淺:在使用事件時,訂閱者不瞭解發送者。
二、通過實戰測試: EventBus是最經常使用的Android庫之一:成千上萬的應用程序使用EventBus,其中包括很是流行的應用程序。超過十億的應用安裝說明了一切。
三、高性能:特別是在Android上,性能相當重要。對EventBus進行了概要分析和優化;可能使其成爲同類產品中最快的解決方案。
四、方便的基於註釋的API (不犧牲性能):只需將@Subscribe註釋放入您的訂戶方法中。因爲註釋的創建時間已創建索引,所以EventBus無需在應用程序運行時進行註釋反射,這在Android上很是慢。
五、Android主線程傳送:與UI交互時,EventBus能夠在主線程中傳送事件,而無論事件的發佈方式如何。
六、後臺線程傳遞:若是您的訂戶執行長時間運行的任務,EventBus也可使用後臺線程來避免UI阻塞。
七、事件和訂閱者繼承:在EventBus中,面向對象的範例適用於事件和訂閱者類。假設事件類AB的超類。類型B的已發佈事件也將被髮布給對A感興趣的訂戶。相似地,考慮訂戶類的繼承。
八、零配置: 您可使用代碼中任何位置均可用的默認EventBus實例當即開始使用。
九、可配置: 要調整EventBus到您的要求,您可使用構建器模式來調整其行爲。
設計模式

讀了這麼多官方文檔,能夠從裏面總結成一句話:EventBus就是牛X,用就完事了,不用你就Out了。數組

使用

本篇文章使用的是官方最新版本3.1.1
咱們按照官方的使用步驟寫一個小Demo緩存

第一步:定義事件

public class SayHelloEvent {
    private String message;

    public void sayHellow(String message) {
        this.message = message;
    }
}
複製代碼

這裏事件被定義成一個普通類,在類的內部建立一個字段,同時建立一個方法。服務器

第二步:準備訂閱者

@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(SayHelloEvent event) {
    String message = event.getMessage();
    ...
}

@Override
public void onStart() {
    super.onStart();
    EventBus.getDefault().register(this);
}
 
@Override
public void onStop() {
    EventBus.getDefault().unregister(this);
    super.onStop();
}
複製代碼

這裏須要在建立須要訂閱者是調用EventBus.getDefault().register(this)方法進行註冊,在適當的位置調用EventBus.getDefault().unregister(this)方法進行解註冊。同時建立一個接收事件的方法onMessageEvent方法,使用註解@Subscribe進行標示,同時生命事件接收的線程爲主線程。網絡

第三步:發送事件

EventBus.getDefault().post(new SayHelloEvent("Hello,EventBus!!!"));
複製代碼

在須要發送事件的地方調用EventBus.getDefault().post方法,將第一步建立的事件對象傳入便可。
到這裏EventBus的使用方法就算結束了,看起來挺簡單的,可是這裏面有不少須要注意的地方,彆着急,這些都會在代碼探究中進行講解。併發

知識點準備

在進行源碼分析以前,咱們要知道EventBus中的三個知識點:角色分配、線程模型和事件類型。

角色分配

Event

事件,它能夠是任意類型,EventBus會根據事件的類型進行全局發送。

Subscriber

事件訂閱者,也能夠稱之爲事件的接收者。在EventBus3.0以前,使用時必須以OnEvent開頭的幾種接收方法來接收事件。這些方法分別爲:onEventonEventMainThreadonEventBackgroundThreadonEventAsync,可是在3.0以後處理方法能夠自定義,可是要以註解@subscribe進行標示,同時指定線程模型,線程模型默認爲POSTING的方式。

Publisher

事件的發佈者,它能夠在任意線程中發佈事件。在使用時一般調用EventBus.getDefault()方法獲取一個EventBus對象,再經過鏈式編程調用post()方法進行事件發送。

線程模型

事件模型是指事件訂閱者所在線程和發佈事件者所在線程的關係。

POSTING

事件的訂閱和事件的發佈處於同一線程。這是默認設置。事件傳遞意味着最少的開銷,由於它徹底避免了線程切換。所以,對於已知在很是短的時間內完成而不須要主線程的簡單任務,這是推薦的模式。使用此模式的事件處理程序必須快速返回,以免阻塞多是主線程的發佈線程。

MAIN

Android上,用戶將在Android的主線程(UI線程)中被調用。若是發佈線程是主線程,則將直接調用訂閱方方法,從而阻塞發佈線程。不然,事件將排隊等待傳遞(非阻塞)。使用此模式的訂閱服務器必須快速返回,以免阻塞主線程。若是不在Android上,則行爲與POSTING相同。

MAIN_ORDERED

Android上,用戶將在Android的主線程(UI線程)中被調用。與{@link#MAIN}不一樣,事件將始終排隊等待傳遞。這確保了post調用是非阻塞的。

BACKGROUND

Android上,用戶將在後臺線程中被調用。若是發佈線程不是主線程,則將在發佈線程中直接調用訂閱方方法。若是發佈線程是主線程,則EventBus使用單個後臺線程,該線程將按順序傳遞其全部事件。使用此模式的訂閱服務器應嘗試快速返回,以免阻塞後臺線程。若是不在Android上,則始終使用後臺線程。

ASYNC

訂閱服務器將在單獨的線程中調用這始終獨立於發佈線程和主線程。發佈事件從不等待使用此模式的訂閱服務器方法。若是訂戶方法的執行可能須要一些時間(例如,用於網絡訪問),則應使用此模式。避免同時觸發大量長時間運行的異步訂閱服務器方法來限制併發線程的數量。EventBus使用線程池有效地重用來自已完成的異步訂閱服務器通知的線程。

事件類型

普通事件

普通事件是指已有的事件訂閱者可以收到事件發送者發送的事件,在事件發送以後註冊的事件接收者將沒法收到事件。發送普通事件能夠調用EventBus.getDefault().post()方法進行發送。

粘性事件

粘性事件是指,無論是在事件發送以前註冊的事件接收者仍是在事件發送以後註冊的事件接收者都可以收到事件。這裏於普通事件的區別之處在於事件接收處須要定義事件接收類型,它能夠經過@Subscribe(threadMode = xxx, sticky = true)的方式進行聲明;在事件發送時須要調用EventBus.getDefault().postSticky()方法進行發送。事件類型默認爲普通事件。

事件優先級

訂閱者優先級以影響事件傳遞順序。在同一傳遞線程ThreadMode中,優先級較高的訂閱者將在優先級較低的其餘訂閱者以前接收事件。默認優先級爲0。注意:優先級不影響具備不一樣ThreadMode的訂閱服務器之間的傳遞順序!

源碼分析

EventBus.getDefault()建立EventBus對象

在EventBus使用的過程當中,無論是註冊、解註冊或者是發送事件都會用到這個方法,因此咱們首先看一下這個方法中都是作了什麼。

public class EventBus {
    static volatile EventBus defaultInstance;
    private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
    private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
    private final Map<Object, List<Class<?>>> typesBySubscriber;
        private final Map<Class<?>, Object> stickyEvents;
    ...
    // 一、單例設計模式返回EventBus對象
    public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    // 二、調用EventBus構造方法
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }
    ...
    public EventBus() {
        // 三、調用有參構造方法,傳入一個EventBusBuilder對象
        this(DEFAULT_BUILDER);
    }
    
    EventBus(EventBusBuilder builder) {
        //日誌
        logger = builder.getLogger();
        //這個集合能夠根據事件類型獲取訂閱者
        //key:事件類型,value:訂閱該事件的訂閱者集合
        subscriptionsByEventType = new HashMap<>();
        //訂閱者所訂閱的事件集合
        //key:訂閱者,value:該訂閱者訂閱的事件集合
        typesBySubscriber = new HashMap<>();
        //粘性事件集合
        //key:事件Class對象,value:事件對象
        stickyEvents = new ConcurrentHashMap<>();
        //Android主線程處理事件
        mainThreadSupport = builder.getMainThreadSupport();
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        //Background事件發送者
        backgroundPoster = new BackgroundPoster(this);
        //異步事件發送者
        asyncPoster = new AsyncPoster(this);
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        //訂閱者訂閱事件查找對象
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
    }
}
複製代碼

這個方法內部首先經過單例模式建立一個EventBus對象,在建立EventBus時最終會調用它的有參構造函數,傳入一個EventBus.Builder對象。在這個有參構造函數內部對屬性進行初始化,其中有幾個比較重要的屬性:subscriptionsByEventTypetypesBySubscriberstickyEventssubscriberMethodFinder,這幾個屬性的做用已經在註釋中給出。

register(Object subscriber)

建立EventBus對象以後須要將事件訂閱者在EventBus中進行註冊。

public class EventBus {
    /** * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they * are no longer interested in receiving events. * <p/> * Subscribers have event handling methods that must be annotated by {@link Subscribe}. * The {@link Subscribe} annotation also allows configuration like {@link * ThreadMode} and priority. */
    public void register(Object subscriber) {
        // 一、經過反射獲取到訂閱者的Class對象
        Class<?> subscriberClass = subscriber.getClass();
        // 二、經過subscriberMethodFinder對象獲取訂閱者所訂閱事件的集合
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            // 三、遍歷集合進行註冊
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }
    
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        // 四、獲取事件類型
        Class<?> eventType = subscriberMethod.eventType;
        // 五、封裝Subscription對象
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        // 六、經過事件類型獲取該事件的訂閱者集合
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        // 七、若是沒有訂閱者訂閱該事件
        if (subscriptions == null) {
            // 建立集合,存入subscriptionsByEventType集合中
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else { // 八、若是有訂閱者已經訂閱了該事件
            // 判斷這些訂閱者中是否有重複訂閱的現象
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }
        int size = subscriptions.size();
        // 九、遍歷該事件的全部訂閱者
        for (int i = 0; i <= size; i++) {
            // 按照優先級高低進行插入,若是優先級最低,插入到集合尾部
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
        
        // 十、獲取該事件訂閱者訂閱的全部事件集合
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        // 十一、將該事件加入到集合中
        subscribedEvents.add(eventType);
        
        // 十二、判斷該事件是不是粘性事件
        if (subscriberMethod.sticky) {
            if (eventInheritance) { // 1三、判斷事件的繼承性,默認是不可繼承
                // 1四、獲取全部粘性事件並遍歷,判斷繼承關係
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        // 1五、調用checkPostStickyEventToSubscription方法
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }
    
    private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
        if (stickyEvent != null) {
            // 1六、若是粘性事件不爲空
            postToSubscription(newSubscription, stickyEvent, isMainThread());
        }
    }


    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        // 1七、根據threadMode的類型去選擇是直接反射調用方法,仍是將事件插入隊列
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    // 1八、經過反射的方式調用
                    invokeSubscriber(subscription, event);
                } else {
                    // 1九、將粘性事件插入到隊列中
                    // 最後仍是會調用EventBus.invokeSubscriber(PendingPost pendingPost)方法。
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }
    
    void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {
            invokeSubscriber(subscription, event);
        }
    }
    
    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }
}


public class SubscriberMethod {
    final Method method; // 處理事件的Method對象
    final ThreadMode threadMode; //線程模型
    final Class<?> eventType; //事件類型
    final int priority; //事件優先級
    final boolean sticky; //是不是粘性事件
    String methodString;
}

final class Subscription {
    final Object subscriber;
    final SubscriberMethod subscriberMethod;
}
複製代碼

還記得上面代碼註釋2處經過調用subscriberMethodFinder.findSubscriberMethods(subscriberClass)方法,獲取到了訂閱者全部的訂閱事件集合。

class SubscriberMethodFinder {
    private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;
    private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
    private static final int POOL_SIZE = 4;
    private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
    
    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        // 一、先從以前緩存的集合中獲取
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            // 二、若是以前緩存了,直接返回
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) { //ignoreGeneratedIndex通常爲false
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            // 三、獲取全部訂閱方法集合
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            // 四、放入緩存集合中
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }
    
    private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        // 五、從數組中獲取FindState對象
        // 若是有直接返回,若是沒有建立一個新的FindState對象
        FindState findState = prepareFindState();
        // 六、根據事件訂閱者初始化findState
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            // 七、獲取subscriberInfo,初始化爲null
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                // 八、經過反射的方式獲取訂閱者中的Method
                findUsingReflectionInSingleClass(findState);
            }
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }
    
    private FindState prepareFindState() {
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                FindState state = FIND_STATE_POOL[i];
                if (state != null) {
                    FIND_STATE_POOL[i] = null;
                    return state;
                }
            }
        }
        return new FindState();
    }
    
    private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // 九、訂閱者中全部聲明的方法,放入數組中
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // 十、獲取訂閱者中聲明的public方法,設置跳過父類
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        // 遍歷這些方法
        for (Method method : methods) {
            // 十一、獲取方法的修飾符:public、private等等
            int modifiers = method.getModifiers();
            // 十二、訂閱方法爲public同時不是abstract、static
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                // 1三、方法參數類型數組
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length == 1) {
                    // 1四、獲取方法的註解
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    // 1五、若是有註解
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        // 1六、將method和eventType放入到findState進行檢查
                        if (findState.checkAdd(method, eventType)) {
                            // 1七、獲取註解中的threadMode對象
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            // 1八、新建一個SubscriberMethod對象,同時加入到findState中
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }
    
    // 從findState中獲取訂閱者全部方法並釋放
    private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        // 獲取訂閱者全部訂閱方法集合
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
        // findState進行回收
        findState.recycle();
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        // 返回集合
        return subscriberMethods;
    }
}
複製代碼

到這裏,註冊的流程差很少就走完了,能夠稍做休息,放鬆一下。

小結

在訂閱者註冊的過程當中主要進行了如下幾個步驟,詳細的步驟都在代碼的註釋中,你們能夠自行查看。

一、根據單例設計模式建立一個EventBus對象,同時建立一個EventBus.Builder對象對EventBus進行初始化,其中有三個比較重要的集合和一個SubscriberMethodFinder對象。
二、調用register方法,首先經過反射獲取到訂閱者的Class對象。
三、經過SubscriberMethodFinder對象獲取訂閱者中全部訂閱的事件集合,它先從緩存中獲取,若是緩存中有,直接返回;若是緩存中沒有,經過反射的方式去遍歷訂閱者內部被註解的方法,將這些方法放入到集合中進行返回。
四、遍歷第三步獲取的集合,將訂閱者和事件進行綁定。
五、在綁定以後會判斷綁定的事件是不是粘性事件,若是是粘性事件,直接調用postToSubscription方法,將以前發送的粘性事件發送給訂閱者。其實這也很好理解,在講粘性事件時說過,若是在粘性事件發送以前註冊的訂閱者,當發送粘性事件時,會接收到該事件;若是是粘性事件發送以後註冊的訂閱者,一樣也能接收到事件,緣由就在這裏。

EventBus.getDefault().post(Object event)

public class EventBus {
    ...
    public void post(Object event) {
        // 一、獲取當前線程的PostingThreadState,這是一個ThreadLocal對象
        PostingThreadState postingState = currentPostingThreadState.get();
        // 二、當前線程的事件集合
        List<Object> eventQueue = postingState.eventQueue;
        // 三、將要發送的事件加入到集合中
        eventQueue.add(event);

        // 查看是否正在發送事件
        if (!postingState.isPosting) {
            // 判斷是不是主線程
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                // 四、只要事件集合中還有事件,就一直髮送
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }
    
    // currentPostingThreadState是包含了PostingThreadState的ThreadLocal對象
    // ThreadLocal是一個線程內部的數據存儲類,經過它能夠在指定的線程中存儲數據, 而且線程之間的數據是相互獨立的。
    // 其內部經過建立一個它包裹的泛型對象的數組,不一樣的線程對應不一樣的數組索引,每一個線程經過get方法獲取對應的線程數據。
    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };
    
    // 每一個線程中存儲的數據
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>(); // 線程的事件隊列
        boolean isPosting; //是否正在發送中
        boolean isMainThread; //是否在主線程中發送
        Subscription subscription; //事件訂閱者和訂閱事件的封裝
        Object event; //事件對象
        boolean canceled; //是否被取消發送
    }
    
    ...
    
    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        // 五、獲取事件的Class對象
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) { // eventInheritance通常爲true
            // 六、 找到當前的event的全部 父類和實現的接口 的class集合
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                // 七、遍歷集合發送單個事件
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }
    
    private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
        synchronized (eventTypesCache) {
            // 獲取事件集合
            List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
            if (eventTypes == null) { //若是爲空
                eventTypes = new ArrayList<>();
                Class<?> clazz = eventClass;
                while (clazz != null) {
                    eventTypes.add(clazz); //添加事件
                    addInterfaces(eventTypes, clazz.getInterfaces()); //添加當前事件的接口class
                    clazz = clazz.getSuperclass();// 獲取當前事件的父類
                }
                eventTypesCache.put(eventClass, eventTypes);
            }
            return eventTypes;
        }
    }
    
    //循環添加當前事件的接口class
    static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
        for (Class<?> interfaceClass : interfaces) {
            if (!eventTypes.contains(interfaceClass)) {
                eventTypes.add(interfaceClass);
                addInterfaces(eventTypes, interfaceClass.getInterfaces());
            }
        }
    }

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            // 八、根據事件獲取全部訂閱它的訂閱者
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            // 九、遍歷集合
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    // 十、將事件發送給訂閱者
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    // 十一、重置postingState
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }
    
    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        // 十二、根據訂閱方法的線程模式調用訂閱方法
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING: //默認類型,表示發送事件操做直接調用訂閱者的響應方法,不須要進行線程間的切換
                invokeSubscriber(subscription, event);
                break;
            case MAIN: //主線程,表示訂閱者的響應方法在主線程進行接收事件
                if (isMainThread) { //若是發送者在主線程
                    invokeSubscriber(subscription, event);//直接調用訂閱者的響應方法
                } else { //若是事件的發送者不是主線程
                    //添加到mainThreadPoster的隊列中去,在主線程中調用響應方法
                    mainThreadPoster.enqueue(subscription, event); 
                }
                break;
            case MAIN_ORDERED:// 主線程優先模式
                if (mainThreadPoster != null) {
                    
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    //若是不是主線程就在消息發送者的線程中進行調用響應方法
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    // 若是事件發送者在主線程,加入到backgroundPoster的隊列中,在線程池中調用響應方法
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    // 若是不是主線程,在事件發送者所在的線程調用響應方法
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                //這裏沒有進行線程的判斷,也就是說無論是否是在主線程中,都會在子線程中調用響應方法
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }
    ...
}
複製代碼

小結

有關發送事件的分析先到這裏,咱們先來小結一下事件發送都是作了什麼。

一、獲取當前線程的事件集合,將要發送的事件加入到集合中。
二、經過循環,只要事件集合中還有事件,就一直髮送。
三、獲取事件的Class對象,找到當前的event的全部父類和實現的接口的class集合。遍歷這個集合,調用發送單個事件的方法進行發送。
四、根據事件獲取全部訂閱它的訂閱者集合,遍歷集合,將事件發送給訂閱者。
五、發送給訂閱者時,根據訂閱方法的線程模式調用訂閱方法,若是須要線程切換,則切換線程進行調用;不然,直接調用。

有關線程切換的問題會放到後面的下文中作一個單獨的主題進行探究。

EventBus.getDefault().postSticky(Object event)

接下來咱們來看一下發送粘性事件。

public class EventBus {
    public void postSticky(Object event) {
        synchronized (stickyEvents) {
            // 一、將事件添加到粘性事件集合中
            stickyEvents.put(event.getClass(), event);
        }
        // 二、發送事件
        post(event);
    }
}
複製代碼

小結

發送粘性事件作了兩件事:

一、將粘性事件加入到EventBus對象的粘性事件集合中,當有新的訂閱者進入後,若是該訂閱者訂閱了該粘性事件,能夠直接發送給訂閱者。
二、將粘性事件發送給已有的事件訂閱者。

EventBus.getDefault().unregister(Object subscriber)

前面已經把訂閱者註冊和消息的發送進行了探究,接下來咱們再來看一下解註冊的方法。

public class EventBus {
    ...
    public synchronized void unregister(Object subscriber) {
        // 一、獲取訂閱者訂閱的全部事件
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            // 二、遍歷集合
            for (Class<?> eventType : subscribedTypes) {
                // 三、將該訂閱者的從訂閱該事件的全部訂閱者集合中移除
                unsubscribeByEventType(subscriber, eventType);
            }
            // 四、將訂閱者從集合中移除
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }
    
    private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        // 獲取該事件的全部訂閱者
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            // 遍歷集合
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                // 將訂閱者從集合中移除
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }
    ...
}
複製代碼

小結

解註冊一共就作了兩件事:

一、獲取訂閱者的全部訂閱方法,遍歷這些方法。而後拿到每一個方法對應的全部訂閱者集合,將訂閱者從集合中移除。
二、移除訂閱者中全部的訂閱方法。

EventBus是如何進行線程間切換的?

因爲事件的發送牽扯到前程的切換,在平時的面試中被提到的概率會比較大,因此做爲一個單獨的主題來進行探究。

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }
複製代碼

這個方法中根據ThreadMode類型來調用訂閱者的訂閱方法,有關ThreadMode的解釋已經在前面給出,若是不太清楚的能夠看一下。
這個方法中涉及到的前程切換的地方有兩個:mainThreadPosterbackgroundPoster,有關ASYNC模式下是如何工做的,這裏也會進行探究。

mainThreadPoster.enqueue

mainThreadPosterEventBus進行初始化時建立的,咱們來看一下。

...
    private final MainThreadSupport mainThreadSupport;
    private final Poster mainThreadPoster;
    ...
    EventBus(EventBusBuilder builder) {
        ...
        // 一、建立mainThreadSupport
        mainThreadSupport = builder.getMainThreadSupport();
        // 二、建立mainThreadPoster
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        ...
    }
複製代碼

這裏建立了兩個對象:mainThreadSupport和mainThreadPoster,咱們先來看一下mainThreadSupport是什麼。

MainThreadSupport getMainThreadSupport() {
        if (mainThreadSupport != null) {
            return mainThreadSupport;
        } else if (Logger.AndroidLogger.isAndroidLogAvailable()) {
            // 獲取主線程Looper對象,這裏又可能爲空
            Object looperOrNull = getAndroidMainLooperOrNull();
            // 根據主線程Looper對象返回AndroidHandlerMainThreadSupport對象
            return looperOrNull == null ? null :
                    new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
        } else {
            return null;
        }
    }
    
    Object getAndroidMainLooperOrNull() {
        try {
            // 返回主線程Looper
            return Looper.getMainLooper();
        } catch (RuntimeException e) {
            return null;
        }
    }
    
public interface MainThreadSupport {
    boolean isMainThread();
    Poster createPoster(EventBus eventBus);
    
    class AndroidHandlerMainThreadSupport implements MainThreadSupport {
        private final Looper looper;
        public AndroidHandlerMainThreadSupport(Looper looper) {
            // 根據外部傳入的looper對象進行本地初始化
            this.looper = looper;
        }

        @Override
        public boolean isMainThread() {
            // 判斷是否爲主線程
            return looper == Looper.myLooper();
        }

        @Override
        public Poster createPoster(EventBus eventBus) {
            // 當調用createPoster方法時建立一個HandlerPoster對象
            return new HandlerPoster(eventBus, looper, 10);
        }
    }
}
複製代碼

咱們看到建立mainThreadSupport對象其實是根據主線程的Looper對象來建立的,MainThreadSupport其實是一個接口,因此這裏返回的是它的實現類AndroidHandlerMainThreadSupport對象。
接下來讓咱們看一下mainThreadPoster對象,在建立mainThreadPoster對象時,調用了mainThreadSupport.createPoster方法,因爲MainThreadSupport是一個接口,因此其實是調用了它的實現類對象的createPoster方法,在方法的內部建立了一個HandlerPoster對象並返回,咱們看一下HandlerPoster

//這個類繼承自Handler而且實現了Poster接口
public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        //這裏傳入的是主線程的Looper對象,因此這個Handler對象是主線程的Handler
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        //建立一個事件隊列
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        //根據傳入的參數封裝一個PendingPost對象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //將PendingPost加入到隊列中
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                // 調用sendMessage,發送事件回到主線程
                // 最終會調用下面的handleMessage方法
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            //無限循環
            while (true) {
                PendingPost pendingPost = queue.poll();
                //進行兩次檢查,確保pendingPost不爲null,若是爲null直接跳出循環
                if (pendingPost == null) {
                    synchronized (this) {
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //使用反射的方法調用訂閱者的訂閱方法。
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}
複製代碼

mainThreadPoster.enqueue咱們已經探究完畢,其實仍是運行了Handler機制。

backgroundPoster.enqueue

backgroundPoster仍是在EventBus初始化時一併初始化

public class EventBus {
    ...
    private final BackgroundPoster backgroundPoster;
    //線程池對象
    private final ExecutorService executorService;
    EventBus(EventBusBuilder builder) {
        ...
        backgroundPoster = new BackgroundPoster(this);
        ...
    }
    ...
}

public class EventBusBuilder {
    // 建立線程池
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
    
    ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
}

public class Executors {
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
}
複製代碼

這裏同時初始化了一個線程池對象,這個線程池會在後面用到。 看一下BackgroundPoster的內部結構。

final class BackgroundPoster implements Runnable, Poster {
    private final PendingPostQueue queue;
    private final EventBus eventBus;
    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        //初始化隊列
        queue = new PendingPostQueue();
    }
    
    public void enqueue(Subscription subscription, Object event) {
        //封裝PendingPost對象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //將PendingPost對象加入到隊列中
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                //這裏使用到以前初始化的線程池
                eventBus.getExecutorService().execute(this);
            }
        }
    }
    
    //線程池的執行回調
    @Override
    public void run() {
        try {
            try {
                //無限循環
                while (true) {
                    //獲取隊列中的PendingPost,進行雙重檢查
                    //若是爲null直接返回,結束循環
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    //使用反射的方式調用訂閱者的訂閱方法
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }
}
複製代碼

從上面的代碼中咱們能夠看到backgroundPoster.enqueue其實是使用了線程池的方式。

asyncPoster.enqueue

首先咱們仍是看一下asyncPoster的內部結構。

class AsyncPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }
}
複製代碼

從它的內部結構來看有一種似曾相識的感受,沒錯它的內部結構至關於BackgroundPoster的簡化版,可是原理仍是同樣的,也是使用了線程池。

總結

到這裏EventBus的探究算是告一段落,關鍵步驟都在文中代碼的註釋中給出,若是有哪裏寫的不對的地方,懇請你們批評指正,感謝。最後給你們留一個問題:EventBus有哪些不足之處?

參考資料

EventBus官網

相關文章
相關標籤/搜索