Android RxLife 一款輕量級別的RxJava生命週期管理庫(一)

簡介

RxLife是一款輕量級別的RxJava生命週期管理庫,代碼侵入性極低,隨用隨取,不須要作任何準備工做,支持在Activity/Fragment 的任意生命週期方法斷開管道。java

原理

RxLife經過Jetpack 下的 Lifecycle 獲取 Activity/Fragment 的生命週期變化,並經過Observable.lift(ObservableOperator) 操做符,注入本身實現的Observer對象(該對象能感知 Activity/Fragment的生命週期變化),從而在onSubscribe(Disposable d)方法中拿到Disposable對象,隨後在相應的生命週期回調裏執行Disposable.dispose()方法斷開管道,這樣就能將lift操做符上面的全部Disposable對象所有斷開。git

爲何要重複造輪子

熟悉RxJava的同窗應該都知道trello/RxLifecycle 項目,它在目前的3.0.0版本中經過Lifecycle感知Activity/Fragment 的生命週期變化,並經過BehaviorSubject類及composetakeUntil操做符來實現管道的中斷,這種實現原理有一點不足的是,它在管道斷開後,始終會往下游發送一個onComplete事件,這對於在onComplete事件中有業務邏輯的同窗來講,無疑是致命的。那爲何會這樣呢?由於takeUntil操做符內部實現機制就是這樣的,有興趣的同窗能夠去閱讀takeUntil操做符的源碼,這裏不展開。而RxLife就不會有這樣問題,由於在原理上RxLife就與trello/RxLifecycle不一樣,而且RxLife還在lift操做都的基礎上提供了一些額外的api,能有效的避免因RxJava內部類持有Activity/Fragment的引用,而形成的內存泄漏問題,下面開始講解。github

gradle依賴segmentfault

implementation 'com.rxjava.rxlife:rxlife:1.0.4'

源碼下載api

用法

Observable.timer(10, TimeUnit.SECONDS)
        //默認在onDestroy時中斷管道
        .lift(RxLife.lift(this))
        .subscribe(aLong -> {
            Log.e("LJX", "accept =" + aLong);
        });
//或者
Observable.timer(10, TimeUnit.SECONDS)
        //指定在onStop時中斷管道
        .lift(RxLife.lift(this,Event.ON_STOP))
        .subscribe(aLong -> {
            Log.e("LJX", "accept =" + aLong);
        });

在Activity/Fragment 中,使用Observable的lift()操做符,方法中傳入RxLife.lift(this),若是須要指定生命週期方法,額外再傳一個Event對象便可。怎麼樣??是否是極其簡單,根本不須要作任何準備工做,代碼侵入性極低。安全

處理內存泄漏

咱們來看一個案例app

public void leakcanary(View view) {
    Observable.timer(100, TimeUnit.MILLISECONDS)
            .map(new MyFunction<>()) //阻塞操做
            .lift(RxLife.lift(this))
            .subscribe(new Consumer<Long>() { //這裏使用匿名內部類,持有Activity的引用
                //注意這裏不能使用Lambda表達式,不然leakcanary檢測不到內存泄漏
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.e("LJX", "accept =" + aLong);
                }
            });
}

//這裏使用靜態內部類,不會持有外部類的引用
static class MyFunction<T> implements Function<T, T> {

    @Override
    public T apply(T t) throws Exception {
        //當dispose時,第一次睡眠會被吵醒,接着便會進入第二次睡眠
        try {
            Thread.sleep(3000);
        } catch (Exception e) {

        }

        try {
            Thread.sleep(30000);
        } catch (Exception e) {

        }
        return t;
    }
}

上面的代碼會形成Activity沒法回收,致使內存泄漏,咱們用Leakcannry工具來檢測一下,發現確實形成來內存泄漏,以下
在這裏插入圖片描述
咱們已經使用RxLife庫,會自動中斷管道,那爲何還會形成內存泄漏呢?其實緣由很簡單,咱們只是中斷了管道,而沒有中斷上游對下游引用。看上面的截圖就能知道,上游始終持有下游的引用,而最下游的匿名內部類Consumer又持有了Activity的引用,因此就致使了Activity沒法回收。框架

那爲何中斷管道時,不會中斷上下游的引用呢?ide

首先有一點咱們須要明確,調用Disposable.dispose()方法來斷開管道,並非真正意義上的將上游與下游斷開,它只是改變了管道上各個Observer對象的一個標誌位的值,咱們來看一下LambdaObserver類的源碼就會知道工具

@Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

呃呃,只有一行代碼,咱們繼續

public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get(); //此處獲得上游的Disposable對象
        Disposable d = DISPOSED;
        if (current != d) {
            current = field.getAndSet(d); //更改本身的標誌位爲DISPOSED
            if (current != d) {
                if (current != null) {
                    current.dispose();//關閉上游的Disposable對象
                }
                return true;
            }
        }
        return false;
    }

能夠看到,這裏只作了兩件事,一是更改本身的標誌位,二是調用上游的dispose()方法,其實你只要多看看,你就發現,RxJava內部大多數Observer在dispose()方法都會幹這兩件事。

到這,咱們該如何解決這個內存泄漏問題呢?其實,RxJava早就想到了這一點,它給咱們提供了一個onTerminateDetach()操做符,這個操做符會在onError(Throwable t)onComplete()dispose()這個3個時刻,斷開上游對下游的引用,咱們來看看源碼,源碼在ObservableDetach類中

@Override
public void dispose() {
    Disposable d = this.upstream;
    this.upstream = EmptyComponent.INSTANCE;//上游從新賦值
    this.downstream = EmptyComponent.asObserver();//下游從新賦值
    d.dispose();//調用上游的dispose()方法
}

