本篇文章已受權微信公衆號 YYGeeker
獨家發佈轉載請標明出處react
CSDN學院課程地址緩存
- RxJava2從入門到精通-初級篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-中級篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-進階篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-源碼分析篇:edu.csdn.net/course/deta…
背壓的概念是在平時業務開發時較爲常見,大多數是針對高併發的業務,背壓是必須考慮的因素之一。在異步場景中,因爲數據流的發射速度高於數據流的接收速度,就會致使數據不能及時處理,從而致使數據流的阻塞。背壓所要作的事情就是主動控制數據流發射的速度bash
在RxJava2.0中,推出了Flowable用來支持背壓,去除了Observable對背壓的支持,下面在背壓策略的講解中,咱們都使用Flowable做爲咱們的響應類型。在使用背壓時,只須要在create()
方法中第二個參數添加背壓策略便可微信
FlowableSubscriber
,那麼須要經過s.request(Long.MAX_VALUE)
去主動請求上游的數據項。若是遇到背壓報錯的時候,FlowableSubscriber
默認已經將錯誤try-catch,並經過onError()
進行回調,程序並不會崩潰Consumer
,那麼不須要主動去請求上游數據,默認已經調用了s.request(Long.MAX_VALUE)
。若是遇到背壓報錯、且對Throwable的Consumer
沒有new出來,則程序直接崩潰public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
}
複製代碼
MISSING表示OnNext事件沒有任何緩存和丟棄,下游要處理任何溢出,能夠理解爲至關於沒有指定背壓策略。Flowable至關於沒有指定背壓策略能夠將下游要處理任何溢出理解爲,上游發射的數據未獲得處理,就會緩存起來,當緩存容量達到128時,再增長一個未處理的數據項,就會拋出MissingBackpressureException,且帶有隊列已經滿了的友好提示。這裏就比如一個大水缸,當水注滿的時候,它就會把蓋子蓋上,不讓你再繼續注水了
併發
這裏咱們模擬上游發送速度高於下游數據流的處理速度,在數據處理的時候加上
Thread.sleep(1000)
異步
public void missing() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
複製代碼
輸出ide
io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
複製代碼
ERROR表示在下游沒法跟上時,會拋出MissingBackpressureException。能夠將下游沒法跟上理解爲,上游發射的數據未獲得處理,就會緩存起來,當緩存容量達到128時,再增長一個未處理的數據項,就會拋出MissingBackpressureException。這裏比如一個大水缸,當水注滿的時候,它會把水缸撐破了,直接破裂
高併發
public void error() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
複製代碼
輸出源碼分析
io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
複製代碼
上游不斷的發出onNext請求,直到下游處理完,上游發射的數據項的緩存池是無限大的,程序也不會拋出錯誤,可是要注意程序OOM的現象,由於緩存越大,佔用的內存就越多。例子中發射129個數據項,然而程序並無崩潰,只會一直讀取緩存池的數據項,直到數據項被處理完。這裏就是一個無限大的水缸
ui
背壓策略除了BUFFER策略的緩存池是無限大以外,其餘默認的緩存池都是128
public void buffer() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
複製代碼
輸出
onNext=0
onNext=1
onNext=2
......
onNext=998
onNext=999
複製代碼
會在下游跟不上速度時,把onNext的值丟棄,簡單的說就是,超過緩存區大小(128)的數據項都會被丟棄。例子中經過發射800個數據項,那麼咱們只會收到0-127的數據項。若是咱們再次調用request()
,這時候取到的數據就是上一次request()後的128個數據。這裏比如一個大水缸,當水注滿的時候,水仍是在繼續的流,一旦有request調用的時候,它就會去取出水缸裏的全部水,這時候水缸就是空的,但水一直在流,因此水缸立刻又會被注滿,這個時候就要等request再次取出水缸裏的水
public void drop() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
複製代碼
輸出
onNext=0
onNext=1
onNext=2
......
onNext=127
複製代碼
LATEST與Drop策略同樣,若是超過緩存池容量大小的數據項都會被丟棄。不一樣的是,無論緩存池的狀態如何,LATEST都會將最後一條數據強行放入緩存池中。這裏的水缸容納下了最後一滴水
public void latest() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
複製代碼
輸出
onNext=0
onNext=1
......
onNext=126
onNext=127
onNext=999
複製代碼