詳解 RxJava 的消息訂閱和線程切換原理

本文由玉剛說寫做平臺提供寫做贊助java

原做者:四月葡萄git

版權聲明:本文版權歸微信公衆號 玉剛說 全部,未經許可,不得以任何形式轉載github

1.前言

本文主要是對RxJava的消息訂閱和線程切換進行源碼分析,相關的使用方式等不做詳細介紹。數據庫

本文源碼基於rxjava:2.1.14json

2. RxJava簡介

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.緩存

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.安全

上面這段話來自於RxJava在github上面的官方介紹。翻譯成中文的大概意思就是:微信

RxJava是一個在Java虛擬機上的響應式擴展,經過使用可觀察的序列將異步和基於事件的程序組合起來的一個庫。markdown

它擴展了觀察者模式來支持數據/事件序列,而且添加了操做符,這些操做符容許你聲明性地組合序列,同時抽象出要關注的問題:好比低級線程、同步、線程安全和併發數據結構等。網絡

簡單點來講, RxJava就是一個使用了觀察者模式,可以異步的庫。

3. 觀察者模式

上面說到,RxJava擴展了觀察者模式,那麼什麼是觀察模式呢?咱們先來了解一下。

舉個例子,以微信公衆號爲例,一個微信公衆號會不斷產生新的內容,若是咱們讀者對這個微信公衆號的內容感興趣,就會訂閱這個公衆號,當公衆號有新內容時,就會推送給咱們。咱們收到新內容時,若是是咱們感興趣的,就會點進去看下;若是是廣告的話,就可能直接忽略掉。這就是咱們生活中遇到的典型的觀察者模式。

在上面的例子中,微信公衆號就是一個被觀察者(Observable),不斷的產生內容(事件),而咱們讀者就是一個觀察者(Observer) ,經過訂閱(subscribe)就可以接受到微信公衆號(被觀察者)推送的內容(事件),根據不一樣的內容(事件)作出不一樣的操做。

3.1 Rxjava角色說明

RxJava的擴展觀察者模式中就是存在這麼4種角色:

角色 角色功能
被觀察者(Observable 產生事件
觀察者(Observer 響應事件並作出處理
事件(Event 被觀察者和觀察者的消息載體
訂閱(Subscribe 鏈接被觀察者和觀察者

3.2 RxJava事件類型

RxJava中的事件分爲三種類型:Next事件、Complete事件和Error事件。具體以下:

事件類型 含義 說明
Next 常規事件 被觀察者能夠發送無數個Next事件,觀察者也能夠接受無數個Next事件
Complete 結束事件 被觀察者發送Complete事件後能夠繼續發送事件,觀察者收到Complete事件後將不會接受其餘任何事件
Error 異常事件 被觀察者發送Error事件後,其餘事件將被終止發送,觀察者收到Error事件後將不會接受其餘任何事件

4.RxJava的消息訂閱

在分析RxJava消息訂閱原理前,咱們仍是先來看下它的簡單使用步驟。這裏爲了方便講解,就不用鏈式代碼來舉例了,而是採用分步驟的方式來逐一說明(平時寫代碼的話仍是建議使用鏈式代碼來調用,由於更加簡潔)。其使用步驟以下:

  1. 建立被觀察者(Observable),定義要發送的事件。
  2. 建立觀察者(Observer),接受事件並作出響應操做。
  3. 觀察者經過訂閱(subscribe)被觀察者把它們鏈接到一塊兒。

4.1 RxJava的消息訂閱例子

這裏咱們就根據上面的步驟來實現這個例子,以下:

//步驟1. 建立被觀察者(Observable),定義要發送的事件。
        Observable observable = Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });
       
        //步驟2. 建立觀察者(Observer),接受事件並作出響應操做。
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext : " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError : " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };
       
        //步驟3. 觀察者經過訂閱(subscribe)被觀察者把它們鏈接到一塊兒。
        observable.subscribe(observer);
複製代碼

其輸出結果爲:

onSubscribe
onNext : 文章1
onNext : 文章2
onNext : 文章3
onComplete
複製代碼

4.2 源碼分析

下面咱們對消息訂閱過程當中的源碼進行分析,分爲兩部分:建立被觀察者過程和訂閱過程。

4.2.1 建立被觀察者過程

首先來看下建立被觀察者(Observable)的過程,上面的例子中咱們是直接使用Observable.create()來建立Observable,咱們點進去這個方法看下。

4.2.1.1 Observable類的create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
複製代碼

能夠看到,create()方法中也沒作什麼,就是建立一個ObservableCreate對象出來,而後把咱們自定義的ObservableOnSubscribe做爲參數傳到ObservableCreate中去,最後就是調用 RxJavaPlugins.onAssembly()方法。

咱們先來看看ObservableCreate類:

4.2.1.2 ObservableCreate類
public final class ObservableCreate<T> extends Observable<T> {//繼承自Observable
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;//把咱們建立的ObservableOnSubscribe對象賦值給source。
    }
}
複製代碼

