RxJava 系列-3:使用 Subject

在這篇文章中,咱們會先分析一下 RxJava2 中的 Subject ;而後,咱們會使用 Subject 製做一個相似於 EventBus 的全局的通訊工具。git

在瞭解本篇文章的內容以前,你須要先了解 RxJava2 中的一些基本的用法,好比 Observable 以及背壓的概念,你能夠參考個人其餘兩篇文章來獲取這部份內容:《RxJava2 系列 (1):一篇的比較全面的 RxJava2 方法總結》《RxJava2 系列 (2):背壓和Flowable》github

一、Subject

1.1 Subject 的兩個特性

Subject 能夠同時表明 Observer 和 Observable,容許從數據源中屢次發送結果給多個觀察者。除了 onSubscribe(), onNext(), onError() 和 onComplete() 以外,全部的方法都是線程安全的。此外,你還可使用 toSerialized() 方法,也就是轉換成串行的,將這些方法設置成線程安全的。緩存

若是你已經瞭解了 Observable 和 Observer ,那麼也許直接看 Subject 的源碼定義會更容易理解:安全

public abstract class Subject<T> extends Observable<T> implements Observer<T> {

    // ...
}
複製代碼

從上面看出,Subject 同時繼承了 Observable 和 Observer 兩個接口,說明它既是被觀察的對象,同時又是觀察對象,也就是能夠生產、能夠消費、也能夠本身生產本身消費。因此,咱們能夠項下面這樣來使用它。這裏咱們用到的是該接口的一個實現 PublishSubject :bash

