RxJava2 源碼解析(二)

轉載請標明出處:
juejin.im/post/58ce8c…
本文出自:【張旭童的稀土掘金】(gold.xitu.io/user/56de21…)javascript

概述

承接上一篇RxJava2 源碼解析(一)
本系列咱們的目的:java

  1. 知道源頭(Observable)是如何將數據發送出去的。
  2. 知道終點(Observer)是如何接收到數據的。
  3. 什麼時候將源頭和終點關聯起來的
  4. 知道線程調度是怎麼實現的
  5. 知道操做符是怎麼實現的

本篇計劃講解一下4,5.app

RxJava最強大的莫過於它的線程調度花式操做符異步

map操做符

map是一個高頻的操做符,咱們首先拿他開刀。
例子以下,源頭Observable發送的是String類型的數字,利用map轉換成int型,最終在終點Observer接受到的也是int類型數據。:ide

final Observable<String> testCreateObservable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onComplete()
            }
        });複製代碼
Observable<Integer> map = testCreateObservable.map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return Integer.parseInt(s);
                    }
                });
                map.subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext() called with: value = [" + value + "]");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError() called with: e = [" + e + "]");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete() called");
                    }
                });複製代碼

咱們看一下map函數的源碼:函數

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        //判空略過
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //RxJavaPlugins.onAssembly()是hook 上文提到過
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }複製代碼

RxJavaPlugins.onAssembly()是hook 上文提到過,因此咱們只要看ObservableMap,它就是返回到咱們手裏的Observable:post

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    //將function變換函數類保存起來
    final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        //super()將上游的Observable保存起來 ,用於subscribeActual()中用。
        super(source);
        this.function = function; } @Override public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function)); }複製代碼

它繼承自AbstractObservableWithUpstream,該類繼承自Observable,很簡單,就是將上游的ObservableSource保存起來,作一次wrapper,因此它也算是裝飾者模式的提現,以下:ui

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    //將上游的`ObservableSource`保存起來
    protected final ObservableSource<T> source;
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
    @Override
    public final ObservableSource<T> source() {
        return source;
    }
}複製代碼

關於ObservableSource,表明了一個標準的無背壓的 源數據接口,能夠被Observer消費(訂閱),以下:this

public interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}複製代碼

全部的Observable都已經實現了它,因此咱們能夠認爲ObservableObservableSource在本文中是相等的spa

public abstract class Observable<T> implements ObservableSource<T> {複製代碼

因此咱們獲得的ObservableMap對象也很簡單,就是將上游的Observable和變換函數類Function保存起來
Function的定義超級簡單,就是一個接口,給我一個T,還你一個R.

public interface Function<T, R> {
    R apply(T t) throws Exception;
}複製代碼

本例寫的是將String->int.

重頭戲subscribeActual()是訂閱真正發生的地方,ObservableMap以下編寫,就一句話,用MapObserver訂閱上游Observable。

@Override
    public void subscribeActual(Observer<? super U> t) {
    //用MapObserver訂閱上游Observable。
        source.subscribe(new MapObserver<T, U>(t, function)); }複製代碼

MapObserver也是裝飾者模式,對終點(下游)Observer修飾。

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            //super()將actual保存起來
            super(actual);
            //保存Function變量
            this.mapper = mapper;
        }
        @Override
        public void onNext(T t) {
            //done在onError 和 onComplete之後纔會是true,默認這裏是false,因此跳過
            if (done) {
                return;
            }
            //默認sourceMode是0,因此跳過
            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
            //下游Observer接受的值
            U v;
            //這一步執行變換,將上游傳過來的T,利用Function轉換成下游須要的U。
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //變換後傳遞給下游Observer
            actual.onNext(v);
        }複製代碼

到此咱們梳理一下流程:
訂閱的過程,是從下游到上游依次訂閱的。

  1. 終點 Observer 訂閱了 map 返回的ObservableMap
  2. 而後mapObservable(ObservableMap)在被訂閱時,會訂閱其內部保存上游Observable,用於訂閱上游的Observer是一個裝飾者(MapObserver),內部保存了下游(本例是終點)Observer以便上游發送數據過來時,能傳遞給下游
  3. 以此類推,直到源頭Observable被訂閱,根據上節課內容,它開始向Observer發送數據

