Android 使用 LiveData 實現 EventBus

緒論

本文是學習了大佬的文章後,本身去動手實踐後寫的一篇學習筆記。大佬的文章寫得比較好,我本身去寫未必描述得那麼清楚😂,因此本文不少地方都直接引用了大佬的文章。html

項目源碼:github.com/LinYaoTian/…java

Tip:閱讀本文最好對 Jetpack 的 LIfeCycle 和 LiveData 有初步的瞭解。android

引用:git

基本概念

對於 Android 系統來講,消息傳遞是最基本的組件,每個 App 內的不一樣頁面,不一樣的組件都在進行消息傳遞。github

消息傳遞既能夠用於 Android 四大組件以前的通訊,也能夠用於異步線程和主線程以前的通訊。多線程

對於 Android 開發者來講,常用的消息傳遞方式有不少種,從最先的 Handler、BroadcastReceiver 、接口回調,到最近幾年的流行的通訊總線類框架 EventBus、RxBus。架構

EventBus

說到 Android 的通訊總線類框架就不得不提到 EventBus 。併發

EventBus 是一個 Android 事件發佈/訂閱框架,經過解耦發佈者和訂閱者簡化 Android 事件傳遞。EventBus 能夠代替 Android 傳統的 Intent、Handler、Broadcast 或接口回調,在 Fragment、Activity、Service 線程之間傳遞數據,執行方法。app

EventBus 最大的特色就是:簡潔、解耦。框架

在沒有 EventBus 以前咱們一般用廣播來實現監聽,或者自定義接口函數回調,有的場景咱們也能夠直接用Intent攜帶簡單數據,或者在線程之間經過 Handler 處理消息傳遞。但不管是廣播仍是 Handler 機制遠遠不能知足咱們高效的開發。EventBus 簡化了應用程序內各組件間、組件與後臺線程間的通訊。EventBus 一經推出,便受到廣大開發者的推崇。

如今看來,EventBus 給 Android 開發者世界帶來了一種新的框架和思想,就是消息的發佈和訂閱。 這種思想在其後不少框架中都獲得了應用。

訂閱/發佈模式

訂閱發佈模式定義了一種「一對多」的依賴關係,讓多個訂閱者對象同時監聽某一個主題對象。這個主題對象在自身狀態變化時,會通知全部訂閱者對象,使它們可以自動更新本身的狀態。

訂閱/發佈模式和觀察者模式二者很是類似,我的以爲訂閱/發佈模式是觀察者模式的一種加強版。

二者的區別有:

  • 觀察者模式下,觀察者和被觀察者以前直接存在依賴關係。
  • 而訂閱/發佈模式下,訂閱者和發佈者以前多了一個調度中心,避免了訂閱者和發佈者的依賴關係。

詳情能夠看這篇文章: 觀察者模式 vs 發佈-訂閱模式

LiveData

本文是經過 LiveData 去實現 EventBus ,這裏先介紹下 LiveData。

LiveData 如同它的名字同樣,是一個可觀察的數據持有者,和常規的 observable 不一樣,LiveData 是能夠具備生命週期感知的,這意味着它可以在 Activity、Fragment、Service 中正確的處理生命週期。

實際上 LiveData 並不能單獨起做用,它依賴於 Jectpack 的 LifeCycle 組件,由於 LifeCycle 已經在 Activity 的父類中被封裝好了,在咱們使用 LiveData 的過程當中是基本是無感知的,因此這裏就簡單提一下 LifeCycle 好了。

LifeCycle 的做用

  • 管理組件的生命週期。
  • 讓第三方業務能在本身內部就能拿到依賴的組件的生命週期,便於及時叫停,避免錯過執行時機。

想繼續瞭解的同窗能夠看下這兩篇文章:

下面繼續說 LiveData:

LiveData 的數據源通常是 ViewModel,也能夠是其餘能夠更新的 LiveData 組件。

通常咱們使用 LiveData 的 observe(),當數據更新後,LiveData 會通知它的全部活躍的觀察者。與 RxJava 不一樣的,LiveData 只會通知活躍的觀察者,例如 Activity 位於 Destroyed 狀態時是不活躍的,所以不會收到通知。

固然咱們也可使用 LiveData 的 observerForever() 方法進行訂閱,區別是 observerForever() 不會受到 Activity 等組件的生命週期的影響,只要數據更新就會收到通知。

LiveData 的簡單使用:

