RxJava(二):基礎知識

博客主頁java

1. Observable

RxJava 使用一般須要三步:數據庫

  1. 建立 Observable

Observable 字面意思是被觀察者,使用 RxJava 須要建立一個被觀察者,它會決定何時觸發事件以及觸發怎樣的事件。有點相似上游發送命令,能夠在這裏決定異步操做模塊的順序和異步操做模塊的次數。segmentfault

  1. 建立 Observer

Observer 即觀察者,它能夠在不一樣的線程中執行任務。這種模式能夠極大地簡化併發操做,由於它建立了一個處於待命狀態的觀察者哨兵,能夠在將來某個時刻響應 Observable 的通知,而不須要阻塞等待 Observable 發射數據。緩存

  1. 使用 subscribe() 進行訂閱

建立了 Observable 巳和 Observer 以後,咱們還須要使用 subscribe() 方法將它們鏈接起來,這樣整個上下游就能銜接起來實現鏈式調用。安全

Observable.just("Hello World!").subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "accept: " + s);
    }
});

just() 是 RxJava 的建立操做符,用於建立一個 Observable,Consumer 是消費者,用於接收單個值。網絡

subscribe 有多個重載的方法。
併發

一個重載方法的版本,subscribe(onNext, onError, onComplete)框架

Observable.just("Hello World!").subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error-> " + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

// 執行結果
Next-> Hello World!
Complete.

onComplete 是一個 Action 它與 Consumer 的區別以下:異步

  1. Action 無參數類型。
  2. Consumer :單一參數類型。

再來看一個重載方法的版本, subscribe(onNext, onError, onComplete, onSubscribe)ide

Observable.just("Hello World!").subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error-> " + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
}, new Consumer<Disposable>() {
    @Override
    public void accept(Disposable disposable) throws Exception {
        Log.d(TAG, "subscribe");
    }
});

// 執行結果
subscribe
Next-> Hello World!
Complete.

從打印結果可知:先執行了onSubscribe 再執行了 onNext和onComplete

在RxJava 2 中, Observable 再也不支持訂閱 Subscriber ,而是須要使用 Observer 做爲觀察者

Observable.just("Hello World!").subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "subscribe");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "Next-> " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "Error-> " + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "Complete.");
    }
});

// 執行結果
subscribe
Next-> Hello World!
Complete.

在RxJava 中, 被觀察者、觀察者、subscribe()方法三者缺一不可。只有使用了 subscribe(),被觀察者纔會開始發送數據.

RxJava 2 的5種觀察者模式:

5種觀察者模式的描述,只有 Flowable 支持背壓,若是有須要背壓的狀況,則必須使用 Flowable
l7DHSK.png

do 操做符

do 操做符能夠給 Observable 生命週期的各個階段加上一系列的回調監聽,當 Observable 執行到這個階段時,這些回調就會被觸發。在 RxJava 中包含了不少的 doXXX 操做符。

Observable.just("Hello World!")
        .doOnNext(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "doOnNext-> " + s);
            }
        })
        .doAfterNext(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "doAfterNext-> " + s);
            }
        })
        .doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "doOnComplete.");
            }
        })
        // 訂閱以後回調方法
        .doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.d(TAG, "doOnSubscribe");
            }
        })
        .doAfterTerminate(new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "doAfterTerminate");
            }
        })
        .doFinally(new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "doFinally");
            }
        })
        // Observable 每發射一個數據就會觸發這個回調,不只包括onNext,還包括onComplete和onError
        .doOnEach(new Consumer<Notification<String>>() {
            @Override
            public void accept(Notification<String> stringNotification) throws Exception {
                Log.d(TAG, "doOnEach-> "
                        + (stringNotification.isOnNext() ? "onNext"
                        : stringNotification.isOnComplete() ? "onComplete"
                        : "onError"));
            }
        })
        // 訂閱後能夠取消訂閱
        .doOnLifecycle(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                Log.d(TAG, "doOnLifecycle::onSubscribe-> " + disposable.isDisposed());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "doOnLifecycle::onDispose");
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "subscribe-> " + s);
            }
        });

