從源碼的角度分析 Rxjava2 的基本執行流程、線程切換原理

前言

因爲以前項目搭建的是 MVP 架構,由RxJava + Glide + OKHttp + Retrofit 等開源框架組合而成,以前也都是停留在使用層面上,沒有深刻的研究,最近打算把它們所有攻下,尚未關注的同窗能夠先關注一波,看完這個系列文章,(不論是面試仍是工做中處理問題)相信你都在知道原理的狀況下,處理問題更加駕輕就熟。java

Android 圖片加載框架 Glide 4.9.0 (一) 從源碼的角度分析 Glide 執行流程react

Android 圖片加載框架 Glide 4.9.0 (二) 從源碼的角度分析 Glide 緩存策略git

從源碼的角度分析 Rxjava2 的基本執行流程、線程切換原理github

從源碼的角度分析 OKHttp3 (一) 同步、異步執行流程面試

從源碼的角度分析 OKHttp3 (二) 攔截器的魅力數組

從源碼的角度分析 OKHttp3 (三) 緩存策略緩存

從源碼的角度分析 Retrofit 網絡請求,包含 RxJava + Retrofit + OKhttp 網絡請求執行流程markdown

介紹

RxJava 出來已經有幾年了,我相信你們多多少少都有使用過 RxJava (簡單來講:它就是一個實現異步操做的庫),它強大的操做變換符和線程切換等,使咱們的業務邏輯操做起來更加簡單明瞭。我使用 Rxjava 有 2 年左右了吧,當初仍是看扔物線給 Android 開發者的 RxJava 詳解 入的門,門如今入了,使用上也沒什麼障礙了,如今咱們就能夠來看下 Rxjava 底層是怎麼實現的。在瞭解原理以前,咱們先來看下基本使用。網絡

使用參考

這裏只是總結一下 Rxjava 操做符,不作示例講解。數據結構

被觀察者

觀察者 說明
Observable Observable 即被觀察者,決定何時觸發事件以及觸發怎樣的事件
Flowable Flowable 能夠當作是 Observable 的實現,只是它支持背壓
Single 只有 onSuccess 可 onError 事件,只能用 onSuccess 發射一個數據或一個錯誤通知,以後再發射數據也不會作任何處理,直接忽略
Completable 只有 onComplete 和 onError 事件,不發射數據,沒有 map,flatMap 操做符。經常結合 andThen 操做符使用
Maybe 沒有 onNext 方法,一樣須要 onSuccess 發射數據,且只能發射 0 或 1 個數據,多發也再也不處理

操做符

建立操做符

被觀察者的操做符 說明
create 建立一個被觀察者
just 建立一個被觀察者,併發送事件,發送的事件不能夠超過10個以上
fromArray 這個方法和 just() 相似,只不過 fromArray 能夠傳入多於10 個的變量,而且能夠傳入一個數組
fromCallable 這裏的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它會返回一個結果值,這個結果值就是發給觀察者的
fromFuture 參數中的 Future 是 java.util.concurrent 中的 Future,Future 的做用是增長了 cancel() 等方法操做 Callable,它能夠經過 get() 方法來獲取 Callable 返回的值
fromIterable 直接發送一個 List 集合數據給觀察者
defer 這個方法的做用就是直到被觀察者被訂閱後纔會建立被觀察者。
Timer 當到指定時間後就會發送一個 0L 的值給觀察者。
Interval 每隔一段時間就會發送一個事件,這個事件是從 0 開始,不斷增 1 的數字。
intervalRange 能夠指定發送事件的開始值和數量,其餘與 interval() 的功能同樣。
range 同時發送必定範圍的事件序列。
rangeLong 做用與 range() 同樣,只是數據類型爲 Long
empty 直接發送 onComplete() 事件
never 不發送任何事件
error 發送 onError() 事件

轉換操做符

名稱 說明
map() map 能夠將被觀察者發送的數據類型轉變成其餘的類型
flatMap() 這個方法能夠將事件序列中的元素進行整合加工,返回一個新的被觀察者。flatMap() 其實與 map() 相似,可是 flatMap() 返回的是一個 Observerable
concatMap() concatMap() 和 flatMap() 基本上是同樣的,只不過 concatMap() 轉發出來的事件是有序的,而 flatMap() 是無序的
buffer() 從須要發送的事件當中獲取必定數量的事件,並將這些事件放到緩衝區當中一併發出
groupBy() 將發送的數據進行分組,每一個分組都會返回一個被觀察者
scan() 將數據以必定的邏輯聚合起來
window() 發送指定數量的事件時,就將這些事件分爲一組。window 中的 count 的參數就是表明指定的數量,例如將 count 指定爲 2,那麼每發 2 個數據就會將這 2 個數據分紅一組。