public static void main(String...args) {
    PublishSubject<Integer> subject = PublishSubject.create();
    subject.subscribe(System.out::println);

    Executor executor = Executors.newFixedThreadPool(5);
    Disposable disposable = Observable.range(1, 5).subscribe(i ->
            executor.execute(() -> {
                try {
                    Thread.sleep(i * 200);
                    subject.onNext(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
}
複製代碼

根據程序的執行結果,程序在第200, 400, 600, 800, 1000毫秒依次輸出了1到5的數字。ide

在這裏,咱們用 PublishSubject 建立了一個主題並對其監聽,而後在線程當中又通知該主題內容變化,整個過程咱們都只操做了 PublishSubject 一個對象。顯然,使用 Subject 咱們能夠達到對一個指定類型的值的結果進行監聽的目的——咱們把值改變以後對應的邏輯寫在 subscribe() 方法中,而後每次調用 onNext() 等方法通知結果以後就能夠自動調用 subscribe() 方法進行更新操做。工具

同時,由於 Subject 實現了 Observer 接口,而且在 Observable 等的 subscribe() 方法中存在一個以 Observer 做爲參數的方法(以下),因此,Subject 也是能夠做爲消費者來對事件進行消費的。post

public final void subscribe(Observer<? super T> observer) 
複製代碼

以上就是 Subject 的兩個主要的特性。測試

1.2 Subject 的實現類

在 RxJava2 ,Subject 有幾個默認的實現,下面咱們對它們之間的區別作簡單的說明:this

  1. AsyncSubject:只有當 Subject 調用 onComplete 方法時,纔會將 Subject 中的最後一個事件傳遞給全部的 Observer。好比,在下面的例子中,雖然在發送 "two" 的時候,observer 就進行了訂閱,可是隻有當 subject 調用了 onComplete() 方法的時候,observer 才收到了 "three" 這一個事件:

    AsyncSubject<String> subject = AsyncSubject.create();
     subject.onNext("one");
     subject.onNext("two");
     subject.subscribe(observer);
     subject.onNext("three");
     subject.onComplete();
    複製代碼
  2. BehaviorSubject:在建立 BehaviorSuject 的時候能夠經過靜態的工廠方法指定一個默認值數,也能夠不指定。當一個 Observer 使用了 subscribe() 方法對其進行訂閱的時候,它只能收到在訂閱以前發送出的最後一個結果(或者說最新的值),在這以前的結果是沒法被接收到的。好比,下面的例子中,新註冊的 observer 只能接收到 "one", "two" 和 "three",可是沒法接收到 "zero":

    BehaviorSubject<Object> subject = BehaviorSubject.create();
     subject.onNext("zero");
     subject.onNext("one");
     subject.subscribe(observer);
     subject.onNext("two");
     subject.onNext("three");
    複製代碼
  3. PublishSubject:不會改變事件的發送順序;在已經發送了一部分事件以後註冊的 Observer 不會收到以前發送的事件。好比,在下面的代碼中,observer1 會收到全部的 onNext() 和 onComplete() 發出的結果,可是 observer2 只能收到 "three" 和最終的 onComplete():

    PublishSubject<Object> subject = PublishSubject.create();
     // observer1 進行訂閱
     subject.subscribe(observer1);
     subject.onNext("one");
     subject.onNext("two");
     // observer2 進行訂閱
     subject.subscribe(observer2);
     subject.onNext("three");
     subject.onComplete();
    複製代碼
  4. ReplaySubject:不管何時註冊 Observer 均可以接收到任什麼時候候經過該 Observable 發射的事件。好比,在下面的代碼中,observer1 和 observer2 能夠收到在它們進行訂閱以前的全部的 onNext() 和 onCompete() 事件:

    ReplaySubject<Object> subject = ReplaySubject.create();
     subject.onNext("one");
     subject.onNext("two");
     subject.onNext("three");
     subject.onComplete();
     // observer1 和 observer2 進行訂閱
     subject.subscribe(observer1);
     subject.subscribe(observer2);
    複製代碼
  5. UnicastSubject:只容許一個 Observer 進行監聽,在該 Observer 註冊以前會將發射的全部的事件放進一個隊列中,並在 Observer 註冊的時候一塊兒通知給它。好比,在下面的例子中,當 observer1 進行訂閱的時候,會將 "one" "two" "three" 依次發送給 observer1,而當 observer2 進行訂閱的時候會拋出一個異常,由於只能有一個觀察者能夠訂閱:

    UnicastSubject<String> subject = UnicastSubject.create();
     subject.onNext("one");
     subject.onNext("two");
     subject.onNext("three");
     subject.subscribe(observer1);
     subject.subscribe(observer2);
    複製代碼

對比 PublishSubject 和 ReplaySubject,它們的區別在於新註冊的 Observer 是否可以收到在它註冊以前發送的事件。這個相似於 EventBus 中的 StickyEvent 即黏性事件,爲了說明這一點,咱們準備了下面兩段代碼:

private static void testPublishSubject() throws InterruptedException {
    PublishSubject<Integer> subject = PublishSubject.create();
    subject.subscribe(i -> System.out.print("(1: " + i + ") "));

    Executor executor = Executors.newFixedThreadPool(5);
    Disposable disposable = Observable.range(1, 5).subscribe(i -> executor.execute(() -> {
        try {
            Thread.sleep(i * 200);
            subject.onNext(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }));

    Thread.sleep(500);
    subject.subscribe(i -> System.out.print("(2: " + i + ") "));

    Observable.timer(2, TimeUnit.SECONDS).subscribe(i -> ((ExecutorService) executor).shutdown());
}

private static void testReplaySubject() throws InterruptedException {
    ReplaySubject<Integer> subject = ReplaySubject.create();
    subject.subscribe(i -> System.out.print("(1: " + i + ") "));

    Executor executor = Executors.newFixedThreadPool(5);
    Disposable disposable = Observable.range(1, 5).subscribe(i -> executor.execute(() -> {
        try {
            Thread.sleep(i * 200);
            subject.onNext(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }));

    Thread.sleep(500);
    subject.subscribe(i -> System.out.print("(2: " + i + ") "));

    Observable.timer(2, TimeUnit.SECONDS).subscribe(i -> ((ExecutorService) executor).shutdown());
}
複製代碼

它們的輸出結果依次是

PublishSubject的結果:(1: 1) (1: 2) (1: 3) (2: 3) (1: 4) (2: 4) (1: 5) (2: 5)
ReplaySubject的結果: (1: 1) (1: 2) (2: 1) (2: 2) (1: 3) (2: 3) (1: 4) (2: 4) (1: 5) (2: 5)
複製代碼

從上面的結果對比中,咱們能夠看出前者與後者的區別在於新註冊的 Observer 並無收到在它註冊以前發送的事件。試驗的結果與上面的敘述是一致的。

其餘的測試代碼這不一併給出了,詳細的代碼能夠參考Github - Java Advanced

二、用 RxJava 打造 EventBus

2.1 打造 EventBus

清楚了 Subject 的概念以後,讓咱們來作一個實踐——用 RxJava 打造 EventBus。

咱們先考慮用一個全局的 PublishSubject 來解決這個問題,固然,這意味着咱們發送的事件不是黏性事件。不過,不要緊,只要這種實現方式搞懂了,用 ReplaySubject 作一個發送黏性事件的 EventBus 也非難事。

考慮一下,若是要實現這個功能咱們須要作哪些準備:

  1. 咱們須要發送事件並可以正確地接收到事件。 要實現這個目的並不難,由於 Subject 自己就具備發送和接收兩個能力,做爲全局的以後就具備了全局的註冊和通知的能力。所以,不論你在什麼位置發送了事件,任何訂閱的地方都能收到該事件。
  2. 首先,咱們要在合適的位置對事件進行監聽,並在合適的位置取消事件的監聽。若是咱們沒有在適當的時機釋放事件,會不會形成內存泄漏呢?這仍是有可能的。 因此,咱們須要對註冊監聽的觀察者進行記錄,並提供註冊和取消註冊的方法,給它們在指定的生命週期中進行調用。

好了,首先是全局的 Subject 的問題,咱們能夠實現一個靜態的或者單例的 Subject。這裏咱們選擇使用後者,因此,咱們須要一個單例的方式來使用 Subject:

public class RxBus {

private static volatile RxBus rxBus;

private final Subject<Object> subject = PublishSubject.create().toSerialized();

public static RxBus getRxBus() {
    if (rxBus == null) {
        synchronized (RxBus.class) {
            if(rxBus == null) {
                rxBus = new RxBus();
            }
        }
    }
    return rxBus;
}
複製代碼

}

這裏咱們應用了 DCL 的單例模式提供一個單例的 RxBus,對應一個惟一的 Subject. 這裏咱們用到了 Subject 的toSerialized(),咱們上面已經提到過它的做用,就是用來保證 onNext() 等方法的線程安全性。

另外,由於 Observalbe 自己是不支持背壓的,因此,咱們還須要將該 Observable 轉換成 Flowable 來實現背壓的效果:

public <T> Flowable<T> getObservable(Class<T> type){
    return subject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
}
複製代碼

這裏咱們用到的背壓的策略是BackpressureStrategy.BUFFER,它會緩存發射結果,直到有消費者訂閱了它。而這裏的ofType()方法的做用是用來過濾發射的事件的類型,只有指定類型的事件會被髮布。

而後,咱們須要記錄訂閱者的信息以便在適當的時機取消訂閱,這裏咱們用一個Map<String, CompositeDisposable>類型的哈希表來解決。這裏的CompositeDisposable用來存儲 Disposable,從而達到一個訂閱者對應多個 Disposable 的目的。CompositeDisposable是一個 Disposable 的容器,聲稱能夠達到 O(1) 的增、刪的複雜度。這裏的作法目的是使用註冊觀察以後的 Disposable 的 dispose() 方法來取消訂閱。因此,咱們能夠獲得下面的這段代碼:

public void addSubscription(Object o, Disposable disposable) {
    String key = String.valueOf(o.hashCode());
    if (disposableMap.get(key) != null) {
        disposableMap.get(key).add(disposable);
    } else {
        CompositeDisposable disposables = new CompositeDisposable();
        disposables.add(disposable);
        disposableMap.put(key, disposables);
    }
}

public void unSubscribe(Object o) {
    String key = String.valueOf(o.hashCode());
    if (!disposableMap.containsKey(key)){
        return;
    }
    if (disposableMap.get(key) != null) {
        disposableMap.get(key).dispose();
    }

    disposableMap.remove(key);
}
複製代碼

最後,對外提供一下 Subject 的訂閱和發佈方法,整個 EventBus 就製做完成了:

public void post(Object o){
    subject.onNext(o);
}

public <T> Disposable doSubscribe(Class<T> type, Consumer<T> next, Consumer<Throwable> error){
    return getObservable(type)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(next,error);
}
複製代碼

2.2 測試效果

咱們只須要在最頂層的 Activity 基類中加入以下的代碼。這樣,咱們就不須要在各個 Activity 中取消註冊了。而後,就可使用這些頂層的方法來進行操做了。

protected void postEvent(Object object) {
    RxBus.getRxBus().post(object);
}

protected <M> void addSubscription(Class<M> eventType, Consumer<M> action) {
    Disposable disposable = RxBus.getRxBus().doSubscribe(eventType, action, LogUtils::d);
    RxBus.getRxBus().addSubscription(this, disposable);
}

protected <M> void addSubscription(Class<M> eventType, Consumer<M> action, Consumer<Throwable> error) {
    Disposable disposable = RxBus.getRxBus().doSubscribe(eventType, action, error);
    RxBus.getRxBus().addSubscription(this, disposable);
}

@Override
protected void onDestroy() {
    super.onDestroy();
    RxBus.getRxBus().unSubscribe(this);
}
複製代碼

在第一個 Activity 中咱們對指定的類型的結果進行監聽:

addSubscription(RxMessage.class, rxMessage -> ToastUtils.makeToast(rxMessage.message));
複製代碼

而後,咱們在另外一個 Activity 中發佈事件:

postEvent(new RxMessage("Hello world!"));
複製代碼

這樣當第二個 Activity 中調用指定的發送事件的方法以後,第一個 Activity 就能夠接收到發射的事件了。

總結

好了,以上就是 Subject 的使用,若是要用一個詞來形容它的話,那麼只能是「自給自足」了。就是說,它同時作了 Observable 和 Observer 的工做,既能夠發射事件又能夠對事件進行消費,可謂身兼數職。它在那種想要對某個值進行監聽並處理的情形特別有用。由於它不須要你寫多個冗餘的類,只要它一個就完成了其餘兩個類來完成的任務,於是代碼更加簡潔。

RxJava 系列文章:

2018年10月26日修正

更正 RxBus 中的 addSubscription()unSubscribe() 兩個方法,在以前的版本中使用傳入的 Object 的類名做爲哈希表的鍵,現改成使用 Object 的哈希碼做爲哈希表的鍵:使用類名的時候存在一個問題,即若是在 Fragment 中使用 RxBus,而且同一類型的 Fragment 在多個地方使用,會致使其中一個 Fragment 取消訂閱的時候,全部同一類型的 Fragment 都取消訂閱。

相關文章
相關標籤/搜索