能夠看到,ObservableCreate是繼承自Observable的,而且會把ObservableOnSubscribe對象給存起來。

再看下RxJavaPlugins.onAssembly()方法

4.2.1.3 RxJavaPlugins類的onAssembly()
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        //省略無關代碼
        return source;
    }
複製代碼

很簡單,就是把上面建立的ObservableCreate給返回。

4.2.1.4 簡單總結

因此Observable.create()中就是把咱們自定義的ObservableOnSubscribe對象從新包裝成一個ObservableCreate對象,而後返回這個ObservableCreate對象。 注意,這種從新包裝新對象的用法在RxJava中會頻繁用到,後面的分析中咱們還會屢次遇到。 放個圖好理解,包起來哈~

被觀察者.png

4.2.1.5 時序圖

Observable.create()的時序圖以下所示:

Observable.create()時序圖.png

4.2.2 訂閱過程

接下來咱們就看下訂閱過程的代碼,一樣,點進去Observable.subscribe()

4.2.2.1 Observable類的subscribe()
public final void subscribe(Observer<? super T> observer) {
            //省略無關代碼
           
            observer = RxJavaPlugins.onSubscribe(this, observer);

            subscribeActual(observer);
           
            //省略無關代碼
    }
複製代碼

能夠看到,實際上其核心的代碼也就兩句,咱們分開來看下:

4.2.2.2 RxJavaPlugins類的onSubscribe()
public static <T> Observer<? super T> onSubscribe(
    @NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        //省略無關代碼
       
        return observer;
    }
複製代碼

跟以前代碼同樣,這裏一樣也是把原來的observer返回而已。 再來看下subscribeActual()方法。

4.2.2.3 Observable類的subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);
複製代碼

Observable類的subscribeActual()中的方法是一個抽象方法,那麼其具體實如今哪呢?還記得咱們前面建立被觀察者的過程嗎,最終會返回一個ObservableCreate對象,這個ObservableCreate就是Observable的子類,咱們點進去看下:

4.2.2.4 ObservableCreate類的subscribeActual()
@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //觸發咱們自定義的Observer的onSubscribe(Disposable)方法
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

能夠看到,subscribeActual()方法中首先會建立一個CreateEmitter對象,而後把咱們自定義的觀察者observer做爲參數給傳進去。這裏一樣也是包裝起來,放個圖:

觀察者.png
這個 CreateEmitter實現了 ObservableEmitter接口和 Disposable接口,以下:

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
        //代碼省略
    }
複製代碼

而後就是調用了observer.onSubscribe(parent),實際上就是調用觀察者的onSubscribe()方法,即告訴觀察者已經成功訂閱到了被觀察者。

繼續往下看,subscribeActual()方法中會繼續調用source.subscribe(parent),這裏的source就是ObservableOnSubscribe對象,即這裏會調用ObservableOnSubscribesubscribe()方法。 咱們具體定義的subscribe()方法以下:

Observable observable = Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });
複製代碼

ObservableEmitter,顧名思義,就是被觀察者發射器。 因此,subscribe()裏面的三個onNext()方法和一個onComplete()會逐一被調用。 這裏的ObservableEmitter接口其具體實現爲CreateEmitter,咱們看看CreateEmitte類的onNext()方法和onComplete()的實現:

