Android進階系列之第三方庫知識點整理。java
知識點總結,整理也是學習的過程,若有錯誤,歡迎批評指出。web
上一篇:Rxjava2(一)、基礎概念及使用緩存
直接開整,上一篇基礎概念裏面說了,rxjava2
擴展於觀察者模式,咱們上篇的只是簡單的介紹了用Observable
來建立使用,其實rxjava2
給咱們提供了五種觀察者模式的建立方式。app
可以發射0或n個數據,並以成功或錯誤事件終止,在第一篇中已經舉例說明了,這裏就再也不詳細說明。異步
可以發射0或n個數據,並以成功或錯誤事件終止。 支持背壓,能夠控制數據源發射的速度。ide
咱們看到Observable
和Flowable
這兩個的區別就是後者支持背壓,那麼何爲背壓?post
背壓是一種現象,簡單來講就是在異步操做中,上游發送數據速度快於下游處理數據的速度,下游來不及處理,Buffer 溢出,致使事件阻塞,從而引發的各類問題,好比事件丟失,OOM等。學習
在rxjava1
中並不支持背壓,當出現事件阻塞時候,會直接拋出 MissingBackpressureException
異常,可是在rxjava2
中,提供了 Flowable
來建立被觀察者,經過Flowable
來處理背壓問題,咱們能夠簡單經過demo分析。spa
A:咱們上游模擬循環發送數據。線程
B:線程切換,異步操做。
C:下游每隔一秒獲取數據。
咱們Observable
建立,來模擬了背壓這個現象,咱們在上游模擬無限循環的發送數據,下游每次都休眠一秒再獲取數據,這樣確定會形成咱們前面提的問題,就是上游發送太他丫的快了,下游根本處理不過來,咱們先看結果。
看日誌,打印結果停留在了13就沒有繼續打印了?同時能夠看到程序已經崩了,是由於在rxjava2
中,Observable
並不支持背壓操做,遇到背壓問題,它並不會報錯,也不會拋MissingBackpressureException
異常,可是內存會一直飆高,最後致使內存不足程序直接掛掉。
能夠看到內存一直在往上飆,針對背壓這種現象,rxjava2
中提出用 Flowable
來處理。
下面由淺入深,慢慢揭開Flowable
的神祕面紗。
咱們先用Flowable
建立一個基本的demo:
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
emitter.onNext("事件一");
LogUtil.d(TAG + "--subscribe 發送事件一");
emitter.onNext("事件二");
LogUtil.d(TAG + "--subscribe 發送事件二");
emitter.onNext("事件三");
LogUtil.d(TAG + "--subscribe 發送事件三");
emitter.onNext("事件四");
LogUtil.d(TAG + "--subscribe 發送事件四");
emitter.onComplete();
LogUtil.d(TAG + "--subscribe 發送完成");
}
}, BackpressureStrategy.ERROR) // 這裏須要傳入背壓策略,跟線程池裏面飽和策略相似,當緩存區存滿時候採起的處理策略
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()) // 線程切換,異步操做
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
// 決定觀察者能接收多少個事件,多餘事件放入緩存區
// Flowable 默認緩存區大小爲128,即最大能存放128個事件
s.request(3);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t.getLocalizedMessage());
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
複製代碼
能夠看到
Flowable
建立和Observable
基本差很少,只是在create
方法中多傳入BackpressureStrategy.ERROR
這麼一個背壓策略,這個後面會詳講。在
onSubscribe
的回調中,參數變成了Subscription
,咱們能夠經過這個參數,讓觀察者本身設置要接收多少個事件,若是發送的事件大於觀察者設置接收的事件,多餘事件將會存入Flowable
緩存區中。
Flowable
緩存區隊列大小隻能存放128個事件,若是超過,就會報異常。
結果:
發送四個事件,觀察者經過
Subscription.request(3)
設置只接收三個事件,因此下游只接收三個,剩下一個放入Flowable
緩存區中。
若是咱們觀察者不設置Subscription.request(x)
,即不接收事件,被觀察者仍然會發送事件,並存入緩存區中,觀察者能夠動態調用Subscription.request(x)
方法來獲取事件。
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int x = 0; x <= 10; x++) {
LogUtil.d(TAG + "--subscribe 發送了" + x + "個事件");
emitter.onNext(x + "事件");
}
}
}, BackpressureStrategy.ERROR)
// 線程切換,異步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
subscription = s;
// s.request(3); 這裏不指定觀察者接收事件個數
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t.getLocalizedMessage());
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
複製代碼
動態獲取
findViewById(R.id.bt_get_event).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
if (subscription != null) {
LogUtil.d(TAG + "--onClick");
subscription.request(4);
}
}
});
複製代碼
能夠看到咱們觀察者一開始並無指定接收多少個事件,而是經過外接點擊事件,來動態設置接收事件個數,咱們看結果,當點擊觸發後,咱們收到了最早存入隊列的四個事件。
結果:
咱們前面提到,Flowable
默認的緩存區隊列大小爲128,即只能存放上游發送的128個事件,若是上游發送的事件超過128,就須要咱們指定相應的背壓策略來作不一樣的處理,BackpressureStrategy
爲咱們提供了五種背壓策略。
整理以下:
策略 | 做用 |
---|---|
MISSING | 當緩存區大小存滿(128),被觀察者仍然繼續發送下一個事件時,拋出異常MissingBackpressureException , 提示緩存區滿了 |
ERROR | 當緩存區大小存滿(128)(默認緩存區大小128),被觀察者仍然繼續發送下一個事件時,直接拋出異常MissingBackpressureException |
BUFFER | 當緩存區大小存滿(128),被觀察者仍然繼續發送下一個事件時,緩存區大小設置無限大, 即被觀察者可無限發送事件,但其實是存放在緩存區 |
DROP | 當緩存區大小存滿,被觀察者仍然繼續發送下一個事件時, 超過緩存區大小(128)的事件會被所有丟棄 |
LATEST | 當緩存區大小存滿,被觀察者仍然繼續發送下一個事件時,只保存最新/最後發送的事件, 其餘超過緩存區大小(128)的事件會被所有丟棄 |
當緩存區大小存滿(128),被觀察者仍然繼續發送下一個事件時,拋出異常MissingBackpressureException
, 提示緩存區滿了
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 發送129個事件,模擬超出緩存區
for (int x = 0; x < 129; x++) {
emitter.onNext(x + "事件");
LogUtil.d(TAG + "--subscribe 發送了" + x + "個事件");
}
}
}, BackpressureStrategy.MISSING) // 使用BackpressureStrategy.MISSING背壓策略
// 線程切換,異步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
複製代碼
咱們使用BackpressureStrategy.MISSING背壓策略,觀察者接收request(Integer.MAX_VALUE),此值也爲推薦值。
結果:
咱們看到,當發送了128個事件後,再發送第129個事件時候,拋了MissingBackpressureException
異常,並且咱們設置了觀察者接收也未接收到數據,說明是先存入緩存區隊列,再發送,當緩存區中拋異常後,就中止了onNext()
事件,咱們能夠驗證一下,當設置被觀察者發送128
事件。
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// ******* 發送128個事件 ********
for (int x = 0; x < 128; x++) {
emitter.onNext(x + "事件");
LogUtil.d(TAG + "--subscribe 發送了" + x + "個事件");
}
}
}, BackpressureStrategy.MISSING)
// 線程切換,異步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
複製代碼
就是在上面demo的基礎上,改了發送的事件個數,上游發送128個事件,恰好爲緩存區大小,並不拋異常。
結果:
咱們看到程序沒有拋異常,而且正常打印了緩存區中的128個數據(從0開始),能夠印證兩點
一、緩存區大小確實爲128
二、先存入緩存區後再獲取(若是異常,
onNext
直接不調用)
當緩存區大小存滿(128)(默認緩存區大小128),被觀察者仍然繼續發送下一個事件時,直接拋出異常MissingBackpressureException
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 發送129個事件,模擬超出緩存區
for (int x = 0; x < 129; x++) {
emitter.onNext(x + "事件");
LogUtil.d(TAG + "--subscribe 發送了" + x + "個事件");
}
}
}, BackpressureStrategy.ERROR)
// 線程切換,異步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
複製代碼
使用 BackpressureStrategy.ERROR 背壓策略
結果:
跟Missing同樣,直接拋了MissingBackpressureException
異常且下游未接收到數據,同理,若是上游發送數據小於等於128,正常發送和接收。
當緩存區大小存滿(128),被觀察者仍然繼續發送下一個事件時,緩存區大小設置無限大, 即被觀察者可無限發送事件,但其實是存放在緩存區。
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 發送129個事件,模擬超出緩存區
for (int x = 0; x < 129; x++) {
emitter.onNext(x + "事件");
LogUtil.d(TAG + "--subscribe 發送了" + x + "個事件");
}
}
}, BackpressureStrategy.BUFFER)
// 線程切換,異步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
複製代碼
使用 BackpressureStrategy.BUFFER 背壓策略
更改緩存區大小,不作限制。
結果:
能夠看到,咱們發送的129個事件所有發送且接收到了。
當緩存區大小存滿,被觀察者仍然繼續發送下一個事件時, 超過緩存區大小(128)的事件會被所有丟棄
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 發送129個事件,模擬超出緩存區
for (int x = 0; x < 129; x++) {
emitter.onNext(x + "事件");
LogUtil.d(TAG + "--subscribe 發送了" + x + "個事件");
}
}
}, BackpressureStrategy.DROP)
// 線程切換,異步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
複製代碼
使用 BackpressureStrategy.DROP 背壓策略
丟掉大於緩存區的事件。
結果:
結果很明瞭,並無拋異常同時也正常打印了,可是超過緩存區的那個事件被拋棄,並無獲取到。
當緩存區大小存滿,被觀察者仍然繼續發送下一個事件時,只保存最新/最後發送的事件, 其餘超過緩存區大小(128)的事件會被所有丟棄
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 發送150個事件
for (int x = 0; x < 150; x++) {
emitter.onNext(x + "事件");
LogUtil.d(TAG + "--subscribe 發送了" + x + "個事件");
}
}
}, BackpressureStrategy.LATEST)
// 線程切換,異步操做
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
複製代碼
使用 BackpressureStrategy.LATEST 背壓策略
發送了150個事件
當超出128時,會保存最新的一個事件,即會接收129個事件。
結果:
咱們能夠看到,觀察者端接收到129個數據,分別爲緩存區內數據,加上最新/最後一條數據,中間數據均被丟棄。
前面說過,背壓前提是異步操做下,在同步下,咱們並不會有背壓一說,由於在同一個線程,發送數據後老是要等下游處理了纔會發送第二條數據,不會存在緩衝區,以下:
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
LogUtil.d(TAG + "--subscribe 發送事件一");
emitter.onNext("事件一");
LogUtil.d(TAG + "--subscribe 發送事件二");
emitter.onNext("事件二");
LogUtil.d(TAG + "--subscribe 發送事件三");
emitter.onNext("事件三");
LogUtil.d(TAG + "--subscribe 發送完成");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(3);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
複製代碼
結果:
能夠看到,事件都是順序執行,發送一條接收一條,而後再執行下一條。
可是,咱們可能會遇到這個一個狀況,當上遊發送了四條數據,可是下游只接收三條?咱們改一下demo以下:
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
LogUtil.d(TAG + "--subscribe 發送事件一");
emitter.onNext("事件一");
LogUtil.d(TAG + "--subscribe 發送事件二");
emitter.onNext("事件二");
LogUtil.d(TAG + "--subscribe 發送事件三");
emitter.onNext("事件三");
LogUtil.d(TAG + "--subscribe 發送事件四");
emitter.onNext("事件四");
LogUtil.d(TAG + "--subscribe 發送完成");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
s.request(3);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.d(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
複製代碼
能夠看到,被觀察者發送了四個事件,可是觀察者只接收了三條。
結果:
能夠看到,一樣拋了MissingBackpressureException
異常
這裏可使用BUFFER的背壓策略來處理,可是咱們爲了說明觀察者反向控制被觀察者,咱們採用以下方案:
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// 經過emitter.requested()獲取觀察者設置的接收的事件數目
long requested = emitter.requested();
LogUtil.d(TAG + "--subscribe 觀察者設置接收的事件數目:" + requested);
for (int x = 0; x < requested; x++) {
LogUtil.d(TAG + "--subscribe 發送事件" + x);
emitter.onNext("發送事件" + x);
}
LogUtil.d(TAG + "--subscribe 發送完成");
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
LogUtil.d(TAG + "--onSubscribe");
// 設置觀察者接收事件數目爲3
s.request(3);
}
@Override
public void onNext(String s) {
LogUtil.d(TAG + "--onNext 接收到:" + s);
}
@Override
public void onError(Throwable t) {
LogUtil.e(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.d(TAG + "--onComplete");
}
});
複製代碼
咱們在
subscribe
中經過emitter.requested()
獲取觀察者中設置的接收事件數目,來動態的發送數據,這樣就避免了上下游數據不一樣步問題。
結果:
咱們前面都是經過create來建立Flowable
,能夠在Create
第二個參數中傳入相應的背壓策略,Flowable
全部的操做符都支持背壓,可是經過操做符建立的背壓策略默認爲BackpressureStrategy.ERROR,咱們能夠經過
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()
三種方式來指定相應的背壓策略。
Flowable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
subscription = s;
s.request(Long.MAX_VALUE); //默承認以接收Long.MAX_VALUE個事件
}
@Override
public void onNext(Long aLong) {
LogUtil.i(TAG + "--onNext aLong=" + aLong);
try {
// 延時一秒接收
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
LogUtil.e(TAG + "--onError error=" + t);
}
@Override
public void onComplete() {
LogUtil.i(TAG + "--onComplete");
}
});
複製代碼
這裏咱們經過 interval
來建立Flowable
,能夠看到下游每一毫秒發送一條數據,下游一秒處理一條,上游明顯快於下游,處理不過來數據放入緩存池中,當緩存池中隊列滿時,就會拋異常,由於其默認的背壓策略爲BackpressureStrategy.ERROR
結果:
咱們能夠經過onBackpressureXXX
其指定相應的背壓策略。
結果:
當咱們指定背壓策略爲BUFFER後,能夠看到並無異常拋出,程序一直在打印輸出。
只發射單個數據或錯誤事件。
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> emitter) throws Exception {
// 只能發送onSuccess或者onError,發射多條數據,只接受第一條
emitter.onSuccess("Success");
emitter.onError(new NullPointerException(""));
}
}).subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
LogUtil.d(TAG + "--onSubscribe");
}
@Override
public void onSuccess(String s) {
LogUtil.d(TAG + "--onSuccess s=" + s);
}
@Override
public void onError(Throwable e) {
LogUtil.e(TAG + "--onError error=" + e.getMessage());
}
});
複製代碼
SingleEmitter
發射器只能發送一條onSuccess
或者onError
數據,若是發射器發射多條數據,觀察者只能接收到第一條數據。
結果:
不發射數據,只處理 onComplete 和 onError 事件。
方法
onComplete
與onError
只可調用一個,同時調用,第一個生效。
可以發射0或者1個數據,要麼成功,要麼失敗。有點相似於Optional。
onSuccess
方法一次訂閱只能發送一次。方法
onComplete
與onError
只可調用一個,同時調用,第一個生效。