本文是學習了大佬的文章後,本身去動手實踐後寫的一篇學習筆記。大佬的文章寫得比較好,我本身去寫未必描述得那麼清楚😂,因此本文不少地方都直接引用了大佬的文章。html
項目源碼:github.com/LinYaoTian/…java
Tip:閱讀本文最好對 Jetpack 的 LIfeCycle 和 LiveData 有初步的瞭解。android
引用:git
對於 Android 系統來講,消息傳遞是最基本的組件,每個 App 內的不一樣頁面,不一樣的組件都在進行消息傳遞。github
消息傳遞既能夠用於 Android 四大組件以前的通訊,也能夠用於異步線程和主線程以前的通訊。多線程
對於 Android 開發者來講,常用的消息傳遞方式有不少種,從最先的 Handler、BroadcastReceiver 、接口回調,到最近幾年的流行的通訊總線類框架 EventBus、RxBus。架構
說到 Android 的通訊總線類框架就不得不提到 EventBus 。併發
EventBus 是一個 Android 事件發佈/訂閱框架,經過解耦發佈者和訂閱者簡化 Android 事件傳遞。EventBus 能夠代替 Android 傳統的 Intent、Handler、Broadcast 或接口回調,在 Fragment、Activity、Service 線程之間傳遞數據,執行方法。app
EventBus 最大的特色就是:簡潔、解耦。框架
在沒有 EventBus 以前咱們一般用廣播來實現監聽,或者自定義接口函數回調,有的場景咱們也能夠直接用Intent攜帶簡單數據,或者在線程之間經過 Handler 處理消息傳遞。但不管是廣播仍是 Handler 機制遠遠不能知足咱們高效的開發。EventBus 簡化了應用程序內各組件間、組件與後臺線程間的通訊。EventBus 一經推出,便受到廣大開發者的推崇。
如今看來,EventBus 給 Android 開發者世界帶來了一種新的框架和思想,就是消息的發佈和訂閱。 這種思想在其後不少框架中都獲得了應用。
訂閱發佈模式定義了一種「一對多」的依賴關係,讓多個訂閱者對象同時監聽某一個主題對象。這個主題對象在自身狀態變化時,會通知全部訂閱者對象,使它們可以自動更新本身的狀態。
訂閱/發佈模式和觀察者模式二者很是類似,我的以爲訂閱/發佈模式是觀察者模式的一種加強版。
二者的區別有:
本文是經過 LiveData 去實現 EventBus ,這裏先介紹下 LiveData。
LiveData 如同它的名字同樣,是一個可觀察的數據持有者,和常規的 observable 不一樣,LiveData 是能夠具備生命週期感知的,這意味着它可以在 Activity、Fragment、Service 中正確的處理生命週期。
實際上 LiveData 並不能單獨起做用,它依賴於 Jectpack 的 LifeCycle 組件,由於 LifeCycle 已經在 Activity 的父類中被封裝好了,在咱們使用 LiveData 的過程當中是基本是無感知的,因此這裏就簡單提一下 LifeCycle 好了。
LifeCycle 的做用
想繼續瞭解的同窗能夠看下這兩篇文章:
下面繼續說 LiveData:
LiveData 的數據源通常是 ViewModel,也能夠是其餘能夠更新的 LiveData 組件。
通常咱們使用 LiveData 的 observe()
,當數據更新後,LiveData 會通知它的全部活躍的觀察者。與 RxJava 不一樣的,LiveData 只會通知活躍的觀察者,例如 Activity 位於 Destroyed 狀態時是不活躍的,所以不會收到通知。
固然咱們也可使用 LiveData 的 observerForever() 方法進行訂閱,區別是 observerForever() 不會受到 Activity 等組件的生命週期的影響,只要數據更新就會收到通知。
LiveData 的簡單使用:
public class MainActivity extends AppCompatActivity {
private static final String TAG="MainActivity";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 建立 LiveData 對象
MutableLiveData<String> mutableLiveData = new MutableLiveData<>();
// 開始訂閱
mutableLiveData.observe(this, new Observer<String>() {
@Override
public void onChanged(@Nullable final String s) {
Log.d(TAG, "onChanged:"+s);
}
});
// 更新數據,最終會回調上面的 onChange() 方法
mutableLiveData.postValue("Android進階三部曲");
}
}
複製代碼
LiveData 的更多詳情能夠參考這篇文章Android Jetpack架構組件(四)一文帶你瞭解LiveData(使用篇)
EventBus 是業界知名的通訊類總線庫,但它也存在許多被人詬病的缺點:
而經過 LiveData 實現的 LiveDataBus 的具備如下優勢:
經過 LiveData 咱們能夠很是簡單的實現一個事件發佈/訂閱框架:
public final class LiveDataBus {
private final Map<String, MutableLiveData<Object>> bus;
private LiveDataBus() {
bus = new HashMap<>();
}
private static class SingletonHolder {
private static final LiveDataBus DATA_BUS = new LiveDataBus();
}
public static LiveDataBus get() {
return SingletonHolder.DATA_BUS;
}
public <T> MutableLiveData<T> getChannel(String target, Class<T> type) {
if (!bus.containsKey(target)) {
bus.put(target, new MutableLiveData<>());
}
return (MutableLiveData<T>) bus.get(target);
}
public MutableLiveData<Object> getChannel(String target) {
return getChannel(target, Object.class);
}
}
複製代碼
沒錯,上面短短的 27 行代碼咱們就實現了一個事件發佈/訂閱框架!
LiveData 一時使用一時爽,爽完了以後咱們發現這個簡易的 LiveDataBus 存在一個問題,就是訂閱者會收到訂閱以前發佈的消息,相似於粘性消息。對於一個消息總線來講,粘性消息和非粘性消息都是必須支持的,下面咱們來看一下如何解決這個問題。
咱們先看一下爲何會出現這個問題:
android.arch.lifecycle.LiveData
@MainThread
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
assertMainThread("observe");
// 當前綁定的組件(activity or fragment)狀態爲DESTROYED的時候, 則會忽視當前的訂閱請求
if (owner.getLifecycle().getCurrentState() == DESTROYED) {
return;
}
// 轉爲帶生命週期感知的觀察者包裝類
LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
// 對應觀察者只能與一個owner綁定
if (existing != null && !existing.isAttachedTo(owner)) {
throw new IllegalArgumentException("Cannot add the same observer"
+ " with different lifecycles");
}
if (existing != null) {
return;
}
// lifecycle註冊
owner.getLifecycle().addObserver(wrapper);
}
複製代碼
能夠看到,LiveData 內部會將咱們的傳入參數包裝成 wrapper ,而後存在一個 Map 中,最後經過 LifeCycle 組件添加觀察者。
LiveData 更新數據方式有兩個,一個是 setValue() 另外一個是 postValue(),這兩個方法的區別是,postValue() 在內部會拋到主線程去執行更新數據,所以適合在子線程中使用;而 setValue() 則是直接更新數據。
這裏就只看下 setValue() 方法就行了
android.arch.lifecycle.LiveData
@MainThread
protected void setValue(T value) {
assertMainThread("setValue");
// 發送版本+1
mVersion++;
mData = value;
// 信息分發
dispatchingValue(null);
}
複製代碼
記住這裏的 mVersion,它本問題關鍵,每次更新數據都會自增,默認值是 -1。而後咱們跟進下 dispatchingValue() 方法:
android.arch.lifecycle.LiveData
void dispatchingValue(@Nullable ObserverWrapper initiator) {
// mDispatchingValue的判斷主要是爲了解決併發調用dispatchingValue的狀況
// 當對應數據的觀察者在執行的過程當中, 若有新的數據變動, 則不會再次通知到觀察者
// 因此觀察者內的執行不該進行耗時工做
if (mDispatchingValue) {
mDispatchInvalidated = true;
return;
}
mDispatchingValue = true;
do {
mDispatchInvalidated = false;
if (initiator != null) {
// 這裏
considerNotify(initiator);
initiator = null;
} else {
for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
// 這裏
considerNotify(iterator.next().getValue());
if (mDispatchInvalidated) {
break;
}
}
}
} while (mDispatchInvalidated);
mDispatchingValue = false;
}
複製代碼
能夠看到,不管條件判斷,最終都會執行 considerNotify() 方法,因此咱們繼續跟進:
android.arch.lifecycle.LiveData
private void considerNotify(ObserverWrapper observer) {
if (!observer.mActive) {
return;
}
if (!observer.shouldBeActive()) {
observer.activeStateChanged(false);
return;
}
// 判斷 version
if (observer.mLastVersion >= mVersion) {
return;
}
observer.mLastVersion = mVersion;
//noinspection unchecked
observer.mObserver.onChanged((T) mData);
}
複製代碼
終於到了最關鍵的時候了!!若是 ObserverWrapper 的 mLastVersion 小於 LiveData 的 mVersion,那麼就會執行的 onChange() 方法去通知觀察者數據已更新。
而 ObserverWrapper.mLastVersion 的默認值是 -1, LiveData 只要更新過數據,mVersion 就確定會大於 -1,因此訂閱者會立刻收到訂閱以前發佈的最新消息!!
咱們看一下 LiveData 的 postValue() 方法的註釋:
能夠看到,註釋中說,若是在多線程中同一個時刻,屢次調用了 postValue() 方法,只有最後次調用的值會獲得更新。也就是此方法是有可能會丟失事件!!!
看下源碼:
android.arch.lifecycle.LiveData
private final Runnable mPostValueRunnable = new Runnable() {
@Override
public void run() {
Object newValue;
synchronized (mDataLock) {
newValue = mPendingData;
// 設置 mPendingData 爲 not_set
mPendingData = NOT_SET;
}
setValue((T) newValue);
}
};
protected void postValue(T value) {
boolean postTask;
synchronized (mDataLock) {
// 判斷 postTask 是否爲 not_set
postTask = mPendingData == NOT_SET;
mPendingData = value;
}
if (!postTask) {
return;
}
ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable);
}
複製代碼
從上面的源碼就能夠很容易看出,postValue 只是把傳進來的數據先存到 mPendingData,而後往主線程拋一個 Runnable,在這個 Runnable 裏面再調用 setValue 來把存起來的值真正設置上去,並回調觀察者們。而若是在這個 Runnable 執行前屢次 postValue,其實只是改變暫存的值 mPendingData,並不會再次拋另外一個 Runnable。
明白了問題根源所在,咱們就好解決問題了。
解決這個問題的方案有多種,其中美團大佬Android消息總線的演進之路:用LiveDataBus替代RxBus、EventBus 使用的反射的方式修改 LiveData 中的 mVersion 值去實現。還有另外一個方案基於LiveData實現事件總線思路和方案,此方案是基於自定義觀察者包裝類,由於粘性消息最終會調用到 Observer#onChange() 方法,所以咱們自定義 Observer 包裝類,本身維護實際的訂閱消息數,來判斷是否須要觸發真正的 onChange() 方法。本文使用的是第二種方案。
爲了業務的拓展性,這裏咱們定義先一個 Base 基類:
internal open class BaseBusObserverWrapper<T>(private val mObserver: Observer<in T>, private val mBusLiveData: BusLiveData<T>) : Observer<T> {
private val mLastVersion = mBusLiveData.version
private val TAG = "BaseBusObserverWrapper"
override fun onChanged(t: T?) {
Logger.d(TAG,"msg receiver = " + t.toString())
if (mLastVersion >= mBusLiveData.version){
// LiveData 的版本號沒有更新過,說明並無新數據,只是由於
// 當前 Observer 的版本號比 LiveData 低致使的調用 onChange()
return
}
try {
mObserver.onChanged(t)
}catch (e:Exception){
Logger.e(TAG,"error on Observer onChanged() = " + e.message)
}
}
open fun isAttachedTo(owner: LifecycleOwner) = false
}
複製代碼
這裏咱們保存了 LiveData 的 mVersion 值,每次執行 onChange() 時都先判斷一些 LiveData 是否更新過數據,若是沒有則不執行觀察者的 Observer.onChange() 方法。
而後定義兩個子類,BusLifecycleObserver 和 BusAlwaysActiveObserver ,其中 BusLifecycleObserver 用於 LiveData#observer() 方法訂閱的事件,而 BusAlwaysActiveObserver 用於 LiveData#observerForever() 方法訂閱的事件。
internal class BusLifecycleObserver<T>(private val observer: Observer<in T>, private val owner: LifecycleOwner, private val liveData: BusLiveData<T>)
: BaseBusObserverWrapper<T>(observer,liveData),LifecycleObserver{
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun onDestroy(){
liveData.removeObserver(observer)
owner.lifecycle.removeObserver(this)
}
}
複製代碼
對於 BusLifecycleObserver,在生命週期組件處於 Destroyed 時,須要把觀察者移除。
internal class BusAlwaysActiveObserver<T>(private val mObserver: Observer<in T>, private val mBusLiveData: BusLiveData<T>)
: BaseBusObserverWrapper<T>(mObserver, mBusLiveData)
複製代碼
對於 BusAlwaysActiveObserver,觀察者不受組件生命週期的影響,所以不須要在組件 Destroyed 時移除。
注意:由於 LiveData 的 getVersion() 是包訪問級別的!因此 BusLiveData 必須定義到與 LiveData 同一個包內,即 androidx.lifecycle
包下,所以須要你本身建立一個同名的包並將 BusLiveData 放到裏面。
若是你的項目工程沒有引入 androidx,也可使用 v7 包下的 android.arch.lifecycle
,方法同理,不過要注意的是 android.arch.lifecycle
包下 LiveData 的方法參數的泛型是沒有型變的,所以直接複製這裏代碼會有點問題,須要你本身根據編譯器的提示修改下。
class BusLiveData<T>(private val mKey:String) : MutableLiveData<T>() {
private val TAG = "BusLiveData"
private val mObserverMap: MutableMap<Observer<in T>, BaseBusObserverWrapper<T>> = mutableMapOf()
@MainThread
override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {
val exist = mObserverMap.getOrPut(observer,{
BusLifecycleObserver(observer,owner,this).apply {
mObserverMap[observer] = this
owner.lifecycle.addObserver(this)
}
})
super.observe(owner, exist)
Logger.d(TAG,"observe() called with: owner = [$owner], observer = [$observer]")
}
@MainThread
override fun observeForever(observer: Observer<in T>) {
super.observeForever(observer)
val exist = mObserverMap.getOrPut(observer ,{
BusAlwaysActiveObserver(observer,this).apply {
mObserverMap[observer] = this
}
})
super.observeForever(exist)
Logger.d(TAG, "observeForever() called with: observer = [$observer]")
}
@MainThread
fun observeSticky(owner: LifecycleOwner, observer: Observer<T>) {
super.observe(owner, observer)
Logger.d(TAG, "observeSticky() called with: owner = [$owner], observer = [$observer]")
}
@MainThread
fun observeStickyForever(observer: Observer<T>){
super.observeForever(observer)
Logger.d(TAG, "observeStickyForever() called with: observer = [$observer]")
}
@MainThread
override fun removeObserver(observer: Observer<in T>) {
val exist = mObserverMap.remove(observer) ?: observer
super.removeObserver(exist)
Logger.d(TAG, "removeObserver() called with: observer = [$observer]")
}
@MainThread
override fun removeObservers(owner: LifecycleOwner) {
mObserverMap.iterator().forEach {
if (it.value.isAttachedTo(owner)) {
mObserverMap.remove(it.key)
}
}
super.removeObservers(owner)
Logger.d(TAG, "removeObservers() called with: owner = [$owner]")
}
override fun postValue(value: T) {
mMainHandler.post {
setValue(value)
}
}
@MainThread
override fun onInactive() {
super.onInactive()
if (!hasObservers()) {
// 當 LiveData 沒有活躍的觀察者時,能夠移除相關的實例
LiveDataBusCore.getInstance().mBusMap.remove(mKey)
}
Logger.d(TAG, "onInactive() called")
}
@MainThread
public override fun getVersion(): Int {
return super.getVersion()
}
}
複製代碼
代碼比較短和簡單,要點就是,對於非粘性消息,即 observer() 和 observerForever() ,咱們須要使用自定義的包裝類包裝處理。對於粘性消息,則直接使用 LiveData 默認實現便可。
同時重寫 postValue() 方法,內部經過 MainHandler.post() 的方式去執行 setValue() 去避免 postValue() 會丟失數據的問題。
internal class LiveDataBusCore {
companion object{
@JvmStatic
private val defaultBus = LiveDataBusCore()
@JvmStatic
fun getInstance() = defaultBus
}
internal val mBusMap : MutableMap<String, BusLiveData<*>> by lazy {
mutableMapOf<String, BusLiveData<*>>()
}
fun <T> getChannel(key: String) : BusLiveData<T> {
return mBusMap.getOrPut(key){
BusLiveData<T>(key)
} as BusLiveData<T>
}
}
複製代碼
class LiveDataBus {
companion object{
@JvmStatic
@Synchronized
fun <T> getSyn(key: String) : BusLiveData<T>{
return get(key)
}
@JvmStatic
fun <T> get(key: String) : BusLiveData<T>{
return LiveDataBusCore.getInstance().getChannel(key)
}
private fun <T> get(key: String, type: Class<T>) : BusLiveData<T> {
return LiveDataBusCore.getInstance().getChannel(key)
}
@JvmStatic
fun <E> of(clz: Class<E>): E {
require(clz.isInterface) { "API declarations must be interfaces." }
require(clz.interfaces.isEmpty()) { "API interfaces must not extend other interfaces." }
return Proxy.newProxyInstance(clz.classLoader, arrayOf(clz), InvocationHandler { _, method, _->
return@InvocationHandler get(
// 事件名以集合類名_事件方法名定義
// 以此保證事件的惟一性
"${clz.canonicalName}_${method.name}",
(method.genericReturnType as ParameterizedType).actualTypeArguments[0].javaClass)
}) as E
}
}
}
複製代碼
使用:
class TestLiveDataBusActivity : AppCompatActivity() {
companion object{
private const val TAG = "TestLiveDataBusActivity"
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_test_live_data_bus)
LiveDataBus.get<String>("testObserver").observe(this, Observer<String> {
Log.d(TAG, "testObserver = $it")
test_observer_id.text = it
})
LiveDataBus.get<String>("testObserver").postValue("new value")
}
}
複製代碼
LiveDataBus 還提供了一個 of() 方法,用於提供用於事件約束,什麼是時間約束呢?就是說咱們的如今觀察者和發佈者獲取消息渠道的 key 是一個字符串,在使用的過程當中可能會出現,發佈者的 key 是 itO 而觀察者的 key 誤輸入成 it0 的狀況,因此這裏咱們能夠模仿 Retrofit 請求動態代理的作法,在使用的過程當中,咱們須要先定義一個接口:
interface TestLiveEvents {
fun event1(): MutableLiveData<String>
}
複製代碼
使用:
fun main() {
LiveDataBus
.of(TestLiveEvents::class.java)
.event1()
.postValue("new value")
}
複製代碼
藉助於 Android 官方提供的 LiveData ,咱們能夠很是方便地實現本身的 LiveDataBus,所有文件也就只有以上幾個類,同時咱們還避免了 EventBus 的許多缺點!
LiveDataBus 的源碼:github.com/LinYaoTian/…
若是文章有任何問題,歡迎在評論區指正。