rxjava筆記:關於lift

正在看rxjava,看到lift,在閱讀了源碼和網上的一些文章,整理了下思路。下文着重不是直接分析源代碼,而是從lift解決什麼問題和如何解決角度分析lift應該是作什麼/怎麼作的問題。具體源碼實現請參考rxjava,網上不少文章分析的很詳細。java

 

Observable的本質上就是異步獲取/加工數據(OnSubscribe的call方法),而後通知observer(Observer的幾個方法)的一個框架。每一個Observable都有一個OnSubscribe(繼承Action1接口)對象。在調用Observable的subscribe方法建立,一旦subscribe後,Observable就開始工做。程序員

舉例來講,對於一個Observalbe<JSONObject>的對象,能夠看做它最老是發射JSONObject數據,要求下游提供一個Subscriber<JSONObject>(Subscriber實現了Observer)來接收數據,而Subscriber<JSONObject>則放在OnSubscribe的參數。網絡

 

rxjava中的lift是各類操做符的核心所在,具體操做符提供不一樣的如map,filter等效果。lift的代碼設計比較精細,其實只要理解了上面Observable的本質,lift的實現也就迎刃而解了。app

剛纔講Observable要獲取/加工數據,那麼它是怎麼獲取/加工數據呢,方式不少,如最基本的例子框架

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext(new String());
            }
        });

這個是最簡單的,可是new String()可說是一個「獲取數據的例子」,固然這樣寫毫無心義。而可能的一種實現是網絡獲取如異步

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(final Subscriber<? super String> subscriber) {
        RemoteApi.getInstance().getCurrentUserName(new Callback<String>(){
            public void onSuccess(String username){
                subscriber.onNext(username);
                subscriber.onCompleted();
            }

            public void onFail(int code, String detail){
                subscriber.onError(new Exception(detail));
            }
        });
    }
});

這都很好理解。ide

而lift本質是一個Observable數據是從另外一個Observable獲取應該怎麼處理呢?一言不合直接先上代碼post

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
    static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
}

咱們來分析下。this

Observable1(O1)調用lift返回Observable2(這是個新的對象O2),此時O2要從O1獲取數據,O2是消費者,O1是生產者spa

O2調用另一個O1獲取數據實際上要作3件事

1. 讓O1開始獲取數據

2. 獲取數據後,發射給O2。

3. O2獲得數據後,要發射給O2的消費者

先看第1,如何讓O1開始獲取數據?記得開始咱們所講的Observable,它有個OnSubscribe對象,它的call方法是獲取數據的地方。所以,很簡單,調用該方法。

不過,「調用該方法」這麼簡單一句話通常是產品經理的說得,做爲一個程序員,當你腦子想到「調用該方法」時候,就須要落實到實現:要用到的對象從哪來,方法參數是什麼,方法參數從哪來,是否有返回值,返回值怎麼處理等。

在這裏,O2就是經過調用O1的OnSubscribe對象的call方法讓O1開始工做的。O1的OnSubscribe對象是在建立O2是傳入的,代碼清晰可見。

OnSubscribe的call對象要接收一個O2的Subscriber對象,這個就是咱們關注的第二件事:「獲取數據後,通知Observable2」。

而這裏O2傳給O1 OnSubscribe對象的Subscriber對象從哪來的?這就是lift的參數Operator的做用了。Operator就是負責提供給生產者(O1)監聽回調Subscriber的做用,它實際是泛型爲Subscriber的Func1的接口。不一樣的操做符實質是不一樣的Operator。好比map方法是OperatorMap,filter的是OperatorFilter,observeOn的是OperatorObserveOn(一樣形式實現了線程切換,NB吧)

 

所以一個調用流程是

1. 第一步:O2的OnSubscribe的call -> 第二步:O2用operator構造Subscriber -> 第三步:O2用該Subscriber調用O1的OnSubscribe的call

    按照這三步依次上溯,直到最後一個沒有parent的Observable

2. 頂級Observable獲取數據,調用下游Observable傳來的Subscriber發射數據

3. 若是該Subscriber是你寫的(經過subscribe方法),這個就結束了;若是是級聯Observer,則

4. 上一步的Subscriber是O2的Operator構造出來,這個Subscriber一個任務,就是對收到的數據進行處理,而後在通知O2的下游消費者(由於下游消費者的Subscriber對象會保存在operator返回的Subscriber中)

5. 如此,2,4,反覆調用,直至到第三步,一切game over

 借用扔物線文章的圖片以下

相關文章
相關標籤/搜索