組合操做符

操做符 說明
concat() 能夠將多個觀察者組合在一塊兒,而後按照以前發送順序發送事件。須要注意的是,concat() 最多隻能夠發送4個事件。
concatArray() 與 concat() 做用同樣,不過 concatArray() 能夠發送多於 4 個被觀察者。
merge() 這個方法月 concat() 做用基本同樣,知識 concat() 是串行發送事件,而 merge() 並行發送事件。
concatArrayDelayError() & mergeArrayDelayError() 在 concatArray() 和 mergeArray() 兩個方法當中,若是其中有一個被觀察者發送了一個 Error 事件,那麼就會中止發送事件,若是你想 onError() 事件延遲到全部被觀察者都發送完事件後再執行的話,就可使用 concatArrayDelayError() 和 mergeArrayDelayError()
zip() 會將多個被觀察者合併,根據各個被觀察者發送事件的順序一個個結合起來,最終發送的事件數量會與源 Observable 中最少事件的數量同樣。
combineLatest() & combineLatestDelayError() combineLatest() 的做用與 zip() 相似,可是 combineLatest() 發送事件的序列是與發送的時間線有關的,當 combineLatest() 中全部的 Observable 都發送了事件,只要其中有一個 Observable 發送事件,這個事件就會和其餘 Observable 最近發送的事件結合起來發送
reduce() 與 scan() 操做符的做用也是將發送數據以必定邏輯聚合起來,這兩個的區別在於 scan() 每處理一次數據就會將事件發送給觀察者,而 reduce() 會將全部數據聚合在一塊兒纔會發送事件給觀察者。
collect() 將數據收集到數據結構當中
startWith() & startWithArray() 在發送事件以前追加事件,startWith() 追加一個事件,startWithArray() 能夠追加多個事件。追加的事件會先發出。
count() 返回被觀察者發送事件的數量。

功能操做符

操做符 說明
delay() 延遲一段時間發送事件。
doOnEach() Observable 每發送一件事件以前都會先回調這個方法。
doOnNext() Observable 每發送 onNext() 以前都會先回調這個方法。
doAfterNext() Observable 每發送 onNext() 以後都會回調這個方法。
doOnComplete() Observable 每發送 onComplete() 以前都會回調這個方法。
doOnError() Observable 每發送 onError() 以前都會回調這個方法。
doOnSubscribe() Observable 每發送 onSubscribe() 以前都會回調這個方法。
doOnDispose() 當調用 Disposable 的 dispose() 以後回調該方法
doOnLifecycle() 在回調 onSubscribe 以前回調該方法的第一個參數的回調方法,可使用該回調方法決定是否取消訂閱
doOnTerminate() & doAfterTerminate() doOnTerminate 是在 onError 或者 onComplete 發送以前回調,而 doAfterTerminate 則是 onError 或者 onComplete 發送以後回調
doFinally() 在全部事件發送完畢以後回調該方法。
onErrorReturn() 當接受到一個 onError() 事件以後回調,返回的值會回調 onNext() 方法,並正常結束該事件序列
onErrorResumeNext() 當接收到 onError() 事件時,返回一個新的 Observable,並正常結束事件序列
onExceptionResumeNext() 與 onErrorResumeNext() 做用基本一致,可是這個方法只能捕捉 Exception。
retry() 若是出現錯誤事件,則會從新發送全部事件序列。times 是表明從新發的次數
retryUntil() 出現錯誤事件以後,能夠經過此方法判斷是否繼續發送事件。
retryWhen() 當被觀察者接收到異常或者錯誤事件時會回調該方法,這個方法會返回一個新的被觀察者。若是返回的被觀察者發送 Error 事件則以前的被觀察者不會繼續發送事件,若是發送正常事件則以前的被觀察者會繼續不斷重試發送事件
repeat() 重複發送被觀察者的事件,times 爲發送次數
repeatWhen() 這個方法能夠會返回一個新的被觀察者設定必定邏輯來決定是否重複發送事件。
subscribeOn() 指定被觀察者的線程,要注意的時,若是屢次調用此方法,只有第一次有效。
observeOn() 指定觀察者的線程,每指定一次就會生效一次。

