謎之RxJava (二) —— Magic Lift

回顧

上一篇文章 講了ObservableOnSubscribeSubscriber之間的關係。 咱們知道,Observable的具體工做都是在OnSubscribe中完成的。從這個類名咱們也知道,若是生成了一個Observable對象,而不進行subscribe,那麼什麼都不會發生!javascript

OK,RxJava最讓人興奮的就是它有各類各樣的操做符,什麼map呀,flatMap呀各類,咱們今天要知其然知其因此然,那麼他們是如何實現功能的呢?java

例子

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hello");
    }
})
.map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        return s + "word";
    }
})
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        Log.d("rx", s);
    }
});

lift

咱們先看下進行鏈式調用map以後,發生了什麼。git

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

對,就是調用了lift函數!,而後把咱們的轉換器(Transfomer,我好想翻譯成變形金剛)傳入進去,看下它作了什麼事。github

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            try {
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                try {
                    // new Subscriber created and being subscribed with so 'onStart' it
                    st.onStart();
                    onSubscribe.call(st);
                } catch (Throwable e) {
                    // localized capture of errors rather than it skipping all operators 
                    // and ending up in the try/catch of the subscribe method which then
                    // prevents onErrorResumeNext and other similar approaches to error handling
                    if (e instanceof OnErrorNotImplementedException) {
                        throw (OnErrorNotImplementedException) e;
                    }
                    st.onError(e);
                }
            } catch (Throwable e) {
                if (e instanceof OnErrorNotImplementedException) {
                    throw (OnErrorNotImplementedException) e;
                }
                // if the lift function failed all we can do is pass the error to the final Subscriber
                // as we don't have the operator available to us
                o.onError(e);
            }
        }
    });
}

來,我來簡化一下segmentfault

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(...);
}

返回了一個新的Observable對象,這纔是重點! 這種鏈式調用看起來特別熟悉?有沒有像javascript中的Promise/A,在then中返回一個Promise對象進行鏈式調用?app

OK,那麼咱們要看下它是如何工做的啦。ide

map()調用以後,咱們操做的就是新的Observable對象,咱們能夠把它取名爲Observable$2,OK,咱們這裏調用subscribe,完整的就是Observable$2.subscribe,繼續看到subscribe裏,重要的幾個調用:函數

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);

注意注意 ! 這裏的observableObservable$2!!也就是說,這裏的onSubscribe是,lift中定義的!!spa

OK,咱們追蹤下去,回到lift的定義中。線程

return new Observable<R>(new OnSubscribe<R>() {
    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                onSubscribe.call(st); //請注意我!! 這個onSubscribe是原始的OnSubScribe對象!!
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                if (e instanceof OnErrorNotImplementedException) {
                    throw (OnErrorNotImplementedException) e;
                }
                st.onError(e);
            }
        } catch (Throwable e) {
            if (e instanceof OnErrorNotImplementedException) {
                throw (OnErrorNotImplementedException) e;
            }
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
});

必定必定要注意這段函數執行的上下文!,這段函數中的onSubscribe對象指向的是外部類,也就是第一個ObservableonSubScribe!而不是Observable$2中的onSubscribe,OK,謹記這一點以後,看看

Subscriber<? super T> st = hook.onLift(operator).call(o);

這行代碼,就是定義operator,生成一個通過operator操做過的Subscriber,看下OperatorMap這個類中的call方法

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
    return new Subscriber<T>(o) {

        @Override
        public void onCompleted() {
            o.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            o.onError(e);
        }

        @Override
        public void onNext(T t) {
            try {
                o.onNext(transformer.call(t));
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(OnErrorThrowable.addValueAsLastCause(e, t));
            }
        }

    };
}

沒錯,對傳入的Subscriber作了一個代理,把轉換後的值傳入。
這樣就生成了一個代理的Subscriber

最後咱們最外層的OnSubscribe對象對咱們代理的Subscriber進行了調用。。
也就是

@Override
public void call(Subscriber<? super String> subscriber) {
    //此處的subscriber就是被map包裹(wrapper)後的對象。
    subscriber.onNext("hello");
}

而後這個subscriber傳入到內部,鏈式的通知,最後通知到咱們在subscribe函數中定義的對象。

這時候要盜下扔物線大大文章的圖
rxjavarxjava_8_d.gif

還不明白的各位,能夠本身寫一個Demo試一下。

下一章講下RxJava中很重要的線程切換。

【迷之RxJava(三)—— 線程切換】

歡迎關注我Github 以及 weibo@Gemini

相關文章
相關標籤/搜索