public class MainActivity extends AppCompatActivity {
   private static final String TAG="MainActivity";
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
      	// 建立 LiveData 對象
        MutableLiveData<String> mutableLiveData  = new MutableLiveData<>();
        // 開始訂閱
      	mutableLiveData.observe(this, new Observer<String>() {
            @Override
            public void onChanged(@Nullable final String s) {
                Log.d(TAG, "onChanged:"+s);
            }
        });
      	// 更新數據,最終會回調上面的 onChange() 方法
        mutableLiveData.postValue("Android進階三部曲");
    }
}
複製代碼

LiveData 的更多詳情能夠參考這篇文章Android Jetpack架構組件(四)一文帶你瞭解LiveData(使用篇)

經過 LiveData 實現的優勢是什麼?

EventBus 是業界知名的通訊類總線庫,但它也存在許多被人詬病的缺點:

  1. 須要手動的註冊和反註冊,稍不當心可能會形成內存泄露。
  2. 使用 EventBus 出錯時難以跟蹤出錯的事件源。
  3. 每一個事件都要定義一個事件類,容易形成類膨脹。

而經過 LiveData 實現的 LiveDataBus 的具備如下優勢:

  1. 具備生命週期感知能力,不用手動註冊和反註冊。
  2. 具備惟一的可信事件源。
  3. 以字符串區分每個事件,避免類膨脹。
  4. LiveData 爲 Android 官方庫,更加可靠。

原理

LiveDataBus 的組成

  • 消息: 消息能夠是任何的 Object,能夠定義不一樣類型的消息,如 Boolean、String。也能夠定義自定義類型的消息。
  • 消息通道: LiveData 扮演了消息通道的角色,不一樣的消息通道用不一樣的名字區分,名字是 String 類型的,能夠經過名字獲取到一個 LiveData 消息通道。
  • 消息總線: 消息總線經過單例實現,不一樣的消息通道存放在一個 HashMap 中。
  • 訂閱: 訂閱者經過 getChannel() 獲取消息通道,而後調用 observe() 訂閱這個通道的消息。
  • 發佈: 發佈者經過 getChannel() 獲取消息通道,而後調用 setValue() 或者 postValue() 發佈消息。

LiveDataBus 原理圖

簡單實現

經過 LiveData 咱們能夠很是簡單的實現一個事件發佈/訂閱框架:

public final class LiveDataBus {

    private final Map<String, MutableLiveData<Object>> bus;

    private LiveDataBus() {
        bus = new HashMap<>();
    }

    private static class SingletonHolder {
        private static final LiveDataBus DATA_BUS = new LiveDataBus();
    }

    public static LiveDataBus get() {
        return SingletonHolder.DATA_BUS;
    }

    public <T> MutableLiveData<T> getChannel(String target, Class<T> type) {
        if (!bus.containsKey(target)) {
            bus.put(target, new MutableLiveData<>());
        }
        return (MutableLiveData<T>) bus.get(target);
    }

    public MutableLiveData<Object> getChannel(String target) {
        return getChannel(target, Object.class);
    }
}
複製代碼

沒錯,上面短短的 27 行代碼咱們就實現了一個事件發佈/訂閱框架!

問題一

LiveData 一時使用一時爽,爽完了以後咱們發現這個簡易的 LiveDataBus 存在一個問題,就是訂閱者會收到訂閱以前發佈的消息,相似於粘性消息。對於一個消息總線來講,粘性消息和非粘性消息都是必須支持的,下面咱們來看一下如何解決這個問題。

咱們先看一下爲何會出現這個問題:

LiveData 的訂閱方法

android.arch.lifecycle.LiveData

@MainThread
   public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
       assertMainThread("observe");
       // 當前綁定的組件(activity or fragment)狀態爲DESTROYED的時候, 則會忽視當前的訂閱請求
       if (owner.getLifecycle().getCurrentState() == DESTROYED) {
           return;
       }
       // 轉爲帶生命週期感知的觀察者包裝類
       LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
       ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
       // 對應觀察者只能與一個owner綁定
       if (existing != null && !existing.isAttachedTo(owner)) {
           throw new IllegalArgumentException("Cannot add the same observer"
                   + " with different lifecycles");
       }
       if (existing != null) {
           return;
       }
       // lifecycle註冊
       owner.getLifecycle().addObserver(wrapper);
   }
複製代碼

能夠看到,LiveData 內部會將咱們的傳入參數包裝成 wrapper ,而後存在一個 Map 中,最後經過 LifeCycle 組件添加觀察者。

LiveData 的更新數據方法

LiveData 更新數據方式有兩個,一個是 setValue() 另外一個是 postValue(),這兩個方法的區別是,postValue() 在內部會拋到主線程去執行更新數據,所以適合在子線程中使用;而 setValue() 則是直接更新數據。