// 執行結果
doOnSubscribe
doOnLifecycle::onSubscribe-> false
doOnNext-> Hello World!
doOnEach-> onNext
subscribe-> Hello World!
doAfterNext-> Hello World!
doOnComplete.
doOnEach-> onComplete
doFinally
doAfterTerminate


2. Hot Observable Cold Observable

Observable 的分類

在 RxJava 中, Observable 有 Hot 和 Cold 之分。

Hot Observable 不管有沒有觀察者進行訂閱,事件始終都會發生。當 Hot Observable 有多個訂閱者時(多個觀察者進行訂閱時) , Hot Observable 與訂閱者們的關係是一對多的關係,能夠與多個訂閱者共享信息。

Cold Observable 是隻有觀察者訂閱了,纔開始執行發射數據流的代碼。井且 Cold Observable 和 Observer 只能是一對一的關係 。當有多個不一樣的訂閱者時,消息是從新完整發送的。也就是說,對 Cold Observable ,有多個 Observer 的時候,它們各自的事件是獨立的。

Cold Hot 區別:

  1. 把Hot Observable 想象成一個廣播電臺,全部在此刻收聽的昕衆都會聽到同一首歌
  2. Cold Observable 一張音樂 CD ,人們能夠獨立購買並聽取它。

Cold Observable

Observable 的 just、creat、range、fromXXX 等操做符都能生成 Cold Observable

Consumer<Long> subscriber1 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "subscriber1-> " + aLong);
    }
};

Consumer<Long> subscriber2 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "   subscriber2-> " + aLong);
    }
};


Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
    @Override
    public void subscribe(final ObservableEmitter<Long> emitter) throws Exception {
        Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
                .take(Integer.MAX_VALUE)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        emitter.onNext(aLong);
                    }
                });
    }
}).observeOn(Schedulers.newThread());

observable.subscribe(subscriber1);
observable.subscribe(subscriber2);

// 執行結果
    subscriber2-> 0
 subscriber1-> 0
 subscriber1-> 1
    subscriber2-> 1
 subscriber1-> 2
    subscriber2-> 2
    subscriber2-> 3
 subscriber1-> 3
 subscriber1-> 4
    subscriber2-> 4
 subscriber1-> 5
    subscriber2-> 5
    subscriber2-> 6
 subscriber1-> 6
 subscriber1-> 7
    subscriber2-> 7
    subscriber2-> 8
 subscriber1-> 8
 subscriber1-> 9
    subscriber2-> 9

subscriber1 和 subscriber2 結果並不必定是相同的,它們兩者是徹底獨立的。create 操做符建立的Observable 是 Cold Observable

Cold Observable 如何轉換成 Hot Observable

使用 publish ,生 ConnectableObservable

使用 publish 操做符 ,可讓 Cold Observable 轉換成 Hot Observable,它將原先 Observable
轉換 ConnectableObservable

Consumer<Long> subscriber1 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "subscriber1-> " + aLong);
    }
};

Consumer<Long> subscriber2 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "   subscriber2-> " + aLong);
    }
};

Consumer<Long> subscriber3 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "      subscriber3-> " + aLong);
    }
};


ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
    @Override
    public void subscribe(final ObservableEmitter<Long> emitter) throws Exception {
        Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
                .take(Integer.MAX_VALUE)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        emitter.onNext(aLong);
                    }
                });
    }
}).observeOn(Schedulers.newThread()).publish();

// 須要調用connect()才能真正執行
observable.connect();

// 訂閱
observable.subscribe(subscriber1);
observable.subscribe(subscriber2);

try {
    Thread.sleep(20L);
} catch (InterruptedException e) {
    e.printStackTrace();
}
observable.subscribe(subscriber3);

// 執行結果
 subscriber1-> 0
    subscriber2-> 0
 subscriber1-> 1
    subscriber2-> 1
       subscriber3-> 1
 subscriber1-> 2
    subscriber2-> 2
       subscriber3-> 2
 subscriber1-> 3
    subscriber2-> 3
       subscriber3-> 3
 subscriber1-> 4
    subscriber2-> 4
       subscriber3-> 4
 subscriber1-> 5
    subscriber2-> 5
       subscriber3-> 5
 subscriber1-> 6
    subscriber2-> 6
       subscriber3-> 6
 subscriber1-> 7
    subscriber2-> 7
       subscriber3-> 7
 subscriber1-> 8
    subscriber2-> 8
       subscriber3-> 8
 subscriber1-> 9
    subscriber2-> 9
       subscriber3-> 9
 subscriber1-> 10
    subscriber2-> 10
       subscriber3-> 10
 subscriber1-> 11
    subscriber2-> 11
       subscriber3-> 11

多個訂閱的 subscriber (或者說觀察者)共享同一事件。在這裏,ConnectableObservable 是線程安全的。

使用 Subject/Processor

Subject 和 Processor 的做用相同。Processor 是 RxJava 2.x 新增的類,繼承自 Flowable, 支持背壓控制( Back Presure ),而 Subject 則不支持背壓控制。

Consumer<Long> subscriber1 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "subscriber1-> " + aLong);
    }
};

Consumer<Long> subscriber2 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "   subscriber2-> " + aLong);
    }
};

Consumer<Long> subscriber3 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "      subscriber3-> " + aLong);
    }
};


Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
    @Override
    public void subscribe(final ObservableEmitter<Long> emitter) throws Exception {
        Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
                .take(Integer.MAX_VALUE)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        emitter.onNext(aLong);
                    }
                });
    }
}).observeOn(Schedulers.newThread());

PublishSubject<Long> subject = PublishSubject.create();
observable.subscribe(subject);


observable.subscribe(subscriber1);
observable.subscribe(subscriber2);

try {
    Thread.sleep(20L);
} catch (InterruptedException e) {
    e.printStackTrace();
}
subject.subscribe(subscriber3);

// 執行結果與上面使用 publish 操做符相同。

Subject 既是 Observable ,又是 Observer (Subscriber)。能夠從 Subject 源碼上看到 ,繼承自 Observable ,實現 Observer

// Subject源碼
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
  // ...省略
}

Subject 做爲觀察者,能夠訂閱目標 Cold Observable ,使對方開始發送事件。同時它又做爲 Observable 轉發或者發送新的事件,讓 Cold Observable 藉助 Subject 轉換爲 Hot Observable

Subject 井不是線程安全的,若是想要其線程安全,須要調用 toSerialized() 方法(在RxJava 1.x 中還能夠用 SerializedSubject 代替 Subject ,可是在 RxJava 2.x 以後 SerializedSubject 再也不是 public class)

然而,不少基於 EventBus 改造的 RxBus 並無這麼作。這樣的作法是很是危險的,會遇到併發的狀況。

Hot Observable 如何轉換成 Cold Observable

ConnectableObservable 的 refCount 操做符

在 ReactiveX 官網的解釋是:make a Connectable Observable behave like an ordinary Observable

RefCount 操做符把一個可鏈接的 Observable 鏈接和斷開的過程自動化了。它操做一個可鏈接的Observable 返回一個普通的 Observable 。當第一個訂閱者/觀察者訂閱這個 Observable 時,RefCount 鏈接到下層的可鏈接 Observable。 RefCount 跟蹤有多少個觀察者訂閱它,直到最後一個觀察者完成,才斷開與下層可鏈接 Observable 的鏈接。

若是全部的訂閱者/觀察者都取消訂閱了,則數據流中止:若是從新訂閱,則從新開始數據流。

Consumer<Long> subscriber1 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "subscriber1-> " + aLong);
    }
};

Consumer<Long> subscriber2 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "   subscriber2-> " + aLong);
    }
};

ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
    @Override
    public void subscribe(final ObservableEmitter<Long> emitter) throws Exception {
        Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
                .take(Integer.MAX_VALUE)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        emitter.onNext(aLong);
                    }
                });
    }
}).observeOn(Schedulers.newThread()).publish();

connectableObservable.connect();

Observable<Long> observable = connectableObservable.refCount();

Disposable disposable1 = observable.subscribe(subscriber1);
Disposable disposable2 = observable.subscribe(subscriber2);

try {
    Thread.sleep(20L);
} catch (InterruptedException e) {
    e.printStackTrace();
}

disposable1.dispose();
disposable2.dispose();

Log.d(TAG, "從新開始數據流");

disposable1 = observable.subscribe(subscriber1);
disposable2 = observable.subscribe(subscriber2);

