Android源碼系列-解密EventBus

EventBus是什麼?

簡介

EventBus是Android和Java的開源庫,它使用發佈者/訂閱者模式進行鬆散耦合。EventBus使用了總線控制,可以用幾行代碼解耦類及簡化代碼,消除依賴關係,加速應用程序開發。java

下圖爲官方示例圖:git

官方示例圖

官網網址:EventBus官網github

Github地址:Github數據庫

特色

  • 簡化組件之間的通訊
  • 發送者和接收者高度解耦
  • 性能高效,社區活躍
  • 庫文件很小(<50KB)
  • 具備簡便配置線程、優先級等高級特性

EventBus怎麼用?

一、gradle引入設計模式

implementation 'org.greenrobot:eventbus:3.1.1'
複製代碼

二、註冊與取消註冊數組

//註冊
EventBus.getDefault().register(this);

//取消註冊
EventBus.getDefault().unregister(this);

複製代碼

三、事件定義與發送緩存

//定義
public class MessageEvent {
}
//發送
EventBus.getDefault().post(new MessageEvent());
複製代碼

四、事件接收bash

@Subscribe(threadMode = ThreadMode.POSTING)
    public void onEventMainThread(MessageEvent event) {
        // doSomeThing();
    };
複製代碼

EventBus核心執行流程是怎樣?

EventBus的使用包含註冊、取消註冊、事件定義發送及事件接收。當用戶進行註冊時,會經過反射獲取實例中定義事件方法集合,而後將事件方法集合及訂閱者加入到Map中,當執行post時,會根據事件類型,從集合中獲取對應的訂閱集合,經過配置的threadMode,使用對應的Poster調用訂閱者的事件,最後經過反射method的invoke執行事件。微信

關鍵類功能說明

說明
EventBus 總線調度類,getDefault()爲單例模式。內部持有訂閱者及事件集合。還有各事件的發送器。eventTypesCache(緩存全部粘性事件的集合)、subscriptionsByEventType(key爲事件,value爲訂閱者集合的Map)、typesBySubscriber(key爲訂閱者,value爲事件集合的Map)、currentPostingThreadState(ThreadLocal,當前線程的事件集合)、mainThreadPoster(主線程的post)、backgroundPoster(後臺線程的post)、asyncPoster(異步線程的post)、subscriberMethodFinder(獲取訂閱者中的事件)
SubscriberMethod 訂閱方法的類,包含Method、ThreadMode、priority等配置屬性
Subscription 訂閱者類,包含subscriber的Object實例、subscriberMethod
PostingThreadState 存儲當前線程的事件集合
SubscriberMethodFinder 用於獲取訂閱中的定義的事件接收方法
PendingPost subscription與event的包裝類,內部維護一個pendingPostPool,當池中有PendingPost實例,會進行復用
PendingPostQueue 內部維護了一個head、tail的PendingPost實例,提供enqueue及poll操做
HandlerPoster 用於處理主線程的事件執行
BackgroundPoster 用於處理後臺線程的事件執行
AsyncPoster 用於執行異步線程的事件執行

代碼執行流程

註冊-register

註冊主要是經過反射獲取訂閱者中定義的事件方法集合,將訂閱者和事件集合加入對應的Map,而後會判斷是否支持粘性事件,將以前發送的粘性事件緩存發送給訂閱者。框架

一、 源碼實現

