RxJava系列(一):RxJava 觀察者模式

什麼是RxJava

RxJava是ReactiveX在JVM上的一個實現,使用可觀察序列來編寫異步和基於事件的程序的庫。它擴展了觀察者模式以支持數據/事件序列,並添加了容許您以聲明方式組合序列的運算符,同時抽象出對低級線程,同步,線程安全和併發數據結構等問題的關注。java

什麼是觀察者模式

觀察者模式也被稱爲發佈-訂閱(Publish/Subscribe)模式,它屬於行爲型模式的一種。觀察者模式定義了一種一對多的依賴關係,一個主題對象可被多個觀察者對象同時監聽。當這個主題對象狀態變化時,會通知全部觀察者對象並做出相應處理邏輯。react

觀察者模式角色

  • 抽象被觀察者(Subject):即抽象主題,它把全部對觀察者對象的引用保存在一個集合中,能夠有任意數量的觀察者,抽象主題提供一個接口,能夠增長、刪除、通知觀察者對象。
  • 抽象觀察者(Observer):抽象觀察者,是觀察者者的抽象類,它定義了一個更新接口,使得在獲得主題更改通知時更新本身。
  • 具體被觀察者(Concrete Subject):將有關狀態存入具體觀察者對象,在具體主題的內部狀態發生改變時,給全部註冊過的觀察者發送通知。
  • 具體觀察者(Concrere Observer):實現抽象觀察者定義的更新接口,以便在獲得主題更改通知時更新自身的狀態。

觀察者模式實現(以微博粉絲關注明星爲例子)

1.建立抽象被觀察者(Subject):git

public interface Star {

    /**
     * 添加粉絲
     */
    void addFan(Fan fan);
    
    /**
     * 取消粉絲
     */
    void removeFan(Fan fan);
    
    /**
     * 分享動態
     */
    void notifyFan(String message);
    
}
複製代碼

2.建立抽象觀察者(Observer)github

public interface Fan {

    /**
     * 更新動態
     */
    void update(String message);
    
}
複製代碼

3.建立具體被觀察者(Concrete Subject 具體明星)安全

public class AStar implements Star{

    private List<Fan> fanList = null;
    
    public AStar(){
        fanList = new ArrayList<Fan>();
    }
    
    @Override
    public void addFan(Fan fan){
        fanList.add(fan);
    }
    
    @Override
    public void removeFan(Fan fan){
        fanList.remove(fan);
    }
    
    @Override
    public void notifyFan(String message){
        for(Fan fan : fanList){
            fan.update("AStar 發佈了 ** 信息");
        }
    }
}
複製代碼

4.建立具體觀察者(Concrere Observer 具體粉絲)bash

public class AFan implements Fan{
    
    private String fanName;
    
    public AFan(String fanName){
        this.fanName = fanName;
    }
    
    @Override
    public void update(String message){
        Log.d("AFan 收到了 AStar 發佈的消息");
    }
    
}
複製代碼

RxJava觀察者模式

RxJava三個基本元素

Observable(被觀察者),Observer(觀察者),subscribe(訂閱)。數據結構

Rxjava中的抽象被觀察者(抽象主題Subject)

Observable 是一個抽象類,實現了ObservableSource抽象接口。併發

public abstract class Observable<T> implements ObservableSource<T> {
......
}
複製代碼

ObservableSource中subscribe()用來訂閱觀察者,因此ObservableSource至關於抽象被觀察者。異步

public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}
複製代碼

RxJava中的抽象觀察者(Observer)

經過ObservableSource的subscribe()方法可知抽象觀察者爲裏面的參數對象Observer。async

public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}
複製代碼

Rxjava中的具體被觀察者(Concrete Subject)

/**
 * 建立Observable
 */
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Hello");
    }
});

/**
 * Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world.
 * @param <T> the element type
 * @param source the emitter that is called when an Observer subscribes to the returned {@code Observable}
 * @return the new Observable instance
 * @see ObservableOnSubscribe
 * @see ObservableEmitter
 * @see Cancellable
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製代碼

經過create方法源碼可知ObservableCreate爲具體被觀察者。

RxJava中的具體觀察者(Concrere Observer)

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(String s) {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() {
    }
};
複製代碼

上述實現observer接口的observer爲具體觀察者。

Rxjava訂閱實現

observable.subscribe(observer);
複製代碼
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}
複製代碼
相關文章
相關標籤/搜索