關於 RxJava 最友好的文章—— RxJava 2.0 全新來襲

前言

以前寫RxJava相關文章的時候,就有人想讓我談談RxJava2.0的新特性,說實話,一開始我是拒絕的。由於在我看來,RxJava2.0雖然是版本的重大升級,但總歸仍是RxJava,升級一個版本還能上天是咋的?瞭解一下它的更新文檔不就行了麼?真的有必要單出一篇文章來談這個麼?javascript

可是詳細的瞭解了RxJava2.0以及部分源碼以後,我以爲仍是有必要對RxJava2.0作一個說明,幫助你們對於RxJava有更好的認識。java


鋪墊

假如你還不是很熟悉RxJava,或者對於背壓這個概念(2.0更新中會涉及到背壓的概念)很模糊,但願你也能讀一讀下面兩篇鋪墊的文章:react

關於背壓的那篇文章原本是本文的一部分,由於篇幅過大,被剝離出去了,因此建議你們有時間也一併閱讀。android


正文

RxJava2.0有不少的更新,一些改動甚至衝擊了我以前的文章裏的內容,這也是我想寫這篇文章的緣由之一。不過想要寫這篇文章其實也是有難度的,由於相關的資料去實際上是不多的,還得本身硬着頭皮上....不過俗話說得好,有困難要上,沒有困難創造困難也要上。git

在這裏,我會按照咱們以前關於RxJava的文章的講述順序:觀察者模式,操做符,線程調度,這三個方面依次看有哪些更新。github


添加依賴

這個估計得放在最前面。app

Android端使用RxJava須要依賴新的包名:異步

//RxJava的依賴包(我使用的最新版本)
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    //RxAndroid的依賴包
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'複製代碼

觀察者模式

首先聲明,RxJava以觀察者模式爲骨架,在2.0中依然如此ide

不過這次更新中,出現了兩種觀察者模式:post

  • Observable(被觀察者)/Observer(觀察者)
  • Flowable(被觀察者)/Subscriber(觀察者)

RxJava2.X中,Observeable用於訂閱Observer,是不支持背壓的,而Flowable用於訂閱Subscriber,是支持背壓(Backpressure)的。

關於背壓這個概念以及它在1.0版本中的缺憾在上一篇文章中我已經介紹到了,若是你不是很清楚,我在這裏在作一個介紹:背壓是指在異步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的狀況下,一種告訴上游的被觀察者下降發送速度的策略,在1.0中,關於背壓最大的遺憾,就是集中在Observable這個類中,致使有的Observable支持背壓,有的不支持。爲了解決這種缺憾,新版本把支持背壓和不支持背壓的Observable區分開來。

Observable/Observer

Observable正經常使用法:

Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
            }
        });

Observer mObserver=new Observer<Integer>() {
            //這是新加入的方法,在訂閱後發送數據以前,
            //回首先調用這個方法,而Disposable可用於取消訂閱
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

mObservable.subscribe(mObserver);複製代碼

這種觀察者模型是不支持背壓的。

啥叫不支持背壓呢?

當被觀察者快速發送大量數據時,下游不會作其餘處理,即便數據大量堆積,調用鏈也不會報MissingBackpressureException,消耗內存過大隻會OOM

我在測試的時候,快速發送了100000個整形數據,下游延遲接收,結果被觀察者的數據所有發送出去了,內存確實明顯增長了,遺憾的是沒有OOM。

因此,當咱們使用Observable/Observer的時候,咱們須要考慮的是,數據量是否是很大(官方給出以1000個事件爲分界線,僅供各位參考)

Flowable/Subscriber

Flowable.range(0,10)
        .subscribe(new Subscriber<Integer>() {
            Subscription sub;
            //當訂閱後,會首先調用這個方法,其實就至關於onStart(),
            //傳入的Subscription s參數能夠用於請求數據或者取消訂閱
            @Override
            public void onSubscribe(Subscription s) {
                Log.w("TAG","onsubscribe start");
                sub=s;
                sub.request(1);
                Log.w("TAG","onsubscribe end");
            }

            @Override
            public void onNext(Integer o) {
                Log.w("TAG","onNext--->"+o);
                sub.request(1);
            }
            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }
            @Override
            public void onComplete() {
                Log.w("TAG","onComplete");
            }
        });複製代碼

輸出以下:

onsubscribe start
onNext--->0
onNext--->1
onNext--->2
...
onNext--->9
onComplete
onsubscribe end複製代碼

