事件總線方案實踐

liveData實現事件總線

目錄介紹

  • 01.EventBus使用原理
  • 02.RxBus使用原理
  • 03.爲什麼使用liveData
  • 04.LiveDataBus的組成
  • 05.LiveDataBus原理圖
  • 06.簡單的實現案例代碼
  • 07.遇到的問題和分析思路
  • 08.使用反射解決遇到問題
  • 09.使用postValue的bug
  • 10.如何發送延遲事件消息
  • 11.如何發送輪訓延遲事件
  • 12.避免類型轉換異常問題
  • 13.如何實現生命週期感知

00.事件開源庫

01.EventBus使用原理

  • 框架的核心思想,就是消息的發佈和訂閱,使用訂閱者模式實現,其原理圖大概以下所示,摘自網絡。java

    • image
  • 發佈和訂閱之間的依賴關係,其原理圖大概以下所示,摘自網絡。android

    • image
  • 訂閱/發佈模式和觀察者模式之間有着微弱的區別,我的以爲訂閱/發佈模式是觀察者模式的一種加強版。二者區別以下所示,摘自網絡。git

    • image摘自網絡
  • 具體使用能夠看demo代碼,demo開源地址

02.RxBus使用原理

  • RxBus不是一個庫,而是一個文件,實現只有短短30行代碼。RxBus自己不須要過多分析,它的強大徹底來自於它基於的RxJava技術。
  • 在RxJava中有個Subject類,它繼承Observable類,同時實現了Observer接口,所以Subject能夠同時擔當訂閱者和被訂閱者的角色,咱們使用Subject的子類PublishSubject來建立一個Subject對象(PublishSubject只有被訂閱後纔會把接收到的事件馬上發送給訂閱者),在須要接收事件的地方,訂閱該Subject對象,以後若是Subject對象接收到事件,則會發射給該訂閱者,此時Subject對象充當被訂閱者的角色。
  • 完成了訂閱,在須要發送事件的地方將事件發送給以前被訂閱的Subject對象,則此時Subject對象做爲訂閱者接收事件,而後會馬上將事件轉發給訂閱該Subject對象的訂閱者,以便訂閱者處理相應事件,到這裏就完成了事件的發送與處理。
  • 最後就是取消訂閱的操做了,RxJava中,訂閱操做會返回一個Subscription對象,以便在合適的時機取消訂閱,防止內存泄漏,若是一個類產生多個Subscription對象,咱們能夠用一個CompositeSubscription存儲起來,以進行批量的取消訂閱。
  • 具體使用能夠看demo代碼,demo開源地址

03.爲什麼使用liveData

  • 爲什麼使用liveDatagithub

    • LiveData具備的這種可觀察性和生命週期感知的能力,使其很是適合做爲Android通訊總線的基礎構件。在一對多的場景中,發佈消息事件後,訂閱事件的頁面只有在可見的時候纔會處理事件邏輯。
    • 使用者不用顯示調用反註冊方法。LiveData具備生命週期感知能力,因此LiveDataBus只須要調用註冊回調方法,而不須要顯示的調用反註冊方法。這樣帶來的好處不只能夠編寫更少的代碼,並且能夠徹底杜絕其餘通訊總線類框架(如EventBus、RxBus)忘記調用反註冊所帶來的內存泄漏的風險。
  • 該liveDataBus優點網絡

    • 1.該LiveDataBus的實現比較簡單,支持發送普通事件,也支持發送粘性事件;
    • 2.該LiveDataBus支持發送延遲事件消息,也能夠用做輪訓延遲事件(好比商城類項目某活動頁面5秒鐘刷一次接口數據),支持stop輪訓操做
    • 3.該LiveDataBus能夠減少APK包的大小,因爲LiveDataBus只依賴Android官方Android Architecture Components組件的LiveData;
    • 4.該LiveDataBus具備生命週期感知,這個是一個很大的優點。不須要反註冊,避免了內存泄漏等問題;
  • 關於liveData深度解析,能夠看我這篇博客:01.LiveData詳細分析