// 執行結果
 subscriber1-> 0
    subscriber2-> 0
 subscriber1-> 1
    subscriber2-> 1
 從新開始數據流
 subscriber1-> 0
    subscriber2-> 0
 subscriber1-> 1
    subscriber2-> 1

若是不是全部的訂閱者/觀察者都取消了訂閱 ,而只是部分取消,則部分的訂閱者/觀察者從新開始訂閱時,不會從頭開始數據流

Consumer<Long> subscriber1 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "subscriber1-> " + aLong);
    }
};

Consumer<Long> subscriber2 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "   subscriber2-> " + aLong);
    }
};

Consumer<Long> subscriber3 = new Consumer<Long>() {

    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "      subscriber3-> " + aLong);
    }
};

ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
    @Override
    public void subscribe(final ObservableEmitter<Long> emitter) throws Exception {
        Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
                .take(Integer.MAX_VALUE)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        emitter.onNext(aLong);
                    }
                });
    }
}).observeOn(Schedulers.newThread()).publish();

connectableObservable.connect();

Observable<Long> observable = connectableObservable.refCount();

Disposable disposable1 = observable.subscribe(subscriber1);
Disposable disposable2 = observable.subscribe(subscriber2);
observable.subscribe(subscriber3);

try {
    Thread.sleep(20L);
} catch (InterruptedException e) {
    e.printStackTrace();
}

disposable1.dispose();
disposable2.dispose();

Log.d(TAG, "subscriber1 和 subscriber2 從新訂閱");

disposable1 = observable.subscribe(subscriber1);
disposable2 = observable.subscribe(subscriber2);

// 執行結果
 subscriber1-> 0
    subscriber2-> 0
       subscriber3-> 0
 subscriber1-> 1
    subscriber2-> 1
       subscriber3-> 1
 subscriber1 和 subscriber2 從新訂閱
       subscriber3-> 2
 subscriber1-> 2
    subscriber2-> 2
       subscriber3-> 3
 subscriber1-> 3
    subscriber2-> 3
       subscriber3-> 4
 subscriber1-> 4
    subscriber2-> 4

subscriber1 和 subscriber2 先取消了訂閱,subscriber3 井沒有取消訂閱。以後,subscriber1 和 subscriber2 又從新訂閱。最終 subscriber一、subscriber二、subscriber3 的值保持一致

Observable share 操做符

share 操做符封裝了 publish().refCount()調用,能夠看其源碼

public final Observable<T> share() {
    return publish().refCount();
}

3. Flowable

在 RxJava 2.x 中,Observable 再也不支持背壓(Back Pressure),而改由 Flowable 來支持非阻塞式的背壓。 Flowable 是 RxJava 2.x 新增的被觀察者, Flowable 能夠當作 Observable 新的實現,它支持背壓,同時實現 Reactive Streams 的 Publisher 接口。 Flowable 全部的操做符強制支持背壓,不過 Flowable 中的操做符大多與 Observable 相似。

(1) 使用 Observable 較好的場景

  • 通常處理最大不超過 1000 條數據,而且幾乎不會出現內存溢出
  • GUI 鼠標事件,基本不會背壓(能夠結合 sampling/debouncing 操做)
  • 處理同步流

(2) 使用 Flowable 較好的場景

  • 處理以某種方式產生超過 lOKB 的元素;
  • 文件讀取與分析
  • 讀取數據庫記錄,也是一個阻塞的和基於拉取模式
  • 網絡 I/O 流
  • 建立一個響應式非阻塞接口

4. Single、Completable 和 Maybe

一般狀況下 ,若是想要使用 RxJava, 首先會想到的是使用 Observable ,若是考慮到背壓的狀況,則在 RxJava 2.x 下會使用 Flowable 。除 Observable 和 Flowable 外,在 RxJava 2.x 中還有3種類型的被觀察者 Single、Completable 和 Maybe

Single

從 SingleEmitter 的源碼能夠看出,Single 只有 onSuccess 和 onError 事件

public interface SingleEmitter<T> {

    void onSuccess(@NonNull T t);

    void onError(@NonNull Throwable t);

    void setDisposable(@Nullable Disposable d);