過濾操做符

操做符 說明
filter() 經過必定邏輯來過濾被觀察者發送的事件,若是返回 true 則會發送事件,不然不會發送
ofType() 能夠過濾不符合該類型事件
skip() 跳過正序某些事件,count 表明跳過事件的數量
distinct() 過濾事件序列中的重複事件。
distinctUntilChanged() 過濾掉連續重複的事件。
take() 控制觀察者接收的事件的數量。
debounce() 若是兩件事件發送的時間間隔小於設定的時間間隔則前一件事件就不會發送給觀察者。
firstElement() && lastElement() firstElement() 取事件序列的第一個元素,lastElement() 取事件序列的最後一個元素。
elementAt() & elementAtOrError() elementAt() 能夠指定取出事件序列中事件,可是輸入的 index 超出事件序列的總數的話就不會出現任何結果。這種狀況下,你想發出異常信息的話就用 elementAtOrError() 。

條件操做符

操做符 說明
all() 判斷事件序列是否所有知足某個事件,若是都知足則返回 true,反之則返回 false。
takeWhile() 能夠設置條件,當某個數據知足條件時就會發送該數據,反之則不發送
skipWhile() 能夠設置條件,當某個數據知足條件時不發送該數據,反之則發送。
takeUntil() 能夠設置條件,當事件知足此條件時,下一次的事件就不會被髮送了。
skipUntil() 當 skipUntil() 中的 Observable 發送事件了,原來的 Observable 纔會發送事件給觀察者。
sequenceEqual() 判斷兩個 Observable 發送的事件是否相同。
contains() 判斷事件序列中是否含有某個元素,若是有則返回 true,若是沒有則返回 false。
isEmpty() 判斷事件序列是否爲空。
amb() amb() 要傳入一個 Observable 集合,可是隻會發送最早發送事件的 Observable 中的事件,其他 Observable 將會被丟棄
defaultIfEmpty() 若是觀察者只發送一個 onComplete() 事件,則能夠利用這個方法發送一個值

基本執行流程

在講基本執行流程以前咱們先來看一段代碼

@Test
public void test(){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                System.out.println("Thread = [" + Thread.currentThread().getName() + "]");

                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onComplete();

            }
        })
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("Observer = [ onSubscribe ]");
            }

            @Override
            public void onNext(String s) {
                System.out.println("Observer = [ onNext ]" + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Observer = [ onError ] "+e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("Observer = [ onComplete ]");
            }
        });

    }
複製代碼

Output:

Observer = [ onSubscribe ]
Observer = [ onNext ]1
Observer = [ onNext ]2
Observer = [ onNext ]3
Observer = [ onComplete ]
複製代碼

上面代碼就是一個簡單的被觀察者的一個訂閱關係,不涉及線程切換,經過 create 操做符建立一個新的觀察者,而後發射了幾條 String 的數據,下游接收到數據並打印出來,下面咱們就看下它的具體執行流程。

先來看一個流程圖

下面咱們就跟着流程圖看下代碼

//調用層調用 create 函數,返回一個被觀察者對象 
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //1. 判斷空處理
        ObjectHelper.requireNonNull(source, "source is null");
        //2 hook
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
複製代碼

create 主要作了 2 件事兒,首先對 source 判斷空,而後在鉤子函數中建立了一個 ObservableCreate 對象,而後而後這個 ObservableCreate 對象,咱們看一下內部

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //賦值給成員變量
        this.source = source;
    }
  ....
}

複製代碼

這裏咱們看到只是把 source 賦值給了 ObservableCreate 的成員變量。

接着咱們在看 Observable#subscribe

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        //1. 判空
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //2. hook ,返回一個觀察者
            observer = RxJavaPlugins.onSubscribe(this, observer);
            //3. 判空
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            //4. 內部調用 subscribeActual 函數
            subscribeActual(observer);
        } catch (NullPointerException e) {
            throw e;
        } catch (Throwable e) {
            ....
            throw npe;
        }
    }
複製代碼

被觀察者訂閱觀察者 subscribe 這裏主要作了 4 步

  1. 判斷觀察者是否爲空
  2. hook 觀察者,返回一個觀察者
  3. 對觀察者判斷
  4. 執行 subscribeActual