4.2.2.5 CreateEmitter類的onNext()和onComplete()等
//省略其餘代碼
       
        @Override
        public void onNext(T t) {
            //省略無關代碼
            if (!isDisposed()) {
                //調用觀察者的onNext()
                observer.onNext(t);
            }
        }
       
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //調用觀察者的onComplete()
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
複製代碼

能夠看到,最終就是會調用到觀察者的onNext()onComplete()方法。至此,一個完整的消息訂閱流程就完成了。 另外,能夠看到,上面有個isDisposed()方法能控制消息的走向,即可以切斷消息的傳遞,這個後面再來講。

4.2.2.6 簡單總結

Observable(被觀察者)和Observer(觀察者)創建鏈接(訂閱)以後,會建立出一個發射器CreateEmitter,發射器會把被觀察者中產生的事件發送到觀察者中去,觀察者對發射器中發出的事件作出響應處理。能夠看到,是訂閱以後,Observable(被觀察者)纔會開始發送事件。

放張事件流的傳遞圖:

訂閱過程.png

4.2.2.7 時序流程圖

再來看下訂閱過程的時序流程圖:

訂閱過程時序圖.png

4.3 切斷消息

以前有提到過切斷消息的傳遞,咱們先來看下如何使用:

4.3.1 切斷消息

Observable observable = Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });
       
        Observer<String> observer = new Observer<String>() {
            private Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe : " + d);
                mDisposable=d;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext : " + s);
                mDisposable.dispose();
                Log.d(TAG, "切斷觀察者與被觀察者的鏈接");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError : " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };
       
        observable.subscribe(observer);
複製代碼

輸出結果爲:

onSubscribe : null
onNext : 文章1
切斷觀察者與被觀察者的鏈接
複製代碼

能夠看到,要切斷消息的傳遞很簡單,調用下Disposabledispose()方法便可。調用dispose()以後,被觀察者雖然能繼續發送消息,可是觀察者卻收不到消息了。 另外有一點須要注意,上面onSubscribe輸出的Disposable值是"null",並非空引用null

4.3.2 切斷消息源碼分析

咱們這裏來看看下dispose()的實現。Disposable是一個接口,能夠理解Disposable爲一個鏈接器,調用dispose()後,這個鏈接器將會中斷。其具體實如今CreateEmitter類,以前也有提到過。咱們來看下CreateEmitterdispose()方法:

4.3.2.1 CreateEmitter的dispose()
@Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }
複製代碼

就是調用DisposableHelper.dispose(this)而已。

4.3.2.2 DisposableHelper類
public enum DisposableHelper implements Disposable {

    DISPOSED
    ;
   
    //其餘代碼省略

    public static boolean isDisposed(Disposable d) {
        //判斷Disposable類型的變量的引用是否等於DISPOSED
        //即判斷該鏈接器是否被中斷
        return d == DISPOSED;
    }
   
    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            //這裏會把field給設爲DISPOSED
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }
}
複製代碼

能夠看到DisposableHelper是一個枚舉類,而且只有一個值:DISPOSEDdispose()方法中會把一個原子引用field設爲DISPOSED,即標記爲中斷狀態。所以後面經過isDisposed()方法便可以判斷鏈接器是否被中斷。

4.3.2.3 CreateEmitter類中的方法

再回頭看看CreateEmitter類中的方法:

@Override
        public void onNext(T t) {
            //省略無關代碼
           
            if (!isDisposed()) {
                //若是沒有dispose(),纔會調用onNext()
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                //若是dispose()了,會調用到這裏,即最終會崩潰
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            //省略無關代碼
            if (!isDisposed()) {
                try {
                    //若是沒有dispose(),纔會調用onError()
                    observer.onError(t);
                } finally {
                    //onError()以後會dispose()
                    dispose();
                }
                //若是沒有dispose(),返回true
                return true;
            }
            //若是dispose()了,返回false
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //若是沒有dispose(),纔會調用onComplete()
                    observer.onComplete();
                } finally {
                    //onComplete()以後會dispose()
                    dispose();
                }
            }
        }
複製代碼

