RxJava(十):Disposable 和 Transformer的使用

博客主頁java

1. Disposable

在 RxJav 1.x 中, Subscription 的接口能夠用來取消訂閱。react

public interface Subscription {

    /**
     * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
     * was received.
     * <p>
     * This allows deregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
     * onCompleted is called).
     */
    void unsubscribe();

    /**
     * Indicates whether this {@code Subscription} is currently unsubscribed.
     *
     * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
     */
    boolean isUnsubscribed();

}

在 Rx.Java l.x 中, Observable.subscribe() 方法會返回一個 Subscription 的對象。也就是說,
咱們每次訂閱都會返回 Subscription。Subscription 只需調用 unsubscribe 就能夠取消訂閱。android

Subscription 對象是被觀察者和訂閱者之間的紐帶。 RxJava 使用 Subscription 處理取消訂閱時,會中止整個調用鏈。若是使用了一串很複雜的操做符,則調用 unsubscribe() 將會在它當前執行的地方終止,而不須要作任何額外的工做。git

在 Android 開發中,咱們會在 Activity/Fragment 的 onDestroy 裏作一些釋放資源的事情,若是使用了 RxJava l.x ,則能夠用 Subscription.isUnsubscribed() 檢查 Subscription 是不是 unsubscribed。 若是是 unsubscribed ,則調用 Subscription.unsubscribe(), RxJava 會取消訂閱井通知給 Subscriber,井容許垃圾回收機制釋放對象,防止任何 RxJava 形成內存泄露。github

在 RxJava 2.0 以後, Subscription 被更名爲 Disposable。 RxJava 2.x 中因爲己經存在名爲
org.reactivestreams.Subscription 這個類(遵循 Reactive Streams 標準),爲了不名字衝突,因此將原先的 rx.Subscription 更名爲 io.reactivex.disposables.Disposablesegmentfault

public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();

    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}

Disposable disposable = Observable.just("hello, rxjava")
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

disposable.dispose();

1.1 CompositeDisposable

RxJava 1.x 中有一個複合訂閱( composite subscription )的概念。在 RxJava 2.x 中,RxJava
也內置了一個相似複合訂閱的容器 CompositeDisposable ,每當咱們獲得一個 Disposable 時,就
調用 CompositeDisposable.add(),將它添加到容器中,在退出的時候,調用
CompositeDisposable.clear()便可切斷全部的事件。在 Android 中,咱們常常會看到這樣的用法,
也就是會有一個針對 Activity 或者 Fragment 的 CompositeDisposable ,這樣就能夠在 onDestroy 或者其餘合適的地方取消訂閱。網絡

2. RxLifecycle 和 AutoDispose

在 Android 開發中,可使用 Disposable 來管理一個訂閱或者使用 CompositeDisposable 來管理多個訂閱,防止因爲沒有及時取消,致使 Activity/Fragment 沒法銷燬而引發內存泄露。然而,也有一些比較成熟的庫能夠作這些事情。app

2.1 RxLifecycle

GitHub 下載地址: https://github.com/trello/RxL...ide

RxLifecycle 是配合 Activity/Fragment 生命週期來管理訂閱的。因爲 RxJava Observable 訂閱後(調用了 subscribe 函數),通常會在後臺線程執行一些操做(好比訪問網絡請求數據),當後臺操做返回後,調用 Observer 的 onNext 等函數,而後再更新 UI 狀態。可是後臺線程請求是須要時間的,若是用戶單擊刷新按鈕請求新的微博信息,在刷新尚未完成時,若是用戶退出了當前界面,返回了前面的界面,而這個時候刷新的 Observable 又未取消訂閱,就會致使以前的 Activity 沒法被 JVM 回收,從而致使內存泄露。這就是在 Android 開發中值得注意的地方, RxLifecycle 就是專門用來作這件事的。函數

implementation 'com.trello.rxlifecycle2:rxlifecycle:2.2.2'

// If you want to bind to Android-specific lifecycles
implementation 'com.trello.rxlifecycle2:rxlifecycle-android:2.2.2'

// If you want pre-written Activities and Fragments you can subclass as providers
implementation 'com.trello.rxlifecycle2:rxlifecycle-components:2.2.2'

// If you want pre-written support preference Fragments you can subclass as providers
implementation 'com.trello.rxlifecycle2:rxlifecycle-components-preference:2.2.2'

// If you want to use Navi for providers
implementation 'com.trello.rxlifecycle2:rxlifecycle-navi:2.2.2'

// If you want to use Android Lifecycle for providers
implementation 'com.trello.rxlifecycle2:rxlifecycle-android-lifecycle:2.2.2'

// If you want to use Kotlin syntax
implementation 'com.trello.rxlifecycle2:rxlifecycle-kotlin:2.2.2'