04.LiveDataBus的組成

  • 消息: 消息能夠是任何的 Object,能夠定義不一樣類型的消息,如 Boolean、String。也能夠定義自定義類型的消息。
  • 消息通道: LiveData 扮演了消息通道的角色,不一樣的消息通道用不一樣的名字區分,名字是 String 類型的,能夠經過名字獲取到一個 LiveData 消息通道。
  • 消息總線: 消息總線經過單例實現,不一樣的消息通道存放在一個 HashMap 中。
  • 訂閱: 訂閱者經過 getChannel() 獲取消息通道,而後調用 observe() 訂閱這個通道的消息。
  • 發佈: 發佈者經過 getChannel() 獲取消息通道,而後調用 setValue() 或者 postValue() 發佈消息。

05.LiveDataBus原理圖

  • 爲了方便理解,LiveDataBus原理圖以下所示多線程

    • image
  • 訂閱和註冊的流程圖併發

    • image
  • 訂閱註冊原理圖app

    • image
  • 爲什麼用LiveDataBus替代EventBus和RxBus框架

    • LiveDataBus的實現極其簡單
    • LiveDataBus能夠減少APK包的大小,因爲LiveDataBus只依賴Android官方Android Architecture Components組件的LiveData。
    • LiveDataBus具備生命週期感知。

06.簡單的實現案例代碼

  • 我這裏先用最簡單的代碼實現liveDataBus,而後用一下,看一下會出現什麼問題,代碼以下所示:ide

    public final class LiveDataBus1 {
    
        private final Map<String, MutableLiveData<Object>> bus;
    
        private LiveDataBus1() {
            bus = new HashMap<>();
        }
    
        private static class SingletonHolder {
            private static final LiveDataBus1 DATA_BUS = new LiveDataBus1();
        }
    
        public static LiveDataBus1 get() {
            return SingletonHolder.DATA_BUS;
        }
    
        public <T> MutableLiveData<T> getChannel(String target, Class<T> type) {
            if (!bus.containsKey(target)) {
                bus.put(target, new MutableLiveData<>());
            }
            return (MutableLiveData<T>) bus.get(target);
        }
    
        public MutableLiveData<Object> getChannel(String target) {
            return getChannel(target, Object.class);
        }
    }
  • 那麼如何發送消息和接收消息呢,注意二者的key須要保持一致,不然沒法接收?具體代碼以下所示:

    //發送消息
    LiveDataBus1.get().getChannel("yc_bus").setValue(text);
    //接收消息
    LiveDataBus1.get().getChannel("yc_bus", String.class)
            .observe(this, new Observer<String>() {
                @Override
                public void onChanged(@Nullable String newText) {
                    // 更新數據
                    tvText.setText(newText);
                }
            });

07.遇到的問題和分析思路

  • 遇到的問題:

    • 1.LiveData 一時使用一時爽,爽完了以後咱們發現這個簡易的 LiveDataBus 存在一個問題,就是訂閱者會收到訂閱以前發佈的消息,相似於粘性消息。對於一個消息總線來講,這是不可接受的。
    • 2.屢次調用了 postValue() 方法,只有最後次調用的值會獲得更新。也就是此方法是有可能會丟失事件!

