從 RxBus 這輛蘭博基尼深刻進去

很早以前有看過別人實現的 RxBus , 當初也只是隨意用用而已,沒有想過去研究。今天看到 brucezz 天哥在羣裏分享了一把,本身也加入了討論,下來還實踐了一把,因此想借此篇進入到源碼層,深入體驗下 RxBus 這輛 「蘭博基尼」 的設計美感和獨特魅力。html

本篇文章已受權微信公衆號 guolin_blog (郭霖)獨家發佈java

<!-- more -->git

RxBus

準備

關於簡單的實現和用法,這篇文章已經很好的說明了。

推薦先看看 RxBus 的簡單實現和用法。

地址在這裏:RxBus 的簡單實現

解剖

    • -

讓咱們看看這輛車到底用了些什麼?

  • Subject

  • SerializedSubject

  • PublishSubject

  • CompositeSubscription

從 Subject 開始發車

官方解釋

這是 Subject 的中文解釋:

Subject能夠當作是一個橋樑或者代理,在某些ReactiveX實現中(如RxJava),它同時充當了Observer和Observable的角色。由於它是一個Observer,它能夠訂閱一個或多個Observable;又由於它是一個Observable,它能夠轉發它收到(Observe)的數據,也能夠發射新的數據。

因爲一個Subject訂閱一個Observable,它能夠觸發這個Observable開始發射數據(若是那個Observable是"冷"的--就是說,它等待有訂閱纔開始發射數據)。所以有這樣的效果,Subject能夠把原來那個"冷"的Observable變成"熱"的。

Subject 源碼

源碼:

public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
    protected Subject(OnSubscribe<R> onSubscribe) {
        super(onSubscribe);
    }

    public abstract boolean hasObservers();

    public final SerializedSubject<T, R> toSerialized() {
        if (getClass() == SerializedSubject.class) {
            return (SerializedSubject<T, R>)this;
        }
        return new SerializedSubject<T, R>(this);
    }

Subject 只有兩個方法。

hasObservers()方法的解釋是:

Indicates whether the {@link Subject} has {@link Observer Observers} subscribed to it.

判斷 Subject 是否已經有 observers 訂閱了 有則返回 ture

toSerialized() 方法的解釋是:

Wraps a {@link Subject} so that it is safe to call its various {@code on} methods from different threads.

包裝 Subject 後讓它能夠安全的在不一樣線程中調用各類方法

爲何這個方法後就能夠是線程安全了呢?

咱們看到 toSerialized() 返回了 SerializedSubject<T, R> 。咱們先到這裏打住,稍後咱們再看看該類作了什麼。

PublishSubject 解釋

在 RxJava 裏有一個抽象類 Subject,既是 Observable 又是 Observer,能夠把 Subject 理解成一個管道或者轉發器,數據從一端輸入,而後從另外一端輸出。

Subject 有好幾種,這裏可使用最簡單的 PublishSubject。訂閱以後,一旦數據從一端傳入,結果會裏馬上從另外一端輸出。

源碼裏給了用法例子:

PublishSubject<Object> subject = PublishSubject.create();
  // observer1 will receive all onNext and onCompleted events
  subject.subscribe(observer1);
  subject.onNext("one");
  subject.onNext("two");
  // observer2 will only receive "three" and onCompleted
  subject.subscribe(observer2);
  subject.onNext("three");
  subject.onCompleted();

串行化

官方文檔推薦咱們:

若是你把 Subject 看成一個 Subscriber 使用,注意不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能致使同時(非順序)調用,這會違反Observable協議,給Subject的結果增長了不肯定性。

要避免此類問題,你能夠將 Subject 轉換爲一個 SerializedSubject ,相似於這樣:

mySafeSubject = new SerializedSubject( myUnsafeSubject );

因此咱們能夠看到在 RxBus 初始化的時候咱們作了這樣一件事情:

private final Subject<Object, Object> BUS;

    private RxBus() {
        BUS = new SerializedSubject<>(PublishSubject.create());
    }

爲了保證多線程的調用中結果的肯定性,咱們按照官方推薦將 Subject 轉換成了一個 SerializedSubject 。

SerializedSubject

該類一樣是 Subject 的子類,這裏貼出該類的構造方法。

