很早以前有看過別人實現的 RxBus , 當初也只是隨意用用而已,沒有想過去研究。今天看到 brucezz 天哥在羣裏分享了一把,本身也加入了討論,下來還實踐了一把,因此想借此篇進入到源碼層,深入體驗下 RxBus 這輛 「蘭博基尼」 的設計美感和獨特魅力。html
本篇文章已受權微信公衆號 guolin_blog (郭霖)獨家發佈java
<!-- more -->git
RxBusgithub
關於簡單的實現和用法,這篇文章已經很好的說明了。
推薦先看看 RxBus 的簡單實現和用法。
地址在這裏:RxBus 的簡單實現
-
讓咱們看看這輛車到底用了些什麼?
Subject
SerializedSubject
PublishSubject
CompositeSubscription
這是 Subject 的中文解釋:
Subject能夠當作是一個橋樑或者代理,在某些ReactiveX實現中(如RxJava),它同時充當了Observer和Observable的角色。由於它是一個Observer,它能夠訂閱一個或多個Observable;又由於它是一個Observable,它能夠轉發它收到(Observe)的數據,也能夠發射新的數據。
因爲一個Subject訂閱一個Observable,它能夠觸發這個Observable開始發射數據(若是那個Observable是"冷"的--就是說,它等待有訂閱纔開始發射數據)。所以有這樣的效果,Subject能夠把原來那個"冷"的Observable變成"熱"的。
源碼:
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> { protected Subject(OnSubscribe<R> onSubscribe) { super(onSubscribe); } public abstract boolean hasObservers(); public final SerializedSubject<T, R> toSerialized() { if (getClass() == SerializedSubject.class) { return (SerializedSubject<T, R>)this; } return new SerializedSubject<T, R>(this); }
Subject 只有兩個方法。
hasObservers()
方法的解釋是:
Indicates whether the {@link Subject} has {@link Observer Observers} subscribed to it.
判斷 Subject 是否已經有 observers 訂閱了 有則返回 ture
toSerialized()
方法的解釋是:
Wraps a {@link Subject} so that it is safe to call its various {@code on} methods from different threads.
包裝 Subject 後讓它能夠安全的在不一樣線程中調用各類方法
爲何這個方法後就能夠是線程安全了呢?
咱們看到 toSerialized()
返回了 SerializedSubject<T, R>
。咱們先到這裏打住,稍後咱們再看看該類作了什麼。
在 RxJava 裏有一個抽象類 Subject,既是 Observable 又是 Observer,能夠把 Subject 理解成一個管道或者轉發器,數據從一端輸入,而後從另外一端輸出。
Subject 有好幾種,這裏可使用最簡單的 PublishSubject
。訂閱以後,一旦數據從一端傳入,結果會裏馬上從另外一端輸出。
源碼裏給了用法例子:
PublishSubject<Object> subject = PublishSubject.create(); // observer1 will receive all onNext and onCompleted events subject.subscribe(observer1); subject.onNext("one"); subject.onNext("two"); // observer2 will only receive "three" and onCompleted subject.subscribe(observer2); subject.onNext("three"); subject.onCompleted();
官方文檔推薦咱們:
若是你把 Subject 看成一個 Subscriber 使用,注意不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能致使同時(非順序)調用,這會違反Observable協議,給Subject的結果增長了不肯定性。
要避免此類問題,你能夠將 Subject 轉換爲一個 SerializedSubject ,相似於這樣:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
因此咱們能夠看到在 RxBus 初始化的時候咱們作了這樣一件事情:
private final Subject<Object, Object> BUS; private RxBus() { BUS = new SerializedSubject<>(PublishSubject.create()); }
爲了保證多線程的調用中結果的肯定性,咱們按照官方推薦將 Subject 轉換成了一個 SerializedSubject 。
該類一樣是 Subject
的子類,這裏貼出該類的構造方法。
private final SerializedObserver<T> observer; private final Subject<T, R> actual; public SerializedSubject(final Subject<T, R> actual) { super(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> child) { actual.unsafeSubscribe(child); } }); this.actual = actual; this.observer = new SerializedObserver<T>(actual); }
咱們發現,Subject
最後轉化成了 SerializedObserver
.
When multiple threads are emitting and/or notifying they will be serialized by:
Allowing only one thread at a time to emit
Adding notifications to a queue if another thread is already emitting
Not holding any locks or blocking any threads while emitting
一次只會容許一個線程進行發送事物
若是其餘線程已經準備就緒,會通知給隊列
在發送事物中,不會持有任何鎖和阻塞任何線程
經過介紹能夠知道是經過 notifications
來進行併發處理的。
SerializedObserver 類中private final NotificationLite<T> nl = NotificationLite.instance();
重點看看 nl 在 onNext() 方法裏的使用:
@Override public void onNext(T t) { // 省略一些代碼 for (;;) { for (int i = 0; i < MAX_DRAIN_ITERATION; i++) { FastList list; synchronized (this) { list = queue; if (list == null) { emitting = false; return; } queue = null; } for (Object o : list.array) { if (o == null) { break; } // 這裏的 accept() 方法 try { if (nl.accept(actual, o)) { terminated = true; return; } } catch (Throwable e) { terminated = true; Exceptions.throwIfFatal(e); actual.onError(OnErrorThrowable.addValueAsLastCause(e, t)); return; } } } } }
知道哪裏具體調用了以後,咱們再仔細看看 NotificationLite
。
先來了解它究竟是什麼:
For use in internal operators that need something like materialize and dematerialize wholly within the implementation of the operator but don't want to incur the allocation cost of actually creating {@link rx.Notification} objects for every {@link Observer#onNext onNext} and {@link Observer#onCompleted onCompleted}.
It's implemented as a singleton to maintain some semblance of type safety that is completely non-existent.大體意思是:做爲一個單例類保持這種徹底不存在的安全類型的表象。
剛咱們在 SerializedObserver 的 onNext() 方法中看到 nl.accept(actual, o)
因此咱們再深刻到 accept()
方法中:
public boolean accept(Observer<? super T> o, Object n) { if (n == ON_COMPLETED_SENTINEL) { o.onCompleted(); return true; } else if (n == ON_NEXT_NULL_SENTINEL) { o.onNext(null); return false; } else if (n != null) { if (n.getClass() == OnErrorSentinel.class) { o.onError(((OnErrorSentinel) n).e); return true; } o.onNext((T) n); return false; } else { throw new IllegalArgumentException("The lite notification can not be null"); } }
Unwraps the lite notification and calls the appropriate method on the {@link Observer}.
判斷 lite 通知類別,通知 observer 執行適當方法。
經過 NotificationLite 類圖能夠看到有三個標識
ON_NEXT_NULL_SENTINEL (onNext 標識)
ON_COMPLETED_SENTINEL (onCompleted 標識)
OnErrorSentinel (onError 標識)
與 Observer 回調一致。經過分析得知 accept()
就是經過標識來判斷,而後調用 Observer 相對應的方法。
RxBus 這輛"蘭博基尼"與 CompositeSubscription 車間搭配更好。
-
構造函數:
private Set<Subscription> subscriptions; private volatile boolean unsubscribed; public CompositeSubscription() { } public CompositeSubscription(final Subscription... subscriptions) { this.subscriptions = new HashSet<Subscription>(Arrays.asList(subscriptions)); }
內部是初始化了一個 HashSet ,按照哈希算法來存取集合中的對象,存取速度比較快,而且沒有重複對象。
因此咱們推薦在基類裏實例化一個 CompositeSubscription 對象,使用 CompositeSubscription 來持有全部的 Subscriptions ,而後在 onDestroy()或者 onDestroyView()裏取消全部的訂閱。
能力有限,文章錯誤還望指出,有任何問題都歡迎討論 :)
轉載請註明出處。
最後送上我女神 Gakki , 開心最好 ( ´͈v `͈ )◞。