這裏就只看下 setValue() 方法就行了

android.arch.lifecycle.LiveData

@MainThread
protected void setValue(T value) {
    assertMainThread("setValue");
    // 發送版本+1
    mVersion++;
    mData = value;
    // 信息分發
    dispatchingValue(null);
}
複製代碼

記住這裏的 mVersion,它本問題關鍵,每次更新數據都會自增,默認值是 -1。而後咱們跟進下 dispatchingValue() 方法:

android.arch.lifecycle.LiveData

void dispatchingValue(@Nullable ObserverWrapper initiator) {
        // mDispatchingValue的判斷主要是爲了解決併發調用dispatchingValue的狀況
        // 當對應數據的觀察者在執行的過程當中, 若有新的數據變動, 則不會再次通知到觀察者
        // 因此觀察者內的執行不該進行耗時工做
        if (mDispatchingValue) {
            mDispatchInvalidated = true;
            return;
        }
        mDispatchingValue = true;
        do {
            mDispatchInvalidated = false;
            if (initiator != null) {
              	// 這裏
                considerNotify(initiator);
                initiator = null;
            } else {
                for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
                        mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
                  	// 這裏
                    considerNotify(iterator.next().getValue());
                    if (mDispatchInvalidated) {
                        break;
                    }
                }
            }
        } while (mDispatchInvalidated);
        mDispatchingValue = false;
    }

複製代碼

能夠看到,不管條件判斷,最終都會執行 considerNotify() 方法,因此咱們繼續跟進:

android.arch.lifecycle.LiveData

private void considerNotify(ObserverWrapper observer) {
        if (!observer.mActive) {
            return;
        }
        if (!observer.shouldBeActive()) {
            observer.activeStateChanged(false);
            return;
        }
  			// 判斷 version
        if (observer.mLastVersion >= mVersion) {
            return;
        }
        observer.mLastVersion = mVersion;
        //noinspection unchecked
        observer.mObserver.onChanged((T) mData);
    }

複製代碼

終於到了最關鍵的時候了!!若是 ObserverWrapper 的 mLastVersion 小於 LiveData 的 mVersion,那麼就會執行的 onChange() 方法去通知觀察者數據已更新。

而 ObserverWrapper.mLastVersion 的默認值是 -1, LiveData 只要更新過數據,mVersion 就確定會大於 -1,因此訂閱者會立刻收到訂閱以前發佈的最新消息!!

問題二

咱們看一下 LiveData 的 postValue() 方法的註釋:

能夠看到,註釋中說,若是在多線程中同一個時刻,屢次調用了 postValue() 方法,只有最後次調用的值會獲得更新。也就是此方法是有可能會丟失事件!!!

看下源碼:

android.arch.lifecycle.LiveData

private final Runnable mPostValueRunnable = new Runnable() {
        @Override
        public void run() {
            Object newValue;
            synchronized (mDataLock) {
                newValue = mPendingData;
              	// 設置 mPendingData 爲 not_set
                mPendingData = NOT_SET;
            }
            setValue((T) newValue);
        }
    };

    protected void postValue(T value) {
        boolean postTask;
        synchronized (mDataLock) {
          	// 判斷 postTask 是否爲 not_set
            postTask = mPendingData == NOT_SET;
            mPendingData = value;
        }
        if (!postTask) {
            return;
        }
        ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
    }
複製代碼

從上面的源碼就能夠很容易看出,postValue 只是把傳進來的數據先存到 mPendingData,而後往主線程拋一個 Runnable,在這個 Runnable 裏面再調用 setValue 來把存起來的值真正設置上去,並回調觀察者們。而若是在這個 Runnable 執行前屢次 postValue,其實只是改變暫存的值 mPendingData,並不會再次拋另外一個 Runnable。

最終實現

明白了問題根源所在,咱們就好解決問題了。

解決這個問題的方案有多種,其中美團大佬Android消息總線的演進之路:用LiveDataBus替代RxBus、EventBus 使用的反射的方式修改 LiveData 中的 mVersion 值去實現。還有另外一個方案基於LiveData實現事件總線思路和方案,此方案是基於自定義觀察者包裝類,由於粘性消息最終會調用到 Observer#onChange() 方法,所以咱們自定義 Observer 包裝類,本身維護實際的訂閱消息數,來判斷是否須要觸發真正的 onChange() 方法。本文使用的是第二種方案。

第一步:自定義 Observer 包裝類

爲了業務的拓展性,這裏咱們定義先一個 Base 基類:

internal open class BaseBusObserverWrapper<T>(private val mObserver: Observer<in T>, private val mBusLiveData: BusLiveData<T>) : Observer<T> {

    private val mLastVersion = mBusLiveData.version

    private val TAG = "BaseBusObserverWrapper"

    override fun onChanged(t: T?) {
        Logger.d(TAG,"msg receiver = " + t.toString())
        if (mLastVersion >= mBusLiveData.version){
            // LiveData 的版本號沒有更新過,說明並無新數據,只是由於
            // 當前 Observer 的版本號比 LiveData 低致使的調用 onChange()
            return
        }
        try {
            mObserver.onChanged(t)
        }catch (e:Exception){
            Logger.e(TAG,"error on Observer onChanged() = " + e.message)
        }
    }

    open fun isAttachedTo(owner: LifecycleOwner) = false

}
複製代碼

這裏咱們保存了 LiveData 的 mVersion 值,每次執行 onChange() 時都先判斷一些 LiveData 是否更新過數據,若是沒有則不執行觀察者的 Observer.onChange() 方法。

而後定義兩個子類,BusLifecycleObserver 和 BusAlwaysActiveObserver ,其中 BusLifecycleObserver 用於 LiveData#observer() 方法訂閱的事件,而 BusAlwaysActiveObserver 用於 LiveData#observerForever() 方法訂閱的事件。

internal class BusLifecycleObserver<T>(private val observer: Observer<in T>, private val owner: LifecycleOwner, private val liveData: BusLiveData<T>)
    : BaseBusObserverWrapper<T>(observer,liveData),LifecycleObserver{

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun onDestroy(){
        liveData.removeObserver(observer)
        owner.lifecycle.removeObserver(this)
    }
}
複製代碼

對於 BusLifecycleObserver,在生命週期組件處於 Destroyed 時,須要把觀察者移除。

internal class BusAlwaysActiveObserver<T>(private val mObserver: Observer<in T>, private val mBusLiveData: BusLiveData<T>)
    : BaseBusObserverWrapper<T>(mObserver, mBusLiveData)
複製代碼

對於 BusAlwaysActiveObserver,觀察者不受組件生命週期的影響,所以不須要在組件 Destroyed 時移除。

第二步:自定義 LiveData 類

注意:由於 LiveData 的 getVersion() 是包訪問級別的!因此 BusLiveData 必須定義到與 LiveData 同一個包內,即 androidx.lifecycle 包下,所以須要你本身建立一個同名的包並將 BusLiveData 放到裏面。

若是你的項目工程沒有引入 androidx,也可使用 v7 包下的 android.arch.lifecycle,方法同理,不過要注意的是 android.arch.lifecycle 包下 LiveData 的方法參數的泛型是沒有型變的,所以直接複製這裏代碼會有點問題,須要你本身根據編譯器的提示修改下。

class BusLiveData<T>(private val mKey:String) : MutableLiveData<T>() {

    private val TAG = "BusLiveData"

    private val mObserverMap: MutableMap<Observer<in T>, BaseBusObserverWrapper<T>> = mutableMapOf()

    @MainThread
    override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
        val exist = mObserverMap.getOrPut(observer,{
            BusLifecycleObserver(observer,owner,this).apply {
                mObserverMap[observer] = this
                owner.lifecycle.addObserver(this)
            }
        })
        super.observe(owner, exist)
        Logger.d(TAG,"observe() called with: owner = [$owner], observer = [$observer]")
    }

    @MainThread
    override fun observeForever(observer: Observer<in T>) {
        super.observeForever(observer)
        val exist = mObserverMap.getOrPut(observer ,{
            BusAlwaysActiveObserver(observer,this).apply {
                mObserverMap[observer] = this
            }
        })
        super.observeForever(exist)
        Logger.d(TAG, "observeForever() called with: observer = [$observer]")
    }

    @MainThread
    fun observeSticky(owner: LifecycleOwner, observer: Observer<T>) {
        super.observe(owner, observer)
        Logger.d(TAG, "observeSticky() called with: owner = [$owner], observer = [$observer]")
    }

    @MainThread
    fun observeStickyForever(observer: Observer<T>){
        super.observeForever(observer)
        Logger.d(TAG, "observeStickyForever() called with: observer = [$observer]")
    }

    @MainThread
    override fun removeObserver(observer: Observer<in T>) {
        val exist = mObserverMap.remove(observer) ?: observer
        super.removeObserver(exist)
        Logger.d(TAG, "removeObserver() called with: observer = [$observer]")
    }

    @MainThread
    override fun removeObservers(owner: LifecycleOwner) {
        mObserverMap.iterator().forEach {
            if (it.value.isAttachedTo(owner)) {
                mObserverMap.remove(it.key)
            }
        }
        super.removeObservers(owner)
        Logger.d(TAG, "removeObservers() called with: owner = [$owner]")
    }
  	
  	override fun postValue(value: T) {
       mMainHandler.post {
           setValue(value)
       }
    }

    @MainThread
    override fun onInactive() {
        super.onInactive()
        if (!hasObservers()) {
            // 當 LiveData 沒有活躍的觀察者時,能夠移除相關的實例
            LiveDataBusCore.getInstance().mBusMap.remove(mKey)
        }
        Logger.d(TAG, "onInactive() called")
    }

    @MainThread
    public override fun getVersion(): Int {
        return super.getVersion()
    }


}
複製代碼