數據傳遞的過程,固然是從上游push到下游的,

  1. 源頭Observable傳遞數據給下游Observer(本例就是MapObserver
  2. 而後MapObserver接收到數據,對其變換操做後(實際的function在這一步執行),再調用內部保存的下游ObserveronNext()發送數據給下游
  3. 以此類推,直到終點Observer

線程調度subscribeOn

簡化問題,代碼以下:

Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        Log.d(TAG, "subscribe() called with: e = [" + e + "]" + Thread.currentThread());
                        e.onNext("1");
                        e.onComplete();
                    }
                    //只是在Observable和Observer之間增長了一句線程調度代碼
                }).subscribeOn(Schedulers.io())
                        .subscribe(new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {
                                Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
                            }
                            @Override
                            public void onNext(String value) {
                                Log.d(TAG, "onNext() called with: value = [" + value + "]");
                            }
                            @Override
                            public void onError(Throwable e) {
                                Log.d(TAG, "onError() called with: e = [" + e + "]");
                            }
                            @Override
                            public void onComplete() {
                                Log.d(TAG, "onComplete() called");
                            }
                        });複製代碼

只是在ObservableObserver之間增長了一句線程調度代碼:.subscribeOn(Schedulers.io()).
查看subscribeOn()源碼:

public final Observable<T> subscribeOn(Scheduler scheduler) {
    //判空略過
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //拋開Hook,重點仍是ObservableSubscribeOn
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }複製代碼

等等,怎麼有種似曾相識的感受,你們能夠把文章向上翻,看看map()的源碼。
subscribeOn()的套路一模一樣,那麼咱們根據上面的結論,
先猜想ObservableSubscribeOn類也是一個包裝類(裝飾者),點進去查看:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    //保存線程調度器
    final Scheduler scheduler;
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        //map的源碼中咱們分析過,super()只是簡單的保存ObservableSource
        super(source);
        this.scheduler = scheduler;
    }
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //1 建立一個包裝Observer
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //2 手動調用 下游(終點)Observer.onSubscribe()方法,因此onSubscribe()方法執行在 訂閱處所在的線程
        s.onSubscribe(parent);
        //3 setDisposable()是爲了將子線程的操做加入Disposable管理中
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
            //4 此時已經運行在相應的Scheduler 的線程中
                source.subscribe(parent);
            }
        }));
    }複製代碼

和map套路大致一致,ObservableSubscribeOn自身一樣是個包裝類,同樣繼承AbstractObservableWithUpstream
建立了一個SubscribeOnObserver類,該類按照套路,應該也是實現了ObserverDisposable接口的包裝類,讓咱們看一下:

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        //真正的下游(終點)觀察者
        final Observer<? super T> actual;
        //用於保存上游的Disposable,以便在自身dispose時,連同上游一塊兒dispose
        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            //onSubscribe()方法由上游調用,傳入Disposable。在本類中賦值給this.s,加入管理。
            DisposableHelper.setOnce(this.s, s);
        }

        //直接調用下游觀察者的對應方法
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }
        @Override
        public void onComplete() {
            actual.onComplete();
        }

        //取消訂閱時,連同上游Disposable一塊兒取消
        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
        //這個方法在subscribeActual()中被手動調用,爲了將Schedulers返回的Worker加入管理
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }複製代碼

這兩個類根據上一節的鋪墊加上註釋,其餘都好理解,稍微很差理解的應該是下面兩句代碼:

//ObservableSubscribeOn類
        //3 setDisposable()是爲了將子線程的操做加入Disposable管理中
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
            //4 此時已經運行在相應的Scheduler 的線程中
                source.subscribe(parent);
            }
        }));

        //SubscribeOnObserver類
        //這個方法在subscribeActual()中被手動調用,爲了將Schedulers返回的Worker加入管理
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }複製代碼

其中scheduler.scheduleDirect(new Runnable()..)方法源碼以下:

/** * Schedules the given task on this scheduler non-delayed execution. * ..... */
    public Disposable scheduleDirect(Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }複製代碼

從註釋和方法名咱們能夠看出,這個傳入的Runnable會馬上執行
再繼續往裏面看:

public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        //class Worker implements Disposable ,Worker自己是實現了Disposable 
        final Worker w = createWorker();
        //hook略過
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //開始在Worker的線程執行任務,
        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                //調用的是 run()不是 start()方法執行的線程的方法。
                    decoratedRun.run();
                } finally {
                //執行完畢會 dispose()
                    w.dispose();
                }
            }
        }, delay, unit);
        //返回Worker對象
        return w;
    }複製代碼

createWorker()是一個抽象方法,由具體的Scheduler類實現,例如IoScheduler對應的Schedulers.io().

public abstract Worker createWorker();複製代碼

初看源碼,爲了瞭解大體流程,不宜過入深刻,先點到爲止。
OK,如今咱們總結一下scheduler.scheduleDirect(new Runnable()..)的重點:

  1. 傳入的Runnable馬上執行的。
  2. 返回的Worker對象就是一個Disposable對象
  3. Runnable執行時,是直接手動調用的 run(),而不是 start()方法.
  4. 上一點應該是爲了,能控制在run()結束後(包括異常終止),都會自動執行Worker.dispose().

而返回的Worker對象也會被parent.setDisposable(...)加入管理中,以便在手動dispose()時能取消線程裏的工做。