7.1 先看第一個問題

  • 而後看一下LiveData的訂閱方法observe源碼

    • 看下面代碼可知道,LiveData 內部會將傳入參數包裝成 wrapper ,而後存在一個 Map 中,最後經過 LifeCycle 組件添加觀察者。
    // 註釋只能在主線程中調用該方法
    @MainThread
    public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
        // 當前綁定的組件(activity or fragment)狀態爲DESTROYED的時候, 則會忽視當前的訂閱請求
        if (owner.getLifecycle().getCurrentState() == DESTROYED) {
            // ignore
            return;
        }
        // 轉爲帶生命週期感知的觀察者包裝類
        LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
        ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
        // 對應觀察者只能與一個owner綁定,不然拋出異常
        if (existing != null && !existing.isAttachedTo(owner)) {
            throw new IllegalArgumentException("Cannot add the same observer"
                    + " with different lifecycles");
        }
        if (existing != null) {
            return;
        }
        // lifecycle註冊
        owner.getLifecycle().addObserver(wrapper);
    }
  • 緊接着,來看一下LiveData的更新數據方法

    • LiveData 更新數據方式有兩個,一個是 setValue() 另外一個是 postValue(),這兩個方法的區別是,postValue() 在內部會拋到主線程去執行更新數據,所以適合在子線程中使用;而 setValue() 則是直接更新數據。
    @MainThread
    protected void setValue(T value) {
        assertMainThread("setValue");
        // 這裏的 mVersion,它本問題關鍵,每次更新數據都會自增,默認值是 -1。
        mVersion++;
        mData = value;
        dispatchingValue(null);
    }
    • 跟進下 dispatchingValue() 方法,注意,這裏須要重點看considerNotify代碼:
    private void dispatchingValue(@Nullable ObserverWrapper initiator) {
        // mDispatchingValue的判斷主要是爲了解決併發調用dispatchingValue的狀況
        // 當對應數據的觀察者在執行的過程當中, 若有新的數據變動, 則不會再次通知到觀察者。因此觀察者內的執行不該進行耗時工做
        if (mDispatchingValue) {
            mDispatchInvalidated = true;
            return;
        }
        mDispatchingValue = true;
        do {
            mDispatchInvalidated = false;
            if (initiator != null) {
                // 等下重點看這裏的代碼
                considerNotify(initiator);
                initiator = null;
            } else {
                for (Iterator<Map.Entry<Observer<T>, ObserverWrapper>> iterator =
                        mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
                    // 等下重點看這裏的代碼
                    considerNotify(iterator.next().getValue());
                    if (mDispatchInvalidated) {
                        break;
                    }
                }
            }
        } while (mDispatchInvalidated);
        mDispatchingValue = false;
    }
    • 而後看一下considerNotify() 方法作了什麼,代碼以下所示,這裏有道詞典翻譯下注釋:
    private void considerNotify(ObserverWrapper observer) {
        if (!observer.mActive) {
            return;
        }
        // 檢查最新的狀態b4調度。也許它改變了狀態,但咱們尚未獲得事件。
        // 咱們仍是先檢查觀察者。活動,以保持它做爲活動的入口。
        // 所以,即便觀察者移動到一個活動狀態,若是咱們沒有收到那個事件,咱們最好不要通知一個更可預測的通知順序。
        if (!observer.shouldBeActive()) {
            observer.activeStateChanged(false);
            return;
        }
        if (observer.mLastVersion >= mVersion) {
            return;
        }
        observer.mLastVersion = mVersion;
        //noinspection unchecked
        observer.mObserver.onChanged((T) mData);
    }
  • 爲什麼訂閱者會立刻收到訂閱以前發佈的最新消息?

    • 若是 ObserverWrapper 的 mLastVersion 小於 LiveData 的 mVersion,那麼就會執行的 onChange() 方法去通知觀察者數據已更新。而 ObserverWrapper.mLastVersion 的默認值是 -1, LiveData 只要更新過數據,mVersion 就確定會大於 -1,因此訂閱者會立刻收到訂閱以前發佈的最新消息!!

7.2 而後看一下第二個問題

  • 首先看一下postValue源代碼,以下所示:

    • 看代碼註釋中說,若是在多線程中同一個時刻,屢次調用了 postValue() 方法,只有最後次調用的值會獲得更新。也就是此方法是有可能會丟失事件!
    • postValue 只是把傳進來的數據先存到 mPendingData,ArchTaskExecutor.getInstance()獲取的是一個單利對象。而後往主線程拋一個 Runnable,在這個 Runnable 裏面再調用 setValue 來把存起來的值真正設置上去,並回調觀察者們。而若是在這個 Runnable 執行前屢次 postValue,其實只是改變暫存的值 mPendingData,並不會再次拋另外一個 Runnable。
    protected void postValue(T value) {
        boolean postTask;
        synchronized (mDataLock) {
            postTask = mPendingData == NOT_SET;
            mPendingData = value;
        }
        if (!postTask) {
            return;
        }
        ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
    }
    
    private final Runnable mPostValueRunnable = new Runnable() {
        @Override
        public void run() {
            Object newValue;
            synchronized (mDataLock) {
                newValue = mPendingData;
                mPendingData = NOT_SET;
            }
            //noinspection unchecked
            setValue((T) newValue);
        }
    };

