當前市面上, 比較經常使用的事件總線, 仍然是EventBus
和RxBus
, 早期我曾經寫過EventBus源碼解析,這兩個框架不管是哪一個, 開發者都須要去考慮生命週期的處理.而美團給出了個解決方案, 經過LiveData來實現自帶生命週期感知能力的事件總線框架. 本篇咱們本身擼一個事件總線框架.html
咱們要用LiveData
作事件總線, 總須要知道它是什麼, 爲何能夠用它來實現事件總線.java
LiveData可對數據進行觀測, 並具備生命週期感知能力, 這就意味着當liveData只會在生命週期處於活躍(inActive)的狀態下才會去執行觀測動做, 而他的能力賦予不能脫離LifeCycle的範圍.android
首先咱們能夠看下LiveData
的UML圖, 便於對他有個大概的理解 git
LiveData
內維護的
mVersion
表示的是發送信息的版本,每次發送一次信息, 它都會+1, 而
ObserverWrapper
內維護的
mLastVersion
爲訂閱觸發的版本號, 當訂閱動做生效的時候, 它的版本號會和發送信息的版本號同步.他們初始值都爲-1
LiveData
內部存在一個mObservers
用來保存相關綁定的全部觀察者, 經過LiveData#observe
以及LiveData#oberveForever
方法, 咱們能夠進行訂閱動做.若是須要與生命週期綁定, 則須要傳入LifecycleOwner
對象, 將咱們的LiveData數據觀測者(Observer)包裝註冊到生命週期的觀測者中, 得以接收到生命週期的變動, 並作出及時的對應更新活動, 咱們能夠看下LiveData的訂閱的方法代碼github
@MainThread
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
assertMainThread("observe");
// 當前綁定的組件(activity or fragment)狀態爲DESTROYED的時候, 則會忽視當前的訂閱請求
if (owner.getLifecycle().getCurrentState() == DESTROYED) {
return;
}
// 轉爲帶生命週期感知的觀察者包裝類
LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
// 對應觀察者只能與一個owner綁定
if (existing != null && !existing.isAttachedTo(owner)) {
throw new IllegalArgumentException("Cannot add the same observer"
+ " with different lifecycles");
}
if (existing != null) {
return;
}
// lifecycle註冊
owner.getLifecycle().addObserver(wrapper);
}
複製代碼
針對咱們須要監測生命週期的觀察者, LiveData
將其包裝成了LifecycleBoundObserver
對象, 它繼承於ObserverWrapper
, 並最終實現了GenericLifecycleObserver
接口, 經過實現GenericLifecycleObserver#onStateChanged
方法獲取到生命週期狀態變動事件.併發
LiveData#setValue
和LiveData#postValue
的區別在於一個是在主線程發送信息, 而post是在子線程發送信息, post最終經過指定主線程的Handler執行調用setValue, 因此這裏主要看下LiveData#setValue
app
@MainThread
protected void setValue(T value) {
assertMainThread("setValue");
// 發送版本+1
mVersion++;
mData = value;
// 信息分發
dispatchingValue(null);
}
複製代碼
當調用setValue的時候, 就至關因而LiveData
內部維護的可觀測數據發生變化, 則直接觸發事件分發框架
void dispatchingValue(@Nullable ObserverWrapper initiator) {
// mDispatchingValue的判斷主要是爲了解決併發調用dispatchingValue的狀況
// 當對應數據的觀察者在執行的過程當中, 若有新的數據變動, 則不會再次通知到觀察者
// 因此觀察者內的執行不該進行耗時工做
if (mDispatchingValue) {
mDispatchInvalidated = true;
return;
}
mDispatchingValue = true;
do {
mDispatchInvalidated = false;
if (initiator != null) {
considerNotify(initiator);
initiator = null;
} else {
for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
considerNotify(iterator.next().getValue());
if (mDispatchInvalidated) {
break;
}
}
}
} while (mDispatchInvalidated);
mDispatchingValue = false;
}
複製代碼
最終, 會走到considerNotify
方法, 在保證觀察者活躍, 而且他的訂閱生效數小於發送數的狀況下, 最終觸發到咱們實現的觀察方法.ide
private void considerNotify(ObserverWrapper observer) {
if (!observer.mActive) {
return;
}
if (!observer.shouldBeActive()) {
observer.activeStateChanged(false);
return;
}
if (observer.mLastVersion >= mVersion) {
return;
}
observer.mLastVersion = mVersion;
//noinspection unchecked
observer.mObserver.onChanged((T) mData);
}
複製代碼
要注意的是, LiveData#dispatchingValue
除了在咱們主動更新數據的時候會觸發, 在咱們的觀察者狀態變動(inactive->active)的時候, 也會通知到, 這就致使了LiveData
必然支持粘性事件組件化
class LifecycleBoundObserver extends ObserverWrapper implements GenericLifecycleObserver {
@Override
public void onStateChanged(LifecycleOwner source, Lifecycle.Event event) {
if (mOwner.getLifecycle().getCurrentState() == DESTROYED) {
removeObserver(mObserver);
return;
}
activeStateChanged(shouldBeActive());
}
}
private abstract class ObserverWrapper {
void activeStateChanged(boolean newActive) {
if (newActive == mActive) {
return;
}
// 當observer的狀態從active->inactive, 或者inactive->active的時候走如下流程
mActive = newActive;
boolean wasInactive = LiveData.this.mActiveCount == 0;
LiveData.this.mActiveCount += mActive ? 1 : -1;
if (wasInactive && mActive) {
onActive();
}
if (LiveData.this.mActiveCount == 0 && !mActive) {
// 當前liveData維護的觀察者都不活躍, 而且目前的觀察者也從active->inactive, 會觸發onInactive空方法
// 咱們能夠覆寫onInactive來判斷livedata全部觀察者失效時候的狀況, 好比釋放掉一些大內存對象
onInactive();
}
// 當observer是從inactive->active的時候
// 須要通知到觀察者
if (mActive) {
dispatchingValue(this);
}
}
}
複製代碼
咱們歸納下來, 關於LiveData
能夠了解以下:
LiveData
的觀察者能夠聯動生命週期, 也能夠不聯動LiveData
的觀察者只能與一個LifecycleOwner
綁定, 不然會拋出異常能夠看出, LiveData
自己就已經可觀測數據更新, 咱們經過維護一張eventName-LiveData的哈希表, 就能夠獲得一個基礎的事件總線
class LiveDataBus {
internal val liveDatas by lazy { mutableMapOf<String, LiveData<*>>() }
@Synchronized
private fun <T>bus(channel: String): LiveData<T>{
return liveDatas.getOrPut(channel){
LiveDataEvent<T>(channel)
} as LiveData<T>
}
fun <T> with(channel: String): LiveData<T>{
return bus(channel)
}
companion object{
private val INSTANCE by lazy { LiveDataBus() }
@JvmStatic
fun get() = INSTANCE
}
}
複製代碼
可是除了粘性事件之外, 咱們還須要非粘性事件的支持, 這裏有兩種作法.
美團是根據覆寫observe方法, 反射獲取ObserverWrapper.mLastVersion
, 在訂閱的時候使得初始化的ObserverWrapper.mLastVersion
等於LiveData.mVersion
, 使得粘性消息沒法經過實現(詳細能夠看下參考1的文章內容)
這裏我用了另一種作法,粘性消息最終會調到Observer#onChanged
, 那麼咱們就乾脆將其再進行一層包裝, 內部維護實際的訂閱消息數, 來判斷是否要觸發真正的onChanged
方法
internal open class ExternalObserverWrapper<T>(val observer: Observer<in T>, val liveData: ExternalLiveData<T>): Observer<T>{
// 新建觀察者包裝類的時候, 內部實際的version直接等於LiveData的version
private var mLastVersion = liveData.version
override fun onChanged(t: T) {
if(mLastVersion >= liveData.version){
return
}
mLastVersion = liveData.version
observer.onChanged(t)
}
}
複製代碼
咱們須要覆寫observe方法, 將咱們包裝的觀察者傳進去
internal class ExternalLiveData<T>(val key: String) : MutableLiveData<T>(){
@MainThread
override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
super.observe(owner, ExternalObserverWrapper(observer, this, owner))
}
}
複製代碼
須要注意的是, LiveData維護的觀察者集合變爲咱們包裝後的觀察者集合後, 那麼對應的移除觀察者方法, 咱們也須要從新包裝傳入, 而且須要額外維護一份真正的觀察者和包裝後的觀察者的對應hash表對象, 並在觀察者被移除的時候刪除對應的內存對象, 防止內存泄漏的產生, 最終的代碼以下
internal class ExternalLiveData<T>(val key: String) : MutableLiveData<T>(){
internal var mObservers = mutableMapOf<Observer<in T>, ExternalObserverWrapper<T>>()
@MainThread
override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
val exist = mObservers.getOrPut(observer){
LifecycleExternalObserver(observer, this, owner).apply {
mObservers[observer] = this
owner.lifecycle.addObserver(this)
}
}
super.observe(owner, exist)
}
@MainThread
override fun observeForever(observer: Observer<in T>) {
val exist = mObservers.getOrPut(observer){
AlwaysExternalObserver(observer, this).apply { mObservers[observer] = this }
}
super.observeForever(exist)
}
@MainThread
fun observeSticky(owner: LifecycleOwner, observer: Observer<in T>) {
super.observe(owner, observer)
}
@MainThread
fun observeStickyForever(observer: Observer<in T>){
super.observeForever(observer)
}
@MainThread
override fun removeObserver(observer: Observer<in T>) {
val exist = mObservers.remove(observer) ?: observer
super.removeObserver(exist)
}
@MainThread
override fun removeObservers(owner: LifecycleOwner) {
mObservers.iterator().forEach { item->
if(item.value.isAttachedTo(owner)){
mObservers.remove(item.key)
}
}
super.removeObservers(owner)
}
override fun onInactive() {
super.onInactive()
if(!hasObservers()){
// 當對應liveData沒有相關的觀察者的時候
// 就能夠移除掉維護的LiveData
LiveDataBus.get().liveDatas.remove(key)
}
}
}
internal open class ExternalObserverWrapper<T>(val observer: Observer<in T>, val liveData: ExternalLiveData<T>): Observer<T>{
private var mLastVersion = liveData.version
override fun onChanged(t: T) {
if(mLastVersion >= liveData.version){
return
}
mLastVersion = liveData.version
observer.onChanged(t)
}
open fun isAttachedTo(owner: LifecycleOwner) = false
}
/** * always active 的觀察者包裝類 * @param T * @constructor */
internal class AlwaysExternalObserver<T>(observer: Observer<in T>, liveData: ExternalLiveData<T>):
ExternalObserverWrapper<T>(observer, liveData)
/** * 綁定生命週期的觀察者包裝類 * @param T * @property owner LifecycleOwner * @constructor */
internal class LifecycleExternalObserver<T>(observer: Observer<in T>, liveData: ExternalLiveData<T>, val owner: LifecycleOwner): ExternalObserverWrapper<T>(
observer,
liveData
), LifecycleObserver{
/** * 當綁定的lifecycle銷燬的時候 * 移除掉內部維護的對應觀察者 */
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun onDestroy(){
liveData.mObservers.remove(observer)
owner.lifecycle.removeObserver(this)
}
override fun isAttachedTo(owner: LifecycleOwner): Boolean {
return owner == this.owner
}
}
複製代碼
正如美團後期討論的改進文章內所說, 當前的事件總線(不管是EventBus仍是LiveEventBus)都沒有對事件進行約束, 假如A同窗以"event1"字符串定義事件名併發送事件, 而B同窗勿寫成"eventl"字符串訂閱事件, 那麼這個事件就永遠都接收不到了. 另外當上遊刪除發送的事件相關代碼, 訂閱方也無從感知到. 基於此, 參考了Retrofit針對於請求的動態代理的作法, 將事件的定義由事件總線框架自己經過動態代理去實現
class LiveDataBus {
fun <E> of(clz: Class<E>): E {
if(!clz.isInterface){
throw IllegalArgumentException("API declarations must be interfaces.")
}
if(0 < clz.interfaces.size){
throw IllegalArgumentException("API interfaces must not extend other interfaces.")
}
return Proxy.newProxyInstance(clz.classLoader, arrayOf(clz), InvocationHandler { _, method, _->
return@InvocationHandler get().with(
// 事件名以集合類名_事件方法名定義
// 以此保證事件的惟一性
"${clz.canonicalName}_${method.name}",
(method.genericReturnType as ParameterizedType).actualTypeArguments[0].javaClass)
}) as E
}
}
複製代碼
開發者須要先定義一個事件, 才能夠對它進行相關的發送和訂閱的工做.
interface LiveEvents {
/** * 定義一個事件 * @return LiveEventObserver<Boolean> 事件類型 */
fun event1(): LiveEventObserver<Boolean>
fun event2(): LiveEventObserver<MutableList<String>>
}
複製代碼
而後開發者能夠經過如下方式去發送和訂閱
private fun sendEvent(){
LiveDataBus
.get()
.of(LiveEvents::class.java)
.event1()
.post(true)
}
private fun observe(){
LiveDataBus
.get()
.of(LiveEvents::class.java)
.event1()
.observe(this, Observer {
Log.i(LOG, it.toString())
})
}
複製代碼