RxJava2:線程調度

subscribeOn和observeOn負責線程切換,同時某些操做符也默認指定了線程.java

咱們這裏不分析在線程中怎麼執行的.只看如何切換到某個指定線程.react

subscribeOn

Observable.subscribeOn()在方法內部生成了一個ObservableSubscribeOn對象.
主要看一下ObservableSubscribeOn的subscribeActual方法.ide

@Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        //調用下游的Observer的onSubscribe方法
        observer.onSubscribe(parent);
        //經過SubscribeTask執行了上游Observable的subscribeActual方法
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

scheduler.scheduleDirect(Runnable)用於執行SubscribeTask這個任務.SubscribeTask自己是Runnable的實現類.看一下其run方法.this

@Override
        public void run() {
            //上游的Observable.subscribe方法被切換到了新的線程
            source.subscribe(parent);
        }

首先能夠得出結論:subscribeOn將上游的Observable的subscribe方法切換到了新的線程.線程

若是屢次調用subscribeOn切換線程,會有什麼效果?code

由下往上,每次調用subscribeOn,都會致使上游的Observable的subscribeActual切換到指定的線程.那麼最後一次調用的切換最上游的建立型操做符的subscribeActual的執行線程.若是操做符有默認執行線程怎麼辦?orm

操做符默認線程

若是是建立型操做符,處於最上游,那麼subscribeOn的線程切換對它不起做用.天高皇帝遠,縣官不如現管.就是這個道理.
若是是其它操做符,會是怎樣的?
以操做符timeout爲例:它對應ObservableTimeoutTimed和TimeoutObserverserver

@Override
        public void onNext(T t) {
            downstream.onNext(t);
            //超時計時
            startTimeout(idx + 1);
        }

        void startTimeout(long nextIndex) {
            //交給操做符默認的線程執行
            task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
        }

        @Override
        public void onError(Throwable t) {
                downstream.onError(t); 
        }

        @Override
        public void onComplete() {
                downstream.onComplete();
            }
        }

        @Override
        public void onTimeout(long idx) {
                downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
        }
//TimeoutTask.java
static final class TimeoutTask implements Runnable {

        @Override
        public void run() {
            parent.onTimeout(idx);
        }
    }

能夠看到操做符默認的執行線程只用來作超時計時任務,若是超時了,會在操做符的默認線程執行onError方法..操做符默認線程對下游的observer形成什麼影響要作具體對待.對象

observeOn

observeOn對應ObservableObserveOnObserveOnObserver.接口

//ObservableObserveOn.java
 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
//ObserveOnObserver.java
  
   @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                if (d instanceof QueueDisposable) {
                    if (m == QueueDisposable.SYNC) {
                    //執行下游Observer的onSubscribe方法
                        downstream.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                     //執行下游Observer的onSubscribe方法
                        downstream.onSubscribe(this);
                        return;
                    }
                }
                 //執行下游Observer的onSubscribe方法
                downstream.onSubscribe(this);
            }
        }
        @Override
        public void onNext(T t) {
          //省略
            schedule();
        }
        @Override
        public void onError(Throwable t) {
         //省略
            schedule();
        }
         void schedule() {
            if (getAndIncrement() == 0) {
            /*
            ObserveOnObserver是Runnable的實現類.交給線程池執行
            */
                worker.schedule(this);
            }
        }
        
        
        void drainNormal() {
            final Observer<? super T> a = downstream;
            for (;;) {
                for (;;) {
                    T v;
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        a.onError(ex);
                        return;
                    }
                    //執行下游Observer的onNext方法
                    a.onNext(v);
                }
            }
        }

        void drainFused() {
            for (;;) {
                if (!delayError && d && ex != null) {
                   //執行下游Observer的onError方法
                    downstream.onError(error);
                    return;
                }
                downstream.onNext(null);
                if (d) {
                    ex = error;
                    if (ex != null) {
                       //執行下游Observer的onError方法
                        downstream.onError(ex);
                    } else {
                       //執行下游Observer的onComplete方法
                        downstream.onComplete();
                    }
                    return;
                }
            }
        }
        //執行線程任務
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