Flowable是支持背壓的,也就是說,通常而言,上游的被觀察者會響應下游觀察者的數據請求,下游調用request(n)來告訴上游發送多少個數據。這樣避免了大量數據堆積在調用鏈上,使內存一直處於較低水平。

固然,Flowable也能夠經過creat()來建立:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        }
        //須要指定背壓策略
        , BackpressureStrategy.BUFFER);複製代碼

Flowable雖然能夠經過create()來建立,可是你必須指定背壓的策略,以保證你建立的Flowable是支持背壓的(這個在1.0的時候就很難保證,能夠說RxJava2.0收緊了create()的權限)。

根據上面的代碼的結果輸出中能夠看到,當咱們調用subscription.request(n)方法的時候,不等onSubscribe()中後面的代碼執行,就會馬上執行到onNext方法,所以,若是你在onNext方法中使用到須要初始化的類時,應當儘可能在subscription.request(n)這個方法調用以前作好初始化的工做;

固然,這也不是絕對的,我在測試的時候發現,經過create()自定義Flowable的時候,即便調用了subscription.request(n)方法,也會等onSubscribe()方法中後面的代碼都執行完以後,纔開始調用onNext。

TIPS: 儘量確保在request()以前已經完成了全部的初始化工做,不然就有空指針的風險。

其餘觀察者模式

固然,除了上面這兩種觀察者,還有一類觀察者

  • Single/SingleObserver
  • Completable/CompletableObserver
  • Maybe/MaybeObserver

其實這三者都差很少,Maybe/MaybeObserver能夠說是前二者的複合體,所以以Maybe/MaybeObserver爲例簡單介紹一下這種觀察者模式的用法

//判斷是否登錄
Maybe.just(isLogin())
    //可能涉及到IO操做,放在子線程
    .subscribeOn(Schedulers.newThread())
    //取回結果傳到主線程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new MaybeObserver<Boolean>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Boolean value) {
                if(value){
                    ...
                }else{
                    ...
                }
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });複製代碼

上面就是Maybe/MaybeObserver的普通用法,你能夠看到,實際上,這種觀察者模式並不用於發送大量數據,而是發送單個數據,也就是說,當你只想要某個事件的結果(true or false)的時候,你能夠用這種觀察者模式


這是上面那些被觀察者的上層接口:

//Observable接口
interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}
//Single接口
interface SingleSource<T> {
    void subscribe(SingleObserver<? super T> observer);
}
//Completable接口
interface CompletableSource {
    void subscribe(CompletableObserver observer);
}
//Maybe接口
interface MaybeSource<T> {
    void subscribe(MaybeObserver<? super T> observer);
}
//Flowable接口
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}複製代碼

其實咱們能夠看到,每一種觀察者都繼承自各自的接口,這也就把他們能徹底的區分開,各自獨立(特別是Observable和Flowable),保證了他們各自的拓展或者配套的操做符不會相互影響。

例如flatMap操做符實現:

//Flowable中flatMap的定義
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

//Observable中flatMap的定義
Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);複製代碼

假如你想爲Flowable寫一個自定義的操做符,那麼只要保證Function< Publisher >中的類型實現了Publisher接口便可。這麼說可能很抽象,你們不理解其實也不要緊,由於並不推薦你們自定義操做符,RxJava中的操縱符的組合已經能夠知足你們的需求了。

固然,你也會注意到上面那些接口中的subscribe()方法的返回類型爲void了,在1.X中,這個方法通常會返回一個Subscription對象,用於取消訂閱。如今,這個功能的對象已經被放到觀察者Observer或者subscriber的內部實現方法中了,

Flowable/Subscriber

public interface Subscriber<T> {  
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}複製代碼

上面的實例中,onSubscribe(Subscription s)傳入的參數s就肩負着取消訂閱的功能,固然,他也能夠用於請求上游的數據。

在Observable/observer中,傳入的參數是另外一個對象

Observable/Observer

public interface Observer<T> {
   void onSubscribe(Disposable d);
    void onNext(T value);
    void onError(Throwable e);
    void onComplete();
}

public interface Disposable {
    /** * Dispose the resource, the operation should be idempotent. */
    void dispose();
    /** * Returns true if this resource has been disposed. * @return true if this resource has been disposed */
    boolean isDisposed();
}複製代碼