代碼比較短和簡單,要點就是,對於非粘性消息,即 observer() 和 observerForever() ,咱們須要使用自定義的包裝類包裝處理。對於粘性消息,則直接使用 LiveData 默認實現便可。

同時重寫 postValue() 方法,內部經過 MainHandler.post() 的方式去執行 setValue() 去避免 postValue() 會丟失數據的問題。

第三步:定義管理類

internal class LiveDataBusCore {

    companion object{

        @JvmStatic
        private val defaultBus = LiveDataBusCore()

        @JvmStatic
        fun getInstance() = defaultBus
    }

    internal val mBusMap : MutableMap<String, BusLiveData<*>> by lazy {
        mutableMapOf<String, BusLiveData<*>>()
    }

    fun <T> getChannel(key: String) : BusLiveData<T> {
        return mBusMap.getOrPut(key){
            BusLiveData<T>(key)
        } as BusLiveData<T>
    }
}
複製代碼

第四步:定義入口類

class LiveDataBus {

    companion object{

        @JvmStatic
        @Synchronized
        fun <T> getSyn(key: String) : BusLiveData<T>{
            return get(key)
        }
      
        @JvmStatic
        fun <T> get(key: String) : BusLiveData<T>{
            return LiveDataBusCore.getInstance().getChannel(key)
        }
      
        private fun <T> get(key: String, type: Class<T>) : BusLiveData<T> {
            return LiveDataBusCore.getInstance().getChannel(key)
        }

        @JvmStatic
        fun <E> of(clz: Class<E>): E {
            require(clz.isInterface) { "API declarations must be interfaces." }
            require(clz.interfaces.isEmpty()) { "API interfaces must not extend other interfaces." }
            return Proxy.newProxyInstance(clz.classLoader, arrayOf(clz), InvocationHandler { _, method, _->
                return@InvocationHandler get(
                        // 事件名以集合類名_事件方法名定義
                        // 以此保證事件的惟一性
                        "${clz.canonicalName}_${method.name}",
                        (method.genericReturnType as ParameterizedType).actualTypeArguments[0].javaClass)
            }) as E
        }
    }


}
複製代碼

使用:

class TestLiveDataBusActivity : AppCompatActivity() {
  
    companion object{
        private const val TAG = "TestLiveDataBusActivity"
    }

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_test_live_data_bus)

        LiveDataBus.get<String>("testObserver").observe(this, Observer<String> {
            Log.d(TAG, "testObserver = $it")
            test_observer_id.text = it
        })
      	
      	LiveDataBus.get<String>("testObserver").postValue("new value")
    }
}
複製代碼

LiveDataBus 還提供了一個 of() 方法,用於提供用於事件約束,什麼是時間約束呢?就是說咱們的如今觀察者和發佈者獲取消息渠道的 key 是一個字符串,在使用的過程當中可能會出現,發佈者的 key 是 itO 而觀察者的 key 誤輸入成 it0 的狀況,因此這裏咱們能夠模仿 Retrofit 請求動態代理的作法,在使用的過程當中,咱們須要先定義一個接口:

interface TestLiveEvents {
    fun event1(): MutableLiveData<String>
}
複製代碼

使用:

fun main() {
    LiveDataBus
            .of(TestLiveEvents::class.java)
            .event1()
            .postValue("new value")
}
複製代碼

總結

藉助於 Android 官方提供的 LiveData ,咱們能夠很是方便地實現本身的 LiveDataBus,所有文件也就只有以上幾個類,同時咱們還避免了 EventBus 的許多缺點!

LiveDataBus 的源碼:github.com/LinYaoTian/…

若是文章有任何問題,歡迎在評論區指正。

相關文章
相關標籤/搜索