RxJava之Scheduler (三) ——線程調度

默認狀況下,不作任何線程處理,Observable和Observer處於同一線程中。若是想要切換線程,則能夠使用subscribeOn()和observeOn()。bash

1.subscribeOn

subscribeOn經過接受一個Scheduler參數,來指定對數據的處理運行在特定的線程調度器Scheduler上。 若屢次執行subscribeOn,則只有一次起做用app

單擊subscribeOn()的源碼能夠看到,每次調用subscribeOn()都會建立一個ObservableSubscribeOn對象。async

/**
     * Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
     *
     * @param scheduler
     *            the {@link Scheduler} to perform subscription actions on
     * @return the source ObservableSource modified so that its subscriptions happen on the
     *         specified {@link Scheduler}
     * @see #observeOn
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
複製代碼

ObservableSubscribeOn真正發生訂閱的方法是subscribeActual(Observer<? super T> observer)。ide

@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }
複製代碼

其是,SubscribeOnObserver是下游Observer的OnSubscribe(Disposable disposable)方法ui

s.onSubscribe(parent);
複製代碼

而後,將子線程的操做加入Disposable管理中,加入Disposable後能夠方便上下游的統一管理。在這裏,已經調用了對應Scheduler的scheduleDirect()方法。scheduleDirect()傳入的是一個Runnable。this

parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
複製代碼

此時,已經再對應的Scheduler線程中運行了:spa

source.subscribe(parent);
複製代碼

在RxJava的鏈式操做中,數據的處理是自下而上的,這點與數據發射正好相反。若是屢次調用subscribeOn,則最上面的線程切換最晚執行,因此就變成了只有第一次切換線程有效線程

2.observeOn

observeOn一樣接受一個Scheduler參數,用來指定下游操做運行在特定的線程調度器Scheduler上。code

若屢次執行observeOn,則每次都起做用,線程會一直切換。orm

單擊observeOn的源碼能夠看到,每次調用observeOn()都會建立一個ObservableObserveOn對象。

/**
     * Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
     * asynchronously with an unbounded buffer with {@link Flowable#bufferSize()} "island size".
     *
     * <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
     * asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
     *
     * @param scheduler
     *            the {@link Scheduler} to notify {@link Observer}s on
     * @return the source ObservableSource modified so that its {@link Observer}s are notified on the specified
     *         {@link Scheduler}
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
複製代碼

ObservableObserveOn真正發生訂閱的方法是subscribeActual(Observer<? super T> observer)。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
    
        //若是scheduler是TrampolineScheduler,則上游事件和下游事件會當即產生訂閱。
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //不然scheduler會建立本身的Worker,而後上游事件和下游事件產生訂閱,生成一個ObserveOnObserver對象,封裝了下游真正的Observer。
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
複製代碼

若是scheduler是TrampolineScheduler,則上游事件和下游事件會當即產生訂閱。

若是不是TrampolineScheduler,則scheduler會建立本身的Worker,而後上游事件和下游事件產生訂閱,生成一個ObserveOnObserver對象,封裝了下游真正的Observer。

ObserveOnObserverObservableObserveOn的內部類,實現了Observer、Runnable接口。與SubscribeOnObserver不一樣的是,SubscribeOnObserver實現了Observer、DIsposable接口

在ObserveOnObserver的onNext()中,schedule()執行了具體調度的方法。

@Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
複製代碼

先寫到這兒,後續補充

相關文章
相關標籤/搜索