從上面的代碼能夠看到:

  1. 若是沒有disposeobserver.onNext()纔會被調用到。
  2. onError()onComplete()互斥,只能其中一個被調用到,由於調用了他們的任意一個以後都會調用dispose()
  3. onError()onComplete()onComplete()不會被調用到。反過來,則會崩潰,由於onError()中拋出了異常:RxJavaPlugins.onError(t)。其實是dispose後繼續調用onError()都會炸。

5.RxJava的線程切換

上面的例子和分析都是在同一個線程中進行,這中間也沒涉及到線程切換的相關問題。可是在實際開發中,咱們一般須要在一個子線程中去進行一些數據獲取操做,而後要在主線程中去更新UI,這就涉及到線程切換的問題了,經過RxJava咱們也能夠把線程切換寫得還簡潔。

5.1 線程切換例子

關於RxJava如何使用線程切換,這裏就不詳細講了。 咱們直接來看一個例子,並分別打印RxJava在運行過程當中各個角色所在的線程。

new Thread() {
            @Override
            public void run() {
                Log.d(TAG, "Thread run() 所在線程爲 :" + Thread.currentThread().getName());
                Observable
                        .create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                Log.d(TAG, "Observable subscribe() 所在線程爲 :" + Thread.currentThread().getName());
                                emitter.onNext("文章1");
                                emitter.onNext("文章2");
                                emitter.onComplete();
                            }
                        })
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {
                                Log.d(TAG, "Observer onSubscribe() 所在線程爲 :" + Thread.currentThread().getName());
                            }

                            @Override
                            public void onNext(String s) {
                                Log.d(TAG, "Observer onNext() 所在線程爲 :" + Thread.currentThread().getName());
                            }

                            @Override
                            public void onError(Throwable e) {
                                Log.d(TAG, "Observer onError() 所在線程爲 :" + Thread.currentThread().getName());
                            }

                            @Override
                            public void onComplete() {
                                Log.d(TAG, "Observer onComplete() 所在線程爲 :" + Thread.currentThread().getName());
                            }
                        });
            }
        }.start();
複製代碼

輸出結果爲:

Thread run() 所在線程爲 :Thread-2 Observer onSubscribe() 所在線程爲 :Thread-2 Observable subscribe() 所在線程爲 :RxCachedThreadScheduler-1 Observer onNext() 所在線程爲 :main Observer onNext() 所在線程爲 :main Observer onComplete() 所在線程爲 :main 複製代碼

從上面的例子能夠看到:

  1. Observer(觀察者)的onSubscribe()方法運行在當前線程中。
  2. Observable(被觀察者)中的subscribe()運行在subscribeOn()指定的線程中。
  3. Observer(觀察者)的onNext()onComplete()等方法運行在observeOn()指定的線程中。

5.2 源碼分析

下面咱們對線程切換的源碼進行一下分析,分爲兩部分:subscribeOn()observeOn()

5.2.1 subscribeOn()源碼分析

首先來看下subscribeOn(),咱們的例子中是這麼個使用的:

.subscribeOn(Schedulers.io())
複製代碼

subscribeOn()方法要傳入一個Scheduler類對象做爲參數,Scheduler是一個調度類,可以延時或週期性地去執行一個任務。

5.2.1.1 Scheduler類型

經過Schedulers類咱們能夠獲取到各類Scheduler的子類。RxJava提供瞭如下這些線程調度類供咱們使用:

Scheduler類型 使用方式 含義 使用場景
IoScheduler Schedulers.io() io操做線程 讀寫SD卡文件,查詢數據庫,訪問網絡等IO密集型操做
NewThreadScheduler Schedulers.newThread() 建立新線程 耗時操做等
SingleScheduler Schedulers.single() 單例線程 只需一個單例線程時
ComputationScheduler Schedulers.computation() CPU計算操做線程 圖片壓縮取樣、xml,json解析等CPU密集型計算
TrampolineScheduler Schedulers.trampoline() 當前線程 須要在當前線程當即執行任務時
HandlerScheduler AndroidSchedulers.mainThread() Android主線程 更新UI等
5.2.1.2 Schedulers類的io()

下面咱們來看下Schedulers.io()的代碼,其餘的Scheduler子類都差很少,就不逐以分析了,有興趣的請自行查看哈~