public void register(Object subscriber) {
        //獲取註冊的Object的Class
        Class<?> subscriberClass = subscriber.getClass();
        //經過subscriberMethodFinder獲取該Object中全部的訂閱方法(SubscriberMethod集合)
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            //遍歷事件集合
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }
    
  private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        //生成Subscription對象(SubscriberMethod的包裝類)
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        //獲取subscriptionsByEventType集合(key爲事情,value爲訂閱集合)
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
        //獲取typesBySubscriber集合(key爲訂閱者,value爲事件集合)
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        //將全部的事件,根據訂閱者key加入到事件集合中
        subscribedEvents.add(eventType);

        //事件方法是否支持(粘性事情)
        if (subscriberMethod.sticky) {
            //事件方法是否支持(繼承事件)
            if (eventInheritance) {
            
                //遍歷stickyEvents,找出全部繼承於事件的父事件
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        //給訂閱者發送以前緩存的粘性事件
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                        //給訂閱者發送以前緩存的粘性事件
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }
複製代碼

二、 流程圖

image

發送-post

一、 源碼實現

public void post(Object event) {
        //獲取當前線程的PostingThreadState
        PostingThreadState postingState = currentPostingThreadState.get();
        //獲取事件隊列postingState.eventQueue
        List<Object> eventQueue = postingState.eventQueue;
        //添加事件
        eventQueue.add(event);
        //是否正在發送
        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            //循環獲取eventQueue中的事件
            try {
                while (!eventQueue.isEmpty()) {
                    //獲取集合數據並移除,而後發送事件
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }
    
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        //事件是否支持繼承
        if (eventInheritance) {
            //找出全部繼承事件
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            //遍歷集合
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                //發送
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
           //發送
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
           //若是沒有訂閱者會發送一個NoSubscriberEvent事件
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }
    
 private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            //根據事件獲取全部訂閱者subscriptions
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            //遍歷全部訂閱者
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    //發送事件
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }
    
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        //根據對應的threadMode,使用對應的post進行事件的處理
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

 void invokeSubscriber(Subscription subscription, Object event) {
        try {
            //最後經過反射執行事件方法
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }
複製代碼

二、 流程圖

image

取消註冊-unregister

一、 源碼實現

public synchronized void unregister(Object subscriber) {
    //根據subscriber從typesBySubscriber獲取事件集合
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            //遍歷訂閱者的事件
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            //typesBySubscriber移除subscriber
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }
    
        private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        //根據eventType從subscriptionsByEventType獲取訂閱集合
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    //遍歷集合移除當前的subscriber
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }
複製代碼

二、 流程圖

image

EventBus如何識別類中定義的接收方法?

EventBus之因此如此流行,一個很重要的地方就是使用很是簡便。咱們只須要在類中定義好方法接收對應的事件、配置好相關的標註就能夠。那麼怎麼獲取到訂閱者的事件方法集合,就是EventBus設計的一個精髓的地方。經過上面的註冊方法,咱們知道主要是經過下面的方法來獲取,下面咱們主要分析一下具體的實現。

List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
複製代碼

接着看findSubscriberMethods的事件

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //定義了緩存Map,避免每次都執行反射獲取,提升性能
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
        if (ignoreGeneratedIndex) {
            //經過反射獲取
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            //經過索引獲取
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }
複製代碼

最後會執行indUsingReflection去獲取,具體實現以下:

private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
         //FindState 用來作訂閱方法的校驗和保存
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            //經過反射來得到訂閱方法信息
            findUsingReflectionInSingleClass(findState);
            //查找父類的訂閱方法
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }
複製代碼

關鍵的實如今findUsingReflectionInSingleClass方法中,實現以下:

private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // //經過反射獲得方法數組
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        //遍歷Method
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                ////保證必須只有一個事件參數
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    //獲得註解
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        //校驗是否添加該方法
                        if (findState.checkAdd(method, eventType)) {
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            //實例化SubscriberMethod對象並添加
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

複製代碼

總結一下,EventBus是經過反射getDeclaredMethods()獲取類的方法集合,而後遍歷方法集合,將符合事件定義的方法(public、只有一個事件參數、有Subcribe的註解等)加入到集合中。從而達到識別訂閱者中定義的事件方法。

EventBus中的線程調度機制是怎麼樣的?

咱們知道,EventBus能夠經過定義threadMode來指定事件回調的執行線程。主要的配置以下:

  • ThreadMode: POSTING:默認的,在同一個線程中執行。

  • ThreadMode: MAIN :主線程執行

  • ThreadMode: MAIN_ORDERED: 主線程執行,不過須要排隊,若是前一個也是main_ordered 須要等前一個執行完成後才執行,在主線程中執行,能夠處理更新ui的操做。

  • ThreadMode: BACKGROUND :後臺進程,處理如保存到數據庫等操做。

  • ThreadMode: ASYNC :異步執行,另起線程操做。

經過上面的流程分析在post的過程當中,最後都是經過postToSubscription執行,在這裏面判斷了threadMode的類型,具體的實現以下:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        //根據對應的threadMode,使用對應的post進行事件的處理
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }
複製代碼

一、POSTING模式,直接執行invokeSubscriber。

二、MAIN模式,若是判斷當前線程是主線程則執行invokeSubscriber,不然會使用mainThreadPoster執行enqueue方法。mainThreadPoster爲HandlerPoster的實例,繼承了Handler,是使用了MainLooper進行建立,也就是其的handleMessage在主線程中執行。 咱們看具體的實現:

public void enqueue(Subscription subscription, Object event) {
        //構建PendingPost對象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
          //加入隊列
            queue.enqueue(pendingPost);
            //若是沒有激活hander
            if (!handlerActive) {
                handlerActive = true;
                //發送消息
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }
    
      @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
               //循環獲取隊列中的全部pendingPost
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                          //若是已沒有數據,更新 handlerActive
                            handlerActive = false;
                            return;
                        }
                    }
                }
               //在主線程中實現事件的方法
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
複製代碼

