RxJava中觀察者模式的實現

1、RxJava簡介react

  • RxJava官方的解釋以下:

要想了解RxJava首先須要瞭解Rx,Rx的全稱是reactive extension,翻譯一下就是響應式擴展,Rx是基於觀察者模式的一種編程模型,目標是提供一致的編程接口,幫助開發者更方便地處理異步數據流,Rx滲透到了各類語言中,RxJava就是針對Java語言的一個異步的響應式編程庫,它是對觀察者模式的擴展。不少Android開發者都很是喜歡這個簡潔的編程庫。git

 

  • RxJava源碼:

    https://github.com/ReactiveX/RxJava/tree/2.x/src
github

 

2、觀察者模式編程

  • 觀察者模式概念

      觀察者模式是將觀察者與被觀察者分離開,實現了對象間一種一對多的組合關係,當被觀察者的狀態發生變化時,全部依賴於它的觀察者就會檢測到變化而且刷新本身的狀態。異步

 

  • 觀察者模式中的四個重要角色

抽象主題:定義添加和刪除觀察者的功能;ide

抽象觀察者:定義觀察者收到主題通知後要作什麼事情;ui

具體主題:抽象主題的具體實現;this

具體觀察者:抽象觀察者的具體實現。spa

       大概的步驟就是一、建立被觀察者;二、建立觀察者;三、爲被觀察者添加觀察者;四、被觀察者通知全部的觀察者發生變化。瞭解了觀察者模式的大體實現步驟幫助咱們更好的理解RxJava中的觀察者模式的實現。翻譯

 

3、從RxJava源碼中看觀察者模式

 依然從上文中提到的四個角色結合實現的四個步驟來看RxJava源碼中的實現:

  • 抽象主題:
public abstract class Observable<T> implements ObservableSource<T> {
……
}

  這是一個Observable類,實現ObservableSource的接口;

public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}

  ObservableSource接口中subscribe用來實現訂閱觀察者角色的功能,這裏的ObservableSource就是抽象主題的角色。

  • 抽象觀察者
public interface Observer<T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();

}

  Observable就是觀察者的角色,上面就是Observer接口,定義了觀察者收到被觀察者的通知後要作的事情。

  • 具體主題:

有了抽象主題,下一步就是實現觀察者模式的第一步,建立具體主題也就是被觀察者;

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

  RxJava提供了不少的建立的方法,這是其中一種create;

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

  相似於以上的方式,建立了Observable而且定義了一些事件觸發的規則,create時,傳入了一個OnSubscribe參數,它至關於一個計劃表,當該Observable被訂閱時,它就會調用call()方法,觸發定義好的事件。

  • 具體觀察者
Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

  RxJava中的observer接口的實現方式如上。

  • 訂閱

在觀察者和被觀察者都被建立以後,最重要的就是將二者結合起來,也就是實現訂閱,訂閱部分的代碼很是簡單,並且在我初次看到它的實現方式的時候以爲十分奇怪,從上文的建立Observable類中也能夠看到,subscribe()是被定義在Observable中的,這有點不符合咱們日常的思考方式,聽說這樣更加符合流式API的設計。

observable.subscribe(observer);

  還有一種寫法是DefaultObserver,這裏的DefaultObserver和observer的使用是相同的,subscriber是observer的抽象類。

  下面從DefaultObserver的部分源碼中看訂閱的實現機制:

public abstract class DefaultSubscriber<T> implements FlowableSubscriber<T> {

    Subscription upstream;

    @Override
    public final void onSubscribe(Subscription s) {
        if (EndConsumerHelper.validate(this.upstream, s, getClass())) {
            this.upstream = s;
            onStart();
        }
    }

    /**
     * Requests from the upstream Subscription.
     * @param n the request amount, positive
     */
    protected final void request(long n) {
        Subscription s = this.upstream;
        if (s != null) {
            s.request(n);
        }
    }

    /**
     * Cancels the upstream's Subscription.
     */
    protected final void cancel() {
        Subscription s = this.upstream;
        this.upstream = SubscriptionHelper.CANCELLED;
        s.cancel();
    }
    /**
     * Called once the subscription has been set on this observer; override this
     * to perform initialization or issue an initial request.
     * <p>
     * The default implementation requests {@link Long#MAX_VALUE}.
     */
    protected void onStart() {
        request(Long.MAX_VALUE);
    }

}

  

 

  能夠看出DefaultObserver有四個方法:一、onStart(),這就是一個準備的方法;二、onSubscribe.call(subscriber),這裏事件發送的邏輯開始實現,所以能夠看到Observable發送事件不是在建立的時候開始的,而是在創建了訂閱這個鏈接的時候實現的;三、cancle(),取消一個訂閱鏈接;四、request(),來自上游訂閱的請求。(RxJava的源碼註釋中用上游表示主題,用下游表示觀察者)

4、觀察者模式的優勢

     觀察者模式最主要的優勢是它是低耦合的,也就是觀察者和被觀察者都只關注他們之間的接口,而不須要關心對方具體是哪一個具體類的實現,當增長觀察者和被觀察者的時候,都對對方的具體實現沒有影響。

     觀察者模式實現的是廣播,被觀察者能夠向全部訂閱了它的觀察者發送事件。

     一開始就說RxJava是對觀察者模式的擴展,它對普通的觀察者模式作出了一些調整,主要有:一、觀察者經過onSubscribe()獲取了發送事件的Disposable對象,這樣它就能夠獲取訂閱鏈接中二者的狀態,甚至能夠主動的中斷事件的發送,在普通的觀察者模式中只有被觀察者擁有訂閱的集合而且控制它的訂閱者;二、抽象主題並無直接控制onNext(),onError()這些事件,而是關注Emitter實例的發送,而具體的事件發送是在ObservableOnsubscribe接口監聽到ObservableEmitter對象而且接受它以後才實現的。

相關文章
相關標籤/搜索