從上面能夠看出ObservableObserveOn在其subscribeActual方法中並無切換上游Observable的subscribe方法的執行線程.可是ObserveOnObserver在其onNext,onError和onComplete中經過schedule()方法將下游Observer的各個方法切換到了新的線程.
得出結論: observeOn負責切換的是下游Observer的各個方法的執行線程

若是下游屢次經過observeOn切換線程,會有什麼效果?
每次切換都會對其下游形成影響,直到遇到下一個observeOn爲止.

Observer(onSubscribe,onNext,onError,onComplete)

onNext,onError,onComplete與上游最近的observeOn所切換的線程保持一致.onSubscribe則不一樣.
遇到線程切換的時候,會首先在對應的Observable的subscribeActual方法內,先調用observer.onSubscribe方法.而observer.onSubscribe會逐級向上傳遞直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法內調用,這是在主線程執行的.因此onSubscribe方法不管如何都是在主線程執行.

doOnSubscribe

.doOnSubscribe(new Consumer<Disposable>() {
                   @Override
                   public void accept(Disposable disposable) throws Exception {
                      
                   }
               })

咱們要看的是方法accept的執行線程.
經過源碼找到對應的DisposableLambdaObserver.

@Override
   public void onSubscribe(Disposable d) {
   //在這裏調用了accept方法.
           onSubscribe.accept(d);
   }

這就要看上游在哪一個線程執行了Observer.onSubscribe(disposable)方法.

在建立型操做符的subscribeActual方法和subscribeOn對應的Observable的subscribeActual方法內調用了Observer.onSubscribe(disposable)方法.那麼這兩處的執行線程就決定了onSubscribe.accept(d);的執行線程.

doFinally

對應ObservableDoFinallyDoFinallyObserver

//DoFinallyObserver.java
  @Override
        public void onError(Throwable t) {
            runFinally();
        }

        @Override
        public void onComplete() {
            runFinally();
        }

        @Override
        public void dispose() {
            runFinally();
        }
        
         void runFinally() {
             onFinally.run();
        }

能夠看到與它所對應的DoFinallyObserver的onError,onComplete,dispose方法的執行線程有關,這三個方法的執行線程又受到上游的observeOn的影響.若是沒有observeOn,則會受到最上游的observable.subscribeActual方法影響.

doOnError

對應ObservableDoOnEachDoOnEachObserver

//DoOnEachObserver.java
  @Override
        public void onError(Throwable t) {
                onError.accept(t);
        }

和自身對應的observer.onError所在線程保持一致.

doOnNext

對應ObservableDoOnEachDoOnEachObserver

//DoOnEachObserver.java
  @Override
        public void onNext(T t) {
                onNext.accept(t);
        }

和自身對應的observer.onNext所在線程保持一致.

操做符對應方法參數的執行線程

io.reactivex.functions下的接口類通常用於處理上游數據而後往下傳遞.這些接口類的方法通常在對應的observer.onNext中調用.因此他們的線程保持一致.

總結:

subscribeOn由下往上逐級切換Observable.subscribe的執行線程,不受observeOn影響,也不受具備默認指定線程的非建立型操做符影響,可是會被更上游的subscribeOn奪取線程切換的權利,直到最上游.若是最上游的建立型操做符也有默認執行線程,那麼任何一個subscribeOn的線程切換不起做用.subscribeOn由下向上到達最上游後,而後由上往下影響下游的observer的執行線程.遇到observeOn會被奪取線程切換的權利.observeOn影響的是下游的observer的執行線程,由上往下,遇到另外一個observeOn會移交線程控制權力,遇到指定默認線程非建立型的操做符,要視具體狀況對待.

相關文章
相關標籤/搜索