用LiveDataBus替代RxBus、EventBus——Android消息總線的演進之路

背景

對於Android系統來講,消息傳遞是最基本的組件,每個App內的不一樣頁面,不一樣組件都在進行消息傳遞。消息傳遞既能夠用於Android四大組件之間的通訊,也可用於異步線程和主線程之間的通訊。對於Android開發者來講,常用的消息傳遞方式有不少種,從最先使用的Handler、BroadcastReceiver、接口回調,到近幾年流行的通訊總線類框架EventBus、RxBus。Android消息傳遞框架,總在不斷的演進之中。java

從EventBus提及

EventBus是一個Android事件發佈/訂閱框架,經過解耦發佈者和訂閱者簡化Android事件傳遞。EventBus能夠代替Android傳統的Intent、Handler、Broadcast或接口回調,在Fragment、Activity、Service線程之間傳遞數據,執行方法。android

EventBus最大的特色就是:簡潔、解耦。在沒有EventBus以前咱們一般用廣播來實現監聽,或者自定義接口函數回調,有的場景咱們也能夠直接用Intent攜帶簡單數據,或者在線程之間經過Handler處理消息傳遞。但不管是廣播仍是Handler機制遠遠不能知足咱們高效的開發。EventBus簡化了應用程序內各組件間、組件與後臺線程間的通訊。EventBus一經推出,便受到廣大開發者的推崇。git

如今看來,EventBus給Android開發者世界帶來了一種新的框架和思想,就是消息的發佈和訂閱。這種思想在其後不少框架中都獲得了應用。github

eventbus

圖片摘自EventBus GitHub主頁

發佈/訂閱模式

訂閱發佈模式定義了一種「一對多」的依賴關係,讓多個訂閱者對象同時監聽某一個主題對象。這個主題對象在自身狀態變化時,會通知全部訂閱者對象,使它們可以自動更新本身的狀態。編程

發佈/訂閱模式

RxBus的出現

RxBus不是一個庫,而是一個文件,實現只有短短30行代碼。RxBus自己不須要過多分析,它的強大徹底來自於它基於的RxJava技術。響應式編程(Reactive Programming)技術這幾年特別火,RxJava是它在Java上的實做。RxJava天生就是發佈/訂閱模式,並且很容易處理線程切換。因此,RxBus憑藉區區30行代碼,就敢挑戰EventBus江湖老大的地位。markdown

RxBus原理

在RxJava中有個Subject類,它繼承Observable類,同時實現了Observer接口,所以Subject能夠同時擔當訂閱者和被訂閱者的角色,咱們使用Subject的子類PublishSubject來建立一個Subject對象(PublishSubject只有被訂閱後纔會把接收到的事件馬上發送給訂閱者),在須要接收事件的地方,訂閱該Subject對象,以後若是Subject對象接收到事件,則會發射給該訂閱者,此時Subject對象充當被訂閱者的角色。架構

完成了訂閱,在須要發送事件的地方將事件發送給以前被訂閱的Subject對象,則此時Subject對象做爲訂閱者接收事件,而後會馬上將事件轉發給訂閱該Subject對象的訂閱者,以便訂閱者處理相應事件,到這裏就完成了事件的發送與處理。app

最後就是取消訂閱的操做了,RxJava中,訂閱操做會返回一個Subscription對象,以便在合適的時機取消訂閱,防止內存泄漏,若是一個類產生多個Subscription對象,咱們能夠用一個CompositeSubscription存儲起來,以進行批量的取消訂閱。框架

RxBus有不少實現,如:異步