@Override
public void onError(Throwable t) {
    Observer<? super T> a = downstream;
    this.upstream = EmptyComponent.INSTANCE;//上游從新賦值
    this.downstream = EmptyComponent.asObserver();//下游從新賦值
    a.onError(t); //調用下游的onError方法
}

@Override
public void onComplete() {
    Observer<? super T> a = downstream;
    this.upstream = EmptyComponent.INSTANCE;//上游從新賦值
    this.downstream = EmptyComponent.asObserver();//下游從新賦值
    a.onComplete();//調用下游的onComplete方法
}

到這,咱們就知道該怎麼作了,下面這樣寫就安全了

Observable.timer(100, TimeUnit.MILLISECONDS)
        .map(new MyFunction<>())//阻塞操做
        .onTerminateDetach() //管道斷開時,中斷上游對下游的引用
        .lift(RxLife.lift(this)) //默認在onDestroy時斷開管道
        .subscribe(aLong -> {
            Log.e("LJX", "accept =" + aLong);
        });

但是,每次都要這樣寫嗎?有沒有更簡單的,有,RxLife提供了RxLife.compose(LifecycleOwner)方法,內部就是將onTerminateDetachlift這兩個操做符整合在了一塊兒,接下來,看看如何使用

Observable.timer(100, TimeUnit.MILLISECONDS)
        .map(new MyFunction<>())//阻塞操做
         //注意這裏使用compose操做符
        .compose(RxLife.compose(this))//默認在onDestroy時中斷管道,並中斷下下游之間的引用
        .subscribe(aLong -> {
            Log.e("LJX", "accept =" + aLong);
        });

若是須要指定生命週期的方法,也能夠

Observable.timer(100, TimeUnit.MILLISECONDS)
        .map(new MyFunction<>())//阻塞操做
         //注意這裏使用compose操做符
        .compose(RxLife.compose(this, Event.ON_STOP))//指定在onStop時斷開管道
        .subscribe(aLong -> {
            Log.e("LJX", "accept =" + aLong);
        });
    }

大多數狀況下,咱們但願觀察者能主線程進行回調,也許你會這樣寫

Observable.timer(100, TimeUnit.MILLISECONDS)
        .map(new MyFunction<>())//阻塞操做
        .observeOn(AndroidSchedulers.mainThread()) //在主線程回調
        .compose(RxLife.compose(this, Event.ON_STOP))//指定在onStop回調時中斷管道,並中斷上下游引用
        .subscribe(aLong -> {
            Log.e("LJX", "accept =" + aLong);
        });

若是你是用RxLife的話,就能夠這樣寫,使用RxLife.composeOnMain方法

Observable.timer(100, TimeUnit.MILLISECONDS)
        .map(new MyFunction<>())//阻塞操做
        //在主線程進程回調,在onStop回調時中斷管道,並中斷上下游引用
        .compose(RxLife.composeOnMain(this, Event.ON_STOP))
        .subscribe(aLong -> {
            Log.e("LJX", "accept =" + aLong);
        });

RxLife類就只有6個靜態方法,以下
shshs

注意,前方高能預警!!!!!!!

結合RxLife使用Observable的liftcompose操做符時,下游除了subscribe操做符外最好不要有其它的操做符,前面講過,當調用Disposable.dispose()時,它會往上一層一層的調用上游的dispose()方法,若是下游有Disposable對象,是調用不到的,若是此時下游有本身的事件須要發送,那麼就沒法攔截了。
如:

Observable.just(1)
        .compose(RxLife.compose(this))
        .flatMap((Function<Integer, ObservableSource<Long>>) integer -> {
            //每隔一秒發送一個數據,共10個
            return Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS);
        })
        .subscribe(aLong -> {
            Log.e("LJX", "accept =" + aLong);
        });

這樣,即便Activity關閉了,觀察者每隔一秒後,依然能收到來自上游的事件,由於compose沒法切斷下游的管道,咱們改一下上面的代碼

Observable.just(1)
        .flatMap((Function<Integer, ObservableSource<Long>>) integer -> {
            //每隔一秒發送一個數據,共10個
            return Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS);
        })
        .compose(RxLife.compose(this))
        .subscribe(aLong -> {
            Log.e("LJX", "accept =" + aLong);
        });

這樣ok了,其實這不是RxLife的問題,使用鼎鼎大名的trello/RxLifecycle庫也是同樣的,由於RxJava的設計就是如此,上游拿不到下游的Disposable對象,因此,咱們在使用RxLife時,必定要注意在lift或者compose操做符的下游,除了subscribe操做符外最好不要有其它的操做符,這一點必定須要注意。

RxLife最新版本已經使用as操做符規避這個問題,詳情查看Android RxLife 一款輕量級別的RxJava生命週期管理庫(二)

小彩蛋

RxLife類裏面的life、compose系列方法,皆適用於Flowable、Observable、Single、Maybe、Completable這5個被觀察者對象,道理都同樣,這裏不在一一講解。

結尾

Ok,RxLife的使用基本就介紹完了,到這咱們會發現,使用RxLife庫,咱們只須要關注一個類便可,那便是RxLife類,api簡單功能卻強大。敢興趣的同窗,能夠去閱讀RxLife源碼,有疑問,請留言,我會在第一時間做答。

擴展

RxLife結合HttpSender發送請求,簡直不要太爽。

HttpSender詳情請點擊HttpSender OkHttp+RxJava超好用、功能超級強大的Http請求框架

相關文章
相關標籤/搜索