框架的核心思想,就是消息的發佈和訂閱,使用訂閱者模式實現,其原理圖大概以下所示,摘自網絡。java
發佈和訂閱之間的依賴關係,其原理圖大概以下所示,摘自網絡。android
訂閱/發佈模式和觀察者模式之間有着微弱的區別,我的以爲訂閱/發佈模式是觀察者模式的一種加強版。二者區別以下所示,摘自網絡。git
爲什麼使用liveDatagithub
該liveDataBus優點網絡
爲了方便理解,LiveDataBus原理圖以下所示多線程
訂閱和註冊的流程圖併發
訂閱註冊原理圖app
爲什麼用LiveDataBus替代EventBus和RxBus框架
我這裏先用最簡單的代碼實現liveDataBus,而後用一下,看一下會出現什麼問題,代碼以下所示:ide
public final class LiveDataBus1 { private final Map<String, MutableLiveData<Object>> bus; private LiveDataBus1() { bus = new HashMap<>(); } private static class SingletonHolder { private static final LiveDataBus1 DATA_BUS = new LiveDataBus1(); } public static LiveDataBus1 get() { return SingletonHolder.DATA_BUS; } public <T> MutableLiveData<T> getChannel(String target, Class<T> type) { if (!bus.containsKey(target)) { bus.put(target, new MutableLiveData<>()); } return (MutableLiveData<T>) bus.get(target); } public MutableLiveData<Object> getChannel(String target) { return getChannel(target, Object.class); } }
那麼如何發送消息和接收消息呢,注意二者的key須要保持一致,不然沒法接收?具體代碼以下所示:
//發送消息 LiveDataBus1.get().getChannel("yc_bus").setValue(text); //接收消息 LiveDataBus1.get().getChannel("yc_bus", String.class) .observe(this, new Observer<String>() { @Override public void onChanged(@Nullable String newText) { // 更新數據 tvText.setText(newText); } });
遇到的問題:
而後看一下LiveData的訂閱方法observe源碼
// 註釋只能在主線程中調用該方法 @MainThread public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) { // 當前綁定的組件(activity or fragment)狀態爲DESTROYED的時候, 則會忽視當前的訂閱請求 if (owner.getLifecycle().getCurrentState() == DESTROYED) { // ignore return; } // 轉爲帶生命週期感知的觀察者包裝類 LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer); ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper); // 對應觀察者只能與一個owner綁定,不然拋出異常 if (existing != null && !existing.isAttachedTo(owner)) { throw new IllegalArgumentException("Cannot add the same observer" + " with different lifecycles"); } if (existing != null) { return; } // lifecycle註冊 owner.getLifecycle().addObserver(wrapper); }
緊接着,來看一下LiveData的更新數據方法
@MainThread protected void setValue(T value) { assertMainThread("setValue"); // 這裏的 mVersion,它本問題關鍵,每次更新數據都會自增,默認值是 -1。 mVersion++; mData = value; dispatchingValue(null); }
private void dispatchingValue(@Nullable ObserverWrapper initiator) { // mDispatchingValue的判斷主要是爲了解決併發調用dispatchingValue的狀況 // 當對應數據的觀察者在執行的過程當中, 若有新的數據變動, 則不會再次通知到觀察者。因此觀察者內的執行不該進行耗時工做 if (mDispatchingValue) { mDispatchInvalidated = true; return; } mDispatchingValue = true; do { mDispatchInvalidated = false; if (initiator != null) { // 等下重點看這裏的代碼 considerNotify(initiator); initiator = null; } else { for (Iterator<Map.Entry<Observer<T>, ObserverWrapper>> iterator = mObservers.iteratorWithAdditions(); iterator.hasNext(); ) { // 等下重點看這裏的代碼 considerNotify(iterator.next().getValue()); if (mDispatchInvalidated) { break; } } } } while (mDispatchInvalidated); mDispatchingValue = false; }
private void considerNotify(ObserverWrapper observer) { if (!observer.mActive) { return; } // 檢查最新的狀態b4調度。也許它改變了狀態,但咱們尚未獲得事件。 // 咱們仍是先檢查觀察者。活動,以保持它做爲活動的入口。 // 所以,即便觀察者移動到一個活動狀態,若是咱們沒有收到那個事件,咱們最好不要通知一個更可預測的通知順序。 if (!observer.shouldBeActive()) { observer.activeStateChanged(false); return; } if (observer.mLastVersion >= mVersion) { return; } observer.mLastVersion = mVersion; //noinspection unchecked observer.mObserver.onChanged((T) mData); }
爲什麼訂閱者會立刻收到訂閱以前發佈的最新消息?
首先看一下postValue源代碼,以下所示:
protected void postValue(T value) { boolean postTask; synchronized (mDataLock) { postTask = mPendingData == NOT_SET; mPendingData = value; } if (!postTask) { return; } ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable); } private final Runnable mPostValueRunnable = new Runnable() { @Override public void run() { Object newValue; synchronized (mDataLock) { newValue = mPendingData; mPendingData = NOT_SET; } //noinspection unchecked setValue((T) newValue); } };
能不能從Map容器mObservers中取到LifecycleBoundObserver,而後再更改version呢?答案是確定的,經過查看SafeIterableMap的源碼咱們發現有一個protected的get方法。所以,在調用observe的時候,咱們能夠經過反射拿到LifecycleBoundObserver,再把LifecycleBoundObserver的version設置成和LiveData一致便可。
@Override public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) { super.observe(owner, observer); hook(observer); } private void hook(@NonNull Observer<T> observer) { try { Class<LiveData> classLiveData = LiveData.class; Field fieldObservers = classLiveData.getDeclaredField("mObservers"); fieldObservers.setAccessible(true); Object objectObservers = fieldObservers.get(this); Class<?> classObservers = objectObservers.getClass(); Method methodGet = classObservers.getDeclaredMethod("get", Object.class); methodGet.setAccessible(true); Object objectWrapperEntry = methodGet.invoke(objectObservers, observer); Object objectWrapper = null; if (objectWrapperEntry instanceof Map.Entry) { objectWrapper = ((Map.Entry) objectWrapperEntry).getValue(); } if (objectWrapper != null) { Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass(); Field fieldLastVersion = null; if (classObserverWrapper != null) { fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion"); fieldLastVersion.setAccessible(true); Field fieldVersion = classLiveData.getDeclaredField("mVersion"); fieldVersion.setAccessible(true); Object objectVersion = fieldVersion.get(this); fieldLastVersion.set(objectWrapper, objectVersion); } } } catch (Exception e){ e.printStackTrace(); } }
同時還須要注意,在實現MutableLiveData<T>自定義類BusMutableLiveData中,須要重寫這幾個方法。代碼以下所示:
/** * 在給定的觀察者的生命週期內將給定的觀察者添加到觀察者列表全部者。 * 事件是在主線程上分派的。若是LiveData已經有數據集合,它將被傳遞給觀察者。 * @param owner owner
*/ public void observeSticky(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) { super.observe(owner, observer); } /** * 將給定的觀察者添加到觀察者列表中。這個調用相似於{@link LiveData#observe(LifecycleOwner, Observer)} * 和一個LifecycleOwner, which老是積極的。這意味着給定的觀察者將接收全部事件,而且永遠不會 被自動刪除。 * 您應該手動調用{@link #removeObserver(Observer)}來中止 觀察這LiveData。 * @param observer observer */ public void observeStickyForever(@NonNull Observer<T> observer) { super.observeForever(observer); } ```
首先看看MutableLiveData源代碼,以下所示,這裏重點展現測試數據案例
public void postValue(T value) { super.postValue(value); }
而後使用for循環,使用postValue發送100條消息事件,代碼以下所示:
public void postValueCountTest() { sendCount = 100; receiveCount = 0; ExecutorService threadPool = Executors.newFixedThreadPool(2); for (int i = 0; i < sendCount; i++) { threadPool.execute(new Runnable() { @Override public void run() { LiveDataBus2.get().getChannel(Constant.LIVE_BUS3).postValue("test_1_data"+sendCount); } }); } new Handler().postDelayed(new Runnable() { @Override public void run() { BusLogUtils.d("sendCount: " + sendCount + " | receiveCount: " + receiveCount); Toast.makeText(ThirdActivity4.this, "sendCount: " + sendCount + " | receiveCount: " + receiveCount, Toast.LENGTH_LONG).show(); } }, 1000); } //接收消息 LiveDataBus2.get() .getChannel(Constant.LIVE_BUS3, String.class) .observe(this, new Observer<String>() { @Override public void onChanged(@Nullable String s) { receiveCount++; BusLogUtils.d("接收消息--ThirdActivity4------yc_bus---1-"+s+"----"+receiveCount); } });
而後看一下打印日誌,是否是發現了什麼問題?發現根本沒有100條數據……
2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----1 2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----2 2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----3 2020-03-03 10:25:51.403 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----4
既然post是在子線程中發送消息事件,那麼可不可使用handler將它放到主線程中處理事件了,是能夠的,代碼以下所示
/** * 子線程發送事件
*/ @Override public void postValue(T value) { //注意,去掉super方法, //super.postValue(value); mainHandler.post(new PostValueTask(value)); } private BusWeakHandler mainHandler = new BusWeakHandler(Looper.getMainLooper()); private class PostValueTask implements Runnable { private T newValue; public PostValueTask(@NonNull T newValue) { this.newValue = newValue; } @Override public void run() { setValue(newValue); } } ```
能夠知道,經過postValue能夠在子線程發送消息,那麼發送延遲消息也十分簡單,代碼以下所示:
/** * 子線程發送事件
*/ @Override public void postValue(T value) { //注意,去掉super方法, //super.postValue(value); mainHandler.post(new PostValueTask(value)); } /** * 發送延遲事件 * @param value value * @param delay 延遲時間 */ @Override public void postValueDelay(T value, long delay) { mainHandler.postDelayed(new PostValueTask(value) , delay); //mainHandler.postAtTime(new PostValueTask(value) , delay); } ```
測試用例,延遲5秒鐘發送事件,代碼以下所示。具體能夠看demo鐘的案例!
LiveDataBus.get().with(Constant.LIVE_BUS4).postValueDelay("test_4_data",5000);
輪訓延遲事件,好比有的頁面須要實現,每間隔5秒鐘就刷新一次頁面數據,經常用於活動頁面。在購物商城這類需求很常見
@Override public void postValueInterval(final T value, final long interval) { mainHandler.postDelayed(new Runnable() { @Override public void run() { setValue(value); mainHandler.postDelayed(this,interval); } },interval); }
測試用例,輪訓延遲3秒鐘發送事件,代碼以下所示。具體能夠看demo鐘的案例!
LiveDataBus.get().with(Constant.LIVE_BUS5).postValueInterval("test_5_data",3000);
這裏遇到了一個問題,假若有多個頁面有這種輪訓發送事件的需求,顯然這個是實現不了的。那麼可不能夠把每一個輪訓runnable記錄一個名稱區別開來代碼更更改以下
/** * 發送延遲事件,間隔輪訓 * @param value value
*/ @Deprecated @Override public void postValueInterval(final T value, final long interval,@NonNull String taskName) { if(taskName.isEmpty()){ return; } IntervalValueTask intervalTask = new IntervalValueTask(value,interval); intervalTasks.put(taskName,intervalTask); mainHandler.postDelayed(intervalTask,interval); } private class IntervalValueTask implements Runnable { private T newValue; private long interval; public IntervalValueTask(T newValue, long interval) { this.newValue = newValue; this.interval = interval; } @Override public void run() { setValue(newValue); mainHandler.postDelayed(this,interval); } } ```
輪訓總不能夠一直持續下去吧,這個時候能夠添加一個手動關閉輪訓的方法。代碼以下所示:
/**
*/ @Deprecated @Override public void stopPostInterval(@NonNull String taskName) { IntervalValueTask intervalTask = intervalTasks.get(taskName); if(intervalTask!= null){ //移除callback mainHandler.removeCallbacks(intervalTask); intervalTasks.remove(taskName); } } ```
代碼以下所示
public class SafeCastObserver<T> implements Observer<T> { @NonNull private final Observer<T> observer; public SafeCastObserver(@NonNull Observer<T> observer) { this.observer = observer; } @Override public void onChanged(@Nullable T t) { //捕獲異常,避免出現異常以後,收不到後續的消息事件 try { //注意爲了不轉換出現的異常,try-catch捕獲 observer.onChanged(t); } catch (ClassCastException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
生命週期感知能力就是當在Android平臺的LifecycleOwner(如Activity)中使用的時候,只須要訂閱消息,而不須要取消訂閱消息。LifecycleOwner的生命週期結束的時候,會自動取消訂閱。這帶來了兩個好處: