2.1. 建立Observable:

建立Observable用的是Observable.create(ObservableOnSubscribe<T> source)方法。這個方法的參數是ObservableOnSubscribe:async

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

ObservableOnSubscribe是一個函數式接口,有惟一的方法subscribe,參數是ObservableEmitter<T> e。ObservableEmitter是一個繼承了Emitter的接口,接口Emitter裏定義了onNext、onError、onComplete等方法,和Observer(觀察者)的方法相對應。ide

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

ObservableEmitter對接口Emitter進行擴展,增長了setDisposable、setCancellable等方法
下面看看create方法:函數

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

調用了RxJavaPlugins的onAssembly方法。又有一個新參數ObservableCreate<T>(source),咱們看看它是什麼:this

final class ObservableCreate<T> extends Observable<T> {

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

}

繼承了Observable,是被觀察對象,在構造函數中咱們看到咱們new ObservableOnSubscribe對象,被存在了ObservableCreate source引用裏spa

onAssembly方法:設計

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

 

一個Hook方法。onObservableAssembly是一個靜態變量,沒有設置默認爲空,因此直接返回source對象。也就是說,Observable的create方法其實就是把ObservableOnSubscribe對象存儲在ObservableCreate對象的source引用裏,而後返回該ObservableCreate對象。
ObservableCreate是繼承Observable的,因此建立了ObservableCreate對象,Observable也就建立完了。

2.2 訂閱事件(被觀察者)

訂閱操做是observable.subscribe(new Observer<String>())。這裏是被觀察者訂閱觀察者,主要是由於鏈式調用方便,由於subscribe方法裏的參數Observer纔是觀察者。咱們也會在Observer裏的各個被調方法裏接收到事件相關的返回值。
subscribe訂閱方法的源碼:

  public final void subscribe(Observer<? super T> observer) {
        try {
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            RxJavaPlugins.onError(e);
        }
    }

 

實際上調用了subscribeActual(observer);,這個方法是Observable裏的方法,而此時的Observable是一個ObservableCreate對象。ObservableCreate裏面的subscribeActual以下:

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

方法主要作了三件事:

1. 建立一個CreateEmitter對象parent;
2. 把parent傳給source的subscribe方法,也就是ObservableOnSubscribe對象的subscribe方法:

@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
     e.onNext("a");
}

因此在這個方法裏就能收到一個CreateEmmiter,經過CreateEmitter能夠回調相應的方法。CreateEmitter是實現ObservableEmitter接口,內部onNext源碼以下:

@Override
public void onNext(T t) {
    observer.onNext(t);
}

也就是說,當咱們在ObservableOnSubscribe的subscribe方法裏調用ObservableEmitter的onNext方法的時候,它裏面會調用observer的onNext。因而經過這樣的傳遞就能在observer的回調方法裏收到。

總結無背壓Rxjava(observable/observer):

一、使用Observbable.create方法,產生一個ObservableCreate對象,對象裏存着ObservableOnSubscribe對象source。

二、調用ObservableCreate.subscribe方法,實際調用的是subscribeActual方法,傳入一個Observer對象。

三、subscribeActual方法中建立一個CreateEmmiter對象,調用source.subscribe方法,傳入CreateEmmiter對象。

四、因而咱們在ObservableOnSubscribe中就接收到了一個CreateEmmiter,CreateEmmiter是ObservableEmmiter的子類。咱們能夠在這裏調用CreateEmmiter的方法進行事件回調。

五、調用CreateEmmiter方法,實際上會調用Observer的響應的方法。也就是CreateEmmiter把事件狀態傳遞給觀察者。