EventBus 1.0.1版本源碼分析及一步一步自實現過程

什麼是EventBus

EventBus是經過使用發佈者/訂閱者模式來實現解耦的Android和Java開源庫。在Android開發中一般使用EventBus實現Activities, Fragments, Threads, Services等組件之間的通訊。但EventBus不能實現跨進程間的通訊。java

圖文表達

源碼剖析

  • UML類圖 android

  • 源碼分析 EventBus.java數組

    package de.greenrobot.event;
    
    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    import android.os.Handler;
    import android.os.Looper;
    import android.os.Message;
    import android.util.Log;
    
    /**
     * 
     Class based event bus, optimized for Android. By default, subscribers will handle events in methods named "onEvent".
     *
     * @author Markus Junginger, greenrobot
     */
    public class EventBus {
        /**
         * Log tag, apps may override it.
         */
        public static String TAG = "Event";
    
        private static final EventBus defaultInstance = new EventBus();//默認實例
    
        public enum ThreadMode {
            /**
             * Subscriber will be called in the same thread, which is posting the event.
             */
            PostThread,//發佈事件所在線程訂閱
            /**
             * Subscriber will be called in Android's main thread (sometimes referred to as UI thread). */ MainThread,//主線程訂閱 /* BackgroundThread */ } //表示某個類中同一個方法名的全部重載方法。(也就是一個類中全部的訂閱方法),緩存,目的:提升性能 private static final Map<String, List<Method>> methodCache = new HashMap<String, List<Method>>(); //保存事件類型的全部的事件(包括父類和接口),懶加載的,緩存,目的:了提升性能 private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<Class<?>, List<Class<?>>>(); //一個事件類型的全部訂閱者:發送事件時使用到(時間複雜度爲:O(1)) //用CopyOnWriteArrayList是爲了讀取是線程安全的。 private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; //一個訂閱對象的全部訂閱事件: // 訂閱(register)時是加入 // 取消訂閱(unregister)時使用:獲取當前訂閱對象的全部訂閱事件,而後在訂閱事件中將這個訂閱者刪除。 private final Map<Object, List<Class<?>>> typesBySubscriber; //每一個線程的事件隊列 private final ThreadLocal<List<Object>> currentThreadEventQueue = new ThreadLocal<List<Object>>() { @Override protected List<Object> initialValue() { return new ArrayList<Object>(); } }; //每一個線程的隊列是否在運轉中 private final ThreadLocal<BooleanWrapper> currentThreadIsPosting = new ThreadLocal<BooleanWrapper>() { @Override protected BooleanWrapper initialValue() { return new BooleanWrapper(); } }; private String defaultMethodName = "onEvent"; private PostViaHandler mainThreadPoster; public static EventBus getDefault() { return defaultInstance; } public EventBus() { subscriptionsByEventType = new HashMap<Class<?>, CopyOnWriteArrayList<Subscription>>(); typesBySubscriber = new HashMap<Object, List<Class<?>>>(); mainThreadPoster = new PostViaHandler(Looper.getMainLooper()); } public void register(Object subscriber) {//註冊 register(subscriber, defaultMethodName, ThreadMode.PostThread);//默認方法名,即onEvent, } public void registerForMainThread(Object subscriber) { register(subscriber, defaultMethodName, ThreadMode.MainThread); } //這裏應該加同步synchronized public void register(Object subscriber, String methodName, ThreadMode threadMode) {//傳入訂閱者,遍歷查找其訂閱的全部事件 List<Method> subscriberMethods = findSubscriberMethods(subscriber.getClass(), methodName); for (Method method : subscriberMethods) { Class<?> eventType = method.getParameterTypes()[0];//參數的類型 //查找的時候是根據參數的類型來肯定訂閱者的 subscribe(subscriber, method, eventType, threadMode); } } //若是當前類含有父類,是否查找父類中的全部含指定名稱的方法 private List<Method> findSubscriberMethods(Class<?> subscriberClass, String methodName) {//查找某個類中全部指定名稱的方法 String key = subscriberClass.getName() + '.' + methodName; //類名+方法名爲Key //key相同的同時進入,會出現兩次建立 List<Method> subscriberMethods; synchronized (methodCache) { subscriberMethods = methodCache.get(key);//方法名稱同樣的存在多個重載方法 } if (subscriberMethods != null) { return subscriberMethods; } //同時進入 subscriberMethods = new ArrayList<Method>(); Class<?> clazz = subscriberClass; HashSet<Class<?>> eventTypesFound = new HashSet<Class<?>>();// 同一個類參數相同的方法只加入一次(就是重寫的方法) while (clazz != null) { String name = clazz.getName(); if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) {//過濾掉系統類 // Skip system classes, this just degrades performance break; } Method[] methods = clazz.getDeclaredMethods(); for (Method method : methods) { if (method.getName().equals(methodName)) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1) {//參數爲1個 if (eventTypesFound.add(parameterTypes[0])) { // Only add if not already found in a sub class subscriberMethods.add(method); } } } } clazz = clazz.getSuperclass();//查找父類,也就是父類方法也會被調用 } if (subscriberMethods.isEmpty()) {//沒有訂閱方法,拋出異常 throw new RuntimeException("Subscriber " + subscriberClass + " has no methods called " + methodName); } else { synchronized (methodCache) { methodCache.put(key, subscriberMethods); } return subscriberMethods; } } public void register(Object subscriber, Class<?> eventType, Class<?>... moreEventTypes) {//指定事件類型 register(subscriber, defaultMethodName, ThreadMode.PostThread, eventType, moreEventTypes); } public void registerForMainThread(Object subscriber, Class<?> eventType, Class<?>... moreEventTypes) {//指定事件類型註冊 register(subscriber, defaultMethodName, ThreadMode.MainThread, eventType, moreEventTypes); } public synchronized void register(Object subscriber, String methodName, ThreadMode threadMode, Class<?> eventType, Class<?>... moreEventTypes) { Class<?> subscriberClass = subscriber.getClass(); Method method = findSubscriberMethod(subscriberClass, methodName, eventType); subscribe(subscriber, method, eventType, threadMode); for (Class<?> anothereventType : moreEventTypes) { method = findSubscriberMethod(subscriberClass, methodName, anothereventType); subscribe(subscriber, method, anothereventType, threadMode); } } //應該在對象lock中調用 private void subscribe(Object subscriber, Method subscriberMethod, Class<?> eventType, ThreadMode threadMode) { //調用方:register(Object subscriber, String methodName, ThreadMode threadMode)沒有加鎖 //併發的問題:多個線程同時進入,不一樣頁面都含有同一事件 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<Subscription>(); subscriptionsByEventType.put(eventType, subscriptions);//事件類型爲key,訂閱者爲value,一對多 } else { for (Subscription subscription : subscriptions) {//同一事件的訂閱者,屢次設置 if (subscription.subscriber == subscriber) { throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } } subscriberMethod.setAccessible(true); Subscription subscription = new Subscription(subscriber, subscriberMethod, threadMode); subscriptions.add(subscription); List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);//訂閱者爲key,訂閱的事件爲value,一對多 if (subscribedEvents == null) { subscribedEvents = new ArrayList<Class<?>>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); } /** * Class.getMethod is slow on Android 2.3 (and probably other versions), so use getDeclaredMethod and go up in the * class hierarchy if neccessary. */ private Method findSubscriberMethod(Class<?> subscriberClass, String methodName, Class<?> eventType) { Class<?> clazz = subscriberClass; while (clazz != null) { try { return clazz.getDeclaredMethod(methodName, eventType); } catch (NoSuchMethodException ex) { clazz = clazz.getSuperclass(); } } throw new RuntimeException("Method " + methodName + " not found in " + subscriberClass + " (must have single parameter of event type " + eventType + ")"); } /** * Unregisters the given subscriber for the given event classes. */ public synchronized void unregister(Object subscriber, Class<?>... eventTypes) { if (eventTypes.length == 0) { throw new IllegalArgumentException("Provide at least one event class"); } List<Class<?>> subscribedClasses = typesBySubscriber.get(subscriber); if (subscribedClasses != null) { for (Class<?> eventType : eventTypes) { unubscribeByEventType(subscriber, eventType); subscribedClasses.remove(eventType); } if (subscribedClasses.isEmpty()) { typesBySubscriber.remove(subscriber); } } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } /** * Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */ private void unubscribeByEventType(Object subscriber, Class<?> eventType) { List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { int size = subscriptions.size(); for (int i = 0; i < size; i++) { if (subscriptions.get(i).subscriber == subscriber) { subscriptions.remove(i); i--; size--; } } } } /** * Unregisters the given subscriber from all event classes. */ public synchronized void unregister(Object subscriber) { List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);//刪除全部的訂閱事件 if (subscribedTypes != null) { for (Class<?> eventType : subscribedTypes) { unubscribeByEventType(subscriber, eventType);//刪除事件類型 } typesBySubscriber.remove(subscriber); } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } //post(Object event) //post(Object event) /** * Posts the given event to the event bus. */ public void post(Object event) {//String Object List<Object> eventQueue = currentThreadEventQueue.get(); //每一個線程一個隊列,保證同一個線程發送事件的有序性 eventQueue.add(event); BooleanWrapper isPosting = currentThreadIsPosting.get(); if (isPosting.value) {//當前線程隊列正在運轉 return; } else {//沒有運轉開始運轉 isPosting.value = true; try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0)); //開始發送事件 } } finally { isPosting.value = false; } } } private void postSingleEvent(Object event) throws Error { List<Class<?>> eventTypes = findEventTypes(event.getClass()); //找到全部的事件類型(須要給父事件和接口事件也發送) boolean subscriptionFound = false; int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) {//只能保證Map的原子性 subscriptions = subscriptionsByEventType.get(clazz); } if (subscriptions != null) { for (Subscription subscription : subscriptions) {//由於是CopyOnWriteArrayList,因此寫入時讀取是線程安全的 if (subscription.threadMode == ThreadMode.PostThread) {//在發送線程中執行 postToSubscribtion(subscription, event); } else if (subscription.threadMode == ThreadMode.MainThread) {//在主線程中執行 mainThreadPoster.enqueue(event, subscription);//在主線程中執行 } else { throw new IllegalStateException("Unknown thread mode: " + subscription.threadMode); } } subscriptionFound = true; } } if (!subscriptionFound) { Log.d(TAG, "No subscripers registered for event " + event.getClass()); } } //查找全部的事件類型:當前類型和全部父類型,全部接口,包括父類型 // /** * Finds all Class objects including super classes and interfaces. */ private List<Class<?>> findEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List<Class<?>> eventTypes = eventTypesCache.get(eventClass); //緩存中存在,直接返回 if (eventTypes == null) { eventTypes = new ArrayList<Class<?>>(); Class<?> clazz = eventClass; while (clazz != null) {//事件類型,父事件 eventTypes.add(clazz); //自己是一個事件類型 addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } } /** * Recurses through super interfaces. */ static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) { for (Class<?> interfaceClass : interfaces) { if (!eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); } } } static void postToSubscribtion(Subscription subscription, Object event) throws Error { try { subscription.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { Throwable cause = e.getCause(); Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), cause); if (cause instanceof Error) { throw (Error) cause; } } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } } final static class Subscription { final Object subscriber; final Method method; final ThreadMode threadMode; Subscription(Object subscriber, Method method, ThreadMode threadMode) { this.subscriber = subscriber; this.method = method; this.threadMode = threadMode; } @Override public boolean equals(Object other) { if (other instanceof Subscription) { Subscription otherSubscription = (Subscription) other; // Super slow (improve once used): http://code.google.com/p/android/issues/detail?id=7811 return subscriber == otherSubscription.subscriber && method.equals(otherSubscription.method); } else { return false; } } @Override public int hashCode() { // Check performance once used return subscriber.hashCode() + method.hashCode(); } } /** * For ThreadLocal, much faster to set than storing a new Boolean. */ final static class BooleanWrapper {//提升性能,使用Boolean,修改值時須要每次set boolean value; } final static class PostViaHandler extends Handler {//主線程發送隊列 PostViaHandler(Looper looper) { super(looper); } void enqueue(Object event, Subscription subscription) { PendingPost pendingPost = PendingPost.obtainPendingPost(event, subscription); //待發送對象 Message message = obtainMessage(); message.obj = pendingPost; if (!sendMessage(message)) {//發送失敗:usually because the looper processing the message queue is exiting. throw new RuntimeException("Could not send handler message"); } } @Override public void handleMessage(Message msg) { PendingPost pendingPost = (PendingPost) msg.obj; Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); postToSubscribtion(subscription, event); } } } 複製代碼

    PendingPost.java緩存

    package de.greenrobot.event;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import de.greenrobot.event.EventBus.Subscription;
    
    final class PendingPost {//使用了享元模式
        private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
    
        Object event;
        Subscription subscription;
    
        private PendingPost(Object event, Subscription subscription) {
            this.event = event;
            this.subscription = subscription;
        }
    
        static PendingPost obtainPendingPost(Object event, Subscription subscription) {
            synchronized (pendingPostPool) {
                int size = pendingPostPool.size();
                if (size > 0) {
                    PendingPost pendingPost = pendingPostPool.remove(size - 1);
                    pendingPost.event = event;
                    pendingPost.subscription = subscription;
                    return pendingPost;
                }
            }
            return new PendingPost(event, subscription);
        }
    
        static void releasePendingPost(PendingPost pendingPost) {
            synchronized (pendingPostPool) {
                pendingPostPool.add(pendingPost);
            }
        }
    
    }
    複製代碼
  • 優勢安全

    1.在做者的註釋中(Class based event bus[指Google guava庫中的 event bus], optimized for Android. By default, subscribers will handle events in methods named "onEvent".)能夠看到,爲了優化性能,棄用了註解,使用固定或指定訂閱方法的方式,性能優化

    2.輕巧,不依賴Context。一些處理細節上也作了相應的優化,如:使用BooleanWrapper代替Boolean,這樣就不須要每次更改值都調用ThreadLocal的set方法;PendingPost的建立採用了享元模式提升了獲取其實例的性能,不須要頻繁建立和銷燬;利用CopyOnWriteArrayList在寫入安全的狀況下提升了讀取性能。bash

    3.能指定在主線程中訂閱事件。由於一般狀況下,咱們會在非UI線程中發送事件來更新UI,而更新UI須要在主線程中執行。併發

  • 不足app

    1.爲了性能最求性能的極致,失去了訂閱方法的靈活性,只能指定onEvent方法才能監聽,或者註冊時調用指定方法名,沒有使用註解那麼靈活。ide

    2.訂閱者接收事件的線程不能根據訂閱事件設置。由於常常咱們在一個類中會訂閱多個事件,而每一個事件基本上是沒有相關性的,在哪一個線程上訂閱是根據事件自己來決定的。

    三、強制使用事件的傳遞性,如:咱們發送某個事件,那麼這個事件全部的父類及接口都會被強制觸發,最好有設置開關,讓用戶決定。

  • 解讀

    • 如下代碼爲什麼須要使用事件隊列,直接發送事件給訂閱者不是更快更直接嗎?
      public void post(Object event) {//String Object
          List<Object> eventQueue = currentThreadEventQueue.get(); //每一個線程一個隊列,保證同一個線程發送事件的有序性
          eventQueue.add(event);
      
          BooleanWrapper isPosting = currentThreadIsPosting.get();
          if (isPosting.value) {//當前線程隊列正在運轉
              //在同一個線程中何時會走到這裏呢?若是走不到這裏那麼這個隊列就沒有意義
              return;
          } else {//沒有運轉開始運轉
              isPosting.value = true;
              try {
                  while (!eventQueue.isEmpty()) {
                      postSingleEvent(eventQueue.remove(0)); //開始發送事件
                  }
              } finally {
                  isPosting.value = false;
              }
          }
      }
      
      若是咱們能找到隊列正在運轉的狀況下,那麼隊列就有意義,能保證事件處理的有序性。
      
      舉例:
      public class RefreshEvent {}
      public class Activity1 extends AppCompatActivity{
          @Override
          protected void onCreate(Bundle savedInstanceState) {
              super.onCreate(savedInstanceState);
              setContentView(R.layout.activity_main);
              EventBus.getDefault().register(this);
          }
      
          @Override
          protected void onDestroy() {
              EventBus.getDefault().unregister(this);
              super.onDestroy();
          }
          
          public void onEvent(TestEvent event) {
              EventBus.getDefault().post(new RefreshEvent());
          }
      }
      
      public class ActivityB extends AppCompatActivity{
          @Override
          protected void onCreate(Bundle savedInstanceState) {
              super.onCreate(savedInstanceState);
              setContentView(R.layout.activity_main);
              EventBus.getDefault().register(this);
          }
      
          @Override
          protected void onDestroy() {
              EventBus.getDefault().unregister(this);
              super.onDestroy();
          }
          
          public void onEvent(TestEvent event) {
              
          }
      }
      
      咱們在後臺業務方法中發送了刷新事件:
      EventBus.getDefault().post(new RefreshEvent());
      
      上面的例子中:一、一個事件被多個訂閱者訂閱;二、訂閱者收到事件後再次發送相同事件。
      
      在這種狀況下,若是咱們不使用隊列,那麼在同一個線程中,先發送的事件訂閱者就可能會被後收到。
      這裏使用ThreadLocal+隊列,就很巧妙地維護了在同一個線程中事件處理的有序性。
      
      複製代碼
  • bug

    public void register(Object subscriber, String methodName, ThreadMode threadMode) 方法存在線程安全的問題,除非調用者都加了鎖,但代碼中最經常使用的調用public void register(Object subscriber, String methodName, ThreadMode threadMode)卻沒有加鎖。

自實現

  • 第一步,實現主邏輯:

    • 消息的訂閱:register(Object subscriber)
    • 消息的發佈:post(Object event)
    • 取消訂閱:unregister(Object subscriber)

    EventBus.java 代碼

    package org.hjb.eventbus;
    
    import android.util.Log;
    
    import androidx.annotation.NonNull;
    
    import java.lang.reflect.Method;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    public class EventBus {
    
        private static final String TAG = "EventBus";
    
        private static volatile EventBus mDefaultEventBus;
    
        private static final String DEFAULT_METHOD_NAME = "onEvent";
    
        //key:訂閱事件
        //value:訂閱者信息,訂閱者信息包括:訂閱對象,訂閱方法
        private final HashMap<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>();
    
        private ThreadLocal<ArrayList<Object>> currentThreadEventQueue = new ThreadLocal<ArrayList<Object>>() {
            @Override
            protected ArrayList<Object> initialValue() {
                return new ArrayList<>();
            }
        };
    
        private ThreadLocal<Boolean> currentThreadIsPosting = new ThreadLocal<Boolean>() {
            @Override
            protected Boolean initialValue() {
                return Boolean.FALSE;
            }
        };
    
        public EventBus() {
        }
    
        public static EventBus getDefault() {
            if (mDefaultEventBus == null) {
                synchronized (EventBus.class) {
                    if (mDefaultEventBus == null) {
                        mDefaultEventBus = new EventBus();
                    }
                }
            }
    
            return mDefaultEventBus;
        }
    
        /**
         * 訂閱
         *
         * @param subscriber 訂閱者
         */
        public synchronized void register(@NonNull Object subscriber) {//訂閱
            //保存當前訂閱者的全部訂閱事件
            //由於查詢是經過訂閱事件來查找的,因此使用map來保存訂閱數據
    
            //查找當前訂閱對象的全部訂閱方法
            ArrayList<Method> methods = findSubscriberMethods(subscriber, DEFAULT_METHOD_NAME);
    
            for (Method method : methods) {
                //訂閱事件
                Class<?> eventType = method.getParameterTypes()[0];
    
                CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
                if (subscriptions == null) {
                    subscriptions = new CopyOnWriteArrayList<>();
                    subscriptionsByEventType.put(eventType, subscriptions);
                } else { //須要判斷是否重複添加(同一個事件已經加過訂閱者),拋出異常
    
                    for (Subscription subscription : subscriptions) {
                        if (subscription.subscriber == subscriber) {
                            throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
                        }
                    }
    
                }
    
                method.setAccessible(true);//防止私有方法不能訪問
                subscriptions.add(new Subscription(subscriber, method));
    
            }
        }
    
        /**
         * 須要查找當前對象全部帶onEvent的重載方法,包括父類
         * <p>
         * 注意點:
         * 一、覆蓋的方法不要重複添加
         * 二、過濾掉系統類,java/javax/android開頭的
         * 三、只查找單個參數的方法
         * 四、私有方法也查找
         *
         * @param subscriber 訂閱者
         * @return 全部訂閱對象
         */
        private ArrayList<Method> findSubscriberMethods(Object subscriber, String methodName) {
    
            ArrayList<Method> subscriberMethods = new ArrayList<>();
    
            Class<?> clazz = subscriber.getClass();
    
            while (clazz != null) {
                //過濾系統類
                String name = clazz.getName();
                if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) {
                    break;
                }
                //查找目標方法
                for (Method method : clazz.getDeclaredMethods()) {
    
                    if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) {
    
                        int i = 0, size = subscriberMethods.size();
    
                        for (; i < size; i++) {
                            if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) {
                                break;
                            }
                        }
    
                        if (i == size) {//表示不存在,加入
                            subscriberMethods.add(method);
                        }
    
                    }
                }
    
                clazz = clazz.getSuperclass();
    
            }
    
            return subscriberMethods;
        }
    
        /**
         * 發送事件
         * <p>
         * 須要給全部訂閱者發送事件
         * <p>
         * 注意:咱們要發送的不只是當前對象的訂閱者,同時包括當前父類對象和接口對象的訂閱者。
         * <p>
         * 解決有有序性的問題:咱們須要保證同一個線程中
         *
         * @param event 事件
         */
        public void post(Object event) {
    
            List<Object> eventQueue = currentThreadEventQueue.get();
            eventQueue.add(event);
    
            Boolean isPosting = currentThreadIsPosting.get();
            if (!isPosting.booleanValue()) {
                currentThreadIsPosting.set(Boolean.TRUE);
    
                while (!eventQueue.isEmpty()) {
                    postEvent(eventQueue.remove(0));
                }
    
                currentThreadIsPosting.set(Boolean.FALSE);
            } else {
                Log.i("hejunbin", "正在運轉");
            }
        }
    
        /**
         * 發送事件
         *
         * @param event 待發送事件
         */
        private void postEvent(Object event) {
            //一、找到當前對象的全部事件類型
            ArrayList<Class<?>> eventTypes = findEventTypes(event.getClass());
    
            //二、找到此事件全部的訂閱者
            boolean subscriptionFound = false; //是否找到了訂閱者
    
            for (Class<?> eventType : eventTypes) {
    
                CopyOnWriteArrayList<Subscription> subscriptions;
    
                synchronized (this) {
                    subscriptions = subscriptionsByEventType.get(eventType);
                }
    
                if (subscriptions != null && subscriptions.size() > 0) {
                    subscriptionFound = true;
    
                    for (Subscription subscription : subscriptions) {
                        try {
                            subscription.method.invoke(subscription.subscriber, event);
                        } catch (Exception e) {
                            Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), e.getCause());
                        }
                    }
    
                }
    
            }
    
            if (!subscriptionFound) {
                Log.d(TAG, "No subscripers registered for event " + event.getClass());
            }
        }
    
        /**
         * 找到當前對象的全部事件類型(包括父類和接口)
         *
         * @param eventClass 事件類型
         * @return 全部事件類型
         */
        private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) {
    
            ArrayList<Class<?>> eventTypes = new ArrayList<>();
    
            Class<?> clazz = eventClass;
    
            while (clazz != null) {
                eventTypes.add(clazz);
                //接口自己自己存在層級
                addInterfaces(eventTypes, clazz.getInterfaces());
                clazz = clazz.getSuperclass();
            }
    
            return eventTypes;
        }
    
        private void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
            for (Class<?> interfaceClass : interfaces) {
                if (!eventTypes.contains(interfaceClass)) {
                    eventTypes.add(interfaceClass);
                    addInterfaces(eventTypes, interfaceClass.getInterfaces());
                }
            }
        }
    
        /**
         * 取消訂閱
         *
         * @param subscriber 訂閱者
         */
        public synchronized void unregister(Object subscriber) {//取消訂閱
    
            for (Map.Entry<Class<?>, CopyOnWriteArrayList<Subscription>> entry : subscriptionsByEventType.entrySet()) {
                int size = entry.getValue().size();
    
                for (int i = 0; i < size; i++) {
                    if (entry.getValue().get(i).subscriber == subscriber) {
                        entry.getValue().remove(i);
                        i--;
                        size--;
                    }
                }
            }
    
        }
    
        /**
         * 訂閱對象
         */
        static class Subscription {
            Object subscriber;
    
            Method method;
    
            public Subscription(Object subscriber, Method method) {
                this.subscriber = subscriber;
                this.method = method;
            }
        }
    }
    
    複製代碼
  • 能夠在主線程中訂閱事件

    在Android利用Handler的機制,將事件發送到主隊列中執行。

    由於須要將訂閱信息(Subscription)和事件(event)都須要傳遞,而Handler中的Message只能傳遞單個Object,因此本身建立了PendingPost包裝類。

    EventBus.java代碼

    package org.hjb.eventbus;
    
    
    import android.os.Handler;
    import android.os.Looper;
    import android.os.Message;
    import android.util.Log;
    
    import androidx.annotation.MainThread;
    import androidx.annotation.NonNull;
    
    import java.lang.reflect.Method;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    public class EventBus {
    
        private static final String TAG = "EventBus";
    
        private static volatile EventBus mDefaultEventBus;
    
        private static final String DEFAULT_METHOD_NAME = "onEvent";
    
        public enum ThreadMode {
            PostThread, //post所在線程
            MainThread,//主線程
        }
    
        //key:訂閱事件
        //value:訂閱者信息,訂閱者信息包括:訂閱對象,訂閱方法
        private final HashMap<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>();
    
        private ThreadLocal<ArrayList<Object>> currentThreadEventQueue = new ThreadLocal<ArrayList<Object>>() {
            @Override
            protected ArrayList<Object> initialValue() {
                return new ArrayList<>();
            }
        };
    
        private ThreadLocal<Boolean> currentThreadIsPosting = new ThreadLocal<Boolean>() {
            @Override
            protected Boolean initialValue() {
                return Boolean.FALSE;
            }
        };
    
        private MainThreadHandler mainThreadHandler;
    
        public EventBus() {
            mainThreadHandler = new MainThreadHandler();
        }
    
        public static EventBus getDefault() {
            if (mDefaultEventBus == null) {
                synchronized (EventBus.class) {
                    if (mDefaultEventBus == null) {
                        mDefaultEventBus = new EventBus();
                    }
                }
            }
    
            return mDefaultEventBus;
        }
    
        /**
         * 訂閱
         *
         * @param subscriber 訂閱者
         */
        public synchronized void register(@NonNull Object subscriber) {//訂閱
            subscribe(subscriber, ThreadMode.PostThread);
        }
    
        public synchronized void registerForMainThread(Object subscriber) {
            subscribe(subscriber, ThreadMode.MainThread);
        }
    
        public synchronized void register(Object subscriber, ThreadMode mode) {
            subscribe(subscriber, mode);
        }
    
        private void subscribe(Object subscriber, ThreadMode mode) {//須要在線程安全的方法中調用
            //保存當前訂閱者的全部訂閱事件
            //由於查詢是經過訂閱事件來查找的,因此使用map來保存訂閱數據
    
            //查找當前訂閱對象的全部訂閱方法
            ArrayList<Method> methods = findSubscriberMethods(subscriber, DEFAULT_METHOD_NAME);
    
            for (Method method : methods) {
                //訂閱事件
                Class<?> eventType = method.getParameterTypes()[0];
    
                CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
                if (subscriptions == null) {
                    subscriptions = new CopyOnWriteArrayList<>();
                    subscriptionsByEventType.put(eventType, subscriptions);
                } else { //須要判斷是否重複添加(同一個事件已經加過訂閱者),拋出異常
    
                    for (Subscription subscription : subscriptions) {
                        if (subscription.subscriber == subscriber) {
                            throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
                        }
                    }
    
                }
    
                method.setAccessible(true);//防止私有方法不能訪問
                subscriptions.add(new Subscription(subscriber, method, mode));
    
            }
        }
    
    
        /**
         * 須要查找當前對象全部帶onEvent的重載方法,包括父類
         * <p>
         * 注意點:
         * 一、覆蓋的方法不要重複添加
         * 二、過濾掉系統類,java/javax/android開頭的
         * 三、只查找單個參數的方法
         * 四、私有方法也查找
         *
         * @param subscriber 訂閱者
         * @return 全部訂閱對象
         */
        private ArrayList<Method> findSubscriberMethods(Object subscriber, String methodName) {
    
            ArrayList<Method> subscriberMethods = new ArrayList<>();
    
            Class<?> clazz = subscriber.getClass();
    
            while (clazz != null) {
                //過濾系統類
                String name = clazz.getName();
                if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) {
                    break;
                }
                //查找目標方法
                for (Method method : clazz.getDeclaredMethods()) {
    
                    if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) {
    
                        int i = 0, size = subscriberMethods.size();
    
                        for (; i < size; i++) {
                            if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) {
                                break;
                            }
                        }
    
                        if (i == size) {//表示不存在,加入
                            subscriberMethods.add(method);
                        }
    
                    }
                }
    
                clazz = clazz.getSuperclass();
    
            }
    
            return subscriberMethods;
        }
    
        /**
         * 發送事件
         * <p>
         * 須要給全部訂閱者發送事件
         * <p>
         * 注意:咱們要發送的不只是當前對象的訂閱者,同時包括當前父類對象和接口對象的訂閱者。
         * <p>
         * 解決有有序性的問題:咱們須要保證同一個線程中
         *
         * @param event 事件
         */
        public void post(Object event) {
    
            List<Object> eventQueue = currentThreadEventQueue.get();
            eventQueue.add(event);
    
            Boolean isPosting = currentThreadIsPosting.get();
            if (!isPosting.booleanValue()) {
                currentThreadIsPosting.set(Boolean.TRUE);
    
                while (!eventQueue.isEmpty()) {
                    postEvent(eventQueue.remove(0));
                }
    
                currentThreadIsPosting.set(Boolean.FALSE);
            } else {
                Log.i("hejunbin", "正在運轉");
            }
        }
    
        /**
         * 發送事件
         *
         * @param event 待發送事件
         */
        private void postEvent(Object event) {
            //一、找到當前對象的全部事件類型
            ArrayList<Class<?>> eventTypes = findEventTypes(event.getClass());
    
            //二、找到此事件全部的訂閱者
            boolean subscriptionFound = false; //是否找到了訂閱者
    
            for (Class<?> eventType : eventTypes) {
    
                CopyOnWriteArrayList<Subscription> subscriptions;
    
                synchronized (this) {
                    subscriptions = subscriptionsByEventType.get(eventType);
                }
    
                if (subscriptions != null && subscriptions.size() > 0) {
                    subscriptionFound = true;
    
                    for (Subscription subscription : subscriptions) {
    
                        if (subscription.mode == ThreadMode.PostThread) {
                            postEvent(subscription, event);
                        } else if (subscription.mode == ThreadMode.MainThread) {//在主線程中發送
                            mainThreadHandler.enqueue(subscription, event);
                        } else {//非法
                            throw new IllegalStateException("Unknown thread mode: " + subscription.mode);
                        }
    
                    }
    
                }
    
            }
    
            if (!subscriptionFound) {
                Log.d(TAG, "No subscripers registered for event " + event.getClass());
            }
        }
    
        private static void postEvent(Subscription subscription, Object event) {
    
            try {
                subscription.method.invoke(subscription.subscriber, event);
            } catch (Exception e) {
                Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), e.getCause());
            }
        }
    
    
        /**
         * 找到當前對象的全部事件類型(包括父類和接口)
         *
         * @param eventClass 事件類型
         * @return 全部事件類型
         */
        private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) {
    
            ArrayList<Class<?>> eventTypes = new ArrayList<>();
    
            Class<?> clazz = eventClass;
    
            while (clazz != null) {
                eventTypes.add(clazz);
                //接口自己自己存在層級
                addInterfaces(eventTypes, clazz.getInterfaces());
                clazz = clazz.getSuperclass();
            }
    
            return eventTypes;
        }
    
        private void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
            for (Class<?> interfaceClass : interfaces) {
                if (!eventTypes.contains(interfaceClass)) {
                    eventTypes.add(interfaceClass);
                    addInterfaces(eventTypes, interfaceClass.getInterfaces());
                }
            }
        }
    
        /**
         * 取消訂閱
         *
         * @param subscriber 訂閱者
         */
        public synchronized void unregister(Object subscriber) {//取消訂閱
    
            for (Map.Entry<Class<?>, CopyOnWriteArrayList<Subscription>> entry : subscriptionsByEventType.entrySet()) {
                int size = entry.getValue().size();
    
                for (int i = 0; i < size; i++) {
                    if (entry.getValue().get(i).subscriber == subscriber) {
                        entry.getValue().remove(i);
                        i--;
                        size--;
                    }
                }
            }
    
        }
    
        /**
         * 訂閱對象
         */
        static class Subscription {
            Object subscriber;
    
            Method method;
    
            ThreadMode mode;
    
            public Subscription(Object subscriber, Method method, ThreadMode mode) {
                this.subscriber = subscriber;
                this.method = method;
                this.mode = mode;
            }
        }
    
        static class MainThreadHandler extends Handler {
    
            MainThreadHandler() {
                super(Looper.getMainLooper());
            }
    
            void enqueue(Subscription subscription, Object event) {
                Message message = Message.obtain();
                message.obj = new PendingPost(subscription, event);
                //如今有兩個對象須要傳遞,但Message只能傳遞單個對象,而Java沒有元祖類型,因此須要本身建立對象或者放入數組中
    
                if (!sendMessage(message)) {//發送失敗:usually because the looper processing the message queue is exiting.
                    throw new RuntimeException("Could not send handler message");
                }
            }
    
            @Override
            public void handleMessage(Message msg) {
                PendingPost pendingPost = (PendingPost) msg.obj;
                postEvent(pendingPost.subscription, pendingPost.event);
            }
        }
    }
    
    複製代碼

    PendingPost.java代碼

    package de.greenrobot.event;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import de.greenrobot.event.EventBus.Subscription;
    
    final class PendingPost {
        Object event;
        Subscription subscription;
    
        PendingPost(Object event, Subscription subscription) {
            this.event = event;
            this.subscription = subscription;
    }
    複製代碼
  • 性能優化

    • findSubscriberMethods 每次都要循環遍歷查找,能夠第一次查找後緩存起來,下次直接從緩存中獲取。(咱們常常在進入頁面時register,在返回頁面時unregister,當第二次進入時就可使用緩存中獲取了)

      實現代碼爲:

      private static final Map<String, ArrayList<Method>> methodCache = new HashMap<>();
       /**
       * 須要查找當前對象全部帶onEvent的重載方法,包括父類
       * <p>
       * 注意點:
       * 一、覆蓋的方法不要重複添加
       * 二、過濾掉系統類,java/javax/android開頭的
       * 三、只查找單個參數的方法
       * 四、私有方法也查找
       *
       * @param subscriberClass 訂閱者類
       * @return 全部訂閱對象
       */
      private ArrayList<Method> findSubscriberMethods(Class<?> subscriberClass, String methodName) {
      
          String key = subscriberClass.getName() + '.' + methodName; //類名+方法名爲Key
      
          ArrayList<Method> subscriberMethods = methodCache.get(key);
      
          if (subscriberMethods != null) {
              return subscriberMethods;
          }
      
          synchronized (methodCache) {
      
              subscriberMethods = methodCache.get(key);
      
              if (subscriberMethods != null) {
                  return subscriberMethods;
              }
      
      
              subscriberMethods = new ArrayList<>();
      
              Class<?> clazz = subscriberClass;
      
              while (clazz != null) {
                  //過濾系統類
                  String name = clazz.getName();
                  if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) {
                      break;
                  }
                  //查找目標方法
                  for (Method method : clazz.getDeclaredMethods()) {
      
                      if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) {
      
                          int i = 0, size = subscriberMethods.size();
      
                          for (; i < size; i++) {
                              if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) {
                                  break;
                              }
                          }
      
                          if (i == size) {//表示不存在,加入
                              subscriberMethods.add(method);
                          }
      
                      }
                  }
      
                  clazz = clazz.getSuperclass();
      
              }
      
              if (subscriberMethods.isEmpty()) {//沒有訂閱方法,拋出異常
                  throw new RuntimeException("Subscriber " + subscriberClass + " has no methods called " + methodName);
              } else {
                  methodCache.put(key, subscriberMethods);
                  return subscriberMethods;
              }
          }
      
      }
      複製代碼
    • findEventTypes 每次發送事件都會調用,而內部實現也須要循環遍歷,也能夠經過緩存來提升性能。

      修改代碼爲:

      private static final Map<Class<?>, ArrayList<Class<?>>> eventTypesCache = new HashMap<>();
      
      /**
       * 找到當前對象的全部事件類型(包括父類和接口)
       *
       * @param eventClass 事件類型
       * @return 全部事件類型
       */
      private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) {
      
          ArrayList<Class<?>> eventTypes = eventTypesCache.get(eventClass);
      
          if (eventTypes != null) {
              return eventTypes;
          }
      
          synchronized (eventTypesCache) {
      
              eventTypes = eventTypesCache.get(eventClass);
      
              if (eventTypes != null) {
                  return eventTypes;
              }
      
              eventTypes = new ArrayList<>();
      
              Class<?> clazz = eventClass;
      
              while (clazz != null) {
                  eventTypes.add(clazz);
                  //接口自己自己存在層級
                  addInterfaces(eventTypes, clazz.getInterfaces());
                  clazz = clazz.getSuperclass();
              }
      
              eventTypesCache.put(eventClass, eventTypes);
      
              return eventTypes;
          }
      }
      複製代碼
    • unregister方法,無論有沒有訂閱過,都須要雙層遍歷,時間複雜度爲O(n2),能夠創建以subscriber爲key的哈希表,將時間複雜度降爲O(n)。

      修改代碼爲:

      //存儲一個訂閱對象的全部訂閱事件(key:訂閱對象,value:全部訂閱事件)
      private final HashMap<Object, List<Class<?>>> eventTypesBySubscriber = new HashMap<>();
      
      private void subscribe(Object subscriber, ThreadMode mode) {//須要在線程安全的方法中調用
          //保存當前訂閱者的全部訂閱事件
          //由於查詢是經過訂閱事件來查找的,因此使用map來保存訂閱數據
      
          //查找當前訂閱對象的全部訂閱方法
          ArrayList<Method> methods = findSubscriberMethods(subscriber.getClass(), DEFAULT_METHOD_NAME);
      
          for (Method method : methods) {
              //訂閱事件
              Class<?> eventType = method.getParameterTypes()[0];
      
              CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
              if (subscriptions == null) {
                  subscriptions = new CopyOnWriteArrayList<>();
                  subscriptionsByEventType.put(eventType, subscriptions);
              } else { //須要判斷是否重複添加(同一個事件已經加過訂閱者),拋出異常
      
                  for (Subscription subscription : subscriptions) {
                      if (subscription.subscriber == subscriber) {
                          throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
                      }
                  }
      
              }
      
              method.setAccessible(true);//防止私有方法不能訪問
              subscriptions.add(new Subscription(subscriber, method, mode));
      
              //訂閱對象的所訂閱的數據類型
              List<Class<?>> subscribedEvents = eventTypesBySubscriber.get(subscriber);//訂閱者爲key,訂閱的事件爲value,一對多
              if (subscribedEvents == null) {
                  subscribedEvents = new ArrayList<>();
                  eventTypesBySubscriber.put(subscriber, subscribedEvents);
              }
              subscribedEvents.add(eventType);
      
          }
      }
      
      /**
       * 取消訂閱
       *
       * @param subscriber 訂閱者
       */
      public synchronized void unregister(Object subscriber) {//取消訂閱
      
          List<Class<?>> eventTypes = eventTypesBySubscriber.get(subscriber);
          if (eventTypes == null) {
              Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
              return;
          }
      
          for (Class<?> eventType : eventTypes) { //雖然這裏也是循環,可是這裏的只是單個訂閱者中的事件,而不是全局的事件的遍歷
              CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
              if (subscriptions != null) {
                  int size =subscriptions.size();
                  for (int i = 0; i < size; i++) {
                      if (subscriptions.get(i).subscriber == subscriber) {
                          subscriptions.remove(i);
                          i--;
                          size--;
                      }
                  }
              }
          }
          eventTypesBySubscriber.remove(subscriber);
      }
      
      複製代碼
    • PendingPost使用享元模式(重複利用對象)

      修改代碼爲:

      package org.hjb.eventbus;
      
      import java.util.ArrayList;
      import java.util.List;
      
      import org.hjb.eventbus.EventBus.Subscription;
      
      public class PendingPost { //使用了享元模式
      
          private static final List<PendingPost> pendingPostPool = new ArrayList<>(0);
      
          Subscription subscription;
      
          Object event;
      
          private PendingPost(EventBus.Subscription subscription, Object event) {
              this.subscription = subscription;
              this.event = event;
          }
      
          static PendingPost obtain(Subscription subscription, Object event) {
              synchronized (pendingPostPool) {//看池子裏是否有對象,若是有,使用池子裏但對象
      //            if (!pendingPostPool.isEmpty()) {
      //                PendingPost pendingPost = pendingPostPool.remove(0);
      //                pendingPost.subscription = subscription;
      //                pendingPost.event = event;
      //                return pendingPost;
      //            }
      
                  //上面註釋的代碼使用第一個元素,應該使用最後一個元素,
                  //由於使用的是數組存儲,若是remove第一個的話,那麼每次取出的操做都涉及到數組的搬移操做時間複雜度爲O(n)
                  //若是刪除最後一個元素,不涉及搬移操做時間複雜度爲O(1)
                  //因此應該寫成
                  int size = pendingPostPool.size();
                  if (size > 0) {
                      PendingPost pendingPost = pendingPostPool.remove(size - 1);
                      pendingPost.subscription = subscription;
                      pendingPost.event = event;
      
                      return pendingPost;
                  }
              }
      
              return new PendingPost(subscription, event);//沒有,建立
          }
      
          void recycle() {
              synchronized (pendingPostPool) {
                  pendingPostPool.add(this);
              }
          }
      }
      
      複製代碼

      使用時也須要修改,代碼爲:

      static class MainThreadHandler extends Handler {
      
          MainThreadHandler() {
              super(Looper.getMainLooper());
          }
      
          void enqueue(Subscription subscription, Object event) {
              Message message = Message.obtain();
              //如今有兩個對象須要傳遞,但Message只能傳遞單個對象,而Java沒有元祖類型,因此須要本身建立對象或者放入數組中
              message.obj = PendingPost.obtain(subscription, event);
              
              if (!sendMessage(message)) {//發送失敗:usually because the looper processing the message queue is exiting.
                  throw new RuntimeException("Could not send handler message");
              }
          }
      
          @Override
          public void handleMessage(Message msg) {
              PendingPost pendingPost = (PendingPost) msg.obj;
              postEvent(pendingPost.subscription, pendingPost.event);
              pendingPost.recycle();//回收
          }
      }
      複製代碼

總結

咱們能夠看出,1.0.1版本仍是有些bug和不足的,後面我將分析2.x和3.x版本。

相關文章
相關標籤/搜索