正在看rxjava,看到lift,在閱讀了源碼和網上的一些文章,整理了下思路。下文着重不是直接分析源代碼,而是從lift解決什麼問題和如何解決角度分析lift應該是作什麼/怎麼作的問題。具體源碼實現請參考rxjava,網上不少文章分析的很詳細。java
Observable的本質上就是異步獲取/加工數據(OnSubscribe的call方法),而後通知observer(Observer的幾個方法)的一個框架。每一個Observable都有一個OnSubscribe(繼承Action1接口)對象。在調用Observable的subscribe方法建立,一旦subscribe後,Observable就開始工做。程序員
舉例來講,對於一個Observalbe<JSONObject>的對象,能夠看做它最老是發射JSONObject數據,要求下游提供一個Subscriber<JSONObject>(Subscriber實現了Observer)來接收數據,而Subscriber<JSONObject>則放在OnSubscribe的參數。網絡
rxjava中的lift是各類操做符的核心所在,具體操做符提供不一樣的如map,filter等效果。lift的代碼設計比較精細,其實只要理解了上面Observable的本質,lift的實現也就迎刃而解了。app
剛纔講Observable要獲取/加工數據,那麼它是怎麼獲取/加工數據呢,方式不少,如最基本的例子框架
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext(new String()); } });
這個是最簡單的,可是new String()可說是一個「獲取數據的例子」,固然這樣寫毫無心義。而可能的一種實現是網絡獲取如異步
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(final Subscriber<? super String> subscriber) { RemoteApi.getInstance().getCurrentUserName(new Callback<String>(){ public void onSuccess(String username){ subscriber.onNext(username); subscriber.onCompleted(); } public void onFail(int code, String detail){ subscriber.onError(new Exception(detail)); } }); } });
這都很好理解。ide
而lift本質是一個Observable數據是從另外一個Observable獲取應該怎麼處理呢?一言不合直接先上代碼post
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> { static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); final OnSubscribe<T> parent; final Operator<? extends R, ? super T> operator; public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) { this.parent = parent; this.operator = operator; } @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); parent.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } }
咱們來分析下。this
Observable1(O1)調用lift返回Observable2(這是個新的對象O2),此時O2要從O1獲取數據,O2是消費者,O1是生產者spa
O2調用另一個O1獲取數據實際上要作3件事
1. 讓O1開始獲取數據
2. 獲取數據後,發射給O2。
3. O2獲得數據後,要發射給O2的消費者
先看第1,如何讓O1開始獲取數據?記得開始咱們所講的Observable,它有個OnSubscribe對象,它的call方法是獲取數據的地方。所以,很簡單,調用該方法。
不過,「調用該方法」這麼簡單一句話通常是產品經理的說得,做爲一個程序員,當你腦子想到「調用該方法」時候,就須要落實到實現:要用到的對象從哪來,方法參數是什麼,方法參數從哪來,是否有返回值,返回值怎麼處理等。
在這裏,O2就是經過調用O1的OnSubscribe對象的call方法讓O1開始工做的。O1的OnSubscribe對象是在建立O2是傳入的,代碼清晰可見。
OnSubscribe的call對象要接收一個O2的Subscriber對象,這個就是咱們關注的第二件事:「獲取數據後,通知Observable2」。
而這裏O2傳給O1 OnSubscribe對象的Subscriber對象從哪來的?這就是lift的參數Operator的做用了。Operator就是負責提供給生產者(O1)監聽回調Subscriber的做用,它實際是泛型爲Subscriber的Func1的接口。不一樣的操做符實質是不一樣的Operator。好比map方法是OperatorMap,filter的是OperatorFilter,observeOn的是OperatorObserveOn(一樣形式實現了線程切換,NB吧)
所以一個調用流程是
1. 第一步:O2的OnSubscribe的call -> 第二步:O2用operator構造Subscriber -> 第三步:O2用該Subscriber調用O1的OnSubscribe的call
按照這三步依次上溯,直到最後一個沒有parent的Observable
2. 頂級Observable獲取數據,調用下游Observable傳來的Subscriber發射數據
3. 若是該Subscriber是你寫的(經過subscribe方法),這個就結束了;若是是級聯Observer,則
4. 上一步的Subscriber是O2的Operator構造出來,這個Subscriber一個任務,就是對收到的數據進行處理,而後在通知O2的下游消費者(由於下游消費者的Subscriber對象會保存在operator返回的Subscriber中)
5. 如此,2,4,反覆調用,直至到第三步,一切game over
借用扔物線文章的圖片以下