RxJava 線程切換原理

推薦

推薦幾篇在學習Rxjava中的閱讀的文章。尤爲是大神W_BinaryTree的文章,給學習過程當中帶來了很多啓發。html

  1. 什麼是函數響應式編程(Java&Android版本)

函數響應式編程介紹java

  1. Rxjava2.0 較全的Api介紹和使用,能夠看成開發手冊

Rxjava2.0 Api介紹和使用android

  1. Rxjava2.0 和1.0的主要區別

Rxjava2.0 和1.0的主要區別git

  1. Rxjava github官方地址

Github地址github

  1. RxJava背壓策略的原理

背壓策略編程

背壓的理解緩存

  1. Rxjava實戰系列

Android RxJava 實際應用講解:(無條件)網絡請求輪詢安全

Android RxJava 實際應用講解:(有條件)網絡請求輪詢bash

Android RxJava 實際應用講解:網絡請求嵌套回調網絡

Android RxJava 實際應用講解:合併數據源

Android RxJava 實際應用講解:從磁盤 / 內存緩存中 獲取緩存數據

Android RxJava 實際應用講解:聯合判斷

Android RxJava:細說 線程控制(切換 / 調度 )(含Retrofit實例講解)

Android RxJava 實際應用講解:網絡請求出錯重連(結合Retrofit)

  1. Rxjava 深度教程,對Rxjava講解的比較透徹

放棄RxBus,擁抱RxJava

Rxjava的推理過程

大神關於實際運用過程當中的一些見解

  1. Rxjava中設計到的Monad概念理解

函數式編程有一個重要概念,叫作Monad

理解 Monad,一份 monad 的解惑指南

什麼是 Monad (Functional Programming)?

Functors, Applicatives, And Monads In Pictures

基礎概念

觀察者模式

  1. 觀察者概念

觀察者模式(Observer Mode)是定義對象間的一對多的依賴關係,當被觀察者的狀態發生改變時,全部依賴於它的對象都獲得通知並自動刷新。

在觀察者模式中有如下四個主要角色:

抽象主題[抽象被觀察者](Subject):定義添加和刪除觀察者的方法,內部經過集合維護觀察者序列。

具體主題[具體被觀察者](Concrete Subject):抽象主題的實現對象,在具體主題內部狀態發生變化時,通知全部的觀察者更新狀態。

抽象觀察者(Observer):定義觀察者的統一接口和方法。

具體觀察者(Concrete Observer):抽象觀察者的具體實現類,實現抽象觀察者定義的統一接口,以便使自己的狀態與主題狀態協調。 經典的觀察者模式UML類圖:

在這裏插入圖片描述
2. 觀察者模式在Rxjava中的運用

爲了方便分析問題,下面給出Rxjava實現的最簡單的被觀察者(主題)發送數據觀察者打印數據的代碼。從代碼中分析Rxjava中是如何定義而且實現觀察者模式中不一樣的角色。爲了方便說明問題,把Rxjava中的鏈式(Chain)拆分紅最基本的3段。

(1)Observable對象建立,抽象類Observable是接口ObservableSource下的一個抽象實現,經過Observable建立一個可觀察對象發射事件流。

(2)Observer對象建立,建立一個觀察者Observer來接受並響應可觀察對象發射的事件。

(3)Observer訂閱Observable,經過subscribe方法,使Observer與Observable創建訂閱關係,Observer與Observable便成爲了一個總體,Observer即可對Observable中的行爲做出響應。

PS: 雖然從代碼上看上去像是Observable訂閱了Observer,可是其實仍是觀察者訂閱了被觀察者,Rxjava這麼設計是爲了保持鏈式調用(Chain)。 這裏問了說明問題,沒有采用極簡的代碼實現。 java實現:

private void rxjavaDemo() {
        Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext("R");
                e.onNext("X");
                e.onComplete();
            }
        });

        Observer observer = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Object s) {
                Log.e(RxjavaDemoActivity.class.getSimpleName(), "object : " + s);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        };

        observable.subscribe(observer);
    }
複製代碼

Kotlin實現:

private fun rxjavaDemo() {
        val mObservable = Observable.create<String>{
            it.onNext("R")
            it.onNext("X")
            it.onComplete()
        }

        val mObserver = object : Observer<String>{
            override fun onComplete() {

            }

            override fun onSubscribe(d: Disposable?) {

            }

            override fun onNext(value: String?) {
                Log.e(RxjavaDemoActivity::class.java.simpleName, "object : $value")
            }

            override fun onError(e: Throwable?) {

            }
        }

        mObservable.subscribe(mObserver)
    }
複製代碼
  1. 被觀察者(Observable)

在上面代碼中咱們調用了Observable的create方法來建立被觀察者。

(1)在Observable類內部提供了衆多的靜態方法來建立被觀察者。諸如:create、just、interval、from、zip、contact、merge等方法。

(2)requireNonNull方法,是Rxjava的判空實現,防止出現空指針異常。

(3)在create方法中會建立ObservableCreate的被觀察者。

@SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
複製代碼

被觀察者ObservableCreate類繼承自抽象類Observable,內部實現了父類的subscribeActual方法。

public final class ObservableCreate<T> extends Observable<T>{
..........
  @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
   .................
}
複製代碼

被觀察者的抽象類Observable,Observable又是接口ObservableSource下的一個抽象實現。

(1)內部實現了ObservableSource接口定義的subscribe方法。subscribe方法內部主要是調用了subscribeActual方法,全部判定訂閱關係是在subscribeActual方法內部實現的。

(2)因此要實現訂閱關係,觀察者真正須要複寫的是subscribeActual方法。好比ObservableCreate類就複寫了該方法。

public abstract class Observable<T> implements ObservableSource<T> {
    ..........
     @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
..............
}
複製代碼

Observable實現了ObservableSource接口,在ObservableSource內部定義了subscribe方法用來實現訂閱觀察者(Observer)。

public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(Observer<? super T> observer);
}
複製代碼

總結:

(1)ObservableSource就是扮演着抽象被觀察者的角色。

(2)在ObservableSource 接口中定義了subscribe方法用來用來實現訂閱觀察者(Observer)。

(3)Observable類實現了ObservableSource接口而且實現了其subscribe方法,可是它並無真正的去完成主題和觀察者之間的訂閱關係,而是內部調用了另外一個抽象方法subscribeActual。

(4)在Observable內部提供了一系列建立型操做符, 用來建立不一樣場景的Observable。

通過上面的介紹,咱們已經明白了在Rxjava中被觀察者(Observable)是如何建立的,以及是誰扮演者抽象觀察者的角色。可是咱們並無在ObservableCreate類中發現具體發送事件的實現。那麼這裏就有一個問題:

問題: ObservableCreate等內部是如何發送事件到觀察者(Observer)的?

  1. 觀察者(Observer)

經過上面的代碼和分析,咱們知道Observer扮演着抽象觀察者的角色。下面分別解釋一下Observer類內部定義的四個主要的方法:

(1)onSubscribe(Disposable d)裏面的Disposable對象,Disposable翻譯過來是可隨意使用的。至關於觀察者和被觀察者之間的訂閱關係,若是觀察者不想訂閱被觀察者了,能夠調用 mDisposable.dispose()取消訂閱關係。

(2)onCompleted(): 事件隊列完成。RxJava 不只把每一個事件單獨處理,還會把它們看作一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,須要觸發 onCompleted() 方法做爲標誌。

(3)onError(): 事件隊列異常。在事件處理過程當中出異常時,onError() 會被觸發,同時隊列自動終止,不容許再有事件發出。

(4)onNext():接收數據。

(5)在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,而且是事件序列中的最後一個。並且onCompleted() 和 onError() 兩者也是互斥的,即在隊列中調用了其中一個,就不該該再調用另外一個。

public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param value
     *          the item emitted by the Observable
     */
    void onNext(T value);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}
複製代碼

既然Observer是個接口,那麼就應該是個抽象觀察者,具體的觀察者是咱們在實際運用的時候直接new一個實例對象。

通過上面的介紹,咱們已經明白了在Rxjava中觀察者是如何建立的以及各個方法的做用。那麼觀察者是如何接受被觀察者發送的事件的呢?

問題: Observer是如何接受數據到被觀察者發送的數據?

訂閱

前提:觀察者訂閱被觀察者後,被觀察者纔會開始發送事件。 PS:可是並非全部的被觀察者都須要被訂閱纔會發送數據,好比Observable.just的方法返回的ObservableJust被觀察者者。 示例:執行下面的代碼會輸出JustObservable,可是將just換成create方法就不會輸出,全部得出結論:並非全部的被觀察者都須要被訂閱纔會發送數據