咱們總結一下subscribeOn(Schedulers.xxx())過程

  1. 返回一個ObservableSubscribeOn包裝類對象
  2. 上一步返回的對象被訂閱時,回調該類中的subscribeActual()方法,在其中會馬上將線程切換到對應的Schedulers.xxx()線程。
  3. 在切換後的線程中,執行source.subscribe(parent);對上游(終點)Observable訂閱
  4. 上游(終點)Observable開始發送數據,根據RxJava2 源碼解析(一),上游發送數據僅僅是調用下游觀察者對應的onXXX()方法而已,因此此時操做是在切換後的線程中進行

一點擴展,
你們可能看過一個結論:
subscribeOn(Schedulers.xxx())切換線程N次,老是以第一次爲準,或者說離源Observable最近的那次爲準,而且對其上面的代碼生效(這一點對比的ObserveOn())。

爲何?

  • 由於根據RxJava2 源碼解析(一)中提到,訂閱流程從下游往上游傳遞
  • subscribeActual()裏開啓了Scheduler的工做,source.subscribe(parent);,從這一句開始切換了線程,因此在這之上的代碼都是在切換後的線程裏的了。
  • 但若是連續切換最上面的切換最晚執行,此時線程變成了最上面的subscribeOn(xxxx)指定的線程,
  • 數據push時,是從上游到下游的,因此會在離源頭最近的那次subscribeOn(xxxx)的線程裏push數據(onXXX())給下游。

可寫以下代碼驗證:

Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        Log.d(TAG, "subscribe() called with: e = [" + e + "]" + Thread.currentThread());
                        e.onNext("1");
                        e.onComplete();
                    }
                }).subscribeOn(Schedulers.io())
                        .map(new Function<String, String>() {
                            @Override
                            public String apply(String s) throws Exception {
                                //依然是io線程
                                Log.d(TAG, "apply() called with: s = [" + s + "]" + Thread.currentThread());
                                return s;
                            }
                        })
                        .subscribeOn(Schedulers.computation())
                        .subscribe(new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {
                                Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
                            }
                            @Override
                            public void onNext(String value) {
                                Log.d(TAG, "onNext() called with: value = [" + value + "]");
                            }
                            @Override
                            public void onError(Throwable e) {
                                Log.d(TAG, "onError() called with: e = [" + e + "]");
                            }
                            @Override
                            public void onComplete() {
                                Log.d(TAG, "onComplete() called");
                            }
                        });複製代碼

線程調度observeOn

在上一節的基礎上,增長一個observeOn(AndroidSchedulers.mainThread()),就完成了觀察者線程的切換。

.subscribeOn(Schedulers.computation())
                        //在上一節的基礎上,增長一個ObserveOn
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Observer<String>() {複製代碼

繼續看源碼吧,我已經能猜出來了,hook+new XXXObservable();

public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ....
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }複製代碼

果真,查看ObservableObserveOn,:
高能預警,這部分的代碼 有些略多,建議讀者打開源碼邊看邊讀。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    //本例是 AndroidSchedulers.mainThread()
    final Scheduler scheduler;
    //默認false
    final boolean delayError;
    //默認128
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // false
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //1 建立出一個 主線程的Worker
            Scheduler.Worker w = scheduler.createWorker();
            //2 訂閱上游數據源, 
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }複製代碼

