一 前言bash
上一篇講解的被觀察者的分類,以及特殊的subject/Processor。起初腦海中構建這篇文章大概是講解subject,可是被《一篇不太同樣的RxJava介紹》深深的吸引,這篇文章闡述了rx真實的推導過程,原來Observable是一個異步集合——從主動問是否有數據,到被動等通知數據到來,其實被動的等通知就是統一同步和異步世界的鑰匙,而且順帶統一的回調。以後一發不可收拾的又看了《Observable究竟如何封裝數據》簡單有效的源碼分析。仍是不夠過癮,因而就有了這篇文章------談不上源碼分析,只是簡單點進去看看裏邊的大體調用原理。app
二 僅僅看看你異步
先來看這樣一段代碼:ide
代碼一:
Observable<Integer> Observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
}
});複製代碼
點進去靜態方法create內部代碼以下:源碼分析
源碼一:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}複製代碼
source指的是外部傳入的ObservableOnSubscribe對象。source又傳入ObservableCreate對象中,而且靜態方法返回這個ObservableCreate對象。
post
繼續跟進ObservableCreate對象,這個類代碼以下:ui
源碼二:
//1 它是一個Observable
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
//2這裏的 source 就是外部傳入的ObservableOnSubscribe
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override //注意這個方法,下邊進行闡述。
protected void subscribeActual(Observer<? super T> observer) {
//建立發射對象,傳入被觀察者
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//3.調用外部邏輯,對外暴漏發射對象
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}複製代碼
源碼二有兩點須要闡述:this
1.CreateEmitter經過回調對外暴漏對象,同時CreateEmitter持有observer對象,理所固然能夠猜到CreateEmitter內部是onNext方法中去調用observer的onNext方法。點進去猜對了。spa
2. subscribeActual方法何時調用?當訂閱的時候纔會調用這個方法,不信你能夠查看Observable$subscribe方法,裏邊對subscribeActual方法進行了調用。線程
很顯然這個Observable1沒有被訂閱,因此目前來subscribeActual方法不會被調用。
咱們使用map去轉換Observable1而後返回Observable2:
代碼二:
Observable<Integer> Observable2 = Observable1.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer + 10;
}
});複製代碼
點進去map,能夠看到
源碼三:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
這個this就是Observable1實例,畢竟是調用他的map方法
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
} 複製代碼
繼續跟蹤源碼三中的ObservableMap以下:
源碼四
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
//注意這個source是上邊傳的this,也是Observable1實例
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override//這個方法是訂閱的時候才調用
public void subscribeActual(Observer<? super U> t) {
//注意這個source是上邊傳的this,也是Observable1實例
source.subscribe(new MapObserver<T, U>(t, function));
} }複製代碼
源碼3、四有幾點須要闡述:
源碼四中的source就是上流的Observable1事例。
由於Observable2 沒有訂閱,因此subscribeActual方法不會被調用。
咱們再去使用doOnNext去作一次轉換
代碼三:
Observable<Integer> Observable3 = Observable2.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});複製代碼
點進去doOnNext查看源碼:
源碼五:
private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
//這裏的this是指的上流的Observable2,畢竟調用的他的doOnEach方法
return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
} 複製代碼
繼續跟蹤ObservableDoOnEach
源碼六:
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
//這個source是上邊傳入的this,也就是Observable2實例
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Action onAfterTerminate) {
super(source);
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onAfterTerminate = onAfterTerminate;
}
@Override
public void subscribeActual(Observer<? super T> t) {
//這個source是上邊傳入的this,也就是Observable2實例
source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
} 複製代碼
源碼六中的source就是上流的Observabl2事例。
截止到目前Observable3 也沒有被訂閱,因此全部的subscribeActual方法都沒有被調用。
接下來咱們去訂閱Observable3(注意僅僅是訂閱Observable3):
Observable3 .subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});複製代碼
這個時候源碼六中的subscribeActual方法被觸發,subscribeActual
中的邏輯又會觸發Observable2被訂閱,這個時候源碼四中的的subscribeActual
方法被觸發,subscribeActual
中的邏輯又會使得Observable1被訂閱,當Observable1被訂閱的時候,源碼二中的subscribeActual
方法就開始建立發射對象,進行發射。這就完成了一處訂閱,到處訂閱的效果。
到此,咱們只是跟蹤了訂閱的源碼。
咱們接下來跟蹤一下觀察者是怎麼傳遞和執行的!
看源碼六,外部傳入的Observer經過DoOnEachObserver包裝而後傳遞給Observable2。繼續跟蹤又經過源碼四中的MapObserver包裝傳遞到Observable1
最後傳遞到源碼二中的CreateEmitter對象中被調用,這就經過包裝者模式完成數據轉換的層層封裝。
三總結
都是下流被觀察者持有上流被觀察者對象引用,能夠保證subscribeActual方法
遞歸的向上調用。每次調用都把觀察者使用包裝者模式包裝一下本層的邏輯,而後傳遞給上游,直至到源頭流。
在訂閱時,實際上這個順序是逆向的,從下游往上游進行訂閱。數據的傳遞和變換則是正常,方向從上游往下游進行依次處理,最終執行咱們subscribe中傳遞的Observer.onNext()。
以上分析所有是基於同步進行的分析,若是想要印證:
observeOn 後面的全部操做都會在它指定線程工做。subscribeOn 指定的線程是從這個Observable生成一直到
遇到其餘 observeOn。若是程序須要屢次切換線程,使用屢次observeOn是徹底能夠的。而subscribeOn只有最
上方的subscribeOn會起做用。複製代碼
就須要查看ObservableObserveOn被觀察者中的subscribeActual
方法。和ObservableSubscribeOn被觀察者的subscribeActual方法。有興趣的可自行研究。大體就是subscribeOn的原理就是實現一個runnable,而後再runnable中去開啓上流訂閱,這樣全部觀察者的邏輯都和runnable的線程一致。而後指定runnable的線程就能夠改變全部觀察者的線程。observeOn的原理就是獲取下游傳遞過來的觀察者,而後指定下流傳遞過來的被觀察者的線程。僞代碼以下:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//整個訂閱都是在runnable中,因此能夠指定上游的線程和全部觀察者的線程
source.subscribe(parent);
}
}複製代碼
上邊代碼註釋很清楚,指定全部觀察者線程,這就讓全部的代碼經過一行代碼進行了線程切換。直到遇到切換下游線程的代碼observeOn。另外關於指定下游的僞代碼:
Scheduler.Worker w = scheduler.createWorker();
這裏的訂閱是轉換完成再指定下游的線程
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));複製代碼