Observable.just(new JustObservable());

class JustObservable{
    JustObservable(){
        Log.e("rx","JustObservable");
    }
}
複製代碼

下面咱們來分析上面遺留的兩個問題:即觀察者和被觀察者是如何發送和接受事件的

Observable.create生成的被觀察者須要被訂閱後纔會發送數據到觀察者。 根據上面的分析咱們知道:這裏的observable對象是ObservableCreate類的實例。

observable.subscribe(observer);
複製代碼

這裏的subscribe是父類Observable的方法,在裏面又會調用subscribeActual方法,Observable的子類ObservableCreate會複寫subscribeActual方法。

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
複製代碼

下面來分析一下ObservableCreate類。

(1)在ObservableCreate的構造方法中有個ObservableOnSubscribe類型的形參。

(2)而且正如咱們上面所說內部實現了subscribeActual方法。

(3)因此真正處理被觀察者和觀察者之間實現訂閱的邏輯在Observable的subscribeActual方法中。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
   .............
}
複製代碼

那麼在ObservableCreate的構造方法的形參的賦值確定是在ObservableCreate對象初始化的時候,然而ObservableCreate的初始化,是經過Observable的create方法,下面咱們回到Observable的create的方法。

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
複製代碼

在調用create方法的時候須要傳遞ObservableOnSubscribe的對象做爲參數,而這個對象最終會傳入到ObservableCreate的構造方法中。 下面來看一下ObservableOnSubscribe的做用:

(1)內部聲明瞭一個subscribe方法,subscribe 接收到一個ObservableEmitter對象。

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(ObservableEmitter<T> e) throws Exception;
}
複製代碼

下面再來看一下ObservableEmitter類的做用:

(1)ObservableEmitter以一種能夠安全取消的形式發送事件到觀察者,經過調用setDisposable方法。

(2)繼承了Emitter接口。

public interface ObservableEmitter<T> extends Emitter<T> {

    /**
     * Sets a Disposable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param d the disposable, null is allowed
     */
    void setDisposable(Disposable d);

    /**
     * Sets a Cancellable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param c the cancellable resource, null is allowed
     */
    void setCancellable(Cancellable c);

    /**
     * Returns true if the downstream disposed the sequence.
     * @return true if the downstream disposed the sequence
     */
    boolean isDisposed();

    /**
     * Ensures that calls to onNext, onError and onComplete are properly serialized.
     * @return the serialized ObservableEmitter
     */
    ObservableEmitter<T> serialize();
}

複製代碼

下面咱們來看一下Emitter類的做用:

(1)Emitter翻譯過來就是發射器的意識,到這裏咱們能夠想到,該類內部應該定義了一些跟事件發送相關的方法。

(2)而且在實例化ObservableOnSubscribe的時候,咱們正好是調用了ObservableEmitter的onNext向觀察者發送數據的。

Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext("R");
                e.onNext("X");
                e.onComplete();
            }
        });
複製代碼
public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}
複製代碼

由此咱們能夠知道Emitter內部聲明瞭三種事件類型,而ObservableEmitter 擴展了Emiiter的功能,添加了Disposable相關的方法,能夠用來安全取消事件的發送即取消觀察者和被觀察者之間的訂閱關係。 由上訴分析咱們已經知道了ObservableCreate的ObservableOnSubscribe變量的來歷和基本做用以及被觀察者的建立過程。 下面繼續回到ObservableCreate類的subscribeActual方法來看看事件是如何從被觀察者發送到觀察者的。

(1)在subscribeActual方法內部建立了CreateEmitter類對象,而且接受Observer做爲參數,CreateEmitter實現了ObservableEmitter接口。因此該類是負責事件發送,到這裏咱們已經明確了事件的發送類即ObservableEmitter。

(2)在subscribeActual方法內部,調用了ObservableOnSubscribe的subscribe方法而且傳遞ObservableEmitter對象實例做爲參數,而後就能夠調用ObservableEmitter的方法發送事件了。

(3)在subscribeActual方法內部,調用了Observer的onSubscribe方法而且傳遞CreateEmitter做爲參數,這樣觀察者就持有了發送事件(被觀察者)的直接引用,方便觀察者取消訂閱關係。

