首先要感謝 Season_zlc 的一系列RxJava2
的教程,關於上游、下游、水缸的類比,讓我對於整個RxJava2
的基本思想有了更加清晰的認識。你們有興趣的話必定要多看看,寫的通俗易懂,傳送門:給初學者的 RxJava 2.0 教程 (一) ,本文的思想都來源於它的一系列文章。java
文章比較長,爲了不耽誤你們的時間,先列出須要介紹的知識點: react
在開始學習以前,咱們先看一下最簡單的例子:android
dependencies {
//在build.gradle中,導入依賴。
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
複製代碼
Observable
+ Observer
的最簡單示例,這裏咱們在上游發送了四個onNext(String s)
事件以後,最後發送了一個onComplete()
事件。public static void classicalSample() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("1");
observableEmitter.onNext("2");
observableEmitter.onNext("3");
observableEmitter.onNext("4");
observableEmitter.onComplete();
}
}).subscribe(new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "onSubscribe");
mDisposable = disposable;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext=" + s);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
複製代碼
onSubscribe
;onNext
;onComplete
。
在上面的例子中,涉及到了如下五個類:緩存
Observable
:上游。ObservableOnSubscribe
:上游的create
方法所接收的參數。ObservableEmitter
:上游事件的發送者。Observer
:下游的接收者。Disposable
:用於維繫上游、下游之間的聯繫。對於整個模型,能夠總結爲如下幾點:bash
RxJava2
簡單的來講,就是一個發送事件、接收事件的過程,咱們能夠將發送事件方類比做上游,而接收事件方類比做下游。Observable
,而下游對應Observer
。subscribe
方法。各關鍵元素的UML
圖以下: 網絡
用於 發出事件,它能夠分別發出onNext/onComplete/onError
事件:app
onNext
,下游也能夠接收無限個onNext
。onComplete/onError
後,上游onComplete/onError
後的事件將會繼續發送,可是下游在收到onComplete/onError
事件後再也不繼續接收事件。onComplete
或者onError
事件。onError
或者onComplete
切斷了上游和下游的聯繫,在聯繫切斷後上游再發送onError
事件就會報錯,onComplete
和onError
的調用狀況有如下幾種: (1) onComplete
能夠發送屢次,可是隻會收到一次回調。 (2) onError
只能夠發送一次,發送屢次會報錯。 (3) onComplete
以後不能夠發送onError
,不然會報錯。 (4) onError
以後能夠發送onComplete
,可是隻會收到onError
事件。onError
的參數不容許爲空。其繼承關係以下圖所示: ide
理解成爲 水管的機關,當調用它的dispose
方法時,將會將上游和下游之間的管道切斷,從而致使 下游接收不到事件。函數
Observer
的onSubscribe
回調中,會傳入一個Disposable
對象,下游能夠經過該對象的dispose()
方法主動切斷和上游的聯繫,在這以後上游的observableEmitter.isDisposed()
方法將返回true
。onComplete/onError
在內的任何事件,若此時上游再調用onError
方法發送事件,那麼將會報錯。咱們來模擬一下,在下游收到2
以後,經過Disposable
來切斷上游和下游之間的聯繫:工具
public static void classicalSample() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("1");
observableEmitter.onNext("2");
observableEmitter.onNext("3");
observableEmitter.onNext("4");
observableEmitter.onComplete();
}
}).subscribe(new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "onSubscribe");
mDisposable = disposable;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext=" + s);
if ("2".equals(s)) {
mDisposable.dispose();
}
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
複製代碼
最終的運行結果爲:
經過subscribe
肯定上游和下游的聯繫有如下幾種方法:
Consumer<T>
類Observer
類Action
類對於不使用Observer
類做爲形參的subscribe
函數,其實實現的功能和使用Observer
類做爲參數的方法相同,只不過它們是將Observer
的四個回調分解成形參,有參數的回調用Consumer<T>
代替,而沒有參數的則用Action
代替。
Observable
來發送事件,那麼這個上游就默認在主線程發送事件;而當咱們在下游建立一個Observer
來接收事件,那麼這個下游就默認在主線程中接收事件。subscribeOn
指定的是 上游發送事件 的線程,而observeOn
指定的是 下游接收事件 的線程。subscribeOn
只有第一次有效,而每調用一次observeOn
,那麼下游接收消息的線程就會切換一次。CompositeDisposable
能夠用來容納Disposable
對象,每當咱們獲得一個Disposable
對象時,就經過add
方法將它添加進入容器,在退出的時候,調用clear
方法,便可切斷全部的水管。Schedulers.io()
:表明IO
操做,一般用於網絡請求、文件讀寫等IO
密集型的操做。Schedulers.computation()
:表明CPU
密集型的操做,適用於大量計算。Schedulers.newThread()
:建立新的常規線程。AndroidSchedulers.mainThread()
:表明Android
的主線程。在鏈式調用當中,咱們能夠經過observeOn
方法屢次切換管道下游處理消息的線程,例以下面的代碼,咱們對下游進行了兩次線程的切換:
static void mapSample() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=true");
observableEmitter.onNext("true");
Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=false");
observableEmitter.onNext("false");
Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",onComplete");
observableEmitter.onComplete();
}
//1.指定了subscribe方法執行的線程,並進行第一次下游線程的切換,將其切換到新的子線程。
}).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).map(new Function<String, Boolean>() {
@Override
public Boolean apply(String s) throws Exception {
Log.d(TAG, "apply's thread=" + Thread.currentThread().getId() + ",s=" + s);
return "true".equals(s);
}
//2.進行第二次下游線程的切換,將其切換到主線程。
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(Boolean aBoolean) {
Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",boolean=" + aBoolean);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",onComplete");
}
});
}
複製代碼
以上代碼的運行的結果爲:
Map
操做符的做用是對上游發送的每個事件應用一個函數,使得每一個事件按照函數的邏輯進行變換,經過Map
就能夠把上游發送的每個事件,轉換成Object
或者集合,其英文註釋爲:
map
的代碼爲例,能夠看到map
接收一個Function
類,它有兩個泛型變量,分別爲調用map
方法的Observable<T>
的<T>
泛型,和返回的Obervable<R>
的<R>
泛型。public static void mapVerify() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
}
});
Observable<String> convertObservable = sourceObservable.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
Log.d(TAG, "sourceObservable=" + sourceObservable + "\n convertObservable=" + convertObservable);
}
複製代碼
Function
爲一個接口:
map
函數調用完畢以後,將返回一個新的
Observable
,它的類型爲
ObservableMap
:
FlatMap
用於將一個發送事件的上游Observable
變換成多個發送事件的Observable
,而後將它們發送的事件合併,放進一個單獨的Observable
中,其註釋爲:
FlatMap
不保證不一樣水管之間事件的順序,若是須要保證順序,則須要使用contactMap
。static void flatMapSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onNext(3);
}
});
Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.fromArray("a value of " + integer + ",b value of " + integer);
}
});
flatObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
複製代碼
和map
操做符相似,它也接收一個類型爲Function
的接口,只不過它的? extends R
參數類型換成了? extends Observable<? extends R>
。
前面咱們說到,flatMap
操做符不會保證下游接收事件的順序,下面,咱們就以一個例子來講明,在flatMap
的apply
函數中,咱們將一個事件轉換成兩個Observable
,而且加上了延時:
static void flatMapOrderSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "flatMapOrderSample emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "flatMapOrderSample emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "flatMapOrderSample emit 3");
observableEmitter.onNext(3);
}
});
Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
Log.d(TAG, "flatMapOrderSample apply=" + integer);
long delay = (3 - integer) * 100;
return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
}
});
flatObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
複製代碼
能夠看到,最終的輸出結果和flatMap
收到事件的順序並不相同:
flatMap
換成
contactMap
:
static void contactMapOrderSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "contactMapOrderSample emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "contactMapOrderSample emit 1");
observableEmitter.onNext(2);
Log.d(TAG, "contactMapOrderSample emit 1");
observableEmitter.onNext(3);
}
});
Observable<String> flatObservable = sourceObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
Log.d(TAG, "contactMapOrderSample apply=" + integer);
long delay = (3 - integer) * 100;
return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
}
});
flatObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
複製代碼
最終的運行結果爲:
Zip
經過一個函數從多個Observable
每次各取出一個事件,合併成一個新的事件發送給下游。static void zipSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "sourceObservable emit 1");
observableEmitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "sourceObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "sourceObservable emit 3");
observableEmitter.onNext(3);
Log.d(TAG, "sourceObservable emit 4");
observableEmitter.onNext(4);
}
});
Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "otherObservable emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "otherObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "otherObservable emit 3");
observableEmitter.onNext(3);
}
});
Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "resultObservable onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "resultObservable onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "resultObservable onError");
}
@Override
public void onComplete() {
Log.d(TAG, "resultObservable onComplete");
}
});
}
複製代碼
此時的運行結果爲:
static void zipSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "sourceObservable emit 1");
observableEmitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "sourceObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "sourceObservable emit 3");
observableEmitter.onNext(3);
Log.d(TAG, "sourceObservable emit 4");
observableEmitter.onNext(4);
}
});
Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "otherObservable emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "otherObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "otherObservable emit 3");
observableEmitter.onNext(3);
}
}).subscribeOn(Schedulers.io());
Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "resultObservable onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "resultObservable onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "resultObservable onError");
}
@Override
public void onComplete() {
Log.d(TAG, "resultObservable onComplete");
}
});
}
複製代碼
運行結果爲:
「背壓」其實就是一種用於解決問題的工具,那麼咱們的問題又是什麼呢?
想必你們在不少文章中都聽過這個一句話:在RxJava2
中,Observable
不支持「背壓」,而Flowable
支持背壓。
關於Observable
不支持背壓,咱們應當從兩種狀況去考慮,即上游、下游是否位於相同的線程。
首先,咱們不調用observeOn
和subscribeOn
方法來改變上游、下游的工做線程,這樣,上游和下游就位於同一線程,同時,咱們在下游的處理函數中,每收到一個消息就休眠2000ms
,以模擬上游處理速度大於下游的場景。
static void oomSample() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
for (int i = 0; i < 1000; i++) {
Log.d(TAG, "observableEmitter=" + i);
observableEmitter.onNext(i);
}
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.d(TAG, "accept=" + integer);
}
});
}
複製代碼
從下面的打印結果能夠看到,當「使用 Observable
,而且上游、下游位於相同線程」時,並不會出現消息堆積的狀況,由於上游發射完一條消息後,必需要等到下游處理完該消息,纔會發射一條新的消息。
接着,咱們採用subscribeOn
和observeOn
來使得上游和下游位於不一樣的工做線程,其它均和2.2
中相同。
static void oomSample() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
for (int i = 0; i < 1000; i++) {
Log.d(TAG, "observableEmitter=" + i);
observableEmitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.d(TAG, "accept=" + integer);
}
});
}
複製代碼
和2.2
中不一樣,當上遊和下游位於不一樣的工做線程,那麼上游發送消息時,不會考慮下游是否已經處理了以前的消息,它會直接發送,而這些發送的消息被存放在水缸當中,下游每處理完一條消息,就去水缸中取下一條數據,那麼隨着水缸中數據愈來愈多,那麼系統中的無用資源就會急劇增長。
咱們之因此說Observable
不支持「背壓」,就是在2.1
介紹的整個族譜中,沒有一個類,一種方法能讓下游通知上游說:不要再發消息到水缸裏了,我已經處理不過來了!
那是否是說Flowable
支持「背壓」,而Observable
不支持,那麼Observable
就要被取代了呢,其實否則,Flowable
對於「背壓」的支持是以性能爲代價的,咱們應當只在有可能出現2.3
中上游下游速率不匹配的問題時,纔去使用Flowable
,不然就應當使用Observable
,也就是知足兩點條件:
Flowable
和Subscriber
分別對應於以前討論的Observable
和Observer
,它們直接的鏈接仍然是經過subscribe
方法。Flowable
在設計的時候採用了 響應式拉取 的思想,當下遊調用了Subscription
的request
方法時,就代表了下游處理事件的能力,這樣上游就能夠根據這個值來控制事件發送的頻率,避免出現前面談到的上游發送太快,而下游處理太慢從而致使OOM
的發生。static void flowSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
sourceFlow.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
複製代碼
其類結構圖和Observable
幾乎徹底一致:
從上面的類圖能夠看出,Flowable
和Observable
最大的不一樣,就是在create
方法中,須要傳入額外的參數,它表示的是「背壓」的策略,這裏可選的值包括:
ERROR
BUFFER
DROP
LATEST
request(n)
的值,那麼會拋出MissingBackpressureException
異常。128
,只有當下遊調用request
時,才從水缸中取出事件發送給下游,若是水缸中事件的個數超過了128
,那麼也會拋出MissingBackpressureException
異常。下面這段代碼,咱們先將三個事件放入到水缸當中,以後每次調用request
方法就會從水缸當中取出一個事件發送給下游。
static void flowSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
static void clickSubscription() {
if (sSubscription != null) {
sSubscription.request(1);
}
}
複製代碼
當上遊和下游位於不一樣的線程,每次經過Subscription
調用request
就會從水缸中取出一個事件,發送給下游:
BUFFER
策略時,至關於在上游放置了一個容量無限大的水缸,全部下游暫時沒法處理的消息都放在水缸當中,這裏再也不像ERROR
策略同樣,區分上游和下游是否位於同一線程。OOM
。static void clickSubscription() {
if (sSubscription != null) {
sSubscription.request(10);
}
}
static void flowBufferSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10000;i ++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
複製代碼
在上面的例子中,咱們先把10000
條消息放入到水缸當中,以後經過Subscription
每次從水缸中取出10
條消息發送給下游,演示結果爲:
DROP
策略時,會把水缸沒法存放的事件丟棄掉,這裏一樣不會受到下游和下游是否處於同一個線程的限制。static void flowDropSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 130; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
複製代碼
咱們先往水缸中放入130
條消息,以後每次經過Subscription
取出60
條消息發送給下游,能夠看到,最後最多隻取到了第128
條消息,第129/130
條消息被丟棄了。
DROP
相似,當水缸沒法容納下消息時,會將它丟棄,可是除此以外,上游還會緩存最新的一條消息,實例以下:static void flowLatestSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 130; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
複製代碼
從下面的運行結果能夠看出,當取出最後一批數據的時候,上游除了收到存儲在水缸當中的數據,還額外收到了最後一條消息,也就是第130
條數據,這就是DROP
策略和LATEST
策略的區別: