rxjava2源碼解析(二)線程切換分析

引言

先貼上系列的連接:
rxjava2源碼解析(一)基本流程分析
rxjava2源碼解析(二)線程切換分析
rxjava2源碼解析(三)線程池原理分析java

上一篇介紹了rxjava2源碼解析(一)基本流程分析,這篇準備說說線程切換。bash

使用方法

仍是先從最基本的使用開始看:異步

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onNext("4");
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "s = " + s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
複製代碼

相較於前面的實例,多了兩段代碼,subscribeOnobserveOn。咱們知道,subscribeOn是用來調整被觀察者(發射源)的線程,而observeOn是調整觀察者(處理器)的線程。ide

observeOn

咱們先從observeOn的源碼來看它是如何控制處理器的線程:post

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //判空代碼,和hock相關機制,咱們能夠忽略,直接看ObservableObserveOn
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    
複製代碼

首先咱們知道,observeOnsubscribeOn都是observable這個裝飾器的方法,他們的返回值也都是observable(前面講過這是裝飾器模式)。這裏仍是老樣子,直接看ObservableObserveOn這個對象就好了。ui

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        //將上游source存在本地
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //若是是當前線程,就不作處理
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
複製代碼

重點仍是subscribeActual這個方法。咱們看到,初始化ObservableObserveOn的時候傳入了咱們設置的scheduler。因此在subscribeActual裏,先判斷scheduler是不是TrampolineSchedulerTrampolineScheduler是什麼東西呢?咱們看官方註釋:this

/**
 * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed
 * after the current unit of work is completed.
 *計劃在當前線程上工做,但不會當即執行。 將工做放入隊列並在當前工做單元完成後執行。
 */
複製代碼

OK,一目瞭然,若是是當前線程,不作任何處理,直接用綁定起來。不然,新建一個ObserveOnObserver對象,將上游裝飾器(這裏的上游是代碼流程上的上游,即調用observeOn的裝飾器)先於這個對象綁定。這個看起來是否是很眼熟?咱們回看上一篇裏面說的ObservableCreate裏的subscribeActual方法,對比一下有什麼不一樣。spa

public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

能夠看到,二者都有在內部新建一個對象,並將上游裝飾器(或者是發射源)與之綁定。區別在於,ObservableCreate內部是新建一個發射器CreateEmitter對象,而ObservableObserveOn內部是新建一個處理器ObserveOnObserver對象。ObservableCreate是將以前存儲的上游發射源與發射器綁定,ObservableObserveOn是將上游裝飾器與處理器綁定。
根據這一點,咱們能夠將裝飾器分爲兩種樣式:線程

  • 一種是跟ObservableCreate相似,屬於每一條流水線的開端,自己是裝飾器,上游是發射源,內部生成一個發射器,處理最開始的發射流程。咱們稱之爲起始裝飾器
  • 一種是跟ObservableObserveOn相似,屬於流水線中間流程,自己是裝飾器,上游是裝飾器,內部新建一個處理器來處理上游事件,下游是處理器或者其餘裝飾器。咱們稱之爲流程裝飾器。

下面咱們能夠看看ObserveOnObserver的源碼。code

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            //同步異步相關,暫時無論
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            //重點是這個
            schedule();
        }
        
        void schedule() {
            if (getAndIncrement() == 0) {
                //ObserveOnObserver繼承了runnable接口,意味着能夠當作是線程任務來執行。這裏表明着在新線程中執行run方法。
                worker.schedule(this);
            }
        }
        //ObserveOnObserver繼承了runnable接口
        @Override
        public void run() {
        //同步異步相關,咱們直接看drainNormal()
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
        
        void drainNormal() {
            int missed = 1;
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
                for (;;) {
                    ····//省略一些判斷的代碼
                    v = q.poll();
                    //這裏就能夠看到,將下游的onNext方法,切換到新線程執行。
                    a.onNext(v);
                }
                ···
            }
        }
        
    }
複製代碼

這裏咱們就不深刻細究了(會在下一篇中詳細來講),只須要知道,這是上游的處理器執行onNext,傳到這裏,使用以前設置的線程執行下游的onNext方法。因此這裏就完成了線程切換功能。而且這個切換,延續到下游全部處理的onNext。若是在下游再次調用ObserverOn,就會將後面的處理器切換到另一個線程。
因此,咱們能夠獲得結論,ObserverOn能夠屢次調用,每次調用會做用於下游的全部處理器,直到遇到新的ObserverOn

subscribeOn

接下來能夠看看,SubscribeOn的源碼內容。

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //判空和hock機制代碼,忽略。直接看ObservableSubscribeOn
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
複製代碼

同樣的配方,同樣的味道,咱們能夠直接看ObservableSubscribeOn類的subscribeActual方法。

@Override
    public void subscribeActual(final Observer<? super T> observer) {
        //建立SubscribeOnObserver內部類對象
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        //這裏調用了下游的onSubscribe
        observer.onSubscribe(parent);
        //scheduler.scheduleDirect方法返回一個disposable
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
複製代碼

這裏能夠看到,ObservableSubscribeOn一樣是一個流程裝飾器,在調用subscribe的時候,內部新建一個處理器,這個處理器與下游處理器相互持有。這裏與ObservableObserveOn有所不一樣,並無當即執行subscribe將上游裝飾器與內部處理器鏈接起來,而是執行了:

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
複製代碼

咱們看看SubscribeTask是啥:

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //這裏的source是上游裝飾器,parent是內部處理器
            source.subscribe(parent);
        }
    }
複製代碼

很簡單,SubscribeTask繼承了Runnable,其中run方法是執行上游裝飾器的subscribe方法。咱們再看scheduleDirect方法。

@NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }
複製代碼

這裏createWorker方法,是每一個不一樣的scheduler自行重寫的。咱們不深究這塊的代碼,只須要理解爲,在subscribeActual中,根據前面設置的scheduler,新建一個線程,在線程中當即執行上游sourcesubscribe方法,與內部處理器SubscribeOnObserver綁定。SubscribeOnObserver處理器與通常處理器沒什麼區別,就不貼代碼了。
那麼上面就實現了,在另一個線程執行subscribe方法。看過前面基本流程的都知道,這裏基本上就肯定了,上游全部subscribe方法執行的線程。因此咱們知道了,subscribeOn()方法會切換上游全部的subscribe方法,至於發射源所在的線程,只跟離它最近的subscribeOn()方法中所切換的線程有關。因此說,subscribeOn()方法只須要執行一次,且只有第一次是生效的。

onSubscribe

咱們看到,終端Observer這裏有一個onSubscribe方法,咱們通常在這裏進行一些初始化的操做,而前面的源碼中也有不少地方有onSubscribe方法。那這個方法究竟是執行在哪一個線程呢?咱們從源碼中尋找答案。
想要知道onSubscribe方法在哪一個線程,只須要看,在離終端處理器最近的上游裝飾器中,是在哪一個線程調用onSubscribe()的。咱們來看看subscribeOnobserveOn有沒有更改它所運行的線程。 先看ObserveOn,畢竟這是做用於下游處理器的線程切換方法。

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
複製代碼

從上面的源碼能夠看到,ObservableObserveOn類中的subscribeActual並無onSubscribe相關的內容。咱們看看ObserveOnObserver的源碼:

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                ·····//不相干代碼
                //這裏的downstream是指下游處理器
                downstream.onSubscribe(this);
            }
        }
複製代碼

咱們看到,在ObserveOnObserver被調用onSubscribe的時候,會調用下游的onSubscribe,參數是自己。也就是說,ObserveOn並無切換onSubscribe方法的線程。
再看subscribeOn方法,很明顯是直接在subscribeActual中執行下游處理器的onSubscribe方法。

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
複製代碼

這兩個線程切換的方法,都沒有更改onSubscribe()方法的線程。因此咱們能肯定,在終端的處理器Observer裏面的onSubscribe()方法,是跟外部在同一個線程上。

總結一下

  • observeOn做用於下游的全部處理器,能夠屢次調用。每個處理器所運行的線程,決定於它最近的上游observeOn方法中指定的線程。
  • subscribeOn做用於上游的發射源,主要是用來指定subscribe方法所在的線程。針對於發射源,只有離它最近的下游subscribeOn方法中所指定的線程才生效。因此subscribeOn方法屢次調用並無效果。
  • onSubscribe()方法並不會跟隨內部線程切換而切換線程。運行在哪一個線程,只跟外部建立這一整套觀察者模式的線程一致。

OK,上面就是本篇的所有內容了,下一篇咱們深刻看一看,rxjava2 裏面的線程池和線程切換究竟是怎麼實現的!

每週更新,敬請期待~

相關文章
相關標籤/搜索