HandlerPoster會經過主線程的Handler去執行隊列中的全部事件方法。

三、MAIN_ORDERED模式 優先主線程隊列執行

if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
複製代碼

四、BACKGROUND模式 若是再主線程則執行後臺線程執行,不然使用當前線程

if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                
複製代碼

這裏咱們主要看backgroundPoster的實現,BackgroundPoster繼承了Runnable,實現線程池執行run方法,經過executorRunning控制不會每次都啓動一個新任務。

public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //加入隊列
            queue.enqueue(pendingPost);
            //變量標識,不要每次都執行
            if (!executorRunning) {
                executorRunning = true;
                //線程池執行
                eventBus.getExecutorService().execute(this);
            }
        }
    }

  @Override
    public void run() {
        try {
            try {
                while (true) {
                    //循環間隔1s獲取事件
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                //若是已沒有任務,更新變量
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    //在異步線程執行了事件方法
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }
複製代碼

backgroundPoster會開啓一個線程去執行當前全部隊列中的事件方法。

五、ASYNC 模式 主要使用了AsyncPoster,也繼承了run接口。實現以下:

public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }
複製代碼

AsyncPoster每個事件都會開啓一個異步任務,經過線程池去執行。

總結一下 EventBus經過配置threadMode,控制事件在不一樣的線程中去執行。總歸有5種模式,分別爲POSTING、MAIN、MAIN_ORDERED、BACKGROUND、ASYNC。主要是經過HandlerPoster、backgroundPoster、AsyncPoster來實現線程的切換。

  • HandlerPoster會經過主線程的Handler開啓循環去執行隊列中的全部事件方法。
  • backgroundPoster會開啓一個線程循環執行當前全部隊列中的事件方法。
  • AsyncPoster每個事件都會開啓一個異步任務,經過線程池去執行。

EventBus的事件發送和接收的原理是什麼?

EventBus主要是經過觀察者模式來實現事件的發送和接收。使用register後,會將訂閱者及定義的事件接收方法加入到Map中,當在任意地方執行post時,會對事件類型進行匹配,找出全部的訂閱者,根據配置的threadMode,使用不一樣的poster經過反射去執行事件方法。當使用unregister後,會將訂閱者在Map中移除,進行取消註冊。

EventBus中代碼運用了那些設計模式,有什麼巧妙的設計?

一、單例模式

EventBus.getDefault()使用了懶漢的單例模式。

二、外觀模式

EventBus對外提供了統一的調度,屏蔽了內部的實現。

三、建造者模式

Event對象的建立使用EventBusBuilder進行建立,將複雜對象的建立和表示分離,調用者不須要知道複雜的建立過程,使用Build的相關方法進行配置建立對象。

四、策略模式

根據threadMode的設置,使用不一樣Poster的實現策略來執行事件方法

總結

一、框架的設計不在複雜而在精巧

二、使用反射和標註能夠簡化不少實現

三、EventBus的使用要注意避免大量的濫用,將致使邏輯的分散,出現問題後很難定位

推薦

Android源碼系列-解密OkHttp

Android源碼系列-解密Retrofit

Android源碼系列-解密Glide

Android源碼系列-解密EventBus

Android源碼系列-解密RxJava

Android源碼系列-解密LeakCanary

Android源碼系列-解密BlockCanary

關於

歡迎關注個人我的公衆號

微信搜索:一碼一浮生,或者搜索公衆號ID:life2code

image

相關文章
相關標籤/搜索