從觀察者模式出發,聊聊RxJava

前言

RxJava 是什麼

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.java

以上是RxJava在Github上的介紹,大概意思是,針對於JVM(Java虛擬機)的響應式擴展實現,一個在Java VM上使用可觀察的序列來組合實現異步的、基於事件編程的庫。git

RxJava如今你們用的都應該已經很溜了,用法這裏就再也不多說了。咱們都知道RxJava是對觀察者模式的擴展,下面就從觀察者模式的實現機制出發,瞭解一下RxJava2的實現邏輯。只有真正瞭解了RxJava 的實現原理,咱們才能在遇到問題的時候,更快速更準確的定位的到問題。github

這次源碼分析基於 RxJava Release 2.1.7編程

觀察者模式

這裏簡單回顧一下觀察者模式的組成及使用方式,經過以前觀察者模式一文中的分析,咱們知道觀察者模式中有四個重要的角色:安全

  • 抽象主題:定義添加和刪除觀察者的功能,也就是註冊和解除註冊的功能
  • 抽象觀察者:定義觀察者收到主題通知後要作什麼事情
  • 具體主題:抽象主題的實現
  • 具體觀察者:抽象觀察者的實現

當咱們建立好了具體主題和觀察者類,就可使用觀察者模式了,下面是一個最簡單的測試demo。bash

public class TestObservePattern {

    public static void main(String[] args) {
		// 建立主題(被觀察者)
        ConcreteSubject concreteSubject = new ConcreteSubject();
		// 建立觀察者
        ObserverOne observerOne=new ObserverOne();
        // 爲主題添加觀察者
        concreteSubject.addObserver(observerOne);        
        //主題通知全部的觀察者
        concreteSubject.notifyAllObserver("wake up,wake up");
    }

}
複製代碼

以上就是觀察者模式的使用方式,很簡單是吧。如今就讓咱們帶着如下幾個問題,看看RxJava是如何使用觀察者模式的。markdown

用RxJava這麼久了,你能夠思考一下以下幾個問題:app

  1. RxJava 中上面提到的四個重要角色是如何定義的?
  2. RxJava 中具體的主題,具體的觀察者是如何實例化的?
  3. RxJava 中觀察者和主題是如何實現訂閱的?
  4. RxJava 中上游是怎麼發送事件的,下游又是怎樣接收到的?
  5. RxJava 中對常規的觀察者模式作了怎樣調整,帶來了什麼好處?

若是對以上幾個問題,你有明確的答案,恭喜你,如下內容你就不用再看了,O(∩_∩)O哈哈~。異步

不少開發者對RxJava的學習多是從上游下游的角度開始,這裏能夠認爲這樣的敘述更偏重RxJava 事件序列的特徵。本文從被觀察者(主題)觀察者的角度出發,能夠說是更偏向於RxJava 觀察者模式的特徵。這裏的主題就是上游,觀察者就是下游。不管從哪一個角度出發去理解,源碼就那麼一份,無所謂對錯,只是每一個人的認知角度不一樣而已,選擇一種本身更容易瞭解的方式便可。async

好了,若是你看到了這裏,說明你對以上幾個問題,還有些許疑問,那麼咱們就從這幾個問題出發,瞭解一下RxJava的源碼實現。

RxJava2 的觀察者模式實現

咱們就帶着上述幾個問題,依次來看看RxJava究竟是怎麼一回事兒。爲了方便敘述和記憶,咱們首先看一段RxJava2 最最基礎的使用方式。

private void basicRxjava2() {
        Observable mObservable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                e.onNext("1");
                e.onNext("2");
                e.onNext("3");
                e.onNext("4");
                e.onComplete();
            }
        });

        Observer mObserver = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe: d=" + d);
                sb.append("\nonSubcribe: d=" + d);
            }

            @Override
            public void onNext(Object s) {
                Log.e(TAG, "onNext: " + s);
                sb.append("\nonNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e);
                sb.append("\nonError: " + e.toString());
                logContent.setText(sb.toString());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
                sb.append("\nonComplete: ");
                logContent.setText(sb.toString());
            }
        };

        mObservable.subscribe(mObserver);
    }