private final SerializedObserver<T> observer;
    private final Subject<T, R> actual;

    public SerializedSubject(final Subject<T, R> actual) {
        super(new OnSubscribe<R>() {

            @Override
            public void call(Subscriber<? super R> child) {
                actual.unsafeSubscribe(child);
            }

        });
        this.actual = actual;
        this.observer = new SerializedObserver<T>(actual);
    }

咱們發現,Subject 最後轉化成了 SerializedObserver.

SerializedObserver

When multiple threads are emitting and/or notifying they will be serialized by:

Allowing only one thread at a time to emit
Adding notifications to a queue if another thread is already emitting
Not holding any locks or blocking any threads while emitting

一次只會容許一個線程進行發送事物

若是其餘線程已經準備就緒,會通知給隊列
在發送事物中,不會持有任何鎖和阻塞任何線程

經過介紹能夠知道是經過 notifications 來進行併發處理的。

SerializedObserver 類中
private final NotificationLite<T> nl = NotificationLite.instance();

重點看看 nl 在 onNext() 方法裏的使用:

@Override
    public void onNext(T t) {
   // 省略一些代碼
        for (;;) {
            for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
                FastList list;
                synchronized (this) {
                    list = queue;
                    if (list == null) {
                        emitting = false;
                        return;
                    }
                    queue = null;
                }
                for (Object o : list.array) {
                    if (o == null) {
                        break;
                    }
                    // 這裏的 accept() 方法
                    try {
                        if (nl.accept(actual, o)) {
                            terminated = true;
                            return;
                        }
                    } catch (Throwable e) {
                        terminated = true;
                        Exceptions.throwIfFatal(e);
                        actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
                        return;
                    }
                }
            }
        }
    }

NotificationLite

知道哪裏具體調用了以後,咱們再仔細看看 NotificationLite

先來了解它究竟是什麼:

For use in internal operators that need something like materialize and dematerialize wholly within the implementation of the operator but don't want to incur the allocation cost of actually creating {@link rx.Notification} objects for every {@link Observer#onNext onNext} and {@link Observer#onCompleted onCompleted}.
It's implemented as a singleton to maintain some semblance of type safety that is completely non-existent.

大體意思是:做爲一個單例類保持這種徹底不存在的安全類型的表象。

剛咱們在 SerializedObserver 的 onNext() 方法中看到 nl.accept(actual, o)
因此咱們再深刻到 accept() 方法中:

public boolean accept(Observer<? super T> o, Object n) {
        if (n == ON_COMPLETED_SENTINEL) {
            o.onCompleted();
            return true;
        } else if (n == ON_NEXT_NULL_SENTINEL) {
            o.onNext(null);
            return false;
        } else if (n != null) {
            if (n.getClass() == OnErrorSentinel.class) {
                o.onError(((OnErrorSentinel) n).e);
                return true;
            }
            o.onNext((T) n);
            return false;
        } else {
            throw new IllegalArgumentException("The lite notification can not be null");
        }
    }

Unwraps the lite notification and calls the appropriate method on the {@link Observer}.

判斷 lite 通知類別,通知 observer 執行適當方法。

經過 NotificationLite 類圖能夠看到有三個標識

  • ON_NEXT_NULL_SENTINEL (onNext 標識)

  • ON_COMPLETED_SENTINEL (onCompleted 標識)

  • OnErrorSentinel (onError 標識)

與 Observer 回調一致。經過分析得知 accept() 就是經過標識來判斷,而後調用 Observer 相對應的方法。

CompositeSubscription

RxBus 這輛"蘭博基尼"與 CompositeSubscription 車間搭配更好。

    • -

構造函數:

private Set<Subscription> subscriptions;
    private volatile boolean unsubscribed;

    public CompositeSubscription() {
    }

    public CompositeSubscription(final Subscription... subscriptions) {
        this.subscriptions = new HashSet<Subscription>(Arrays.asList(subscriptions));
    }

內部是初始化了一個 HashSet ,按照哈希算法來存取集合中的對象,存取速度比較快,而且沒有重複對象。

因此咱們推薦在基類裏實例化一個 CompositeSubscription 對象,使用 CompositeSubscription 來持有全部的 Subscriptions ,而後在 onDestroy()或者 onDestroyView()裏取消全部的訂閱。

參考文章

熄火休息

能力有限,文章錯誤還望指出,有任何問題都歡迎討論 :)

轉載請註明出處。

最後送上我女神 Gakki , 開心最好 ( ´͈v `͈ )◞。

相關文章
相關標籤/搜索