@NonNull
    static final Scheduler IO;
   
    @NonNull
    public static Scheduler io() {
        //1.直接返回一個名爲IO的Scheduler對象
        return RxJavaPlugins.onIoScheduler(IO);
    }
   
    static {
        //省略無關代碼
       
        //2.IO對象是在靜態代碼塊中實例化的,這裏會建立按一個IOTask()
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
    }
   
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            //3.IOTask中會返回一個IoHolder對象
            return IoHolder.DEFAULT;
        }
    }
   
    static final class IoHolder {
        //4.IoHolder中會就是new一個IoScheduler對象出來
        static final Scheduler DEFAULT = new IoScheduler();
    }
複製代碼

能夠看到,Schedulers.io()中使用了靜態內部類的方式來建立出了一個單例IoScheduler對象出來,這個IoScheduler是繼承自Scheduler的。這裏mark一發,後面會用到這個IoScheduler的。

5.2.1.3 Observable類的subscribeOn()

而後,咱們就來看下subscribeOn()的代碼:

public final Observable<T> subscribeOn(Scheduler scheduler) {
        //省略無關代碼
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
複製代碼

能夠看到,首先會將當前的Observable(其具體實現爲ObservableCreate)包裝成一個新的ObservableSubscribeOn對象。 放個圖:

ObservableSubscribeOn.png

跟前面同樣,RxJavaPlugins.onAssembly()也是將ObservableSubscribeOn對象原樣返回而已,這裏就不看了。 能夠看下ObservableSubscribeOn的構造方法:

5.2.1.4 ObservableSubscribeOn類的構造方法
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
複製代碼

也就是把sourcescheduler這兩個保存一下,後面會用到。

而後subscribeOn()方法就完了。好像也沒作什麼,就是從新包裝一下對象而已,而後將新對象返回。即將一箇舊的被觀察者包裝成一個新的被觀察者。

5.2.1.5 ObservableSubscribeOn類的subscribeActual()

接下來咱們回到訂閱過程,爲何要回到訂閱過程呢?由於事件的發送是從訂閱過程開始的啊。 雖然咱們這裏用到了線程切換,可是呢,其訂閱過程前面的內容跟上一節分析的是同樣的,咱們這裏就不重複了,直接從不同的地方開始。還記得訂閱過程當中Observable類的subscribeActual()是個抽象方法嗎?所以要看其子類的具體實現。在上一節訂閱過程當中,其具體實現是在ObservableCreate類。可是因爲咱們調用subscribeOn()以後,ObservableCreate對象被包裝成了一個新的ObservableSubscribeOn對象了。所以咱們就來看看ObservableSubscribeOn類中的subscribeActual()方法:

@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
複製代碼

subscribeActual()中一樣也將咱們自定義的Observer給包裝成了一個新的SubscribeOnObserver對象。一樣,放張圖:

SubscribeOnObserver.png
而後就是調用 ObserveronSubscribe()方法,能夠看到,到目前爲止,還沒出現過任何線程相關的東西,因此 ObserveronSubscribe()方法就是運行在當前線程中。 而後咱們重點看下最後一行代碼,首先建立一個 SubscribeTask對象,而後就是調用 scheduler.scheduleDirect().。 咱們先來看下 SubscribeTask類:

5.2.1.6 SubscribeTask類
//SubscribeTask是ObservableSubscribeOn的內部類
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //這裏的source就是咱們自定義的Observable對象,即ObservableCreate
            source.subscribe(parent);
        }
    }
複製代碼

很簡單的一個類,就是實現了Runnable接口,而後run()中調用Observer.subscribe()

5.2.1.7 Scheduler類的scheduleDirect()

再來看下scheduler.scheduleDirect()方法

public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
複製代碼

往下看:

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

        //createWorker()在Scheduler類中是個抽象方法,因此其具體實如今其子類中
        //所以這裏的createWorker()應當是在IoScheduler中實現的。
        //Worker中能夠執行Runnable
        final Worker w = createWorker();
       
        //實際上decoratedRun仍是這個run對象,即SubscribeTask
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
       
        //將Runnable和Worker包裝成一個DisposeTask
        DisposeTask task = new DisposeTask(decoratedRun, w);
       
        //Worker執行這個task
        w.schedule(task, delay, unit);

        return task;
    }