在Observer接口中,onSubscribe(Disposable d)方法傳入的Disposable也是用於取消訂閱,基本功能是差很少的,只不過命名不一致,你們知道就好。

其實這種設計能夠說仍是符合邏輯的,由於取消訂閱這個動做就只有觀察者(Observer等)才能作的,如今把它併入到觀察者內部,也算瓜熟蒂落吧。

最後再提一點更新,就是被觀察者再也不接收null做爲數據源了。


操做符相關

這一塊其實能夠說沒什麼改動,大部分以前你用過的操做符都沒變,即便有所變更,也只是包名或類名的改動。你們可能常常用到的就是Action和Function。

Action相關

以前我在文章裏介紹過關於Action這類接口,在1.0中,這類接口是從Action0,Action1...日後排(數字表明可接受的參數),如今作出了改動

Rx1.0-----------Rx2.0

Action0--------Action
Action1--------Consumer
Action2--------BiConsumer
後面的Action都去掉了,只保留了ActionN

Function相關

同上,也是命名方式的改變

上面那兩個類,和RxJava1.0相比,他們都增長了throws Exception,也就是說,在這些方法作某些操做就不須要try-catch

例如:

Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);複製代碼

Files.readLines(name)這類io方法原本是須要try-catch的,如今直接拋出異常,就能夠放心的使用lambda ,保證代碼的簡潔優美。

doOnCancel/doOnDispose/unsubscribeOn

以doOnCancel爲例,大概就是當取消訂閱時,會調用這個方法,例如:

Flowable.just(1, 2, 3)
.doOnCancel(() -> System.out.println("Cancelled!"))
.take(2)
.subscribe(System.out::println);複製代碼

take新操符會取消後面那些還未被髮送的事件,於是會觸發doOnCancel

其餘的一些操做符基本沒變,或者只是改變了名字,在這裏就不一一介紹了,須要提一下的是,不少操做符都有兩套,一套用於Observable,一套用於Flowable。


線程調度

能夠說這一起基本也沒有改動,若是必定要說的話。

  • 那就是去掉了Schedulers.immediate()這個線程環境
  • 移除的還有Schedulers.test()(我好像歷來沒用過這個方法)
  • io.reactivex.Scheduler這個抽象類支持直接調度自定義線程任務(這個我也沒怎麼用)

補充

若是你想把本身的RxJava1.0的遷移到2.0的版本,可使用這個庫RxJava2Interop,它能夠在Rxjava1.0和2.0之間相互轉換,也就是說,不只能夠把1.0的代碼遷移到2.0,你還能夠把2.0的代碼遷移到1.0,哈哈。

補充2

在RxJava1.0中,有的人會使用CompositeSubscription來收集Subscription,來統一取消訂閱,如今在RxJava2.0中,因爲subscribe()方法如今返回void,那怎麼辦呢?

其實在RxJava2.0中,Flowable提供了subscribeWith這個方法能夠返回當前訂閱的觀察者,而且經過ResourceSubscriber DisposableSubscriber等觀察者來提供 Disposable的接口

因此,若是想要達成RxJava1.0的效果,如今應該是這樣作:

CompositeDisposable composite = new CompositeDisposable();

composite.add(Flowable.range(1, 8).subscribeWith(subscriber));

這個subscriber 應該是 ResourceSubscriber 或者 DisposableSubscriber 的實例。


結尾

其實從整篇文章的分析來看,改動最大的仍是觀察者模式的實現,被拆分和細化了,主要分紅了Observable和Flowable兩大類,固然還有與之相關聯的其餘變更,整體來看這一版本能夠說是對於觀察者和被觀察者的重構

RxJava2.0的範例代碼我沒精力去寫了,也正巧有位外國朋友已經寫了RxJava2.0的demo,下面是他的項目地址:

RxJava2-Android-Samples

固然,學習2.0 的過程當中有什麼問題也能夠在這裏留言討論。


後記

這篇文章半個月前就開始寫了,可是一直不太滿意,因此在草稿箱裏躺了好久,可是本着不放棄任何一篇落後文章的信念,仍是振做起來,完成關於RxJava2.0的介紹。你可能不信,寫完頓時有了一種解(xu)脫的感受。

身體被掏空...


附錄

下面我截圖展現一下2.0相對於1.0的一些改動的細節,僅作參考。





其實這些都是官方給出的列表,截圖在這裏只是方便你們觀摩。

相關文章
相關標籤/搜索