前言:上一篇文章RxJava2源碼分析(一):基本流程分析,是對RxJava2基本流程的分析,有了上一篇的基礎,這篇就再深刻一點,開始分析一下RxJava2操做符的原理。html
爲了方便理解RxJava2操做符的原理,這裏選擇最經常使用的map
操做符來說解操做符的原理,示例代碼以下java
private void basicUseRxJava() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "我是數字" + integer;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("wizardev", "onNext: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
複製代碼
由於這一篇的內容是在上一篇的基礎上進行講解的,因此在講解操做符以前,先回顧一下前一篇主要的知識,以下:app
一樣,這裏分析源碼的順序依然按照代碼的執行順序,create
方法前文已經分析過了,這裏就直接看map
方法,map
方法的代碼以下ide
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
複製代碼
有了前一篇的經驗,能夠直接從這個方法中得出結論即map
方法返回的是ObservableMap實例,同時將map
方法的參數及Observable自身注入了其構造方法中。源碼分析
如今看下ObservableMap類的源碼,以下ui
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);//調用了其父類的構造方法
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
//省略部分無關代碼
//...
}
複製代碼
從上面的代碼中能夠看出,ObservableMap
繼承至AbstractObservableWithUpstream
,繼續進入AbstractObservableWithUpstream
類中看下源碼,以下this
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
複製代碼
從上面的一段代碼,能夠知道AbstractObservableWithUpstream
類其實就是Observable
類的裝飾類,這個類的做用就是將實例化的Observable
注入進來,做爲其成員變量。分析到這裏能夠得出這幾個類的關係以下spa
注:ObservableMap中的成員變量function
就是咱們寫的這段代碼code
new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "我是數字" + integer;
}
}
複製代碼
好了,到這裏算是將map
方法所作的事情分析完了,下面來看實例代碼的最後一個方法subscribe
.cdn
經過上一篇文章能夠知道subscribe
方法實際調用的是Observable子類的subscribeActual
方法,而這裏調用subscribe
方法的類是ObservableMap
,因此這裏調用的就是ObservableMap
類的subscribeActual
方法。如今來看下ObservableMap類的subscribeActual
方法的源碼,以下(爲了分析方便,這裏將與subscribeActual方法有關的代碼一塊兒貼了出來)
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
複製代碼
從上面的代碼能夠發現ObservableMap
類的subscribeActual
方法又調用了其上游的subscribe
方法,爲了便於理解這裏解釋一下文中所說的上游下游
文中說的「上游」及「下游」實際上是相對而言的,這裏的「上游」是靠近Observable的,如示例代碼中的subscribe是最下游,map是其上游,而map操做符又是crate方法的下游。
這裏上游的subscribe
方法就是ObservableCreate調用的subscribe
方法,實際就是調用ObservableCreate的subscribeActual
方法,接着就是前一篇講過的流程了。
分析到如今能夠得出如下結論
subscribe
方法的調用流程是從下往上的,就是從下游往上游分別調用其subscribe
方法。爲了方便理解,這裏我用時序圖表示subscribe
的調用順序,以下圖
上面的內容分析了RxJava2基本流程加入操做符後的subscribe
方法的執行順序,接着就來看下,數據的接收順序。通過上面的分析能夠知道最終調用的是ObservableCreate類的subscribeActual
方法,這裏與前一篇文章subscribeActual
方法不一樣的就是subscribeActual
方法的參數改變了,這裏的參數是MapObserver
類的實例,再來看下ObservableCreate類的subscribeActual
方法的源碼,以下
//這裏的參數實際是MapObserver類的實例
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//這句代碼的最終調用的就是MapObserver類的onNext方法。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
一些重要的內容已經在上面的代碼中進行了註釋,可能這句source.subscribe(parent);
代碼,很差理解,這裏就來解釋一下這句代碼,由上一篇文章可知,這裏的source
就是示例代碼中的new ObservableOnSubscribe<Integer>()...
實例,這裏就是調用了這個實例的subscribe
方法,而這個方法中的代碼就是調用了其參數的onNext
方法,**最終調用的就是MapObserver類的onNext方法。**如今,來看下MapObserver類的onNext方法的代碼,以下
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
//一、
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//二、
downstream.onNext(v);
}
複製代碼
主要來看下v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
這句代碼,這句代碼中的mapper就是注入到ObservableMap中的成員變量function,詳細內容能夠查看源碼。調用的apply方法,就是示例代碼中的這句代碼
@Override
public String apply(Integer integer) throws Exception {
return "我是數字" + integer;
}
複製代碼
接着,能夠發現又調用了「2」處的代碼downstream.onNext(v);
,這句代碼中的downstream
就是示例代碼中最下游的subscribe
方法中的參數便是下面的代碼
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("wizardev", "onNext: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}
複製代碼
因此這句downstream.onNext(v);
調用的就是new Observer<String>()
匿名類中的onNext
方法。關於downstream
的是什麼時候初始化的,能夠從MapObserver的父類BasicFuseableObserver類中知曉。
分析到這裏,又能夠得出一些結論
onNext
方法,首先調用的是function
中的apply
方法,而後再調用下游的onNext
方法並將處理後的參數傳入。 經過分析map
操做符,能夠知道訂閱方法(subscribe)是從下游到上游進行訂閱的,而數據的發射是從上游到下游進行的。這兩個特性不單單是map
操做符的特性,對其餘的操做符一樣適用。爲了講清楚這兩個特性,本文就選了比較具備表明性的map
操做符,若是想了解其餘操做符的原理,就順着這兩個特性分析就好了。
授之以魚,不如授之以漁。本文的目的就是解釋清楚操做符的思想及原理,理解了這種思想及原理,分析其餘的操做符也就不在話下了。