Outlinejava
[TOC]app
在上一節中, 咱們找到了上下游流速不均衡從而致使BackPressureException出現的源頭 , 在這一節裏咱們將學習如何去治理它 . 可能不少看過其餘人寫的文章的朋友都會以爲只有Flowable
才能解決 , 因此你們對這個Flowable
都抱有很大的期許 , 其實吶 , 大家畢竟圖樣圖森破 , 今天咱們先拋開Flowable
, 僅僅依靠咱們本身的雙手和智慧
, 來看看咱們如何去治理 , 經過本節的學習以後咱們再來看Flowable
, 你會發現它其實並無想象中那麼牛叉, 它只是被其餘人過分神化了. ide
咱們接着來看上一節的這個例子:工具
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) { //無限循環發送事件
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});複製代碼
上一節中咱們看到了它的運行結果是直接爆掉了內存, 也明白它爲何就爆掉了內存, 那麼咱們能作些什麼, 才能不讓這種狀況發生呢. 學習
以前咱們說了, 上游發送的全部事件都放到水缸裏了, 因此瞬間水缸就滿了, 那咱們能夠只放咱們須要的事件到水缸裏呀, 只放一部分數據到水缸裏, 這樣不就不會溢出來了嗎, 所以, 咱們把上面的代碼修改一下:優化
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 10 == 0;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});複製代碼
在這段代碼中咱們增長了一個filter, 只容許能被10整除的事件經過, 再來看看運行結果:spa
能夠看到, 雖然內存依然在增加, 可是增加速度相比以前, 已經減小了太多了, 至少在我錄完GIF以前尚未爆掉內存, 你們能夠試着改爲能被100整除試試.3d
能夠看到, 經過減小進入水缸的事件數量的確能夠緩解上下游流速不均衡的問題, 可是力度還不夠, 咱們再來看一段代碼:code
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.sample(2, TimeUnit.SECONDS) //sample取樣
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});複製代碼
這裏用了一個sample
操做符, 簡單作個介紹, 這個操做符每隔指定的時間就從上游中取出一個事件發送給下游. 這裏咱們讓它每隔2秒取一個事件給下游, 來看看此次的運行結果吧:cdn
此次咱們能夠看到, 雖然上游仍然一直在不停的發事件, 可是咱們只是每隔必定時間
取一個放進水缸裏, 並無所有放進水缸裏, 所以此次內存僅僅只佔用了5M.
你們之後能夠出去吹牛逼了: 我曾經經過技術手段去優化一個程序, 最終使得內存佔用從300多M變成不到5M. ~(≧▽≦)/~
前面這兩種方法歸根到底其實就是減小放進水缸的事件的數量, 是以數量
取勝, 可是這個方法有個缺點
, 就是丟失了大部分的事件
.
那麼咱們換一個角度來思考, 既然上游發送事件的速度太快, 那咱們就適當減慢發送事件的速度, 從速度
上取勝, 聽上去不錯, 咱們來試試:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
Thread.sleep(2000); //每次發送完事件延時2秒
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "" + integer);
}
});複製代碼
此次咱們讓上游每次發送完事件後都延時了2秒, 來看看運行結果:
完美 ! 一切都是那麼完美 !
能夠看到, 咱們給上游加上延時了以後, 瞬間一頭髮情的公牛就變得跟只小綿羊同樣, 如此溫順, 如此平靜, 如此平穩的內存線, 美妙極了. 並且事件也沒有丟失
, 上游
經過適當的延時
, 不但減緩了
事件進入水缸的速度
, 也可讓下游
有充足的時間
從水缸裏取出事件來處理 , 這樣一來, 就不至於致使大量的事件涌進水缸, 也就不會OOM啦.
到目前爲止, 咱們沒有依靠任何其餘的工具, 就輕易解決了上下游流速不均衡的問題.
所以咱們總結一下, 本節中的治理的辦法就兩種:
你們必定沒忘記, 在上一節還有個Zip的例子, 這個例子也爆了咱們的內存, 現學現用, 咱們用剛學到的辦法來試試能不能懲奸除惡, 先來看看第一種辦法.
先來減小進入水缸的事件的數量:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io()).sample(2, TimeUnit.SECONDS); //進行sample採樣
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.w(TAG, throwable);
}
});複製代碼
來試試運行結果吧:
哈哈, 成功了吧, 再來用第二種辦法試試.
此次咱們來減緩速度:
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
Thread.sleep(2000); //發送事件以後延時2秒
}
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.w(TAG, throwable);
}
});複製代碼
來看看運行結果吧:
果真也成功了, 這裏只打印出了下游收到的事件, 因此只有一個. 若是你對這個結果看不懂, 請自覺掉頭看前面幾篇文章.
經過本節的學習, 你們應該對如何處理上下游流速不均衡已經有了基本的認識了, 你們也能夠看到, 咱們並無使用Flowable
, 因此不少時候仔細去分析問題, 找到問題的緣由, 從源頭去解決纔是最根本的辦法. 後面咱們講到Flowable
的時候, 你們就會發現它其實沒什麼神祕的, 它用到的辦法和咱們本節所講的基本上是同樣的, 只是它稍微作了點封裝.
好了, 今天的教程就到這裏吧, 下一節中咱們就會來學習大家喜聞樂見的Flowable
.