基於LiveData實現事件總線思路和方案

前言

當前市面上, 比較經常使用的事件總線, 仍然是EventBusRxBus, 早期我曾經寫過EventBus源碼解析,這兩個框架不管是哪一個, 開發者都須要去考慮生命週期的處理.而美團給出了個解決方案, 經過LiveData來實現自帶生命週期感知能力的事件總線框架. 本篇咱們本身擼一個事件總線框架.html

LiveData的原理

咱們要用LiveData作事件總線, 總須要知道它是什麼, 爲何能夠用它來實現事件總線.java

LiveData可對數據進行觀測, 並具備生命週期感知能力, 這就意味着當liveData只會在生命週期處於活躍(inActive)的狀態下才會去執行觀測動做, 而他的能力賦予不能脫離LifeCycle的範圍.android

首先咱們能夠看下LiveData的UML圖, 便於對他有個大概的理解 git

這裏咱們須要注意的是, LiveData內維護的 mVersion表示的是發送信息的版本,每次發送一次信息, 它都會+1, 而 ObserverWrapper內維護的 mLastVersion爲訂閱觸發的版本號, 當訂閱動做生效的時候, 它的版本號會和發送信息的版本號同步.他們初始值都爲-1

訂閱

LiveData內部存在一個mObservers用來保存相關綁定的全部觀察者, 經過LiveData#observe以及LiveData#oberveForever方法, 咱們能夠進行訂閱動做.若是須要與生命週期綁定, 則須要傳入LifecycleOwner對象, 將咱們的LiveData數據觀測者(Observer)包裝註冊到生命週期的觀測者中, 得以接收到生命週期的變動, 並作出及時的對應更新活動, 咱們能夠看下LiveData的訂閱的方法代碼github

@MainThread
   public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
       assertMainThread("observe");
       // 當前綁定的組件(activity or fragment)狀態爲DESTROYED的時候, 則會忽視當前的訂閱請求
       if (owner.getLifecycle().getCurrentState() == DESTROYED) {
           return;
       }
       // 轉爲帶生命週期感知的觀察者包裝類
       LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
       ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
       // 對應觀察者只能與一個owner綁定
       if (existing != null && !existing.isAttachedTo(owner)) {
           throw new IllegalArgumentException("Cannot add the same observer"
                   + " with different lifecycles");
       }
       if (existing != null) {
           return;
       }
       // lifecycle註冊
       owner.getLifecycle().addObserver(wrapper);
   }
複製代碼

針對咱們須要監測生命週期的觀察者, LiveData將其包裝成了LifecycleBoundObserver對象, 它繼承於ObserverWrapper, 並最終實現了GenericLifecycleObserver接口, 經過實現GenericLifecycleObserver#onStateChanged方法獲取到生命週期狀態變動事件.併發

發送信息

LiveData#setValueLiveData#postValue的區別在於一個是在主線程發送信息, 而post是在子線程發送信息, post最終經過指定主線程的Handler執行調用setValue, 因此這裏主要看下LiveData#setValueapp

@MainThread
    protected void setValue(T value) {
        assertMainThread("setValue");
        // 發送版本+1
        mVersion++;
        mData = value;
        // 信息分發
        dispatchingValue(null);
    }
複製代碼

當調用setValue的時候, 就至關因而LiveData內部維護的可觀測數據發生變化, 則直接觸發事件分發框架

void dispatchingValue(@Nullable ObserverWrapper initiator) {
        // mDispatchingValue的判斷主要是爲了解決併發調用dispatchingValue的狀況
        // 當對應數據的觀察者在執行的過程當中, 若有新的數據變動, 則不會再次通知到觀察者
        // 因此觀察者內的執行不該進行耗時工做
        if (mDispatchingValue) {
            mDispatchInvalidated = true;
            return;
        }
        mDispatchingValue = true;
        do {
            mDispatchInvalidated = false;
            if (initiator != null) {
                considerNotify(initiator);
                initiator = null;
            } else {
                for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
                        mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
                    considerNotify(iterator.next().getValue());
                    if (mDispatchInvalidated) {
                        break;
                    }
                }
            }
        } while (mDispatchInvalidated);
        mDispatchingValue = false;
    }
複製代碼

最終, 會走到considerNotify方法, 在保證觀察者活躍, 而且他的訂閱生效數小於發送數的狀況下, 最終觸發到咱們實現的觀察方法.ide

private void considerNotify(ObserverWrapper observer) {
        if (!observer.mActive) {
            return;
        }
        if (!observer.shouldBeActive()) {
            observer.activeStateChanged(false);
            return;
        }
        if (observer.mLastVersion >= mVersion) {
            return;
        }
        observer.mLastVersion = mVersion;
        //noinspection unchecked
        observer.mObserver.onChanged((T) mData);
    }
複製代碼

要注意的是, LiveData#dispatchingValue除了在咱們主動更新數據的時候會觸發, 在咱們的觀察者狀態變動(inactive->active)的時候, 也會通知到, 這就致使了LiveData必然支持粘性事件組件化

class LifecycleBoundObserver extends ObserverWrapper implements GenericLifecycleObserver {
  @Override
       public void onStateChanged(LifecycleOwner source, Lifecycle.Event event) {
           if (mOwner.getLifecycle().getCurrentState() == DESTROYED) {
               removeObserver(mObserver);
               return;
           }
           activeStateChanged(shouldBeActive());
       }
}
private abstract class ObserverWrapper {
  void activeStateChanged(boolean newActive) {
            if (newActive == mActive) {
                return;
            }
            // 當observer的狀態從active->inactive, 或者inactive->active的時候走如下流程
            mActive = newActive;
            boolean wasInactive = LiveData.this.mActiveCount == 0;
            LiveData.this.mActiveCount += mActive ? 1 : -1;
            if (wasInactive && mActive) {
                onActive();
            }
            if (LiveData.this.mActiveCount == 0 && !mActive) {
              // 當前liveData維護的觀察者都不活躍, 而且目前的觀察者也從active->inactive, 會觸發onInactive空方法
              // 咱們能夠覆寫onInactive來判斷livedata全部觀察者失效時候的狀況, 好比釋放掉一些大內存對象
                onInactive();
            }
            // 當observer是從inactive->active的時候
            // 須要通知到觀察者
            if (mActive) {
                dispatchingValue(this);
            }
        }
}
複製代碼

原理總結

咱們歸納下來, 關於LiveData能夠了解以下:

  1. LiveData的觀察者能夠聯動生命週期, 也能夠不聯動
  2. LiveData的觀察者只能與一個LifecycleOwner綁定, 不然會拋出異常
  3. 當觀察者的active狀態變動的時候
  4. active->inactive : 若是LiveCycler通知OnDestroy, 則移除對應的觀察者, 切當全部觀察者都非活躍的狀態下時, 會觸發onInactive
  5. inactive->active: 會通知觀察者最近的數據更新(粘性消息)
  6. 除了觀察者狀態變動時, 會接收到數據更新的通知外, 還有一種就是在活躍的狀況下, 經過開發者主動更新數據, 會接收到數據更新的通知.

基於LiveData的事件總線的實現

能夠看出, LiveData自己就已經可觀測數據更新, 咱們經過維護一張eventName-LiveData的哈希表, 就能夠獲得一個基礎的事件總線

class LiveDataBus {
    internal val liveDatas by lazy { mutableMapOf<String, LiveData<*>>() }

    @Synchronized
    private fun <T>bus(channel: String): LiveData<T>{
        return liveDatas.getOrPut(channel){
            LiveDataEvent<T>(channel)
        } as LiveData<T>
    }

    fun <T> with(channel: String): LiveData<T>{
        return bus(channel)
    }

    companion object{
        private val INSTANCE by lazy { LiveDataBus() }
        @JvmStatic
        fun get() = INSTANCE
    }
}
複製代碼

可是除了粘性事件之外, 咱們還須要非粘性事件的支持, 這裏有兩種作法.

美團是根據覆寫observe方法, 反射獲取ObserverWrapper.mLastVersion, 在訂閱的時候使得初始化的ObserverWrapper.mLastVersion等於LiveData.mVersion, 使得粘性消息沒法經過實現(詳細能夠看下參考1的文章內容)

這裏我用了另一種作法,粘性消息最終會調到Observer#onChanged, 那麼咱們就乾脆將其再進行一層包裝, 內部維護實際的訂閱消息數, 來判斷是否要觸發真正的onChanged方法

internal open class ExternalObserverWrapper<T>(val observer: Observer<in T>, val liveData: ExternalLiveData<T>): Observer<T>{
    // 新建觀察者包裝類的時候, 內部實際的version直接等於LiveData的version
    private var mLastVersion = liveData.version
    override fun onChanged(t: T) {
        if(mLastVersion >= liveData.version){
            return
        }
        mLastVersion = liveData.version
        observer.onChanged(t)
    }
}
複製代碼

咱們須要覆寫observe方法, 將咱們包裝的觀察者傳進去

internal class ExternalLiveData<T>(val key: String) : MutableLiveData<T>(){
    @MainThread
    override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
        super.observe(owner, ExternalObserverWrapper(observer, this, owner))
    }

}
複製代碼

須要注意的是, LiveData維護的觀察者集合變爲咱們包裝後的觀察者集合後, 那麼對應的移除觀察者方法, 咱們也須要從新包裝傳入, 而且須要額外維護一份真正的觀察者和包裝後的觀察者的對應hash表對象, 並在觀察者被移除的時候刪除對應的內存對象, 防止內存泄漏的產生, 最終的代碼以下

internal class ExternalLiveData<T>(val key: String) : MutableLiveData<T>(){
    internal var mObservers = mutableMapOf<Observer<in T>, ExternalObserverWrapper<T>>()

