RxJava1 升級到 RxJava2 所踩過的坑

RxJava2

RxJava2 發佈已經有一段時間了,是對 RxJava 的一次重大的升級,因爲個人一個庫cv4j使用了 RxJava2 來嚐鮮,可是 RxJava2 跟 RxJava1 是不能同時存在於一個項目中的,逼不得已我得把本身全部框架中使用 RxJava 的地方以及
App 中使用 RxJava 的地方都升級到最新版本。因此我整理並記錄了一些已經填好的坑。html

填坑記錄

1. RxJava1 跟 RxJava2 不能共存

若是,在同一個module中同時使用RxJava1和RxJava2,相似以下:java

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'
compile 'io.reactivex:rxandroid:1.2.0'
compile 'io.reactivex:rxjava:1.1.5'複製代碼

那麼,很不幸你會遇到這樣的錯誤react

Rxjava1和Rxjava2沒法共存.jpeg

同理,在 App 中若是使用了 Rxjava2,可是某個第三方的 library 還在使用 Rxjava1 也會遇到一樣的錯誤。android

上面的錯誤是由於 RxAndroid 2.0.1 自己依賴了 RxJava 2.0.1。咱們嘗試去掉對 RxJava 的依賴,只留下 RxAndroid 。仍是會遇到問題。git

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
//compile 'io.reactivex.rxjava2:rxjava:2.0.7'
compile 'io.reactivex:rxandroid:1.2.0'
//compile 'io.reactivex:rxjava:1.1.5'複製代碼

去掉對Rxjava的依賴.jpeg

因此使用RxAndroid不能去掉對RxJava的依賴,我是這樣使用的。github

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.7'複製代碼

官方也是這樣解釋的app

Because RxAndroid releases are few and far between, it is recommended you also
explicitly depend on RxJava's latest version for bug fixes and new features.框架

最後,我建議要升級到 RxJava2 的時候必須全部使用的地方都要升級,而且用最新的版本。ide

2. 新增Flowable

RxJava1 中 Observable 不能很好地支持 backpressure ,會拋出MissingBackpressureException。因此在 RxJava2 中 Oberservable 再也不支持 backpressure ,而使用新增的 Flowable 來支持 backpressure 。關於backpressure 的理解,能夠看這篇文章this

Flowable的用法跟原先的Observable是同樣的。

3. ActionN 和 FuncN 更名

ActionN 和 FuncN 遵循Java 8的命名規則。
其中,Action0 更名成Action,Action1更名成Consumer,而Action2更名成了BiConsumer,而Action3 - Action9都再也不使用了,ActionN變成了Consumer

一樣,Func更名成Function,Func2更名成BiFunction,Func3 - Func9 更名成 Function3 - Function9,FuncN 由 Function 取代。

4. Observable.OnSubscribe 變成 ObservableOnSubscribe

原先RxJava1的寫法:

Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("hello");
            }

        }).subscribe(new Action1<String>() {

            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });複製代碼

如今的寫法:

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("hello");
            }

        }).subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        });複製代碼

5. ObservableOnSubscribe 中使用 ObservableEmitter 發送數據給 Observer

結合上一條,ObservableOnSubscribe 再也不使用 Subscriber 而是用 ObservableEmitter 替代。

ObservableEmitter 能夠理解爲發射器,是用來發出事件的,它能夠發出三種類型的事件,經過調用emitter的onNext(T value)、onComplete()和onError(Throwable error)能夠分別發出next事件、complete事件和error事件。 若是隻關心next事件的話,只需單獨使用onNext()便可。

須要特別注意,emitter的onComplete()調用後,Consumer再也不接收任何next事件。

6. Observable.Transformer 變成 ObservableTransformer

原先RxJava1的寫法:

/** * 跟compose()配合使用,好比ObservableUtils.wrap(obj).compose(toMain()) * @param <T> * @return */
    public static <T> Observable.Transformer<T, T> toMain() {
        return new Observable.Transformer<T, T>() {
           @Override
            public Observable<T> call(Observable<T> tObservable) {
                return tObservable
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }複製代碼

如今的寫法:

/** * 跟compose()配合使用,好比ObservableUtils.wrap(obj).compose(toMain()) * @param <T> * @return */
    public static <T> ObservableTransformer<T, T> toMain() {

        return new ObservableTransformer<T, T>() {

            @Override
            public ObservableSource<T> apply(Observable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }複製代碼

因爲新增了Flowable,同理也增長了FlowableTransformer

public static <T> FlowableTransformer<T, T> toMain() {

        return new FlowableTransformer<T, T>() {

            @Override
            public Publisher<T> apply(Flowable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }複製代碼

7. Subscription 更名爲 Disposable

在 RxJava2 中,因爲已經存在了 org.reactivestreams.subscription 這個類,爲了不名字衝突將原先的 rx.Subscription 更名爲 io.reactivex.disposables.Disposable。

剛開始不知道,在升級 RxJava2 時發現 org.reactivestreams.subscription 這個類徹底無法作原先 rx.Subscription 的事情:(

順便說下,Disposable必須單次使用,用完就要銷燬。

8. first() 用法改變

官方文檔是這麼描述的first()的用法

1.x 2.x
first() RC3 renamed to firstElement and returns Maybe
first(Func1) dropped, use filter(predicate).first()
firstOrDefault(T) renamed to first(T) and RC3 returns Single
firstOrDefault(Func1, T) renamed to first(T) and RC3 returns Single

以first(Func1)爲例,first(Func1)後面還使用了push(),原先 Rxjava1會這樣寫

ConnectableObservable<Data> connectableObservable = Observable.concat(Observable.from(list)).first(new Func1<Data, Boolean>() {
                @Override
                public Boolean call(Data data) {
                    return DataUtils.isAvailable(data);
                }
            }).publish();複製代碼

RxJava2 改爲這樣

ConnectableObservable<Data> connectableObservable = Observable.concat(Observable.fromIterable(list)).filter(new Predicate<Data>() {

                @Override
                public boolean test(@NonNull Data data) throws Exception {
                    return DataUtils.isAvailable(data);
                }
            }).firstElement().toObservable().publish();複製代碼

9. toBlocking().y 被 blockingY() 取代

在個人框架中存在着一個Optional類,它跟Java 8的Optional做用差很少,原先是使用RxJava1來編寫的。

import rx.Observable;

/** * 使用方法: * String s = null; * Optional.ofNullable(s).orElse("default")); // 若是s爲null,則顯示default,不然顯示s的值 * @author Tony Shen * */
public class Optional<T> {

    Observable<T> obs;

    public Optional(Observable<T> obs) {
        this.obs = obs;
    }

    public static <T> Optional<T> of(T value) {
        if (value == null) {
            throw new NullPointerException();
        } else {
            return new Optional<T>(Observable.just(value));
        }
    }

    public static <T> Optional<T> ofNullable(T value) {
        if (value == null) {
            return new Optional<T>(Observable.<T>empty());
        } else {
            return new Optional<T>(Observable.just(value));
        }
    }

    public T get() {
        return obs.toBlocking().single();
    }

    public T orElse(T defaultValue) {
        return obs.defaultIfEmpty(defaultValue).toBlocking().single();
    }
}複製代碼

升級到RxJava2以後,get() 和 orElse() 方法都會報錯,修改以後是這樣的。

import io.reactivex.Observable;

/** * 使用方法: * String s = null; * Optional.ofNullable(s).orElse("default"); // 若是s爲null,則顯示default,不然顯示s的值 * @author Tony Shen * */
public class Optional<T> {

    Observable<T> obs;

    public Optional(Observable<T> obs) {
        this.obs = obs;
    }

    public static <T> Optional<T> of(T value) {
        if (value == null) {
            throw new NullPointerException();
        } else {
            return new Optional<T>(Observable.just(value));
        }
    }

    public static <T> Optional<T> ofNullable(T value) {
        if (value == null) {
            return new Optional<T>(Observable.<T>empty());
        } else {
            return new Optional<T>(Observable.just(value));
        }
    }

    public T get() {

        return obs.blockingSingle();
    }

    public T orElse(T defaultValue) {

        return obs.defaultIfEmpty(defaultValue).blockingSingle();
    }
}複製代碼

10. PublishSubject

包括 PublishSubject 以及各類 Subject(ReplaySubject、BehaviorSubject、AsyncSubject) 都再也不支持backpressure。

總結

RxJava2 所帶來的變化遠遠不止這些,之後遇到的話還會繼續整理和總結,畢竟我使用的 RxJava2 仍是不多的一部份內容。

RxJava2 最好到文檔依然是官方文檔。若是是新項目到話,能夠堅決果斷地使用RxJava2,若是是在線上已經成熟穩定的項目,能夠再等等。對於新手的話,能夠直接從 RxJava2 學起,RxJava1 就直接略過吧。對於老手,RxJava2 仍是使用原來的思想,區別不大,從 RxJava1 遷移到 Rxjava2 也花不了多少工夫。

參考資料:

  1. github.com/ReactiveX/R…
  2. github.com/ReactiveX/R…
  3. www.dundunwen.com/article/275…
相關文章
相關標籤/搜索