// If you want to use Kotlin syntax with Android Lifecycle
implementation 'com.trello.rxlifecycle2:rxlifecycle-android-lifecycle-kotlin:2.2.2'

在想使用該庫的 Activity 或者 Fragment 中,能夠繼承 RxActivity、RxAppCompatActivity 或者
RxFragment,井添加相應的 import 語句

import com.trello.rxlifecycle2.components.support.RxAppCompatActivity;

public class BackPressureAct extends RxAppCompatActivity { }

當涉及綁定 Observable 到 Activity 或者 Fragment 生命週期時,要麼指定 Observable 應終止的生命週期事件,要麼讓 RxLifecycle 庫決定什麼時候終止 Observable 序列。

默認狀況下,RxLifecycle 將在輔助生命週期事件中終止 Observable, 因此若是在 Activity 的 onCreate() 方法期間訂閱了 Observable ,則 RxLifecycle 將在該 Activity 的 onDestroy() 方法中終止 Observable 序列。若是在 Fragment 的 onAttach() 方法中訂閱,RxLifecycle 將在 onDetach() 方法中終止該序列。

2.2 AutoDispose

GitHub 下載地址: https://github.com/uber/AutoD...

AutoDispose 是 Uber 開源的庫。 它與 RxLifecycle的區別是,不只能夠在 Android 平臺上使用,還能夠在 Java(的企業級)平臺上使用,適用的範圍更加寬廣。

如今除 CompositeDisposable 和 RxLifecycle 外,還多了一個選擇 AutoDispose。
AutoDispose 支持 Kotlin、Android Architecture Components,井且 AutoDispose 能夠與 RxLifecycle 進行相互操做。

3. Transformer 在 RxJava 中的使用

3.1 Transformer 的用途

Transformer 是轉換器的意思。早在 RxJava 1.x 版本就己有了 Observable.Transformer、Single .Transformer 和 Completable.Transformer。在 RxJava 2.x 版本中有 ObservableTransformer、SingleTransformer、 CompletableTransformer、FlowableTransformer、 MaybeTransformer 。其中,FlowableTransformer 和 MaybeTransformer 是新增 。因爲 RxJava 2.x 將 Observable 拆分紅了Observable 和 Flowable ,因此有了FlowableTransformer 。一樣, Maybe 也是 RxJava 2.x 新增的一個類型,因此有 MaybeTransformer

Transformer 可以將一個 Observable/Flowable/Single/Completable/Maybe 對象轉換成另外一個 Observable/Flowable/Single/Completable/Maybe 對象,與調用一系列的內聯操做符如出一轍。

舉個簡單的例子, 寫一個 transformer() 方法將一個發射整數 Observable 轉換爲發射字符串的 Observable

private static ObservableTransformer<Integer, String> transformer() {

    return new ObservableTransformer<Integer, String>() {
        @Override
        public ObservableSource<String> apply(Observable<Integer> upstream) {
            return upstream.map(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    return String.valueOf(integer);
                }
            });
        }
    };
}

在使用 transformer() 方法,經過標準 RxJava 操做。

Observable.just(123, 456)
        .compose(transformer())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

// 執行結果
 Next: 123
 Next: 456

3.2 與 compose 操做符結合使用

compose 操做符可以從數據流中獲得原始的被觀察者。當建立被觀察者時,compose 操做符會當即執行,而不像其餘的操做符須要在 onNext() 調用後才能執行

關於 compose 操做符,國內也有相應的翻譯: https://www.jianshu.com/p/e9e...

經常使用的場景

  1. 切換到主線程

對於網絡請求,咱們常常會作以下操做來切換線程:

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

作一個簡單的封裝:

object RxJavaUtils {

    @JvmStatic
    fun <T> observableToMain(): ObservableTransformer<T, T> {
        return ObservableTransformer { upstream ->
            upstream
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
        }
    }

    @JvmStatic
    fun <T> flowableToMain(): FlowableTransformer<T, T> {
        return FlowableTransformer { upstream ->
            upstream
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
        }
    }
}

這段代碼是用 Kotlin 寫的, 把一些工具類用 Kotlin 來編寫,並且 Kotlin 的 Lambda 表達式也更爲直觀。

對於 Flowable 切換到主線程的操做,能夠這樣使用

.compose(RxJavaUtils.<String>flowableToMain())

  1. RxLifecycle 中的 LifecycleTransformer

RxLifecycle 可以配合 Android 的生命週期,防止 App 內存泄漏,其中就使用了 LifecycleTransformer。知乎也作了一個相似的 RxLifecycle ,可以作一樣的事情。


  1. 追蹤 RxJava 的使用

https://github.com/fengzhizi7...

若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)

相關文章
相關標籤/搜索