到這裏咱們已經肯定到了:觀察者是如何和被觀察者訂閱的。以及事件是如何發送到被觀察者的。而且確認了只有發生了訂閱關係,事件才能夠發送。

下面在來看一下CreateEmitter類的實現邏輯。

(1)CreateEmitter的構造函數,傳遞一個Observer的對象做爲形參。這樣就能夠將事件發送到對應的觀察者了。

(2)CreateEmitter實現了ObservableEmitter接口,做爲事件發送器。

(3)onNext事件中,不能夠發送參數爲null的類型,在事件序列沒有中斷的狀況下把事件從被觀察者傳遞給觀察者。

(4)onComplete事件,用於通知觀察者事件隊列已經沒有事件發送了。

(5)onError 事件,事件隊列異常。在事件處理過程當中出異常時,onError() 會被觸發,同時隊列自動終止,不容許再有事件發出。

(6)setDisposable、setCancellable方法,觀察者根據獲取到的Emitter的實例對象,能夠取消被觀察者和觀察者之間的訂閱關係。

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

複製代碼

小結:

(1)在RxJava中Observer經過onSubscribe方法獲取了發送事件中的Disposable對象,這樣他就能夠控制觀察者和被觀察者之間的訂閱關係。

(2)被觀察者並無直接控制事件的發送,而是將事件的發送給Disposable對象的發送。

(3)訂閱關係並無發生在subscribe方法中,而是在subscribeActual方法中實現了訂閱關係。

簡單的線程切換

下面的這段代碼實現了最簡單的Rxjava線程切換。發送事件就能夠在非UI線程(RxNewThreadScheduler 的線程執行,將耗時操做放在子線程中,避免阻塞UI線程。接受事件又切換回了Android UI線程,Android禁止在非UI線程操做UI。這樣就簡單的實現了在自線程處理耗時操做而後在UI線程刷新UI的邏輯。

爲了說明問題,把代碼拆分紅五段,能夠看出,其實每次的鏈式調用都會生成不一樣的Observable對象,因此咱們在平時開發的時候,應該儘量避免長鏈式的調用,規避掉不須要的中間操做。

private void rxjavaDemo() {
        Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                Log.e("rxjava","ObservableEmitter current thread :"+ Thread.currentThread().getName());
                e.onNext("R");
                e.onNext("X");
                e.onComplete();
            }
        });

        Observable observableSubscribeOn = observable.subscribeOn(Schedulers.newThread());

        Observer observer = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("rxjava","onSubscribe current thread :"+ Thread.currentThread().getName());
            }

            @Override
            public void onNext(Object s) {
                Log.e("rxjava","onNext current thread :"+ Thread.currentThread().getName());
                Log.e(RxjavaDemoActivity.class.getSimpleName(), "object : " + s);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        };

        Observable observableObserveOn = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread());
        
        observableObserveOn.subscribe(observer);
    }
複製代碼

subscribeOn

下面咱們將結合上面的分析和代碼,分析一下在observable.subscribeOn內部都作了那些操做。

(1)實例化了ObservableSubscribeOn對象。而且傳入的兩個參數分別是ObservableCreate對象和Scheduler對象。

(2)ObservableCreate對象是咱們上面分析的Observable.create生成的一個被觀察者。

(3)Scheduler對象,如今猜想應該是和線程調度有關的類。接下來會分析到。

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
複製代碼

下面在來分析ObservableSubscribeOn類。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }
    ..........
}
複製代碼

(1)繼承了AbstractObservableWithUpstream類。AbstractObservableWithUpstream類內部存儲了ObservableSource類對象,根據上下文,這裏的source就是ObservableCreate類對象。

(2)實現了subscribeActual方法,而且在在subscribeActual方法內部建立了SubscribeOnObserver對象,這裏的SubscribeOnObserver其實也是個觀察者,能夠理解成事件通過SubscribeOnObserver觀察者中轉了才最終到達咱們建立的觀察者。SubscribeOnObserver 是AtomicReference的子類(保證原子性),實現了 Observer接口 和 Disposable 接口。

(3)內部調用onSubscribe方法將SubscribeOnObserver傳遞給觀察者,這樣觀察者就能夠控制事件的接受了,即獲取了事件發送(被觀察者發送數據)的控制權。