    void setCancellable(@Nullable Cancellable c);

    boolean isDisposed();

    boolean tryOnError(@NonNull Throwable t);
}

其中, onSuccess 用於發射數據(在 Observable/Flowable 中使用 onNext() 來發射數據),並且只能發射一個數據,後面即便再發射數據也不會作任何處理。

Single 的 SingleObserver 中只有 onSuccess 和 onError ,並無 onComplete 。這也是Single 與其餘4種被觀察者之間的最大區別。

Single.create(new SingleOnSubscribe<String>() {
    @Override
    public void subscribe(SingleEmitter<String> emitter) throws Exception {
        emitter.onSuccess("test");
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "onSuccess-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "onError-> " + throwable.getMessage());
    }
});

// 執行結果
onSuccess-> test

上面 Observer 中有兩個 Consumer, 還能夠進一步簡化:

Single.create(new SingleOnSubscribe<String>() {
    @Override
    public void subscribe(SingleEmitter<String> emitter) throws Exception {
        emitter.onSuccess("test");
    }
}).subscribe(new BiConsumer<String, Throwable>() {
    @Override
    public void accept(String s, Throwable throwable) throws Exception {
        Log.d(TAG, "onSuccess-> " + s);
    }
});

Single 能夠經過 toXXX 方法轉換成 Observable、Flowable、Completable、Maybe

Completable

Completable 在建立後,不會發射任何數據。從 CompletableEmitter 的源碼中能夠看到。

public interface CompletableEmitter {

    void onComplete();

    void onError(@NonNull Throwable t);

    void setDisposable(@Nullable Disposable d);

    void setCancellable(@Nullable Cancellable c);

    boolean isDisposed();

    boolean tryOnError(@NonNull Throwable t);
}

Completable 只有 onComplete 和 onError 事件,同時 Completable 井沒有 map、flatMap 等操做符,它的操做符比起 Observable/Flowable 要少得多

能夠經過企fromXXX 操做符來建立一個 Completable

Completable.fromAction(new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Hello World!");
    }
}).subscribe();

// 執行結果
Hello World!

Completable 常常結合 andThen 操做符使用。

Completable.create(new CompletableOnSubscribe() {
    @Override
    public void subscribe(CompletableEmitter emitter) throws Exception {
        try {
            Log.d(TAG, "--------------");
            TimeUnit.SECONDS.sleep(1);
            emitter.onComplete();
        } catch (InterruptedException e) {
            emitter.onError(e);
        }
    }
}).andThen(Observable.range(1, 5))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next-> " + integer);
            }
        });

// 執行結果
18:19:37.943  --------------
18:19:38.947  Next-> 1
18:19:38.947  Next-> 2
18:19:38.947  Next-> 3
18:19:38.948  Next-> 4
18:19:38.948  Next-> 5

emitter.onComplete() 執行完成以後,代表 Completable 經執行完畢,接下來執行 andThen 裏的操做。

在 Completable 中, andThen 有多個重載的方法,正好對應了5種被觀察者的類型。

Completable andThen(CompletableSource next)

<T> Observable<T> andThen(ObservableSource<T> next)

<T> Maybe<T> andThen(MaybeSource<T> next)

<T> Flowable<T> andThen(Publisher<T> next)

<T> Single<T> andThen(SingleSource<T> next)

Completable 也能夠經過 toXXX 方法轉換成 Observable、Flowable、Single 以及 Maybe

Maybe

Maybe 是 RxJava 2.x 以後纔有的新類型,能夠當作是 Single 和 Completable 的結合。

Maybe 建立以後, MaybeEmitter 和 SingleEmitter 樣,並無 onNext() 方法,一樣須要 onSuccess() 方法來發射數據。

Maybe.create(new MaybeOnSubscribe<String>() {
    @Override
    public void subscribe(MaybeEmitter<String> emitter) throws Exception {
        emitter.onSuccess("Hello World!");
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Success-> " + s);
    }
});

// 執行結果
Success-> Hello World!

Maybe 也只能發射 0 或者 1 個數據,即便發射多個數據, 後面發射的數據也不會處理

emitter.onSuccess("Hello World!");
 emitter.onSuccess("Hello World!2");

