Android技能樹系列:java
Android基礎知識git
Android技能樹 — 動畫小結github
Android技能樹 — Android存儲路徑及IO操做小結網絡
數據結構基礎知識ide
算法基礎知識
Rx系列相關
Android技能樹 — Rxjava取消訂閱小結(1):自帶方式
Android技能樹 — Rxjava取消訂閱小結(2):RxLifeCycle
如今不少項目都在使用Rxjava了,對於RxJava的使用,估計都很熟悉了,可是不少人在使用RxJava的時候容易產生內存泄漏問題,好比咱們在用RxJava配合Retrofit的時候,發出請求出去,拿到數據後咱們可能會去刷新界面,可是若是這時候網絡比較差,返回比較慢,而咱們的Activity這時候關閉了,那RxJava當拿到返回的數據的時候去刷新界面就會報空指針異常了。因此咱們當Activity關閉的時候,咱們這時候若是RxJava還沒執行完,咱們應該取消訂閱。
經常使用的主要三種方式:(按照⭐️推薦從低到高來介紹)
本文主要講解RxLifeCycle方式。
這裏確定不會簡單的介紹如何使用RxLifeCycle,github上面已經寫得很清楚了,RxLifecycle github連接,咱們主要是看具體的實現原理。
簡單使用:
假設咱們的Activity是繼承RxActivity (PS: 不是必定要繼承的,只是說明一種使用狀況,具體能夠看GitHub)
//手動設定解除訂閱的時間:(ps:這裏設爲onPause的時候解除訂閱)
myObservable
.compose(this.bindUntilEvent(ActivityEvent.PAUSE))
.subscrbe();
/**
自動設定解除訂閱的時間:
(ps:好比你是在onStart時候訂閱,則自動會在onPause時候解除,
若是在onCreate時候訂閱,則會自動在onDestory時候解除)
*/
myObservable
.compose(this.bindToLifecycle())
.subscribe();
複製代碼
在介紹RxLifeCycle以前,先介紹一些基礎知識,加深你們的理解。
咱們知道在RxBus中咱們使用的是Subject ,由於它既能夠是觀察者又是被觀察者。而Subject有不少種類:子類有PublishSubject、BehaviorSubject、ReplaySubject、AsyncSubject、SerializedSubject。
具體每種的區別能夠看:RxJava中常見的幾種Subject
這裏咱們主要講解BehaviorSubject。
Subject that emits the most recent item it has observed and all subsequent observed items to each subscribed Observer.
大意是BehaviorSubject會發送離訂閱最近的上一個值,沒有上一個值的時候會發送默認值(若是有的話)。
正好上面講到了Subject,順帶提一下冷熱Observable。和RxLifeCycle關係不大,可是能夠當了解,不想看的能夠跳過 1. 2 基礎知識。
所謂的冷熱和咱們單例模式中的餓漢式和飽漢式有一點點像,冷Observable須要有訂閱者的時候纔開始發射數據(有點像飽漢式),熱Observable並非必定須要訂閱者纔開始發射數據(有點像餓漢式)。
PS: 你們也能夠具體參考文章擁抱 RxJava(三):關於 Observable 的冷熱,常見的封裝方式以及誤區,一些圖片及說明我這邊也直接引用該文章。
咱們常見的工廠方法提供的都是Cold Observable,包括just(),fromXX,create(),interval(),defer()。 他們有訂閱者的時候纔會發射數據,而且他們的共同點是當你有多個Subscriber的時候,他們的事件是獨立的。
Observable interval = Observable.interval(1,TimeUnit.SECONDS);
複製代碼
不一樣於Cold Observable, Hot Observable是共享數據的。對於Hot Observable的全部subscriber,他們會在同一時刻收到相同的數據。咱們一般使用publish()操做符來將Cold Observable變爲Hot。或者咱們在RxBus中經常用到的Subjects 也是Hot Observable。
而Hot Observable不須要有訂閱者,只須要調用connect()
方法就會開始發送數據,這時候當其餘訂閱這個Observable的時候,並不會從頭開始接受數據。
而經常使用的Hot Observable 是 ConnectableObservable。
咱們能夠看到takeUtil操做符的功能: 在第二個Observable發射一個內容或終止後放棄第一個Observable發射的內容。
因此咱們立刻就能夠想到假設第一個是咱們的網絡請求接口的Observable , 而後經過takeUntil綁定了一個其餘的Observable , 好比咱們是要在onDestory時候取消訂閱,那隻須要在onDestory方法裏面使第二個Observable發送一個內容便可。
就如同字面意思,起到過濾做用,你寫一個條件,只有符合條件的發送信息纔會被接收到。
observable.filter(new Predicate<R>() {
@Override
public boolean test(R r) throws Exception {
//根據過濾條件,來決定返回是false/true
return false/true;
}
});
複製代碼
這個方法在Rxjava 1 裏面叫作asObservable() 。可能不少人沒用過,主要仍是用在Subject。
好比你寫了一個Subject,你想暴露出去一個接口讓別人使用,你可能會這麼寫:
public class Test {
//假設是BehaviorSubject
private BehaviorSubject subject = BehaviorSubject.create();
//把Observable這塊方面經過方法分享出去,可是又不想整個Subject都分享出去。
public Observable getObservable(){
return ((Observable) subject);
}
//好比你調用play方法,按照要求只能發送1,2,3
public void play(){
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
}
}
複製代碼
可是這麼寫沒啥卵用,只要獲取後強制轉換就能夠:
//又能夠發送相關數據
((BehaviorSubject) getObservable()).onNext(99999);
複製代碼
因此這時候須要使用asObservable方法了:這實際上只是將您的主題封裝在一個可觀察的對象中,這使得消費代碼沒法將其轉換回主題,asObservable是隱藏實現細節的一種防護機制。
//改變暴露的方法:
public Observable getObservable(){
return subject.asObservable();
}
//這時候就算你強轉也沒用,會報錯,由於這時候經過asObservable獲取到的對象已經不是Subject對象了。
((BehaviorSubject) getObservable()).onNext(99999);
複製代碼
而在Rxjava 2 中只是把這個asObservable 方法改爲了 hide方法而已。用法相同。
Tramsformer有不少種:ObservableTransformer,FlowableTransformer,SingleTransformer,MaybeTransformer,CompletableTransformer。
咱們這裏已ObservableTransformer爲例:
ObservableTransformer其實能夠理解爲Observable 轉換器:能夠經過它將一種類型的Observable轉換成另外一種類型的Observable。
好比日常時候每一個observable咱們都須要寫上這段代碼::
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(XXXXX);
複製代碼
明明知道大部分的observable要使用的是
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
複製代碼
因此有些人就會想到我寫一個方法:
public Observable defaultSet(Observable ob){
return ob.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
複製代碼
固然這麼寫是沒問題,可是你每一個請求都要求用defaultSet方法包住了:
defaultSet(observable)
.subScribe(XXXXXX);
複製代碼
沒有了鏈式調用的清爽了,因此這時候ObservableTransformer 就出現了:
public class Transformer {
public static <T> ObservableTransformer<T, T> switchSchedulers() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
}
複製代碼
這時候咱們只須要調用:
observable.compose(Transformer.<Object>switchSchedulers()).subScribe(XXXX);
複製代碼
因此咱們知道了,咱們想把一個Observable轉變成另一個Observable可使用ObservableTransformer。
兩個Observable發射,合併每一個Observable發射的最新內容,而後發出去,看下面的圖片就很清楚。
take操做符:
只發出Observable發出的前n個item。
skip操做符: 壓制Observable發出的前n個item。
經過對每一個item應用函數來轉換Observable發出的item
在Observable發射數據時,有時發送onError通知,致使觀察者不能正常接收數據。但是,有時咱們但願對Observable發射的onError通知作出響應或者從錯誤中恢復。
具體主要有三種不一樣操做符來實現:
具體描述能夠參考:RxJava之錯誤處理
咱們已Activity中取消訂閱爲例:
RxActivity.java(代碼說明具體查看源碼裏面的備註):
public abstract class RxActivity extends Activity implements LifecycleProvider<ActivityEvent> {
//建立一個BehaviorSubject,用來作takeUntil中的第二個Observable,讓其在覈實的生命週期發送信息。
private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();
@Override
@NonNull
@CheckResult
public final Observable<ActivityEvent> lifecycle() {
//使用hide()方法把這個subject暴露出去
return lifecycleSubject.hide();
}
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
}
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindToLifecycle() {
return RxLifecycleAndroid.bindActivity(lifecycleSubject);
}
@Override
@CallSuper
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
lifecycleSubject.onNext(ActivityEvent.CREATE);
}
@Override
@CallSuper
protected void onStart() {
super.onStart();
//在onStart時候發送信息
lifecycleSubject.onNext(ActivityEvent.START);
}
@Override
@CallSuper
protected void onResume() {
super.onResume();
//在onResume時候發送信息
lifecycleSubject.onNext(ActivityEvent.RESUME);
}
@Override
@CallSuper
protected void onPause() {
//在onPause時候發送信息
lifecycleSubject.onNext(ActivityEvent.PAUSE);
super.onPause();
}
@Override
@CallSuper
protected void onStop() {
//在onStop時候發送信息
lifecycleSubject.onNext(ActivityEvent.STOP);
super.onStop();
}
@Override
@CallSuper
protected void onDestroy() {
//在onDestroy時候發送信息
lifecycleSubject.onNext(ActivityEvent.DESTROY);
super.onDestroy();
}
}
複製代碼
同時咱們也注意到一個小細節: 在onCreate , onStart , onResume的時候,都是先調用super.XXX, 而後再用subject 發送相關Event;可是在 onPause , onStop , onDestory 裏面倒是先用subject 發送相關Event,而後再調用super.XXXX。爲啥會有這個區別。由於通常取消訂閱都是在onPause,onStop,onDestory情形下,因此優先先取消訂閱,再去執行系統本身的操做。好比onDestory,先去取消訂閱,再去執行super.onDestory方法。
咱們先來說解手動設定某個生命週期做爲取消訂閱,咱們知道主要是使用:
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
}
複製代碼
咱們經過上面的基礎知識,應該知道咱們的目的是把咱們本身的Obsevable和RxActivity裏面的BehaviorSubject經過takeUntil綁定在一塊兒,由於RxActivity裏面全部的生命週期都發送了相應的ActivityEvent事件,因此咱們須要使用filter來過濾掉不是咱們關心的生命週期事件 ,最後經過ObservableTransformer來把咱們的Observable進行轉換成這個合成好的《Observable & BehaviorSubject》。
因此咱們總體思路知道了,咱們來具體看bindUntilEvent源碼:
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bindUntilEvent(@Nonnull final Observable<R> lifecycle, @Nonnull final R event) {
checkNotNull(lifecycle, "lifecycle == null");
checkNotNull(event, "event == null");
return bind(takeUntilEvent(lifecycle, event));
}
複製代碼
能夠看到主要是bind和 takeUntilEvent二個方法,咱們先看takeUntilEvent方法:
private static <R> Observable<R> takeUntilEvent(final Observable<R> lifecycle, final R event) {
return lifecycle.filter(new Predicate<R>() {
@Override
public boolean test(R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(event);
}
});
}
複製代碼
由於咱們前面提過,咱們在生命週期中都會讓subject發送相應的ActivityEvent事件,因此咱們這裏只是把這個subject經過filter過濾,而後只發送咱們指定的生命週期。
咱們再來看bind方法,這時候就知道bind方法的目的是爲了幫咱們的Observable和這個已經使用過filter的subject進行綁定並返回:
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bind(@Nonnull final Observable<R> lifecycle) {
return new LifecycleTransformer<>(lifecycle);
}
複製代碼
返回了LifecycleTransformer:
public final class LifecycleTransformer<T> implements
ObservableTransformer<T, T>,
FlowableTransformer<T, T>,
SingleTransformer<T, T>,
MaybeTransformer<T, T>,
CompletableTransformer
複製代碼
咱們能夠看到LifecycleTransformer實現了不少Transformer,由於這樣咱們使用Observable或者Single等均可以來進行轉換。好比咱們是Observable,那咱們就會調用LifecycleTransformer裏面實現的的ObservableTransformer對應的apply方法:
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.takeUntil(observable);
}
複製代碼
咱們看到果真調用了takeUntil,把咱們的Observable經過takeUntil與已經處理好指定ActivityEvent的subject進行綁定。
最終咱們只須要:
myObservable.compose(bindUntilEvent(ActivityEvent.PAUSE));
複製代碼
自動取消訂閱代碼:
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindToLifecycle() {
return RxLifecycleAndroid.bindActivity(lifecycleSubject);
}
複製代碼
咱們能夠看到,大體其實和手動指定生命週期的是同樣的,惟一的區別就是咱們要根據咱們設置訂閱事件的生命週期推算出相對於的取消訂閱生命週期。
咱們來看bindActivity源碼:
@NonNull
@CheckResult
public static <T> LifecycleTransformer<T> bindActivity(@NonNull final Observable<ActivityEvent> lifecycle) {
return bind(lifecycle, ACTIVITY_LIFECYCLE);
}
複製代碼
仍是老樣子,bind最後確定是返回一個LifecycleTransformer:
@Nonnull
@CheckReturnValue
public static <T, R> LifecycleTransformer<T> bind(@Nonnull Observable<R> lifecycle,
@Nonnull final Function<R, R> correspondingEvents) {
checkNotNull(lifecycle, "lifecycle == null");
checkNotNull(correspondingEvents, "correspondingEvents == null");
return bind(takeUntilCorrespondingEvent(lifecycle.share(), correspondingEvents));
}
複製代碼
先看takeUntilCorrespondingEvent方法:
private static <R> Observable<Boolean> takeUntilCorrespondingEvent(final Observable<R> lifecycle,
final Function<R, R> correspondingEvents) {
return Observable.combineLatest(
lifecycle.take(1).map(correspondingEvents),
lifecycle.skip(1),
new BiFunction<R, R, Boolean>() {
@Override
public Boolean apply(R bindUntilEvent, R lifecycleEvent) throws Exception {
return lifecycleEvent.equals(bindUntilEvent);
}
})
.onErrorReturn(Functions.RESUME_FUNCTION)
.filter(Functions.SHOULD_COMPLETE);
}
複製代碼
咱們先來看combineLatest裏面的二個Observable:
lifecycle.take(1).map(correspondingEvents):
好比咱們在oncreate裏面註冊了訂閱,咱們這時候就要告訴系統咱們要在onDestory裏面進行取消訂閱,因此咱們要先take(1)獲取第一個(由於onstart,onresume等都會發送相應的ActivityEvent),而後經過map操做符來轉換成相對的ActivityEvent:private static final Function<ActivityEvent, ActivityEvent> ACTIVITY_LIFECYCLE =
new Function<ActivityEvent, ActivityEvent>() {
@Override
public ActivityEvent apply(ActivityEvent lastEvent) throws Exception {
switch (lastEvent) {
case CREATE:
return ActivityEvent.DESTROY;
case START:
return ActivityEvent.STOP;
case RESUME:
return ActivityEvent.PAUSE;
case PAUSE:
return ActivityEvent.STOP;
case STOP:
return ActivityEvent.DESTROY;
case DESTROY:
throw new OutsideLifecycleException("Cannot bind to Activity lifecycle when outside of it.");
default:
throw new UnsupportedOperationException("Binding to " + lastEvent + " not yet implemented");
}
}
};
複製代碼
因此總結就是第一個Observable用來記錄咱們等會要在那個生命週期去取消訂閱。
因此總結第二個Observable用來實時發送生命週期的事件。
而後經過combineLatest把二個綁定一塊兒,這時候就會在指定的生命週期時候就會發送true,其他時候發送false,最後配合filter操做符,只有在true的時候才能發送便可。這樣最終經過takeUntil再把咱們的Observable綁定在一塊兒,而後這時候這裏發送true的時候,咱們的Observable就會取消訂閱了。
有些人會問,爲何我使用了RxLifeCycle,就算到了相應生命週期了,仍是會調用onComplete方法,由於有些人可能在這個方法裏面有相應邏輯處理代碼。由於RxLifeCycle主要使用的是takeUntil,因此最後仍是會執行onComplete,若是想取消訂閱的時候不調用這個,仍是能夠直接使用原生的Disposable來進行取消訂閱。
Why Not RxLifecycle?。這文章的做者就是 RxLifeCycle 的做者 ,說了使用RxLifeCycle會遇到一些窘境 ,而是推薦了AutoDispose: Automatic binding+disposal of RxJava 2 streams.,這是Uber公司的開源Rxjava取消訂閱。而RxLifeCycle做者也參與其中,因此一些設計方式也很像,AutoDipose主要是配合了Android的LifeCycle組件。
emmmmmm.......請多多指教。