(3)ObservableSubscribeOn和ObservableCreate同樣,也是Observable的一個子類。而且在ObservableSubscribeOn內部持有它上一步的被觀察者Observable的引用(這裏就是ObservableCreate)。

(4) source.subscribe(parent)實現了觀察者和被觀察者之間的訂閱關係,並將經過SubscribeOnObserver類對象傳遞給ObservableOnSubscribe的subscribe方法。SubscribeOnObserver類對象就能夠實現事件的發送了。

下面在來看一下SubscribeOnObserver類:做用和代碼基本同於CreateEmitter類,看上面的CreateEmitter類分析便可。

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
複製代碼

通過上面的分析,咱們明確了observable.subscribeOn(Schedulers.newThread())建立的被觀察者(Observable)和觀察者(Observer)之間的訂閱關係。下面來分析一下subscribeOn是如何實現線程切換的。首先咱們思考一下,若是要實現線程切換,確定要建立子線程。 問題: 子線程是如何建立 在調用subscribeOn的時候,傳入了Scheduler參數,Scheduler翻譯過來就是調度者的意識。經過調用Schedulers的newThread方法,建立子線程(RxNewThreadScheduler)。下面咱們以RxNewThreadScheduler線程爲例。看看線程是如何被建立的。

Observable observableSubscribeOn = observable.subscribeOn(Schedulers.newThread());
複製代碼

在Schedulers類內部定義了newThread靜態方法用於生成Scheduler對象。 (1)其中NEW_THREAD爲默認的生成的一個Scheduler對象。

static final Scheduler NEW_THREAD;

  public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }
static {
       ..........
        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return NewThreadHolder.DEFAULT;
            }
        });
    }
複製代碼

接下來咱們就來看看,initNewThreadScheduler() 是如何生成一個Scheduler實例的。

(1)在initNewThreadScheduler方法中通過一系列的條件判斷,最終會執行到call方法(延遲初始化)。

(2)NewThreadHolder.DEFAULT會返回一個NewThreadScheduler對象(單例模式)

static final Scheduler DEFAULT = NewThreadScheduler.instance();
    }
複製代碼

下面再來看看單例模式(餓漢式)的NewThreadScheduler類,看名字就能夠猜想到是線程調度者。

(1)NewThreadScheduler 繼承自Scheduler抽象類。

(2)經過靜態代碼塊中建立了RxThreadFactory線程工廠對象,該類實現了ThreadFactory接口,而且在RxThreadFactory類的newThread方法中建立了優先級爲5的線程Thread。

(3)在NewThreadScheduler的createWorker()方法中,建立了NewThreadWorker 對象。

public final class NewThreadScheduler extends Scheduler {

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public static NewThreadScheduler instance() {
        return INSTANCE;
    }

    private NewThreadScheduler() {

    }

    @Override
    public Worker createWorker() {
        return new NewThreadWorker(THREAD_FACTORY);
    }
}
複製代碼

接下來咱們就來看看NewThreadWorker 都作了寫什麼。

(1)在NewThreadWorker的構造函數中,經過調用SchedulerPoolFactory.create的方法而且傳入NewThreadScheduler中提供的線程工廠RxThreadFactory建立了一個ScheduledExecutorService對象。

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
.........
}
複製代碼

在來看一下SchedulerPoolFactory類

(1)經過create方法建立了核心線程數量爲1的線程池。

/**
     * Creates a ScheduledExecutorService with the given factory.
     * @param factory the thread factory
     * @return the ScheduledExecutorService
     */
    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        if (exec instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        }
        return exec;
    }
複製代碼

分析到這裏咱們明確了,經過Schedulers.newThread()會建立一個核心線程數量爲1的線程池。 建立完線程,下面就是啓動和運行線程了,而且將事件的發送,放在子線程中進行處理。而且咱們都知道調用屢次 subscribeOn 指定子線程只有第一次會生效 ,下面咱們將帶着這兩個疑問,來分析一下。 在ObservableSubscribeOn的subscribeActual方法中,經過source.subscribe(parent)調用實現了觀察者和被觀察者之間的訂閱關係。咱們能夠看到該方法的執行是放在了Runnable裏面執行的。因此線程的切換,應該就發生子此處。