// 執行結果仍然是
Success-> Hello World!

若是 MaybeEmitter 先調用了 onComplete(),即便後面再調用 onSuccess(),也不會發射任何數據。

Maybe.create(new MaybeOnSubscribe<String>() {
    @Override
    public void subscribe(MaybeEmitter<String> emitter) throws Exception {
        emitter.onComplete();
        emitter.onSuccess("Hello World!");
        emitter.onSuccess("Hello World!2");

    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Success-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error->" + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

// 執行結果
Complete.

Maybe 在沒有數據發射時,subscribe 會調用 MaybeObserver 的 onComplete()。若是 Maybe 有數據發射或者調用了 onError(),則不會執行 MaybeObserver 的 onComplete()。

也能夠將 Maybe 轉換成 Observable、Flowable、Single,只需相應地調用 toObservable()、toFlowable()、 toSingle()便可。

5. Subject 和 Processor

Subject 是一種特殊的存在

Subject 既是 Observable ,又是 Observer。官網稱能夠將 Subject 看做一個橋樑或者代理

Subject 的分類

Subject 包含 4 種類型,分別是 AsyncSubject、BehaviorSubject、ReplaySubject、PublishSubject

AsyncSubject

Observer 會接 AsyncSubject 的 onComplete() 以前的最後一個數據

AsyncSubject<String> subject = AsyncSubject.create();
subject.onNext("AsyncSubject#1");
subject.onNext("AsyncSubject#2");
subject.onComplete();
subject.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        // 出現異常纔會輸出
        Log.d(TAG, "Error->" + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

subject.onNext("AsyncSubject#3");
subject.onNext("AsyncSubject#4");

// 執行結果
Next-> AsyncSubject#2
Complete.

修改一下上面代碼,將subject.onComplete();放到最後,執行結果

Next-> AsyncSubject#4
 Complete.

subject.onComplete() 必需要調用纔會開始發送數據,不然觀察者將不接收任何數據。

BehaviorSubject

Observer 會先接收到 BehaviorSubject 被訂閱以前的最後一個數據,再接收訂閱以後發射過來的數據。若是 BehaviorSubject 被訂閱以前沒有發送任何數據,則會發送一個默認數據。

BehaviorSubject<String> subject = BehaviorSubject.createDefault("BehaviorSubject#1");
// 訂閱
subject.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error->" + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

subject.onNext("BehaviorSubject#2");
subject.onNext("BehaviorSubject#3");

// 執行結果
 Next-> BehaviorSubject#1
 Next-> BehaviorSubject#2
 Next-> BehaviorSubject#3

BehaviorSubject#1 是默認值。

修改一下上面的代碼,在訂閱以前發送數據 subject.onNext("BehaviorSubject#4"); 執行結果以下:

Next-> BehaviorSubject#4
 Next-> BehaviorSubject#2
 Next-> BehaviorSubject#3

丟棄了默認值,而發射了 BehaviorSubject#4。由於 BehaviorSubject 每次只會發射調用
subscribe() 方法以前的最後一個事件和調用 subscribe() 方法以後的事件.

BehaviorSubject 還能夠緩存最近一次發出信息的數據.

ReplaySubject

ReplaySubject 會發射全部來自原始 Observable 的數據給觀察者,不管它們是什麼時候訂閱的.

ReplaySubject<String> subject = ReplaySubject.create();
subject.onNext("ReplaySubject#1");
subject.onNext("ReplaySubject#2");
// 訂閱
subject.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error->" + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

subject.onNext("ReplaySubject#3");
subject.onNext("ReplaySubject#4");

// 執行結果
 Next-> ReplaySubject#1
 Next-> ReplaySubject#2
 Next-> ReplaySubject#3
 Next-> ReplaySubject#4

若是將create() 修改成 createWithSize(1),表示只緩存訂閱前最後發送的一條數據。執行結果以下:

Next-> ReplaySubject#2
 Next-> ReplaySubject#3
 Next-> ReplaySubject#4

這個執行結果與 BehaviorSubject 的相同。可是從併發的角度來看, ReplaySubject 在處理併發subscribe() 和 onNext() 時會更加複雜。

ReplaySubject 除了能夠限制緩存數據的數量,還能限制緩存的時間,使用 createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) 便可。

PublishSubject

Observer 只接收 PublishSubject 被訂閱以後發送的數據

PublishSubject<String> subject = PublishSubject.create();
subject.onNext("PublishSubject#1");
subject.onNext("PublishSubject#2");
subject.onComplete();

// 訂閱
subject.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error->" + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

subject.onNext("PublishSubject#3");
subject.onNext("PublishSubject#4");

// 執行結果
 Complete.

由於 subject 在訂閱以前己經執行了 onComplete() 方法,因此沒法發射數據。修改一下,將 onComplete() 方法放在最後。執行結果以下:

Next-> PublishSubject#3
 Next-> PublishSubject#4
 Complete.

4 個 Subject 的特性總結:

可能錯過的事件

Subject 做爲一個 Observable 時,能夠不停地調用 onNext() 來發送事件,直至遇到 onComplete() 纔會結束。

PublishSubject<String> subject = PublishSubject.create();
// 訂閱
subject.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next-> " + s);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error->" + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

subject.onNext("PublishSubject#1");
subject.onNext("PublishSubject#2");
subject.onComplete();

// 執行結果
 Next-> PublishSubject#1
 Next-> PublishSubject#2
 Complete.

若是使用 subscribeOn 操做符將 subject 切換到 I/O 線程,則可使用 Thread.sleep(2000) 讓主線程休眠 2s

PublishSubject<String> subject = PublishSubject.create();
// 訂閱
subject.subscribeOn(Schedulers.io()) // 切換到I/O線程
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next-> " + s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error->" + throwable.getMessage());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "Complete.");
            }
        });

