在探索了RxJava的源碼以後,在這裏作一次總結。源碼大體的執行流程。bash
1 建立Observable,而後使用操做符轉換或者處理Observable,每個操做符,都會產生一種對應類型的Observable。咱們知道Observable是鏈式調用的,好比:框架
Flowable<String> observable = Flowable
// 建立FlowableJust對象
.just("hello", "world")
// 建立建立FlowableMap對象,而且在建立的時候將上一步建立的對象傳入FlowableMap中
.map(String::toUpperCase)
;
複製代碼
首先我建立了A對象,在A對象的基礎之上進行下一步操做產生B對象,在建立B對象的時候,能夠將本身做爲參數傳遞給B對象,那麼B對象的source就是A對象。ide
2 建立消費者,咱們寫的消費者本質上是RxJava框架封裝到最後很是方便使用的,就是一個單純的Observer類。this
3 調用subscribe方法,將上面的步驟進行串聯起來。好比說咱們建立了A,鏈式調用又建立了B,而後B又建立了C,這個時候C有B的引用,B有A的引用。spa
可是在最後真正調用subscribe方法的時候,爆出的對象是C對象,即用C去調用subscribe方法。code
這時:cdn
源頭Observer
,C建立本身的Observer,好比CObserver
,參數爲源頭Observer,而且將其設置爲downstrem。CObserver
,而後B對象接受到CObserver
建立本身的觀察者,包裝CObserver
爲本身的downstream先整理下,這個時候分別建立了:AObserver,BObserver,CObserver,源頭Observer(咱們本身寫的),而且server
AObserver:對象
BObserver:blog
CObserver:
而對於Observable來講,則與上面相反。
CObservable:
BObservable:
AObservable:
4 當Observer傳遞到AObserver,即最開始產生數據哪裏時,其調用自身Observer的onSubscribe方法,由於每一個自身的Observer都有downstream的Observer,再逐次調用downstream的onSubscribe方法。
在調用onSubscribe方法時,將自身傳遞進去,這時BObserver的upstream爲AObserver。依次類推
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
downstream.onSubscribe(this);
s.request(Long.MAX_VALUE);
}
}
複製代碼
5 咱們知道最終調用的是首先建立的AObserveable,AObservable的subscribeActual方法中,在調用onSubscribe以後,將生產的數據與Observer聯合起來,再運行。
AObserver運行完本身的觀察者邏輯後,交由本身的downstream繼續進行onNext方法,直到咱們本身的消費者調用onNext方法。
提及來有點繞,可是大致上就是在建立觀察者和消費者的時候,使用某種方式將其關聯起來。