以Observable爲例,先上代碼:java
//① ObservableJust<String> observable = (ObservableJust<String>) Observable.just("hello rxjava2"); //② ObservableSubscribeOn<String> subscribe = (ObservableSubscribeOn<String>) observable.subscribeOn(Schedulers.io()); //③ ObservableObserveOn<String> observerOn = (ObservableObserveOn<String>) subscribe.observeOn(AndroidSchedulers.mainThread()); //④ ObservableDoFinally<String> doFinally = (ObservableDoFinally<String>) observerOn.doFinally(new Action() { @Override public void run() throws Exception { System.out.println("doFinally"); } }); //⑤ ObservableDoOnLifecycle<String> doOnSubscribe = (ObservableDoOnLifecycle<String>) doFinally.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("doOnSubscribe: " + disposable.hashCode()); } }); //⑥ doOnSubscribe.subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe: "+d.hashCode()); /* if (!d.isDisposed()){ System.out.println("onSubscribe: dispose"); d.dispose(); }*/ } @Override public void onNext(String s) { System.out.println("onNext: "+s); Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show(); } @Override public void onError(Throwable e) { System.out.println("onError: "+e.getMessage()); Toast.makeText(MainActivity.this, e.getMessage(), Toast.LENGTH_SHORT).show(); } @Override public void onComplete() { System.out.println("onComplete"); Toast.makeText(MainActivity.this, "onComplete", Toast.LENGTH_SHORT).show(); } });
這裏每次調用一個操做符,返回的都是Observable的直接子類或者間接之類.以just爲例:網絡
public static <T> Observable<T> just(T item) { ObjectHelper.requireNonNull(item, "The item is null"); return RxJavaPlugins.onAssembly(new ObservableJust<T>(item)); }
這裏從新new了一個Observable的子類對象ObservableJust
.ide
結論以下:函數
- 每一個操做符都會對應返回一個Observable的子類對象,類名格式ObservableXXX而後去調用下一個操做符.好比interval操做符,返回的是ObservableInterval的實例對象.
- 對於Observable的建立型操做符,返回的是其直接子類,而其餘操做符,返回的是AbstractObservableWithUpstream的子類對象.AbstractObservableWithUpstream的構造函數中,第一個參數就是Observable對象,這一點很是重要,這個參數是上一個操做符返回的Observable對象.這保證了整個調用流程的起始處的Observable對象能在整個流程中傳遞.
最後一步訂閱subscribe(Observer).若是沒有最下游的觀察者對數據作接收,整個調用流程是不會執行的.
先從⑥開始看ObservableDoOnLifecycle
的subscribe方法作了什麼.ui
@Override protected void subscribeActual(Observer<? super T> observer) { source.subscribe(new DisposableLambdaObserver<T>(observer, onSubscribe, onDispose)); }
source
就是上游操做符返回的Observable
的子類對象,經過AbstractObservableWithUpstream
的構造函數傳遞給下游的.這裏去調用了上一個Observable
對象的subscribe
方法.這個調用由下至上,直到整個流程的起始處.this
Observable對象先從上游逐步經過下游的Observable對象的構造函數傳遞給下游,再經過下游的subscribe方法,逐步去調用上游的subscribe方法.
訂閱發生在最後一步調用subscribe(Observer).從第⑤步ObservableDoOnLifecycle的subscribe方法開始看.spa
@Override protected void subscribeActual(Observer<? super T> observer) { source.subscribe(new DoFinallyObserver<T>(observer, onFinally)); }
從新建立一個DoFinallyObserver對象,並把第⑥步的Observer參數傳入後,交給上游的Observable.這個調用流程會逐步傳遞到最上游的ObservableJust的subscribe方法.線程
//ObservableJust.java @Override protected void subscribeActual(Observer<? super T> observer) { //參數observer是下游傳上來的 ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); observer.onSubscribe(sd); sd.run(); }
首先調用了 observer.onSubscribe(sd);能夠獲得結論:code
Observer的onSubscribe在主線程執行,不管上下游怎麼切換線程.在請求網絡時,能夠在這個地方彈出進度提示或者作一些初始化操做.
ScalarDisposable.run()方法調用了下游的Observer傳遞數據,這個調用會逐步往下傳遞,直到最下游的Observer,若是沒遇到錯誤或者異常狀況.server
Observer對象先從最下游的訂閱處開始往上傳遞到最上游,再攜帶數據逐步往下游傳遞.
從上面能夠知道,數據是被Observer攜帶,逐步往下游傳遞
有多個重載的方法
//方法一 @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe() { return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } //方法二 @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext) { return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } //方法三 @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) { return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } //方法四 @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) { return subscribe(onNext, onError, onComplete, Functions.emptyConsumer()); } //方法五 @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { //建立LambdaObserver對象 LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; } //方法六 @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { observer = RxJavaPlugins.onSubscribe(this, observer); // 省略 subscribeActual(observer); //省略 }
前五個方法最終在第五個方法內部從新建立了一個Observer類型對象LambdaObserver,而後調用了第六個方法.