繼續看 subscribeActual 函數

//這裏是一個抽象類 看它的上一個流
    protected abstract void subscribeActual(Observer<? super T> observer);
複製代碼

這裏是一個抽象類,根據上面流程圖得知,ObservableCreate 實現了 這個函數,咱們繼續在跟

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //賦值給成員變量
        this.source = source;
    }
  
  ....
    
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //1. 建立一個發射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //2.表示訂閱成功
        observer.onSubscribe(parent);
        try {
            //3.建立發射器,主要發射數據
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
}
複製代碼

相信你們看到註釋 2 已經看到訂閱成功的回調消息,註釋 3 也回調到了 create 中,能夠調用 parent 發送數據了,這裏咱們看下 CreateEmitter 具體實現

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
				//1. 調用層調用
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //是否取消
            if (!isDisposed()) {
            //回調
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
        	
            if (!isDisposed()) {
                try {
                		//完成
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
		
		....
		
    }
複製代碼

根據上面代碼能夠看到,observer .onNext()/onComplete() 是在 CreateEmitter 內部中調用的,到這裏基本執行流程已經完成了,下面咱們看下線程切換原理。

線程切換原理

講解線程切換的原理以前,先請看下面一段代碼。

public void RxTest(){
       Observable.create(new io.reactivex.ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(io.reactivex.ObservableEmitter<String> emitter){
                System.out.println("Thread = [ 準備發射數據 " + Thread.currentThread().getName() + "]");

                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onComplete();

            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new io.reactivex.Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("Thread = [" + Thread.currentThread().getName() + " onSubscribe ]");
                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println("Thread = [" + Thread.currentThread().getName() + " onNext ]" + s);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("Thread = [" + Thread.currentThread().getName() + " onError ] "+e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("Thread = [" + Thread.currentThread().getName() + " onComplete ]");
                    }
                });
    }
複製代碼

output:

Thread = [main onSubscribe ] //訂閱成功
Thread = [ 準備發射數據 RxCachedThreadScheduler-1] //準備發送數據 ----》在 Rxjava 子線程
Thread = [main  onNext ]1 //在主線程中接收數據
Thread = [main  onNext ]2
Thread = [main  onNext ]3
Thread = [main onComplete ]
複製代碼

根據上面示例代碼先來看一個執行的時序圖

ucLrh4.png

被觀察者 ObservableSubscribeOn Schedulers.io() 子線程初始化

ugAZ9I.png

主線程初始化

ugAe3t.png

如今根據線程切換的初始化流程圖,來看下從具體發送數據到線程切換流程

ugnNvR.png

到這裏咱們對線程切換的執行流程已經有了必定的瞭解,知道了子線程從哪裏運行,主線程在哪裏運行。下面咱們根據實際的代碼來說解線程切換的原理:

  1. create

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    	//1. 判斷空處理
    	ObjectHelper.requireNonNull(source, "source is null");
    	//2 hook,返回一個 ObservableCreate 
    	return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    複製代碼
  2. ObservableCreate

    public final class ObservableCreate<T> extends Observable<T> {
      	//這裏的 source 表明的是 Observable.create(new ObservableOnSubscribe<String>() 中的 ObservableOnSubscribe
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            //賦值給成員變量
            this.source = source;
        }
    }
    複製代碼

    根據第一步和第二步,咱們知道在調用層建立一個被觀察者 ObservableCreate

  3. ObservableCreate 中指定子線程執行

    public abstract class Observable<T> implements ObservableSource<T> {
        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
          	//返回一個 ObservableSubscribeOn 被觀察者對象
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    }
    複製代碼

    根據流程可知,這裏的 Observable 實際上是在 ObservableCreate 內部中調用的,下面看 ObservableSubscribeOn 初始化

  4. ObservableSubscribeOn 初始化

    //線程切換的主要實現
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
          	//1. 將上一層的被觀察者傳遞給成員變量
            super(source);
          	//2. 將抽象 scheduler賦值給當前成員變量
            this.scheduler = scheduler;
        }
    }
    複製代碼

    根據上面時序圖可知,上面代碼中註釋一 其實就是 ObservableCreate, 而 scheduler 就是 IOscheduler

  5. 接着又在 ObservableSubscribeOn 中調用 Observable 的 observeOn 操做符

    public abstract class Observable<T> implements ObservableSource<T> {
    @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
      
    }
    複製代碼

    這裏返回一個 ObservableObserveOn 被觀察者,咱們看下初始化代碼

  6. ObservableObserveOn 初始化

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
      	
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
      
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
          	//1. source 表明上一個被觀察者
            super(source);
          	//2. 這裏指 HandlerScheduler
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    }
    複製代碼

    根據代碼註釋一可知 source 是上一個被觀察者,也就是 ObservableSubscribeOn,註釋二根據流程圖能夠知道它是 HandlerScheduler;

  7. 接着看被觀察者訂閱函數

    public abstract class Observable<T> implements ObservableSource<T> {
        @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            //1. 判空
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                //2. hook ,返回一個觀察者
                observer = RxJavaPlugins.onSubscribe(this, observer);
                //3. 判空
                ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
                //4. 內部調用 subscribeActual 函數
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
              ...
            }
        }
    }
    複製代碼

    根據流程可知當前的 Observable 是在 ObservableObserveOn 中調用的,因此根據註釋 4 可知,實現類是在 ObservableObserveOn 中;

  8. ObservableObserveOn 的 subscribeActual 函數實現

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
      
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                //1.返回 HandlerWorker
                Scheduler.Worker w = scheduler.createWorker();
                //2. 這裏表明回調到上一個被觀察者
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    }
    複製代碼

    根據註釋可知先拿到 HandlerWorker 而後執行上一層的 被觀察者,根據流程能夠這裏的 source 是 ObservableSubscribeOn;

  9. 將 ObserveOnObserver 對象傳遞給 ObservableSubscribeOn 的 subscribeActual 函數

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
    ...
        @Override
        public void subscribeActual(final Observer<? super T> observer) {
            //1. 包裝觀察者對象
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
            //2. 這裏的 observer 表明的是 ObserveOnObserver
            observer.onSubscribe(parent);
            //3. 設置被觀察者的運行線程 scheduler 表明的是 IOScheduler ,SubscribeTask 是一個 Runnable 運行在子線程中的
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    }
    複製代碼

    上面代碼註釋一是包裝 observer -> ObserveOnObserver , 註釋二這一步是在 ObserveOnObserver 中實現的,最後是訂閱成功的功能;註釋三這一步纔是線程切換的核心代碼,首先看 scheduler.scheduleDirect(new SubscribeTask(parent))

  10. scheduler.scheduleDirect(new SubscribeTask(parent)) 實現

    public abstract class Scheduler {   
    @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run) {
          	//1. 調用內部的重載函數
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
          @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            //2. 建立 IoSchedule
            final Worker w = createWorker();
            //3 run 表明的是 SubscribeTask
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            //4. 對 SubscribeTask ,IoSchedule 進行包裝
            DisposeTask task = new DisposeTask(decoratedRun, w);
            //5 執行 IoSchedule 中的 schedule 函數
            w.schedule(task, delay, unit);
            return task;
        }
    }
    複製代碼

    根據註釋 1-4 前面是初始化和包裝線程,重要的是註釋5 接着往下看。

  11. 執行 IoSchedule 中的 schedule 函數

    public final class IoScheduler extends Scheduler {        
    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
      if (tasks.isDisposed()) {
    	// don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    	}
    	//1. action 表明的是 DisposeTask
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    	}
    }
    複製代碼

    接着跟 threadWorker.scheduleActual

  12. threadWorker.scheduleActual 函數實現

    public class NewThreadWorker extends Scheduler.Worker implements Disposable {   
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            //1. run 表明的是 DisposeTask
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            //2. decoratedRun 表明的是 DisposeTask
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                  //3. 線程池執行子線程任務
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
            ...
            }
    
            return sr;
        }
    
    }
    複製代碼

    到這裏咱們已經找到了執行層中具體執行邏輯了。線程池執行那麼就會回調 SubscribeTask run 函數,咱們跟着看下 run 具體實現。

  13. SubscribeTask run 具體實現

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
    ...
      
    final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                //1. 這裏表明的是上一個被觀察者對象,這裏的也就是 create 操做符
                source.subscribe(parent);
            }
        }
    }
    複製代碼

    能夠看到 SubscribeTask 是 ObservableSubscribeOn 的非靜態內部類,那麼根據上面流程可知 source 其實就是 ObservableCreate 被觀察者

  14. 被觀察者 ObservableCreate 的 subscribeActual 實現

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            //賦值給成員變量
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            //1. 建立一個發射器
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            //2. 子線程 SubscribeOnObserver
            observer.onSubscribe(parent);
            try {
                //3. 發射器回調
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    }
    複製代碼

    根據上面代碼註釋可知先建立一個發射器,而後調用下游的 ObservableSubscribeOn#SubscribeOnObserver 的 onSubscribe 函數,最後回調發射器 subscribe ,如今能夠進行數據傳遞了。其實到這裏emitter.onNext("1"); 已是在子線程中了,那麼咱們在跟一下發射數據到接收數據之間線程的切換。

  15. emitter.onNext("1") 執行

    public final class ObservableCreate<T> extends Observable<T> {        
    //1. 調用層 調用
    @Override
    public void onNext(T t) {
    	if (t == null) {
    		onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
    		return;
    	}
    	if (!isDisposed()) {
    	//1.1 回調到下游
    	observer.onNext(t);
    	}
    }
    複製代碼

    根據執行流程時序圖可知,這裏的 observer 是 ObservableCreate 的下游 ObservableSubscribeOn#SubscribeOnObserver.onNext(T t) 繼續跟

  16. ObservableSubscribeOn#SubscribeOnObserver.onNext(T t) 實現

    //線程切換的主要實現
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
       ...
    
    
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {        
    @Override
            public void onNext(T t) {
                downstream.onNext(t);
            } 
        }
    複製代碼

    代碼中的 downstream 也表明的是下游 Observer, 繼續跟

  17. ObservableObserveOn#ObserveOnObserver.next()

    @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                  	//任務放入隊列中
                    queue.offer(t);
                }
              	//調用內部重載
                schedule();
            }
    
            void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
    複製代碼

    這裏的 worker 是 HandlerScheduler#HandlerWorker

  18. HandlerWorker.schedule 函數具體實現

    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    	if (run == null) throw new NullPointerException("run == null");
    	if (unit == null) throw new NullPointerException("unit == null");
    	if (disposed) {
    		return Disposables.disposed();
    	}
    
    	run = RxJavaPlugins.onSchedule(run);
    	//1. 對 Runnable 進行包裝
    	ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    	//2. 對象池中拿到 message
    	Message message = Message.obtain(handler, scheduled);
    	message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
      if (async) {
    		message.setAsynchronous(true);
    	}
    	//3. 發送數據
    	handler.sendMessageDelayed(message, unit.toMillis(delay));
    	//4. 若是中止發送數據,就刪除主線程回調
    	if (disposed) {
    		handler.removeCallbacks(scheduled);
    	return Disposables.disposed();
    	}
    	return scheduled;
    }
    複製代碼

    上面代碼就是子線程切換到主線程的主要代碼,沒錯就是 handler, 當 handler sendMessageDelayed 會回調 Runnable 的 run 函數,不知道有沒有人發現這裏的 runnable 接收的數據在哪裏,沒錯根據 worker.schedule(this); 的 this 可知,它就是 ObserveOnObserver 觀察者,實現了 runnable 對象,接下來看 ObserveOnObserver run 函數

  19. ObserveOnObserver run 函樹實現

    @Override
            public void run() {
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
    
    
            void drainNormal() {
                int missed = 1;
    
                final SimpleQueue<T> q = queue;
                final Observer<? super T> a = downstream;
    
                for (;;) {
                    if (checkTerminated(done, q.isEmpty(), a)) {
                        return;
                    }
    
                    for (;;) {
         
                      	...
    										//回調到調用層
                        a.onNext(v);
                    }
    								...
                }
            }
    複製代碼

    最後 a.onNext(v); 回調到了調用層。

    到這裏整個線程切換已經講完了,相信你們對 Rxjava 的線程切換有了必定的認識了。

總結

不知道你們在看時序圖的時候有沒有發現,其實初始化的時候跟流水線同樣,將一個原始的產品通過重重包裝,最後包裝完而後利用抽象 subscribeActual 函數,一步一步又回滾上去,最後發射數據又通過重重傳遞最後回調到調用層,這纔算是一個完整過程。

其實我對 Rxjava 的理解,有點像接口回調同樣,基本上靠回調傳遞數據。

你們若是還有不理解的能夠根據我繪製的時序圖跟着源代碼走一遍。

相關文章
相關標籤/搜索