subject.onNext("PublishSubject#1");
subject.onNext("PublishSubject#2");
subject.onComplete();

try {
    Thread.sleep(2000L);
} catch (InterruptedException e) {
    e.printStackTrace();
}

// 執行結果
 Complete.

沒有打印 PublishSubject#1 和 PublishSubject#2 ???

由於 subject 發射元素的線程被指派到了 I/O 線程,此時 I/O 線程正在初始化還沒起來,subject 發射前,PublishSubject#1 和 PublishSubject#2 這兩個元素還在主線程中,而在從主線程往 I/O 線程轉發的過程當中,因爲 I/O 線程尚未起來,因此就被丟棄了。此時,不管 Thread 睡了多少秒, PublishSubject#1 和 PublishSubject#2 都不會被打印出來。

其實,解決方法很簡單,使用 Observable.create() 來替代 subject ,它容許爲每一個訂閱者精確控制事件的發迭,這樣就不會少打印 PublishSubject#1 和 PublishSubject#2

使用 PublishSubject 實現簡化的 RxBus

下面的代碼實現了一個簡化版本的 Android EventBus ,在這裏使用了 PublishSubject 。由於事件總線是基於發佈/訂閱模式實現的,若是某一事件在多個 Activity/Fragment 中被訂閱,則在 App 的任意地方一旦發佈該事件,多個訂閱的地方均能收到這一事件(在這裏,訂閱事件的 Activity/Fragment 不能被損壞,一旦被損壞就不能收到事件),這很符合 Hot Observable 的特性。因此,咱們使用 PublishSubject ,考慮到多錢程的狀況,還須要使用 Subject 的 toSerialized() 方法。

public class RxBus {
    private static class Holder {
        private static final RxBus BUS = new RxBus();
    }

    public static RxBus get() {
        return Holder.BUS;
    }

    private Subject<Object> mBus;

    private RxBus() {
        mBus = PublishSubject.create().toSerialized();
    }

    public boolean hasObservers() {
        return mBus.hasObservers();
    }

    public Observable<Object> toObservable() {
        return mBus;
    }

    public <T> Observable<T> toObservable(Class<T> clazz) {
        return mBus.ofType(clazz);
    }

    public void post(Object object) {
        mBus.onNext(object);
    }
}

在這裏, Subject 的 toSerialized(), 使用 SerializedSubject 包裝了原先的 Subject

public final Subject<T> toSerialized() {
    if (this instanceof SerializedSubject) {
        return this;
    }
    return new SerializedSubject<T>(this);
}