複製代碼

咱們來看下建立WorkerWorker執行任務的過程。

5.2.1.8 IoScheduler的createWorker()和schedule()
final AtomicReference<CachedWorkerPool> pool;
   
    public Worker createWorker() {
        //就是new一個EventLoopWorker,而且傳一個Worker緩存池進去
        return new EventLoopWorker(pool.get());
    }
   
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();
       
        //構造方法
        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            //從緩存Worker池中取一個Worker出來
            this.threadWorker = pool.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            //省略無關代碼
           
            //Runnable交給threadWorker去執行
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
複製代碼

注意,不一樣的Scheduler類會有不一樣的Worker實現,由於Scheduler類最終是交到Worker中去執行調度的。

咱們來看下Worker緩存池的操做:

5.2.1.9 CachedWorkerPool的get()
static final class CachedWorkerPool implements Runnable {
        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                //若是緩衝池不爲空,就從緩存池中取threadWorker
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            //若是緩衝池中爲空,就建立一個並返回。
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }
    }
複製代碼
5.2.1.10 NewThreadWorker的scheduleActual()

咱們再來看下threadWorker.scheduleActual()ThreadWorker類沒有實現scheduleActual()方法,其父類NewThreadWorker實現了該方法,咱們點進去看下:

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        //構造方法中建立一個ScheduledExecutorService對象,能夠經過ScheduledExecutorService來使用線程池
        executor = SchedulerPoolFactory.create(threadFactory);
    }
   
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        //這裏的decoratedRun實際仍是run對象
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //將decoratedRun包裝成一個新對象ScheduledRunnable
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        //省略無關代碼
       
        if (delayTime <= 0) {
            //線程池中當即執行ScheduledRunnable
            f = executor.submit((Callable<Object>)sr);
        } else {
            //線程池中延遲執行ScheduledRunnable
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
           
        //省略無關代碼

        return sr;
    }
}
複製代碼

這裏的executor就是使用線程池去執行任務,最終SubscribeTaskrun()方法會在線程池中被執行,即Observablesubscribe()方法會在IO線程中被調用。這與上面例子中的輸出結果符合:

Observable subscribe() 所在線程爲 :RxCachedThreadScheduler-1 複製代碼
5.2.1.11 簡單總結
  1. Observer(觀察者)的onSubscribe()方法運行在當前線程中,由於在這以前都沒涉及到線程切換。
  2. 若是設置了subscribeOn(指定線程),那麼Observable(被觀察者)中subscribe()方法將會運行在這個指定線程中去。
5.2.1.12 時序圖

來張總的subscribeOn()切換線程時序圖

subscribeOn()切換線程時序圖.png

5.2.1.13 屢次設置subscribeOn()的問題

若是咱們屢次設置subscribeOn(),那麼其執行線程是在哪個呢?先來看下例子

//省略先後代碼,看重點部分
        .subscribeOn(Schedulers.io())//第一次
        .subscribeOn(Schedulers.newThread())//第二次
        .subscribeOn(AndroidSchedulers.mainThread())//第三次
複製代碼

其輸出結果爲:

Observable subscribe() 所在線程爲 :RxCachedThreadScheduler-1 複製代碼

即只有第一次的subscribeOn()起做用了。這是爲何呢? 咱們知道,每調用一次subscribeOn()就會把舊的被觀察者包裝成一個新的被觀察者,通過了三次調用以後,就變成了下面這個樣子:

屢次設置subscribeOn().png
同時,咱們知道,被觀察者被訂閱時是從最外面的一層通知到裏面的一層,那麼當傳到上圖第三層時,也就是 ObservableSubscribeOn(第一次)那一層時,管你以前是在哪一個線程, subscribeOn(Schedulers.io())都會把線程切到IO線程中去執行,因此屢次設置 subscribeOn()時,只有第一次生效。

5.2.2 observeOn()

咱們再來看下observeOn(),仍是先來回顧一下咱們例子中的設置:

//指定在Android主線程中執行
    .observeOn(AndroidSchedulers.mainThread())
複製代碼
5.2.2.1 Observable類的observeOn()
public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //省略無關代碼
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
複製代碼