本例中,就是兩步:

  1. 建立一個AndroidSchedulers.mainThread()對應的Worker
  2. ObserveOnObserver訂閱上游數據源。這樣當數據從上游push下來,會由ObserveOnObserver對應的onXXX()處理。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
        //下游的觀察者
        final Observer<? super T> actual;
        //對應Scheduler裏的Worker
        final Scheduler.Worker worker;
        //上游被觀察者 push 過來的數據都存在這裏
        SimpleQueue<T> queue;
        Disposable s;
        //若是onError了,保存對應的異常
        Throwable error;
        //是否完成
        volatile boolean done;
        //是否取消
        volatile boolean cancelled;
        // 表明同步發送 異步發送 
        int sourceMode;
        ....
        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                //省略大量無關代碼
                //建立一個queue 用於保存上游 onNext() push的數據
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //回調下游觀察者onSubscribe方法
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            //1 執行過error / complete 會是true
            if (done) {
                return;
            }
            //2 若是數據源類型不是異步的, 默認不是
            if (sourceMode != QueueDisposable.ASYNC) {
                //3 將上游push過來的數據 加入 queue裏
                queue.offer(t);
            }
            //4 開始進入對應Workder線程,在線程裏 將queue裏的t 取出 發送給下游Observer
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            //已經done 會 拋異常 和 上一篇文章裏提到的同樣
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            //給error存個值 
            error = t;
            done = true;
            //開始調度
            schedule();
        }

        @Override
        public void onComplete() {
        //已經done 會 返回 不會crash 和上一篇文章裏提到的同樣
            if (done) {
                return;
            }
            done = true;
            //開始調度
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                //該方法須要傳入一個線程, 注意看本類實現了Runnable的接口,因此查看對應的run()方法
                worker.schedule(this);
            }
        }
        //從這裏開始,這個方法已是在Workder對應的線程裏執行的了
        @Override
        public void run() {
            //默認是false
            if (outputFused) {
                drainFused();
            } else {
                //取出queue裏的數據 發送
                drainNormal();
            }
        }


        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                // 1 若是已經 終止 或者queue空,則跳出函數,
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        //2 從queue裏取出一個值
                        v = q.poll();
                    } catch (Throwable ex) {
                        //3 異常處理 並跳出函數
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        return;
                    }
                    boolean empty = v == null;
                    //4 再次檢查 是否 終止 若是知足條件 跳出函數
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
                    //5 上游還沒結束數據發送,可是這邊處理的隊列已是空的,不會push給下游 Observer
                    if (empty) {
                        //僅僅是結束此次循環,不發送這個數據而已,並不會跳出函數
                        break;
                    }
                    //6 發送給下游了
                    a.onNext(v);
                }

                //7 對不起這裏我也不是很明白,大體猜想是用於 同步原子操做 若有人知道 煩請告知 
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        //檢查 是否 已經 結束(error complete), 是否沒數據要發送了(empty 空), 
        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            //若是已經disposed 
            if (cancelled) {
                queue.clear();
                return true;
            }
            // 若是已經結束
            if (d) {
                Throwable e = error;
                //若是是延遲發送錯誤
                if (delayError) {
                    //若是空
                    if (empty) {
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        //中止worker(線程)
                        worker.dispose();
                        return true;
                    }
                } else {
                    //發送錯誤
                    if (e != null) {
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    //發送complete
                    if (empty) {
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }
    }複製代碼

核心處都加了註釋,總結起來就是,

  1. ObserveOnObserver實現了ObserverRunnable接口。
  2. onNext()裏,先不切換線程,將數據加入隊列queue而後開始切換線程,在另外一線程中,queue裏取出數據,push給下游Observer
  3. onError() onComplete()除了和RxJava2 源碼解析(一)提到的同樣特性以外,也是將錯誤/完成信息先保存,切換線程後再發送。
  4. 因此observeOn()影響的是其下游的代碼,且屢次調用仍然生效。
  5. 由於其切換線程代碼是在ObserveronXXX()作的,這是一個主動的push行爲(影響下游)。
  6. 關於屢次調用生效問題。對比subscribeOn()切換線程是在subscribeActual()裏作的,只是主動切換了上游的訂閱線程,從而影響其發射數據時所在的線程。而直到真正發射數據以前,任何改變線程的行爲,都會生效(影響發射數據的線程)。因此subscribeOn()只生效一次。observeOn()是一個主動的行爲,而且切換線程後會馬上發送數據,因此會生效屢次.

轉載請標明出處:
juejin.im/post/58ce8c…
本文出自:【張旭童的稀土掘金】(gold.xitu.io/user/56de21…)

總結

本文帶你們走讀分析了三個東西:

map操做符原理:

  • 內部對上游Observable進行訂閱
  • 內部訂閱者接收到數據後,將數據轉換發送給下游Observer.
  • 操做符返回的Observable其內部訂閱者、是裝飾者模式的體現。
  • 操做符數據變換的操做,也是發生在訂閱後

線程調度subscribeOn()

  • 內部先切換線程,在切換後的線程中對上游Observable進行訂閱,這樣上游發送數據時就是處於被切換後的線程裏了。
  • 也所以屢次切換線程最後一次切換(離源數據最近)的生效
  • 內部訂閱者接收到數據後,直接發送給下游Observer.
  • 引入內部訂閱者是爲了控制線程(dispose)
  • 線程切換發生在Observable中。

線程調度observeOn():

  • 使用裝飾的Observer對上游Observable進行訂閱
  • ObserveronXXX()方法裏,將待發送數據存入隊列,同時請求切換線程處理真正push數據給下游。
  • 屢次切換線程,都會對下游生效

源碼裏那些實現了Runnable的類或者匿名內部類,最終並無像往常那樣被丟給Thread類執行。
而是先切換線程,再直接執行Runnablerun()方法。
這也加深了我對面向對象,對抽象、Runnable的理解,它就是一個簡簡單單的接口,裏面就一個簡簡單單的run()
我認爲,之因此有Runnable,只是抽象出 一個可運行的任務的概念。也許這句話很平淡,書上也會提到,各位大佬早就知道,可是現在我順着RxJava2的源碼這麼走讀了一遍,確真真切切的感覺到了這些設計思想的美妙。

相關文章
相關標籤/搜索