AndroidKnife/RxBus(https://github.com/AndroidKnife/RxBus) Blankj/RxBus(https://github.com/Blankj/RxBus)

其實正如前面所說的,RxBus的原理是如此簡單,咱們本身均可以寫出一個RxBus的實現:

基於RxJava1的RxBus實現:

public final class RxBus {

    private final Subject<Object, Object> bus;

    private RxBus() {
        bus = new SerializedSubject<>(PublishSubject.create());
    }

    private static class SingletonHolder {
        private static final RxBus defaultRxBus = new RxBus();
    }

    public static RxBus getInstance() {
        return SingletonHolder.defaultRxBus;
    }

    /* * 發送 */
    public void post(Object o) {
        bus.onNext(o);
    }

    /* * 是否有Observable訂閱 */
    public boolean hasObservable() {
        return bus.hasObservers();
    }

    /* * 轉換爲特定類型的Obserbale */
    public <T> Observable<T> toObservable(Class<T> type) {
        return bus.ofType(type);
    }
}
複製代碼

基於RxJava2的RxBus實現:

public final class RxBus2 {

    private final Subject<Object> bus;

    private RxBus2() {
        // toSerialized method made bus thread safe
        bus = PublishSubject.create().toSerialized();
    }

    public static RxBus2 getInstance() {
        return Holder.BUS;
    }

    private static class Holder {
        private static final RxBus2 BUS = new RxBus2();
    }

    public void post(Object obj) {
        bus.onNext(obj);
    }

    public <T> Observable<T> toObservable(Class<T> tClass) {
        return bus.ofType(tClass);
    }

    public Observable<Object> toObservable() {
        return bus;
    }

    public boolean hasObservers() {
        return bus.hasObservers();
    }
}
複製代碼

引入LiveDataBus的想法

從LiveData談起

LiveData是Android Architecture Components提出的框架。LiveData是一個能夠被觀察的數據持有類,它能夠感知並遵循Activity、Fragment或Service等組件的生命週期。正是因爲LiveData對組件生命週期可感知特色,所以能夠作到僅在組件處於生命週期的激活狀態時才更新UI數據。

LiveData須要一個觀察者對象,通常是Observer類的具體實現。當觀察者的生命週期處於STARTED或RESUMED狀態時,LiveData會通知觀察者數據變化;在觀察者處於其餘狀態時,即便LiveData的數據變化了,也不會通知。

LiveData的優勢

  • UI和實時數據保持一致 由於LiveData採用的是觀察者模式,這樣一來就能夠在數據發生改變時得到通知,更新UI。
  • 避免內存泄漏 觀察者被綁定到組件的生命週期上,當被綁定的組件銷燬(destroy)時,觀察者會馬上自動清理自身的數據。
  • 不會再產生因爲Activity處於stop狀態而引發的崩潰 例如:當Activity處於後臺狀態時,是不會收到LiveData的任何事件的。
  • 不須要再解決生命週期帶來的問題 LiveData能夠感知被綁定的組件的生命週期,只有在活躍狀態纔會通知數據變化。
  • 實時數據刷新 當組件處於活躍狀態或者從不活躍狀態到活躍狀態時老是能收到最新的數據。
  • 解決Configuration Change問題 在屏幕發生旋轉或者被回收再次啓動,馬上就能收到最新的數據。

談一談Android Architecture Components

Android Architecture Components的核心是Lifecycle、LiveData、ViewModel 以及 Room,經過它能夠很是優雅的讓數據與界面進行交互,並作一些持久化的操做,高度解耦,自動管理生命週期,並且不用擔憂內存泄漏的問題。

  • Room 一個強大的SQLite對象映射庫。
  • ViewModel 一類對象,它用於爲UI組件提供數據,在設備配置發生變動時依舊能夠存活。
  • LiveData 一個可感知生命週期、可被觀察的數據容器,它能夠存儲數據,還會在數據發生改變時進行提醒。
  • Lifecycle 包含LifeCycleOwer和LifecycleObserver,分別是生命週期全部者和生命週期感知者。

Android Architecture Components的特色

  • 數據驅動型編程 變化的永遠是數據,界面無需更改。
  • 感知生命週期,防止內存泄漏。
  • 高度解耦 數據,界面高度分離。
  • 數據持久化 數據、ViewModel不與UI的生命週期掛鉤,不會由於界面的重建而銷燬。

重點:爲何使用LiveData構建數據通訊總線LiveDataBus

使用LiveData的理由

  • LiveData具備的這種可觀察性和生命週期感知的能力,使其很是適合做爲Android通訊總線的基礎構件。
  • 使用者不用顯示調用反註冊方法。 因爲LiveData具備生命週期感知能力,因此LiveDataBus只須要調用註冊回調方法,而不須要顯示的調用反註冊方法。這樣帶來的好處不只能夠編寫更少的代碼,並且能夠徹底杜絕其餘通訊總線類框架(如EventBus、RxBus)忘記調用反註冊所帶來的內存泄漏的風險。

爲何要用LiveDataBus替代EventBus和RxBus

  • LiveDataBus的實現及其簡單 相對EventBus複雜的實現,LiveDataBus只須要一個類就能夠實現。
  • LiveDataBus能夠減少APK包的大小 因爲LiveDataBus只依賴Android官方Android Architecture Components組件的LiveData,沒有其餘依賴,自己實現只有一個類。做爲比較,EventBus JAR包大小爲57kb,RxBus依賴RxJava和RxAndroid,其中RxJava2包大小2.2MB,RxJava1包大小1.1MB,RxAndroid包大小9kb。使用LiveDataBus能夠大大減少APK包的大小。
  • LiveDataBus依賴方支持更好 LiveDataBus只依賴Android官方Android Architecture Components組件的LiveData,相比RxBus依賴的RxJava和RxAndroid,依賴方支持更好。
  • LiveDataBus具備生命週期感知 LiveDataBus具備生命週期感知,在Android系統中使用調用者不須要調用反註冊,相比EventBus和RxBus使用更爲方便,而且沒有內存泄漏風險。

LiveDataBus的設計和架構

LiveDataBus的組成

  • 消息 消息能夠是任何的Object,能夠定義不一樣類型的消息,如Boolean、String。也能夠定義自定義類型的消息。
  • 消息通道 LiveData扮演了消息通道的角色,不一樣的消息通道用不一樣的名字區分,名字是String類型的,能夠經過名字獲取到一個LiveData消息通道。
  • 消息總線 消息總線經過單例實現,不一樣的消息通道存放在一個HashMap中。
  • 訂閱 訂閱者經過getChannel獲取消息通道,而後調用observe訂閱這個通道的消息。
  • 發佈 發佈者經過getChannel獲取消息通道,而後調用setValue或者postValue發佈消息。

LiveDataBus原理圖

LiveDataBus原理圖

LiveDataBus的實現

第一個實現:

public final class LiveDataBus {

    private final Map<String, MutableLiveData<Object>> bus;

    private LiveDataBus() {
        bus = new HashMap<>();
    }

    private static class SingletonHolder {
        private static final LiveDataBus DATA_BUS = new LiveDataBus();
    }

    public static LiveDataBus get() {
        return SingletonHolder.DATA_BUS;
    }

    public <T> MutableLiveData<T> getChannel(String target, Class<T> type) {
        if (!bus.containsKey(target)) {
            bus.put(target, new MutableLiveData<>());
        }
        return (MutableLiveData<T>) bus.get(target);
    }

    public MutableLiveData<Object> getChannel(String target) {
        return getChannel(target, Object.class);
    }
}
複製代碼

短短二十行代碼,就實現了一個通訊總線的所有功能,而且還具備生命週期感知功能,而且使用起來也及其簡單:

註冊訂閱:

LiveDataBus.get().getChannel("key_test", Boolean.class)
        .observe(this, new Observer<Boolean>() {
            @Override
            public void onChanged(@Nullable Boolean aBoolean) {
            }
        });
複製代碼

發送消息:

LiveDataBus.get().getChannel("key_test").setValue(true);
複製代碼

咱們發送了一個名爲"key_test",值爲true的事件。 這個時候訂閱者就會收到消息,並做相應的處理,很是簡單。

問題出現

對於LiveDataBus的初版實現,咱們發現,在使用這個LiveDataBus的過程當中,訂閱者會收到訂閱以前發佈的消息。對於一個消息總線來講,這是不可接受的。不管EventBus或者RxBus,訂閱方都不會收到訂閱以前發出的消息。對於一個消息總線,LiveDataBus必需要解決這個問題。

問題分析

怎麼解決這個問題呢?先分析下緣由:

當LifeCircleOwner的狀態發生變化的時候,會調用LiveData.ObserverWrapper的activeStateChanged函數,若是這個時候ObserverWrapper的狀態是active,就會調用LiveData的dispatchingValue。

code

在LiveData的dispatchingValue中,又會調用LiveData的considerNotify方法。

code

在LiveData的considerNotify方法中,紅框中的邏輯是關鍵,若是ObserverWrapper的mLastVersion小於LiveData的mVersion,就會去回調mObserver的onChanged方法。而每一個新的訂閱者,其version都是-1,LiveData一旦設置過其version是大於-1的(每次LiveData設置值都會使其version加1),這樣就會致使LiveDataBus每註冊一個新的訂閱者,這個訂閱者馬上會收到一個回調,即便這個設置的動做發生在訂閱以前。

code

問題緣由總結

對於這個問題,總結一下發生的核心緣由。對於LiveData,其初始的version是-1,當咱們調用了其setValue或者postValue,其vesion會+1;對於每個觀察者的封裝ObserverWrapper,其初始version也爲-1,也就是說,每個新註冊的觀察者,其version爲-1;當LiveData設置這個ObserverWrapper的時候,若是LiveData的version大於ObserverWrapper的version,LiveData就會強制把當前value推送給Observer。

如何解決這個問題

明白了問題產生的緣由以後,咱們來看看怎麼才能解決這個問題。很顯然,根據以前的分析,只須要在註冊一個新的訂閱者的時候把Wrapper的version設置成跟LiveData的version一致便可。

那麼怎麼實現呢,看看LiveData的observe方法,他會在步驟1建立一個LifecycleBoundObserver,LifecycleBoundObserver是ObserverWrapper的派生類。而後會在步驟2把這個LifecycleBoundObserver放入一個私有Map容器mObservers中。不管ObserverWrapper仍是LifecycleBoundObserver都是私有的或者包可見的,因此沒法經過繼承的方式更改LifecycleBoundObserver的version。

那麼能不能從Map容器mObservers中取到LifecycleBoundObserver,而後再更改version呢?答案是確定的,經過查看SafeIterableMap的源碼咱們發現有一個protected的get方法。所以,在調用observe的時候,咱們能夠經過反射拿到LifecycleBoundObserver,再把LifecycleBoundObserver的version設置成和LiveData一致便可。

code

對於非生命週期感知的observeForever方法來講,實現的思路是一致的,可是具體的實現略有不一樣。observeForever的時候,生成的wrapper不是LifecycleBoundObserver,而是AlwaysActiveObserver(步驟1),並且咱們也沒有機會在observeForever調用完成以後再去更改AlwaysActiveObserver的version,由於在observeForever方法體內,步驟3的語句,回調就發生了。

code

那麼對於observeForever,如何解決這個問題呢?既然是在調用內回調的,那麼咱們能夠寫一個ObserverWrapper,把真正的回調給包裝起來。把ObserverWrapper傳給observeForever,那麼在回調的時候咱們去檢查調用棧,若是回調是observeForever方法引發的,那麼就不回調真正的訂閱者。

LiveDataBus最終實現

public final class LiveDataBus {

    private final Map<String, BusMutableLiveData<Object>> bus;

    private LiveDataBus() {
        bus = new HashMap<>();
    }

    private static class SingletonHolder {
        private static final LiveDataBus DEFAULT_BUS = new LiveDataBus();
    }

    public static LiveDataBus get() {
        return SingletonHolder.DEFAULT_BUS;
    }

    public <T> MutableLiveData<T> with(String key, Class<T> type) {
        if (!bus.containsKey(key)) {
            bus.put(key, new BusMutableLiveData<>());
        }
        return (MutableLiveData<T>) bus.get(key);
    }

    public MutableLiveData<Object> with(String key) {
        return with(key, Object.class);
    }

    private static class ObserverWrapper<T> implements Observer<T> {

        private Observer<T> observer;

        public ObserverWrapper(Observer<T> observer) {
            this.observer = observer;
        }

        @Override
        public void onChanged(@Nullable T t) {
            if (observer != null) {
                if (isCallOnObserve()) {
                    return;
                }
                observer.onChanged(t);
            }
        }

        private boolean isCallOnObserve() {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            if (stackTrace != null && stackTrace.length > 0) {
                for (StackTraceElement element : stackTrace) {
                    if ("android.arch.lifecycle.LiveData".equals(element.getClassName()) &&
                            "observeForever".equals(element.getMethodName())) {
                        return true;
                    }
                }
            }
            return false;
        }
    }

    private static class BusMutableLiveData<T> extends MutableLiveData<T> {

        private Map<Observer, Observer> observerMap = new HashMap<>();

        @Override
        public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
            super.observe(owner, observer);
            try {
                hook(observer);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        @Override
        public void observeForever(@NonNull Observer<T> observer) {
            if (!observerMap.containsKey(observer)) {
                observerMap.put(observer, new ObserverWrapper(observer));
            }
            super.observeForever(observerMap.get(observer));
        }

        @Override
        public void removeObserver(@NonNull Observer<T> observer) {
            Observer realObserver = null;
            if (observerMap.containsKey(observer)) {
                realObserver = observerMap.remove(observer);
            } else {
                realObserver = observer;
            }
            super.removeObserver(realObserver);
        }

        private void hook(@NonNull Observer<T> observer) throws Exception {
            //get wrapper's version
            Class<LiveData> classLiveData = LiveData.class;
            Field fieldObservers = classLiveData.getDeclaredField("mObservers");
            fieldObservers.setAccessible(true);
            Object objectObservers = fieldObservers.get(this);
            Class<?> classObservers = objectObservers.getClass();
            Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
            methodGet.setAccessible(true);
            Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
            Object objectWrapper = null;
            if (objectWrapperEntry instanceof Map.Entry) {
                objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
            }
            if (objectWrapper == null) {
                throw new NullPointerException("Wrapper can not be bull!");
            }
            Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
            Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
            fieldLastVersion.setAccessible(true);
            //get livedata's version
            Field fieldVersion = classLiveData.getDeclaredField("mVersion");
            fieldVersion.setAccessible(true);
            Object objectVersion = fieldVersion.get(this);
            //set wrapper's version
            fieldLastVersion.set(objectWrapper, objectVersion);
        }
    }
}
複製代碼

註冊訂閱

LiveDataBus.get()
        .with("key_test", String.class)
        .observe(this, new Observer<String>() {
            @Override
            public void onChanged(@Nullable String s) {
            }
        });
複製代碼

發送消息:

LiveDataBus.get().with("key_test").setValue(s);
複製代碼

源碼說明

LiveDataBus的源碼能夠直接拷貝使用,也能夠前往做者的GitHub倉庫查看下載: https://github.com/JeremyLiao/LiveDataBus

總結

本文提供了一個新的消息總線框架——LiveDataBus。訂閱者能夠訂閱某個消息通道的消息,發佈者能夠把消息發佈到消息通道上。利用LiveDataBus,不只能夠實現消息總線功能,並且對於訂閱者,他們不須要關心什麼時候取消訂閱,極大減小了由於忘記取消訂閱形成的內存泄漏風險。

做者介紹

海亮,美團高級工程師,2017年加入美團,目前主要負責美團輕收銀、美團收銀零售版等App的相關業務及模塊開發工做。

相關文章
相關標籤/搜索