08.使用反射解決遇到問題

  • 根據以前的分析,只須要在註冊一個新的訂閱者的時候把Wrapper的version設置成跟LiveData的version一致便可。
  • 能不能從Map容器mObservers中取到LifecycleBoundObserver,而後再更改version呢?答案是確定的,經過查看SafeIterableMap的源碼咱們發現有一個protected的get方法。所以,在調用observe的時候,咱們能夠經過反射拿到LifecycleBoundObserver,再把LifecycleBoundObserver的version設置成和LiveData一致便可。

    @Override
    public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
        super.observe(owner, observer);
         hook(observer);
    }
    
    private void hook(@NonNull Observer<T> observer) {
        try {
            Class<LiveData> classLiveData = LiveData.class;
            Field fieldObservers = classLiveData.getDeclaredField("mObservers");
            fieldObservers.setAccessible(true);
            Object objectObservers = fieldObservers.get(this);
            Class<?> classObservers = objectObservers.getClass();
            Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
            methodGet.setAccessible(true);
            Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
            Object objectWrapper = null;
            if (objectWrapperEntry instanceof Map.Entry) {
                objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
            }
            if (objectWrapper != null) {
                Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
                Field fieldLastVersion = null;
                if (classObserverWrapper != null) {
                    fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
                    fieldLastVersion.setAccessible(true);
                    Field fieldVersion = classLiveData.getDeclaredField("mVersion");
                    fieldVersion.setAccessible(true);
                    Object objectVersion = fieldVersion.get(this);
                    fieldLastVersion.set(objectWrapper, objectVersion);
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        }
    }
  • 同時還須要注意,在實現MutableLiveData<T>自定義類BusMutableLiveData中,須要重寫這幾個方法。代碼以下所示:

    /**
     * 在給定的觀察者的生命週期內將給定的觀察者添加到觀察者列表全部者。
     * 事件是在主線程上分派的。若是LiveData已經有數據集合,它將被傳遞給觀察者。
     * @param owner                                 owner
*/
public void observeSticky(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
    super.observe(owner, observer);
}

/**
 * 將給定的觀察者添加到觀察者列表中。這個調用相似於{@link LiveData#observe(LifecycleOwner, Observer)}
 * 和一個LifecycleOwner, which老是積極的。這意味着給定的觀察者將接收全部事件,而且永遠不會 被自動刪除。
 * 您應該手動調用{@link #removeObserver(Observer)}來中止 觀察這LiveData。
 * @param observer                              observer
 */
public void observeStickyForever(@NonNull Observer<T> observer) {
    super.observeForever(observer);
}
```

09.使用postValue的bug

9.1 模擬經過發送多個postValue消息出現丟失問題

  • 首先看看MutableLiveData源代碼,以下所示,這裏重點展現測試數據案例

    public void postValue(T value) {
        super.postValue(value);
    }
  • 而後使用for循環,使用postValue發送100條消息事件,代碼以下所示:

    public void postValueCountTest() {
        sendCount = 100;
        receiveCount = 0;
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        for (int i = 0; i < sendCount; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    LiveDataBus2.get().getChannel(Constant.LIVE_BUS3).postValue("test_1_data"+sendCount);
                }
            });
        }
        new Handler().postDelayed(new Runnable() {
            @Override
            public void run() {
                BusLogUtils.d("sendCount: " + sendCount + " | receiveCount: " + receiveCount);
                Toast.makeText(ThirdActivity4.this, "sendCount: " + sendCount +
                        " | receiveCount: " + receiveCount, Toast.LENGTH_LONG).show();
            }
        }, 1000);
    }
    //接收消息
    LiveDataBus2.get()
            .getChannel(Constant.LIVE_BUS3, String.class)
            .observe(this, new Observer<String>() {
                @Override
                public void onChanged(@Nullable String s) {
                    receiveCount++;
                    BusLogUtils.d("接收消息--ThirdActivity4------yc_bus---1-"+s+"----"+receiveCount);
                }
            });
  • 而後看一下打印日誌,是否是發現了什麼問題?發現根本沒有100條數據……

    2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----1
    2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----2
    2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----3
    2020-03-03 10:25:51.403 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----4

9.2 修改後使用handler處理postValue消息

  • 既然post是在子線程中發送消息事件,那麼可不可使用handler將它放到主線程中處理事件了,是能夠的,代碼以下所示

    /**
     * 子線程發送事件
*/
@Override
public void postValue(T value) {
    //注意,去掉super方法,
    //super.postValue(value);
    mainHandler.post(new PostValueTask(value));
}

private BusWeakHandler mainHandler = new BusWeakHandler(Looper.getMainLooper());

private class PostValueTask implements Runnable {

    private T newValue;

    public PostValueTask(@NonNull T newValue) {
        this.newValue = newValue;
    }

    @Override
    public void run() {
        setValue(newValue);
    }
}
```
  • 而後再次使用for循環,發送100條消息事件,查看日誌。發現就會恰好有100條數據。代碼這裏就不展現了,跟上面測試代碼相似。

10.如何發送延遲事件消息

  • 能夠知道,經過postValue能夠在子線程發送消息,那麼發送延遲消息也十分簡單,代碼以下所示:

    /**
     * 子線程發送事件
*/
@Override
public void postValue(T value) {
    //注意,去掉super方法,
    //super.postValue(value);
    mainHandler.post(new PostValueTask(value));
}

/**
 * 發送延遲事件
 * @param value                                 value
 * @param delay                                 延遲時間
 */
@Override
public void postValueDelay(T value, long delay) {
    mainHandler.postDelayed(new PostValueTask(value) , delay);
    //mainHandler.postAtTime(new PostValueTask(value) , delay);
}
```
  • 測試用例,延遲5秒鐘發送事件,代碼以下所示。具體能夠看demo鐘的案例!

    LiveDataBus.get().with(Constant.LIVE_BUS4).postValueDelay("test_4_data",5000);

11.如何發送輪訓延遲事件

  • 輪訓延遲事件,好比有的頁面須要實現,每間隔5秒鐘就刷新一次頁面數據,經常用於活動頁面。在購物商城這類需求很常見

    @Override
    public void postValueInterval(final T value, final long interval) {
        mainHandler.postDelayed(new Runnable() {
            @Override
            public void run() {
                setValue(value);
                mainHandler.postDelayed(this,interval);
            }
        },interval);
    }
  • 測試用例,輪訓延遲3秒鐘發送事件,代碼以下所示。具體能夠看demo鐘的案例!

    LiveDataBus.get().with(Constant.LIVE_BUS5).postValueInterval("test_5_data",3000);
  • 這裏遇到了一個問題,假若有多個頁面有這種輪訓發送事件的需求,顯然這個是實現不了的。那麼可不能夠把每一個輪訓runnable記錄一個名稱區別開來代碼更更改以下

    /**
     * 發送延遲事件,間隔輪訓
     * @param value                                 value
*/
@Deprecated
@Override
public void postValueInterval(final T value, final long interval,@NonNull String taskName) {
    if(taskName.isEmpty()){
        return;
    }
    IntervalValueTask  intervalTask = new IntervalValueTask(value,interval);
    intervalTasks.put(taskName,intervalTask);
    mainHandler.postDelayed(intervalTask,interval);
}

private class IntervalValueTask implements Runnable {

    private T newValue;
    private long interval;

    public IntervalValueTask(T newValue, long interval) {
        this.newValue = newValue;
        this.interval = interval;
    }

    @Override
    public void run() {
        setValue(newValue);
        mainHandler.postDelayed(this,interval);
    }
}
```
  • 輪訓總不能夠一直持續下去吧,這個時候能夠添加一個手動關閉輪訓的方法。代碼以下所示:

    /**
*/
@Deprecated
@Override
public void stopPostInterval(@NonNull String taskName) {
    IntervalValueTask  intervalTask  = intervalTasks.get(taskName);
    if(intervalTask!= null){
        //移除callback
        mainHandler.removeCallbacks(intervalTask);
        intervalTasks.remove(taskName);
    }
}
```

12.避免類型轉換異常問題

  • 代碼以下所示

    public class SafeCastObserver<T> implements Observer<T> {
    
        @NonNull
        private final Observer<T> observer;
    
        public SafeCastObserver(@NonNull Observer<T> observer) {
            this.observer = observer;
        }
    
        @Override
        public void onChanged(@Nullable T t) {
            //捕獲異常,避免出現異常以後,收不到後續的消息事件
            try {
                //注意爲了不轉換出現的異常,try-catch捕獲
                observer.onChanged(t);
            } catch (ClassCastException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

13.如何實現生命週期感知

  • 生命週期感知能力就是當在Android平臺的LifecycleOwner(如Activity)中使用的時候,只須要訂閱消息,而不須要取消訂閱消息。LifecycleOwner的生命週期結束的時候,會自動取消訂閱。這帶來了兩個好處:

    • 能夠在任何位置訂閱消息,而不是必須在onCreate方法中訂閱
    • 避免了忘記取消訂閱引發的內存泄漏
  • 具體已經對lifecycle源碼做出了分析,具體能夠看我上一篇的博客。Lifecycle詳細分析

參考內容

事件總線開源庫:https://github.com/yangchong2...

相關文章
相關標籤/搜索