複製代碼

上面這段代碼,應該很容易理解了,輸出結果你們閉着眼睛也能想出來吧。咱們就以這段代碼爲基礎,結合上面提到的問題依次展開對RxJava的分析。

四個重要的角色

首先看看RxJava 中四個重要的角色是如何定義的。

  • 抽象主題

首先能夠看看這個Observable類。

public abstract class Observable<T> implements ObservableSource<T> {
……
}
複製代碼

他實現了ObservableSource接口,接着看ObservableSource

public interface ObservableSource<T> {

    /** * Subscribes the given Observer to this ObservableSource instance. * @param observer the Observer, not null * @throws NullPointerException if {@code observer} is null */
    void subscribe(@NonNull Observer<? super T> observer);
}
複製代碼

這裏很明顯了,ObservableSource 就是抽象主題(被觀察者)的角色。按照以前觀察者模式中約定的職責,subscribe 方法就是用來實現訂閱觀察者(Observer)角色的功能。從這裏咱們也能夠看出,抽象觀察者的角色就是Observer了。

這裏,你也許會有疑問,這麼簡單?抽象主題(上游)不是須要發送事件嗎?onNext(),onComplete()以及onError()跑哪兒去了?彆着急,咱們後面慢慢看。

  • 具體主題

回過頭來繼續看Observable,他實現了ObservableSource接口,而且實現了其subscribe方法,可是它並無真正的去完成主題觀察者之間的訂閱關係,而是把這個功能,轉接給了另外一個抽象方法subscribeActual(具體細節後面分析)。

所以,Observable依舊是一個抽象類,咱們知道抽象類是不能被實例化的,所以從理論上來講,他好像不能做爲具體主題的角色。其實否則,Observable內部提供了create,defer,fromXXX,repeat,just等一系列建立型操做符, 用來建立各類Observable。

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
複製代碼

在RxJava內有不少他的子類。

Observable子類

誠然,你能夠認爲,這些子類其實才是真正的具體主題。可是,換一個角度,從代理模式的角度出發,咱們能夠把Observable當作是一個代理類,客戶端你只管調用create 方法,想要什麼樣的 Observable告訴我一聲就能夠,不一樣Observable之間的差別你不用管,包在我身上,保證給你返回你想要的Observable實例。

同時,Observable另外一個巨大的貢獻,就是定義了不少的操做符,咱們平時經常使用的map,flatMap,distinct等,也是在這裏定義。而且這些方法都是final類型的,所以他的全部子類都會繼承同時也沒法改變這些操做符的實現。

所以,Observable 就是具體主題。

  • 抽象觀察者

在抽象主題裏已經提過了,Observer就是抽象觀察者的角色。

public interface Observer<T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();

}

複製代碼

很是符合觀察者模式中抽象觀察者的職責描述,Observer 定義了觀察者(下游)收到主題(上游)通知後該作什麼事情。這裏須要注意的是onSubscribe 也是定義在這裏的。

  • 具體的觀察者

這個具體的觀察者,o(╯□╰)oo(╯□╰)o,就很少說了吧。你們平時使用應該都是直接用new一個Observer的實例。RxJava內部有不少Observer的子類,有興趣的同窗能夠具體瞭解一下。這裏其實能夠引伸出一個有意思的問題,一樣是抽象類,爲何接口能夠直接實例化,而用abstract修飾過的類就不能夠?

具體的觀察者是如何實例化的

咱們看一下這段代碼:

Observable mObservable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
            }
        });

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
	
	
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
		// 是否有別的其餘操做符運算,有的話,在此Observable上執行一遍
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

複製代碼

RxJava的代碼裏,不少時候會有ObjectHelper.requireNonNull這種空檢查的地方,一概都是爲了最大程度的防止NPE的出現,後面出現就再也不贅述了.

