背壓(Backpressure)多是全部想要深刻運用RxJava的朋友必須理解的一個概念。java
關於它的介紹,我本意是想寫在RxJava2.0更新介紹的文章裏的,但是寫着寫着發現,要完整介紹這個概念須要花費的篇幅太長,剛好目前對於背壓的介紹文章比較少,因此決定單獨拿出來,自成一篇。而關於RxJava2.0的文章修改以後就會發出來和你們探討。react
若是對於RxJava不是很熟悉,那麼在這篇文章以前,我但願你們先看看那篇關於Rxjava最友好的文章,能夠幫助你們很順暢的瞭解RxJava。緩存
讓咱們先忘掉背壓(Backpressure)這個概念,從RxJava一個比較常見的工做場景提及。bash
RxJava是一個觀察者模式的架構,當這個架構中被觀察者(Observable)和觀察者(Subscriber)處在不一樣的線程環境中時,因爲者各自的工做量不同,致使它們產生事件和處理事件的速度不同,這就會出現兩種狀況:架構
下面咱們用代碼演示一下這種崩潰的場景:異步
//被觀察者在主線程中,每1ms發送一個事件
Observable.interval(1, TimeUnit.MILLISECONDS)
//.subscribeOn(Schedulers.newThread())
//將觀察者的工做放在新線程環境中
.observeOn(Schedulers.newThread())
//觀察者處理每1000ms才處理一個事件
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("TAG","---->"+aLong);
}
});
複製代碼
在上面的代碼中,被觀察者發送事件的速度是觀察者處理速度的1000倍ide
這段代碼運行以後:post
...
Caused by: rx.exceptions.MissingBackpressureException
...
...
複製代碼
拋出MissingBackpressureException每每就是由於,被觀察者發送事件的速度太快,而觀察者處理太慢,並且你尚未作相應措施,因此報異常。學習
而這個MissingBackpressureException異常裏面就包含了Backpressure這個單詞,看來背壓確定和這種異常狀況有關係。spa
那麼背壓(Backpressure)究竟是什麼呢?
我這兩天翻閱了大量的中文和英文資料,我發現中文資料中,不少人對於背壓(Backpressure)的理解是有很大問題的,有的人把它看做一個須要避免的問題,或者程序的異常,有的人則乾脆避而不談,模棱兩可,着實讓人尷尬。
經過參考和對比大量的相關資料,我在這裏先對背壓(Backpressure)作一個明確的定義:背壓是指在異步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的狀況下,一種告訴上游的被觀察者下降發送速度的策略
簡而言之,背壓是流速控制的一種策略。
須要強調兩點:
那麼咱們再回看上面的程序異常就很好理解了,就是當被觀察者發送事件速度過快的狀況下,咱們沒有作流速控制,致使了異常。
那麼背壓(Backpressure)策略具體是哪如何實現流速控制的呢?
首先咱們回憶以前那篇關於Rxjava最友好的文章,裏面其實提到,在RxJava的觀察者模型中,被觀察者是主動的推送數據給觀察者,觀察者是被動接收的。而響應式拉取則反過來,觀察者主動從被觀察者那裏去拉取數據,而被觀察者變成被動的等待通知再發送數據。
結構示意圖以下:
觀察者能夠根據自身實際狀況按需拉取數據,而不是被動接收(也就至關於告訴上游觀察者把速度慢下來),最終實現了上游被觀察者發送事件的速度的控制,實現了背壓的策略。
代碼實例以下:
//被觀察者將產生100000個事件
Observable observable=Observable.range(1,100000);
class MySubscriber extends Subscriber<T> {
@Override
public void onStart() {
//必定要在onStart中通知被觀察者先發送一個事件
request(1);
}
@Override
public void onCompleted() {
...
}
@Override
public void onError(Throwable e) {
...
}
@Override
public void onNext(T n) {
...
...
//處理完畢以後,在通知被觀察者發送下一個事件
request(1);
}
}
observable.observeOn(Schedulers.newThread())
.subscribe(MySubscriber);
複製代碼
在代碼中,傳遞事件開始前的onstart()中,調用了request(1),通知被觀察者先發送一個事件,而後在onNext()中處理完事件,再次調用request(1),通知被觀察者發送下一個事件....
注意在onNext()方法中,最好最後再調用request()方法.
若是你想取消這種backpressure 策略,調用quest(Long.MAX_VALUE)便可。
實際上,在上面的代碼中,你也能夠不須要調用request(n)方法去拉取數據,程序依然能完美運行,這是由於range --> observeOn,這一段中間過程自己就是響應式拉取數據,observeOn這個操做符內部有一個緩衝區,Android環境下長度是16,它會告訴range最多發送16個事件,充滿緩衝區便可。不過話說回來,在觀察者中使用request(n)這個方法可使背壓的策略表現得更加直觀,更便於理解。
若是你足夠細心,會發現,在開頭展現異常狀況的代碼中,使用的是interval這個操做符,可是在這裏使用了range操做符,爲何呢?
這是由於interval操做符自己並不支持背壓策略,它並不響應request(n),也就是說,它發送事件的速度是不受控制的,而range這類操做符是支持背壓的,它發送事件的速度能夠被控制。
那麼到底什麼樣的Observable是支持背壓的呢?
須要說明的時,Hot Observables 和cold Observables並非嚴格的概念區分,它只是對於兩類Observable形象的描述
其實也有建立了Observable以後調用諸如publish()方法就能夠開始發送事件的,這裏我們暫且忽略。
咱們通常使用的都是Cold Observable,除非特殊需求,纔會使用Hot Observable,在這裏,Hot Observable這一類是不支持背壓的,而是Cold Observable這一類中也有一部分並不支持背壓(好比interval,timer等操做符建立的Observable)。
懵逼了吧?
Tips: 都是Observable,結果有的支持背壓,有的不支持,這就是RxJava1.X的一個問題。在2.0中,這種問題已經解決了,之後談到2.0時再細說。
在那些不支持背壓策略的操做符中使用響應式拉取數據的話,仍是會拋出MissingBackpressureException。
那麼,不支持背壓的Observevable如何作流速控制呢?
就是雖然生產者產生事件的速度很快,可是把大部分的事件都直接過濾(浪費)掉,從而間接的下降事件發送的速度。
相關相似的操做符:Sample,ThrottleFirst.... 以sample爲例,
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
//這個操做符簡單理解就是每隔200ms發送裏時間點最近那個事件,
//其餘的事件浪費掉
.sample(200,TimeUnit.MILLISECONDS)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("TAG","---->"+aLong);
}
});
複製代碼
這是以殺敵一千,自損八百的方式解決這個問題,由於拋棄了絕大部分的事件,而在咱們使用RxJava 時候,咱們本身定義的Observable產生的事件可能都是咱們須要的,通常來講不會拋棄,因此這種方案有它的缺陷。
就是雖然被觀察者發送事件速度很快,觀察者處理不過來,可是能夠選擇先緩存一部分,而後慢慢讀。
相關相似的操做符:buffer,window... 以buffer爲例,
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
//這個操做符簡單理解就是把100毫秒內的事件打包成list發送
.buffer(100,TimeUnit.MILLISECONDS)
.subscribe(new Action1<List<Long>>() {
@Override
public void call(List<Long> aLong) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("TAG","---->"+aLong.size());
}
});
複製代碼
對於不支持背壓的Observable除了使用上述兩類生硬的操做符以外,還有更好的選擇:onBackpressurebuffer,onBackpressureDrop。
下面,咱們以onBackpressureDrop爲例說說用法:
Observable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onStart() {
Log.w("TAG","start");
// request(1);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.e("ERROR",e.toString());
}
@Override
public void onNext(Long aLong) {
Log.w("TAG","---->"+aLong);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
複製代碼
這段代碼的輸出:
W/TAG: start
W/TAG: ---->0
W/TAG: ---->1
W/TAG: ---->2
W/TAG: ---->3
W/TAG: ---->4
W/TAG: ---->5
W/TAG: ---->6
W/TAG: ---->7
W/TAG: ---->8
W/TAG: ---->9
W/TAG: ---->10
W/TAG: ---->11
W/TAG: ---->12
W/TAG: ---->13
W/TAG: ---->14
W/TAG: ---->15
W/TAG: ---->1218
W/TAG: ---->1219
W/TAG: ---->1220
...
複製代碼
之因此出現0-15這樣連貫的數據,就是是由於observeOn操做符內部有一個長度爲16的緩存區,它會首先請求16個事件緩存起來....
你可能會以爲這兩個操做符和上面講的過濾和緩存很相似,確實,功能上是有些相似,可是這兩個操做符提供了更多的特性,那就是能夠響應下游觀察者的request(n)方法了,也就是說,使用了這兩種操做符,可讓本來不支持背壓的Observable「支持」背壓了。
1, 本文以前對於Hot Observables和Cold observables的描述寫反了,是我太大意,目前已改正,大家如今看到的是正確的,感謝@jaychang0917的提醒。
講了這麼多終於要到尾聲了。
下面咱們總結一下:
這篇文章並非爲了讓你學習在RxJava1.0中使用背壓(若是你以前不瞭解背壓的話),由於在1.0中,背壓的設計並不十分完美。而是但願你對背壓有一個全面清晰的認識,對於它在RxJava1.0中的設計缺陷有所瞭解便可。由於這篇文章自己是爲了2.0作一個鋪墊,後續的文章中我會繼續談到背壓和使用背壓的正確姿式。