對於Android系統來講,消息傳遞是最基本的組件,每個App內的不一樣頁面,不一樣組件都在進行消息傳遞。消息傳遞既能夠用於Android四大組件之間的通訊,也可用於異步線程和主線程之間的通訊。對於Android開發者來講,常用的消息傳遞方式有不少種,從最先使用的Handler、BroadcastReceiver、接口回調,到近幾年流行的通訊總線類框架EventBus、RxBus。Android消息傳遞框架,總在不斷的演進之中。java
EventBus是一個Android事件發佈/訂閱框架,經過解耦發佈者和訂閱者簡化Android事件傳遞。EventBus能夠代替Android傳統的Intent、Handler、Broadcast或接口回調,在Fragment、Activity、Service線程之間傳遞數據,執行方法。android
EventBus最大的特色就是:簡潔、解耦。在沒有EventBus以前咱們一般用廣播來實現監聽,或者自定義接口函數回調,有的場景咱們也能夠直接用Intent攜帶簡單數據,或者在線程之間經過Handler處理消息傳遞。但不管是廣播仍是Handler機制遠遠不能知足咱們高效的開發。EventBus簡化了應用程序內各組件間、組件與後臺線程間的通訊。EventBus一經推出,便受到廣大開發者的推崇。git
如今看來,EventBus給Android開發者世界帶來了一種新的框架和思想,就是消息的發佈和訂閱。這種思想在其後不少框架中都獲得了應用。github
訂閱發佈模式定義了一種「一對多」的依賴關係,讓多個訂閱者對象同時監聽某一個主題對象。這個主題對象在自身狀態變化時,會通知全部訂閱者對象,使它們可以自動更新本身的狀態。編程
RxBus不是一個庫,而是一個文件,實現只有短短30行代碼。RxBus自己不須要過多分析,它的強大徹底來自於它基於的RxJava技術。響應式編程(Reactive Programming)技術這幾年特別火,RxJava是它在Java上的實做。RxJava天生就是發佈/訂閱模式,並且很容易處理線程切換。因此,RxBus憑藉區區30行代碼,就敢挑戰EventBus江湖老大的地位。markdown
在RxJava中有個Subject類,它繼承Observable類,同時實現了Observer接口,所以Subject能夠同時擔當訂閱者和被訂閱者的角色,咱們使用Subject的子類PublishSubject來建立一個Subject對象(PublishSubject只有被訂閱後纔會把接收到的事件馬上發送給訂閱者),在須要接收事件的地方,訂閱該Subject對象,以後若是Subject對象接收到事件,則會發射給該訂閱者,此時Subject對象充當被訂閱者的角色。架構
完成了訂閱,在須要發送事件的地方將事件發送給以前被訂閱的Subject對象,則此時Subject對象做爲訂閱者接收事件,而後會馬上將事件轉發給訂閱該Subject對象的訂閱者,以便訂閱者處理相應事件,到這裏就完成了事件的發送與處理。app
最後就是取消訂閱的操做了,RxJava中,訂閱操做會返回一個Subscription對象,以便在合適的時機取消訂閱,防止內存泄漏,若是一個類產生多個Subscription對象,咱們能夠用一個CompositeSubscription存儲起來,以進行批量的取消訂閱。框架
RxBus有不少實現,如:異步
AndroidKnife/RxBus(https://github.com/AndroidKnife/RxBus) Blankj/RxBus(https://github.com/Blankj/RxBus)
其實正如前面所說的,RxBus的原理是如此簡單,咱們本身均可以寫出一個RxBus的實現:
public final class RxBus { private final Subject<Object, Object> bus; private RxBus() { bus = new SerializedSubject<>(PublishSubject.create()); } private static class SingletonHolder { private static final RxBus defaultRxBus = new RxBus(); } public static RxBus getInstance() { return SingletonHolder.defaultRxBus; } /* * 發送 */ public void post(Object o) { bus.onNext(o); } /* * 是否有Observable訂閱 */ public boolean hasObservable() { return bus.hasObservers(); } /* * 轉換爲特定類型的Obserbale */ public <T> Observable<T> toObservable(Class<T> type) { return bus.ofType(type); } } 複製代碼
public final class RxBus2 { private final Subject<Object> bus; private RxBus2() { // toSerialized method made bus thread safe bus = PublishSubject.create().toSerialized(); } public static RxBus2 getInstance() { return Holder.BUS; } private static class Holder { private static final RxBus2 BUS = new RxBus2(); } public void post(Object obj) { bus.onNext(obj); } public <T> Observable<T> toObservable(Class<T> tClass) { return bus.ofType(tClass); } public Observable<Object> toObservable() { return bus; } public boolean hasObservers() { return bus.hasObservers(); } } 複製代碼
LiveData是Android Architecture Components提出的框架。LiveData是一個能夠被觀察的數據持有類,它能夠感知並遵循Activity、Fragment或Service等組件的生命週期。正是因爲LiveData對組件生命週期可感知特色,所以能夠作到僅在組件處於生命週期的激活狀態時才更新UI數據。
LiveData須要一個觀察者對象,通常是Observer類的具體實現。當觀察者的生命週期處於STARTED或RESUMED狀態時,LiveData會通知觀察者數據變化;在觀察者處於其餘狀態時,即便LiveData的數據變化了,也不會通知。
Android Architecture Components的核心是Lifecycle、LiveData、ViewModel 以及 Room,經過它能夠很是優雅的讓數據與界面進行交互,並作一些持久化的操做,高度解耦,自動管理生命週期,並且不用擔憂內存泄漏的問題。
public final class LiveDataBus { private final Map<String, MutableLiveData<Object>> bus; private LiveDataBus() { bus = new HashMap<>(); } private static class SingletonHolder { private static final LiveDataBus DATA_BUS = new LiveDataBus(); } public static LiveDataBus get() { return SingletonHolder.DATA_BUS; } public <T> MutableLiveData<T> getChannel(String target, Class<T> type) { if (!bus.containsKey(target)) { bus.put(target, new MutableLiveData<>()); } return (MutableLiveData<T>) bus.get(target); } public MutableLiveData<Object> getChannel(String target) { return getChannel(target, Object.class); } } 複製代碼
短短二十行代碼,就實現了一個通訊總線的所有功能,而且還具備生命週期感知功能,而且使用起來也及其簡單:
註冊訂閱:
LiveDataBus.get().getChannel("key_test", Boolean.class) .observe(this, new Observer<Boolean>() { @Override public void onChanged(@Nullable Boolean aBoolean) { } }); 複製代碼
發送消息:
LiveDataBus.get().getChannel("key_test").setValue(true); 複製代碼
咱們發送了一個名爲"key_test",值爲true的事件。 這個時候訂閱者就會收到消息,並做相應的處理,很是簡單。
對於LiveDataBus的初版實現,咱們發現,在使用這個LiveDataBus的過程當中,訂閱者會收到訂閱以前發佈的消息。對於一個消息總線來講,這是不可接受的。不管EventBus或者RxBus,訂閱方都不會收到訂閱以前發出的消息。對於一個消息總線,LiveDataBus必需要解決這個問題。
怎麼解決這個問題呢?先分析下緣由:
當LifeCircleOwner的狀態發生變化的時候,會調用LiveData.ObserverWrapper的activeStateChanged函數,若是這個時候ObserverWrapper的狀態是active,就會調用LiveData的dispatchingValue。
在LiveData的dispatchingValue中,又會調用LiveData的considerNotify方法。
在LiveData的considerNotify方法中,紅框中的邏輯是關鍵,若是ObserverWrapper的mLastVersion小於LiveData的mVersion,就會去回調mObserver的onChanged方法。而每一個新的訂閱者,其version都是-1,LiveData一旦設置過其version是大於-1的(每次LiveData設置值都會使其version加1),這樣就會致使LiveDataBus每註冊一個新的訂閱者,這個訂閱者馬上會收到一個回調,即便這個設置的動做發生在訂閱以前。
對於這個問題,總結一下發生的核心緣由。對於LiveData,其初始的version是-1,當咱們調用了其setValue或者postValue,其vesion會+1;對於每個觀察者的封裝ObserverWrapper,其初始version也爲-1,也就是說,每個新註冊的觀察者,其version爲-1;當LiveData設置這個ObserverWrapper的時候,若是LiveData的version大於ObserverWrapper的version,LiveData就會強制把當前value推送給Observer。
明白了問題產生的緣由以後,咱們來看看怎麼才能解決這個問題。很顯然,根據以前的分析,只須要在註冊一個新的訂閱者的時候把Wrapper的version設置成跟LiveData的version一致便可。
那麼怎麼實現呢,看看LiveData的observe方法,他會在步驟1建立一個LifecycleBoundObserver,LifecycleBoundObserver是ObserverWrapper的派生類。而後會在步驟2把這個LifecycleBoundObserver放入一個私有Map容器mObservers中。不管ObserverWrapper仍是LifecycleBoundObserver都是私有的或者包可見的,因此沒法經過繼承的方式更改LifecycleBoundObserver的version。
那麼能不能從Map容器mObservers中取到LifecycleBoundObserver,而後再更改version呢?答案是確定的,經過查看SafeIterableMap的源碼咱們發現有一個protected的get方法。所以,在調用observe的時候,咱們能夠經過反射拿到LifecycleBoundObserver,再把LifecycleBoundObserver的version設置成和LiveData一致便可。
對於非生命週期感知的observeForever方法來講,實現的思路是一致的,可是具體的實現略有不一樣。observeForever的時候,生成的wrapper不是LifecycleBoundObserver,而是AlwaysActiveObserver(步驟1),並且咱們也沒有機會在observeForever調用完成以後再去更改AlwaysActiveObserver的version,由於在observeForever方法體內,步驟3的語句,回調就發生了。
那麼對於observeForever,如何解決這個問題呢?既然是在調用內回調的,那麼咱們能夠寫一個ObserverWrapper,把真正的回調給包裝起來。把ObserverWrapper傳給observeForever,那麼在回調的時候咱們去檢查調用棧,若是回調是observeForever方法引發的,那麼就不回調真正的訂閱者。
public final class LiveDataBus { private final Map<String, BusMutableLiveData<Object>> bus; private LiveDataBus() { bus = new HashMap<>(); } private static class SingletonHolder { private static final LiveDataBus DEFAULT_BUS = new LiveDataBus(); } public static LiveDataBus get() { return SingletonHolder.DEFAULT_BUS; } public <T> MutableLiveData<T> with(String key, Class<T> type) { if (!bus.containsKey(key)) { bus.put(key, new BusMutableLiveData<>()); } return (MutableLiveData<T>) bus.get(key); } public MutableLiveData<Object> with(String key) { return with(key, Object.class); } private static class ObserverWrapper<T> implements Observer<T> { private Observer<T> observer; public ObserverWrapper(Observer<T> observer) { this.observer = observer; } @Override public void onChanged(@Nullable T t) { if (observer != null) { if (isCallOnObserve()) { return; } observer.onChanged(t); } } private boolean isCallOnObserve() { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); if (stackTrace != null && stackTrace.length > 0) { for (StackTraceElement element : stackTrace) { if ("android.arch.lifecycle.LiveData".equals(element.getClassName()) && "observeForever".equals(element.getMethodName())) { return true; } } } return false; } } private static class BusMutableLiveData<T> extends MutableLiveData<T> { private Map<Observer, Observer> observerMap = new HashMap<>(); @Override public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) { super.observe(owner, observer); try { hook(observer); } catch (Exception e) { e.printStackTrace(); } } @Override public void observeForever(@NonNull Observer<T> observer) { if (!observerMap.containsKey(observer)) { observerMap.put(observer, new ObserverWrapper(observer)); } super.observeForever(observerMap.get(observer)); } @Override public void removeObserver(@NonNull Observer<T> observer) { Observer realObserver = null; if (observerMap.containsKey(observer)) { realObserver = observerMap.remove(observer); } else { realObserver = observer; } super.removeObserver(realObserver); } private void hook(@NonNull Observer<T> observer) throws Exception { //get wrapper's version Class<LiveData> classLiveData = LiveData.class; Field fieldObservers = classLiveData.getDeclaredField("mObservers"); fieldObservers.setAccessible(true); Object objectObservers = fieldObservers.get(this); Class<?> classObservers = objectObservers.getClass(); Method methodGet = classObservers.getDeclaredMethod("get", Object.class); methodGet.setAccessible(true); Object objectWrapperEntry = methodGet.invoke(objectObservers, observer); Object objectWrapper = null; if (objectWrapperEntry instanceof Map.Entry) { objectWrapper = ((Map.Entry) objectWrapperEntry).getValue(); } if (objectWrapper == null) { throw new NullPointerException("Wrapper can not be bull!"); } Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass(); Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion"); fieldLastVersion.setAccessible(true); //get livedata's version Field fieldVersion = classLiveData.getDeclaredField("mVersion"); fieldVersion.setAccessible(true); Object objectVersion = fieldVersion.get(this); //set wrapper's version fieldLastVersion.set(objectWrapper, objectVersion); } } } 複製代碼
LiveDataBus.get() .with("key_test", String.class) .observe(this, new Observer<String>() { @Override public void onChanged(@Nullable String s) { } }); 複製代碼
LiveDataBus.get().with("key_test").setValue(s); 複製代碼
LiveDataBus的源碼能夠直接拷貝使用,也能夠前往做者的GitHub倉庫查看下載: https://github.com/JeremyLiao/LiveDataBus
本文提供了一個新的消息總線框架——LiveDataBus。訂閱者能夠訂閱某個消息通道的消息,發佈者能夠把消息發佈到消息通道上。利用LiveDataBus,不只能夠實現消息總線功能,並且對於訂閱者,他們不須要關心什麼時候取消訂閱,極大減小了由於忘記取消訂閱形成的內存泄漏風險。
海亮,美團高級工程師,2017年加入美團,目前主要負責美團輕收銀、美團收銀零售版等App的相關業務及模塊開發工做。