    @MainThread
    override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
        val exist = mObservers.getOrPut(observer){
            LifecycleExternalObserver(observer, this, owner).apply {
                mObservers[observer] = this
                owner.lifecycle.addObserver(this)
            }
        }
        super.observe(owner, exist)
    }

    @MainThread
    override fun observeForever(observer: Observer<in T>) {
        val exist = mObservers.getOrPut(observer){
            AlwaysExternalObserver(observer, this).apply { mObservers[observer] = this }
        }
        super.observeForever(exist)
    }

    @MainThread
    fun observeSticky(owner: LifecycleOwner, observer: Observer<in T>) {
        super.observe(owner, observer)
    }

    @MainThread
    fun observeStickyForever(observer: Observer<in T>){
        super.observeForever(observer)
    }

    @MainThread
    override fun removeObserver(observer: Observer<in T>) {
        val exist = mObservers.remove(observer) ?: observer
        super.removeObserver(exist)
    }

    @MainThread
    override fun removeObservers(owner: LifecycleOwner) {
        mObservers.iterator().forEach { item->
            if(item.value.isAttachedTo(owner)){
                mObservers.remove(item.key)
            }
        }
        super.removeObservers(owner)
    }

    override fun onInactive() {
        super.onInactive()
        if(!hasObservers()){
            // 當對應liveData沒有相關的觀察者的時候
            // 就能夠移除掉維護的LiveData
            LiveDataBus.get().liveDatas.remove(key)
        }
    }
}

internal open class ExternalObserverWrapper<T>(val observer: Observer<in T>, val liveData: ExternalLiveData<T>): Observer<T>{

    private var mLastVersion = liveData.version
    override fun onChanged(t: T) {
        if(mLastVersion >= liveData.version){
            return
        }
        mLastVersion = liveData.version
        observer.onChanged(t)
    }

    open fun isAttachedTo(owner: LifecycleOwner) = false
}

/** * always active 的觀察者包裝類 * @param T * @constructor */
internal class AlwaysExternalObserver<T>(observer: Observer<in T>, liveData: ExternalLiveData<T>):
    ExternalObserverWrapper<T>(observer, liveData)

/** * 綁定生命週期的觀察者包裝類 * @param T * @property owner LifecycleOwner * @constructor */
internal class LifecycleExternalObserver<T>(observer: Observer<in T>, liveData: ExternalLiveData<T>, val owner: LifecycleOwner): ExternalObserverWrapper<T>(
    observer,
    liveData
), LifecycleObserver{
    /** * 當綁定的lifecycle銷燬的時候 * 移除掉內部維護的對應觀察者 */
    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun onDestroy(){
        liveData.mObservers.remove(observer)
        owner.lifecycle.removeObserver(this)
    }

    override fun isAttachedTo(owner: LifecycleOwner): Boolean {
        return owner == this.owner
    }
}
複製代碼

事件的約束

正如美團後期討論的改進文章內所說, 當前的事件總線(不管是EventBus仍是LiveEventBus)都沒有對事件進行約束, 假如A同窗以"event1"字符串定義事件名併發送事件, 而B同窗勿寫成"eventl"字符串訂閱事件, 那麼這個事件就永遠都接收不到了. 另外當上遊刪除發送的事件相關代碼, 訂閱方也無從感知到. 基於此, 參考了Retrofit針對於請求的動態代理的作法, 將事件的定義由事件總線框架自己經過動態代理去實現

class LiveDataBus {
  fun <E> of(clz: Class<E>): E {
        if(!clz.isInterface){
            throw IllegalArgumentException("API declarations must be interfaces.")
        }
        if(0 < clz.interfaces.size){
            throw IllegalArgumentException("API interfaces must not extend other interfaces.")
        }
        return Proxy.newProxyInstance(clz.classLoader, arrayOf(clz), InvocationHandler { _, method, _->
            return@InvocationHandler get().with(
                // 事件名以集合類名_事件方法名定義
                // 以此保證事件的惟一性
                "${clz.canonicalName}_${method.name}",
                (method.genericReturnType as ParameterizedType).actualTypeArguments[0].javaClass)
        }) as E
    }
}
複製代碼

開發者須要先定義一個事件, 才能夠對它進行相關的發送和訂閱的工做.

interface LiveEvents {
    /** * 定義一個事件 * @return LiveEventObserver<Boolean> 事件類型 */
    fun event1(): LiveEventObserver<Boolean>
    fun event2(): LiveEventObserver<MutableList<String>>
}

複製代碼

而後開發者能夠經過如下方式去發送和訂閱

private fun sendEvent(){
        LiveDataBus
            .get()
            .of(LiveEvents::class.java)
            .event1()
            .post(true)
    }

private fun observe(){
    LiveDataBus
        .get()
        .of(LiveEvents::class.java)
        .event1()
        .observe(this, Observer {
            Log.i(LOG, it.toString())
        })
}
複製代碼

參考

  1. Android消息總線的演進之路:用LiveDataBus替代RxBus、EventBus
  2. Android組件化方案及組件消息總線modular-event實戰
  3. 用LiveData實現一個事件總線
相關文章
相關標籤/搜索