(1)通過上面的分析,這裏的scheduler對象是NewThreadScheduler類。而且調用了Schedule的scheduleDirect方法。

parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
複製代碼

下面來看看Scheduler類的scheduleDirect方法。

(1)內部調用了重載的scheduleDirect方法。

(2)createWorker返回的是NewThreadWorker類對象。而且調用了NewThreadWorker類的schedule方法。

public Disposable scheduleDirect(Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
   public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);

        return w;
    }
複製代碼

下面來看看NewThreadWorker類的schedule方法。

(1)在schedulerActual方法中,經過ScheduledExecutorService執行submit或schedule執行一個Runnable任務,即開啓了線程池裏面的線程任務。

@Override
    public Disposable schedule(final Runnable run) {
        return schedule(run, 0, null);
    }

    @Override
    public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }

 public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            parent.remove(sr);
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
複製代碼

分析到這裏咱們知道了線程的開啓是在NewThreadWorker類中進行的。

那麼還有個疑問:調用屢次 subscribeOn 指定子線程只有第一次會生效

(1)這裏的生效並非指其餘的subscribeOn方法建立的線程沒有生效,而是會被第一次的subscribeOn建立的線程「掩蓋掉」

(2)屢次調用subscribeOn,會生成若干個Observable對象,每一個新生成的對象都有切換線程的能力,可是隻有第一次的subscribeOn才生效,由於後續的線程切換被第一個「掩蓋掉」了。 這麼說可能有點抽象,下面以一張圖來講明:

(1)每次調用subscribeOn方法,都會生成一個Observable,而且回持有上游的Observable對象。

(2)事件的發送是在第一次的subscribeOn建立的子線程中發送的,中間不會切換線程。

在這裏插入圖片描述

observeOn

經過上面subscribeOn的流程梳理,咱們知道了上游事件是被如何切換都子線程的。下面咱們將分析事件是如何被切換到下游線程的,大部分狀況下就是咱們的Android UI線程。 下面咱們將結合上面的分析和代碼,分析一下在observable.observeOn內部都作了那些操做。

observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())
複製代碼

下面來看一下Observable的observeOn方法: (1) observeOn 方法返回了一個 ObservableObserveOn類對象。

public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
複製代碼

接下來看看 ObservableObserveOn類。

(1)該類ObservableSubscribeOn基本一致,繼承了 AbstractObservableWithUpstream ,擁有ObservableSource類型對象,這裏是ObservableSubscribeOn實例對象。

(2)在subscribeActual 方法內部,scheduler是HandlerScheduler類型對象,這裏的scheduler就是展開分析了,基本上就是利用Android的Handler機制實現線程切換的。

(3)經過 scheduler.createWorker() 建立了 HandlerWorker的Worker對象。

(4)建立了一個ObserveOnObserver對象,該 類實現了Observer 接口,全部它是個Observer,同時實現了一個Runnable接口,這樣經過Handler就能夠執行到ObserveOnObserver的run方法。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    ...........
}

複製代碼

下面就來看看這個 ObserveOnObserver,經過上面咱們知道,線程切換是經過Handler實現的。 (1)actual 參數是咱們建立的Observer對象。

(2)Worker參數是HandlerWorker對象。經過AndroidSchedulers.mainThread()的調用是建立的。

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable s;

        Throwable error;
        volatile boolean done;

        volatile boolean cancelled;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        }
複製代碼

下面來看一下run方法裏面的操做: outputFused參數默認是false,因此接下來看看drainNormal方法。

@Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

複製代碼

(1)queue參數是在onSubscribe方法裏面建立的。而onSubscribe方法的調用,則是在上游的subscribeActual方法中調用的。

(2)內部經過輪訓隊列裏面的事件,將事件最終發送到Observer。

void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
複製代碼

下面以一張圖,總結一下:observeOn和subscribeOn的流程。

在這裏插入圖片描述

總結

(1)subscribeOn 控制上游線程切換,subscribeOn屢次調用只有第一次的subscribeOn會起做用。

(2)observeOn控制下游線程切換。observeOn可使用屢次。而且observeOn 後面的全部操做都會在observeOn指定的線程中執行。

(3)subscribeOn和observeOn之間的操做,會在subscribeOn 指定的線程中執行,直到執行了observeOn操做。

相關文章
相關標籤/搜索