RxJava2:Observable和Observer如何傳遞

以Observable爲例,先上代碼:java

//①
ObservableJust<String> observable = (ObservableJust<String>) Observable.just("hello rxjava2");
//②
        ObservableSubscribeOn<String> subscribe = (ObservableSubscribeOn<String>) observable.subscribeOn(Schedulers.io());
//③
        ObservableObserveOn<String> observerOn = (ObservableObserveOn<String>) subscribe.observeOn(AndroidSchedulers.mainThread());
//④
        ObservableDoFinally<String> doFinally = (ObservableDoFinally<String>) observerOn.doFinally(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("doFinally");
            }
        });
//⑤
        ObservableDoOnLifecycle<String> doOnSubscribe = (ObservableDoOnLifecycle<String>) doFinally.doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                System.out.println("doOnSubscribe: " + disposable.hashCode());
            }
        });
//⑥
        doOnSubscribe.subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                       System.out.println("onSubscribe: "+d.hashCode());
                      /*  if (!d.isDisposed()){
                           System.out.println("onSubscribe: dispose");
                           d.dispose();
                       }*/
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println("onNext: "+s);
                        Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError: "+e.getMessage());
                        Toast.makeText(MainActivity.this, e.getMessage(), Toast.LENGTH_SHORT).show();

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                        Toast.makeText(MainActivity.this, "onComplete", Toast.LENGTH_SHORT).show();
                    }
                });

Observable傳遞

這裏每次調用一個操做符,返回的都是Observable的直接子類或者間接之類.以just爲例:網絡

public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

這裏從新new了一個Observable的子類對象ObservableJust.ide

結論以下:函數

  1. 每一個操做符都會對應返回一個Observable的子類對象,類名格式ObservableXXX而後去調用下一個操做符.好比interval操做符,返回的是ObservableInterval的實例對象.
  2. 對於Observable的建立型操做符,返回的是其直接子類,而其餘操做符,返回的是AbstractObservableWithUpstream的子類對象.AbstractObservableWithUpstream的構造函數中,第一個參數就是Observable對象,這一點很是重要,這個參數是上一個操做符返回的Observable對象.這保證了整個調用流程的起始處的Observable對象能在整個流程中傳遞.

圖片描述

最後一步訂閱subscribe(Observer).若是沒有最下游的觀察者對數據作接收,整個調用流程是不會執行的.
先從⑥開始看ObservableDoOnLifecycle的subscribe方法作了什麼.ui

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        source.subscribe(new DisposableLambdaObserver<T>(observer, onSubscribe, onDispose));
    }

source就是上游操做符返回的Observable的子類對象,經過AbstractObservableWithUpstream的構造函數傳遞給下游的.這裏去調用了上一個Observable對象的subscribe方法.這個調用由下至上,直到整個流程的起始處.this

Observable對象先從上游逐步經過下游的Observable對象的構造函數傳遞給下游,再經過下游的subscribe方法,逐步去調用上游的subscribe方法.

圖片描述

Observer傳遞

訂閱發生在最後一步調用subscribe(Observer).從第⑤步ObservableDoOnLifecycle的subscribe方法開始看.spa

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        source.subscribe(new DoFinallyObserver<T>(observer, onFinally));
    }

從新建立一個DoFinallyObserver對象,並把第⑥步的Observer參數傳入後,交給上游的Observable.這個調用流程會逐步傳遞到最上游的ObservableJust的subscribe方法.線程

//ObservableJust.java
 @Override
    protected void subscribeActual(Observer<? super T> observer) {
    //參數observer是下游傳上來的
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

首先調用了 observer.onSubscribe(sd);能夠獲得結論:code

Observer的onSubscribe在主線程執行,不管上下游怎麼切換線程.在請求網絡時,能夠在這個地方彈出進度提示或者作一些初始化操做.

ScalarDisposable.run()方法調用了下游的Observer傳遞數據,這個調用會逐步往下傳遞,直到最下游的Observer,若是沒遇到錯誤或者異常狀況.server

Observer對象先從最下游的訂閱處開始往上傳遞到最上游,再攜帶數據逐步往下游傳遞.

數據傳遞

從上面能夠知道,數據是被Observer攜帶,逐步往下游傳遞

Observable.subscribe(Consumer,Consumer,Action)

有多個重載的方法

//方法一
 @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe() {
        return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
     //方法二
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
     //方法三
     @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
        return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
     //方法四
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete) {
        return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
    }
     //方法五
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
       //建立LambdaObserver對象
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }
     //方法六
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
       
            observer = RxJavaPlugins.onSubscribe(this, observer);
// 省略
            subscribeActual(observer);
        //省略
    }

前五個方法最終在第五個方法內部從新建立了一個Observer類型對象LambdaObserver,而後調用了第六個方法.

相關文章
相關標籤/搜索