咱們使用create操做符建立Observable的過程當中,看似經歷了不少方法,在不考慮任何其餘操做符的前提下,整個過程簡化一下的話就這麼一句代碼

Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())
複製代碼

從以前的分析,咱們也看到了ObservableCreate 就是Observeable抽象類的一個子類。咱們簡單看一下他的實現。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ……
    }
}

複製代碼

能夠看到,他惟一的構造函數須要一個ObservableOnSubscribe實例,同時他實現subscribeActual方法,說明他真正處理主題觀察者之間實現訂閱的邏輯。

看了半天,你可能一直很好奇,這個ObservableOnSubscribe是個什麼東西呢?他其實很簡單。

/** * A functional interface that has a {@code subscribe()} method that receives * an instance of an {@link ObservableEmitter} instance that allows pushing * events in a cancellation-safe manner. * * @param <T> the value type pushed */
public interface ObservableOnSubscribe<T> {

    /** * Called for each Observer that subscribes. * @param e the safe emitter instance, never null * @throws Exception on error */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
複製代碼

ε=(´ο`*)))唉,怎麼又一個subscribe,這又是啥?不要慌,看註釋。意思是說,這裏的subscribe 接收到一個ObservableEmitter實例後,就會容許他以一種能夠安全取消(也就是必定能取消)的形式發送事件。

就是說會有某個對象,給他一個ObservableEmitte的實例,沒給他以前他是不會主動發送事件的,會一直憋着。,到這裏,你是否是想到了什麼,咱們知道在RxJava 中只有觀察者(下游)訂閱(subscribe)了主題(上游),主題纔會發送事件。這就是和普通的觀察者模式有區別的地方之一。

好了,最後再來看看這個神祕的ObservableEmitter是個什麼鬼?

public interface ObservableEmitter<T> extends Emitter<T> {

    void setDisposable(@Nullable Disposable d);


    void setCancellable(@Nullable Cancellable c);


    boolean isDisposed();

    ObservableEmitter<T> serialize();

      /** * Attempts to emit the specified {@code Throwable} error if the downstream * hasn't cancelled the sequence or is otherwise terminated, returning false * if the emission is not allowed to happen due to lifecycle restrictions. * <p> * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called * if the error could not be delivered. * @param t the throwable error to signal if possible * @return true if successful, false if the downstream is not able to accept further * events * @since 2.1.1 - experimental */
    boolean tryOnError(@NonNull Throwable t);
}
複製代碼

這裏能夠關注一下tryOnError這個方法,能夠看到他會把某些類型的error傳遞到下游。

o(╥﹏╥)o,又是一個接口,並且還繼承了另外一個接口,什麼狀況?繼續看

public interface Emitter<T> {

    void onNext(@NonNull T value);

  
    void onError(@NonNull Throwable error);

   
    void onComplete();
}
複製代碼

驚不驚喜,意不意外? 哈哈,終於找到你了,熟悉的onNext,onError,onComplete.原來在這裏。

這裏有個問題能夠思考一下,在抽象觀察者中,定義了四個處理事件的方法,這裏只有三個,按照對應關係來講彷佛缺了一個onSubscribe,這又是怎麼回事呢?後面會有分析,能夠本身先想一想

這兩個接口的含義很明顯了,總結一下:

  • Emitter 定義了能夠發送的事件的三種機制
  • ObservableEmitter 在Emitter 作了擴展,添加了Disposable相關的方法,能夠用來取消事件的發送。

好了,繞了一大圈,就爲了一行代碼:

Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())
複製代碼

總結一下具體主題(上游)的到底幹了啥:

  • 建立了一個ObservableCreate 的實例對象
  • ObservableCreate 內持有ObservableOnSubscribe 對象的引用
  • ObservableOnSubscribe 是一個接口,內部有一個subscribe方法,調用他以後,會用其ObservableEmitter實例開始發送事件。
  • ObservableEmitter 繼承自Emitte。

如何實現訂閱、發送事件和接收事件

爲了方便敘述,把問題3和4連在一塊兒說了。

經過上面的敘述,如今具體主題和具體的觀察者都建立好了,接下來就是實現兩者的訂閱關係。

mObservable.subscribe(mObserver);
複製代碼

這裏須要明確的一點是,是觀察者(下游)訂閱了主題(上游),雖然從代碼上看好像了前者訂閱了後者,不要搞混了。

咱們看Observable的subscribe() 方法:

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
         ……
        }
    }
複製代碼

這個前面已經提到過了,Observable並無真正的去實現subscribe,而是把他轉接給了subscribeActual()方法。

前面已經說過,Observable的實例是一個ObservableCreate對象,那麼咱們就到這個類裏去看看subscribeActual()的實現。

// 爲了方便,順便再看一眼構造函數
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

CreateEmitter 實現了以前提到的ObservableEmitter接口。這裏有一句關鍵的代碼:

observer.onSubscribe(parent);
複製代碼

以前在看到Emitter的定義時,咱們說缺乏了onSubscribe方法,到這裏就明白了。onSubscribe並非由主題(上游)主動發送的事件,而是有觀察者(下游)本身調用的一個事件,只是爲了方便獲取Emitter的實例對象,準確的說應該是Disposable的實例對象,這樣下游就能夠控制上游了。

接下來就更簡單了,source 是ObservableOnSubscribe,按照以前的邏輯,調用其subscribe方法,給他一個ObservableEmitter對象實例,ObservableEmitter就會開始發送事件序列。這樣,一旦開始訂閱了,主題(上游)就開始發送事件了。也就是咱們熟悉的onNext,onComplete,onError 方法真正的開始執行了。

接着看看CreateEmitter的實現。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        ……
    }

    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}
複製代碼
  • 他的構造函數,須要一個觀察者的實例;
  • 他實現了ObservableEmitter接口,並依次實現他的三個方法;
    • 在每一次的onNext事件中,他再也不接受參數爲null的類型,在事件序列沒有中斷的狀況下會把主題(上游)發送的事件T原封不動的傳遞給觀察者(下游)。
    • onComplete事件發生時,他也會通知下游,若是發生異常,則中斷事件序列
    • onError 事件發生時,並無直接傳遞到下游,而是在其內部處理
    • tryOnError 事件發生時,纔會把某些特定類型的錯誤傳遞到下游。
  • 他實現了Disposable接口,下游根據獲取到的Emitter的實例對象,能夠方便的獲取事件序列的信息,甚至是能夠主動關閉事件序列,及斷開觀察者模式中主題和觀察者間的訂閱關係。

RxJava 中對常規的觀察者模式作了怎樣調整,帶來了什麼好處?

最後再來簡單說一下,RxJava中對常規的觀察者模式作了怎樣的調整,有什麼值得借鑑的地方。大部分優勢在上面已經說起了,這裏就來總結一下。

  • 觀察者訂閱主題後,主題纔會開始發送事件
  • RxJava中Observer經過onSubscribe獲取了發送事件中的Disposable對象,這樣他就能夠主動的獲取訂閱關係中兩者的狀態,甚至是控制或者是中斷事件序列的發送。在常規的觀察者模式中,主題有權利添加訂閱者,但也能是由他移除特定的訂閱者,由於只有他持有全部訂閱者的集合
  • 抽象主題(上游)並無直接控制onNext,onComplete,onError事件的發送,而是隻關注Emitter 實例的發送,ObservableOnSubscribe接口監聽ObservableEmitter對象的發送,一旦接受到此對象就會經過他開始發送具體的事件,這裏能夠有點觀察者模式嵌套的意味。

好了,以上就是從觀察者模式的角度出發,對RxJava的一次解讀,有什麼疏漏或理解錯誤的地方,歡迎讀者指出,共同進步!

相關文章
相關標籤/搜索