一樣,這裏也是新包裝一個ObservableObserveOn對象,注意,這裏包裝的舊被觀察者是ObservableSubscribeOn對象了,由於以前調用過subscribeOn()包裝了一層了,因此如今是以下圖所示:

ObservableObserveOn.png

RxJavaPlugins.onAssembly()也是原樣返回。

咱們看看ObservableObserveOn的構造方法。

5.2.2.2 ObservableObserveOn類的構造方法
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
複製代碼

裏面就是一些變量賦值而已。

5.2.2.3 ObservableObserveOn的subscribeActual()

subscribeOn()差很少,咱們就直接來看ObservableObserveOnsubscribeActual()方法了。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        //判斷是否當前線程
        if (scheduler instanceof TrampolineScheduler) {
            //是當前線程的話,直接調用裏面一層的subscribe()方法
            //即調用ObservableSubscribeOn的subscribe()方法
            source.subscribe(observer);
        } else {
            //建立Worker
            //本例子中的scheduler爲AndroidSchedulers.mainThread()
            Scheduler.Worker w = scheduler.createWorker();
            //這裏會將Worker包裝到ObserveOnObserver對象中去
            //注意:source.subscribe沒有涉及到Worker,因此仍是在以前設置的線程中去執行
            //本例子中source.subscribe就是在IO線程中執行。
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
複製代碼

一樣,這裏也將observer給包裝了一層,以下圖所示:

ObserveOnObserver.png

source.subscribe()中將會把事件逐一發送出去,咱們這裏只看下ObserveOnObserver中的onNext()方法的處理,onComplete()等就不看了,實際上都差很少。

5.2.2.4 ObserveOnObserver的onNext()
@Override
        public void onNext(T t) {
            //省略無關代碼
            if (sourceMode != QueueDisposable.ASYNC) {
                //將信息存入隊列中
                queue.offer(t);
            }
            schedule();
        }
複製代碼

就是調用schedule()而已。

5.2.2.5 ObserveOnObserver的schedule()
void schedule() {
            if (getAndIncrement() == 0) {
                //ObserveOnObserver一樣實現了Runnable接口,因此就把它本身交給worker去調度了
                worker.schedule(this);
            }
        }
複製代碼

Android主線程調度器裏面的代碼就不分析了,裏面其實是用handler來發送Message去實現的,感興趣的能夠看下。 既然ObserveOnObserver實現了Runnable接口,那麼就是其run()方法會在主線程中被調用。 咱們來看下ObserveOnObserverrun()方法:

5.2.2.6 ObserveOnObserver的run()
@Override
        public void run() {
            //outputFused默認是false
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
複製代碼

這裏會走到drainNormal()方法。

5.2.2.7 ObserveOnObserver的drainNormal()
void drainNormal() {
            int missed = 1;
            //存儲消息的隊列
            final SimpleQueue<T> q = queue;
            //這裏的actual其實是SubscribeOnObserver
            final Observer<? super T> a = actual;

            //省略無關代碼
           
            //從隊列中取出消息
            v = q.poll();
           
            //...
           
            //這裏調用的是裏面一層的onNext()方法
            //在本例子中,就是調用SubscribeOnObserver.onNext()
            a.onNext(v);
           
            //...
        }
複製代碼

至於SubscribeOnObserver.onNext(),裏面也沒切換線程的邏輯,就是調用裏面一層的onNext(),因此最終會調用到咱們自定義的Observer中的onNext()方法。所以,ObserveronNext()方法就在observeOn()中指定的線程中給調用了,在本例中,就是在Android主線程中給調用。

5.2.2.8 簡單總結
  1. 若是設置了observeOn(指定線程),那麼Observer(觀察者)中的onNext()onComplete()等方法將會運行在這個指定線程中去。
  2. subscribeOn()設置的線程不會影響到observeOn()
5.2.2.9 時序圖

最後,來張observeOn()時序圖:

observeOn()時序圖.png

6.其餘

因本人水平有限,若有錯誤,歡迎指出並交流~四月葡萄的博客

歡迎關注微信公衆號,接收第一手技術乾貨
相關文章
相關標籤/搜索