這個版本的 EventBus 較簡單,井沒有考慮背壓的狀況,由於在 RxJava 2.x 中, SuSubject 已經再也不支持背壓了。若是要增長背壓的處理,可使用 Processor,須要將 PublishSubject 改爲 PublishProcessor,對應的 Observable 須要改爲 Flowable

使用 BehaviorSubject 實現預加載

預加載能夠很好地提升程序的用戶體驗。每當用戶處於弱網絡時,打開 App 極可能會出現一片空白或者一直在 loading,此時用戶必定很煩躁。而若是可以預先加載一些數據,例如上一次打開 App 時保存的數據,那麼必定會有效提高 App 的用戶體驗

下面是藉助 BehaviorSubject 的特性來實現一個簡單的預加載類 RxPreLoader

public class RxPreLoader<T> {

    // 可以緩存訂閱以前的最新數據
    private BehaviorSubject<T> dataSubject;
    private Disposable disposable;

    public RxPreLoader(T defaultValue) {
        dataSubject = BehaviorSubject.createDefault(defaultValue);
    }

    // 發送事件
    public void publish(T data) {
        dataSubject.onNext(data);
    }

    // 訂閱事件
    public Disposable subscribe(Consumer<? super T> onNext) {
        disposable = dataSubject.subscribe(onNext);
        return disposable;
    }

    // 取消訂閱
    public void disposable() {
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
            disposable = null;
        }
    }

    // 獲取緩存數據的Subject
    public BehaviorSubject<T> getCacheDataSubject() {
        return dataSubject;
    }

    // 直接獲取最近的一條數據
    public T getLastCacheData() {
        return dataSubject.getValue();
    }

}

Processor

Processor 和 Subject 的做用相同。 Processor 是 RxJava2.0 新增的功能,它是一個接口,繼承自 Subscriber 和 Publisher,可以支持背壓(Back Pressure)控制。這是 Processor 和 Subject 最大區別。

其實, Publisher 和 Processor 都是 Reactive Streams 的接口, Reactive Streams 項目提供了一個非阻塞異步流處理的背壓標準。 RxJava 2.0 己經基於 Reactive Streams 庫進行重寫 ,包括 Single、Completable ,都是基於 Reactive Streams 規範重寫的, Flowable 實現了 Reactive Streams Publisher 接口等。

Reactive Streams 的目標是管制流數據在跨越異步邊界進行流數據交換,能夠認爲是將元素傳遞到另外一個線程或線程池,同時確保在接收端不是被迫緩衝任意數量的數據。換句話說,背壓是該模型的重要組成部分,經過設置協調線程之間的隊列大小進行限制 。當各異步模型之間採用同步通訊時會削弱異步帶來的好處,所以必須採起謹慎措施,強制徹底無阻塞反應流能在系統的各個方面都作到異步實施。

Reactive Streams 規範的主要目標:

  • 經過異步邊界( Asynchronous Boundary )來解糯系統組件 。解禍的先決條件,分離事件/數據流的發送方和接收方的資源使用
  • 爲背壓處理定義一種模型。流處理的理想範式是將數據從發佈者推送到訂閱者,這樣發佈者就能夠快速發佈數據,同時經過壓力處理來確保速度更快的發佈者不會對速度較慢的訂閱者形成過載。背壓處理經過使用流控制來確保操做的穩定性並能實現優雅降級,從而提供彈性能力。

Reactive Streams JVM接口 由如下四個接口組成:

  • Publisher:消息發佈者
  • Subscriber :消息訂閱者
  • Subscription: 一個訂閱
  • Processor: Publisher + Subscriber 的結合體

RxJava 2.0 中使用 Processor 來處理背壓。同時,在新發布的 Java 9 中也已經引入 Reactive
Streams 的思想,具體來講是構建在 java.util.concurrent.Flow 容器中,包含了四個接口類。

RxJava 的 Subject 是一種特殊的存在,它的靈活性在使用時也會伴隨着風險,如果沒有用好則可能會錯過事件。注意,Subject 不是線程安全的。固然不少開源框架都在使用 Subject,例如大名鼎鼎的 RxLifecycle 就使用了 BehaviorSubject

若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)

相關文章
相關標籤/搜索