衆所周知RxJava有許多優勢好比強大的鏈式調用,方便的線程調度,可是我對其原理仍是瞭解的太少了,所以打算閱讀下源碼,先從一個最基本的例子開始java
這個例子只是爲了示例,正常狀況下也不會這麼寫數組
fun main() {
Observable.create {
emitter: ObservableEmitter<Int> ->
println("onSourceSubscribe")
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
}.subscribe(object : Observer<Int> {
override fun onComplete() {
println("onComplete")
}
override fun onSubscribe(d: Disposable) {
println("onObserverSubscribe")
}
override fun onNext(t: Int) {
println("onNext $t")
}
override fun onError(e: Throwable) {
println("onError")
}
})
}
複製代碼
輸出結果:緩存
onObserverSubscribe
onSourceSubscribe
onNext 1
onNext 2
onNext 3
onComplete
複製代碼
那麼爲何會按這個順序輸出呢?從代碼中也能夠看出從始至終也只調用了create、subscribe兩個方法,先來看看create的源碼bash
// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
複製代碼
能夠看出當咱們沒有設置onObservableAssembly時其實就是直接建立了一個ObservableCreate實例返回,接着看看subscribe併發
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
複製代碼
能夠看到內部就是調用了subscribeActual方法,而這個方法是個抽象方法,ObservableCreate實現了該方法app
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
複製代碼
內部主要就是先建立了一個CreateEmitter實例,而後調用observer的onSubscribe方法,最後再調用source的subscribe方法,這就解釋了onObserverSubscribe和onSourceSubscribe的輸出,而source的subscribe方法又調用了三次onNext方法和一次onComplete方法,先看看onNextide
// CreateEmitter.java
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
複製代碼
若是還沒dispose那麼直接就調用了observer的onNext,這也就解釋了onNext 一、onNext 二、onNext 3三個輸出接着看onComplete函數
// CreateEmitter.java
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
複製代碼
若是還沒dispose就直接調用observer的onComplete,直接解釋了onComplete的輸出,咱們注意到Observer還有一個onError回調,該方法能夠經過調用emitter.onError手動觸發oop
// CreateEmitter.java
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
複製代碼
能夠看到當還沒被dispose就會調用到observer的onError方法,至此這個基本demo的源碼已經分析完畢。 總結下上述代碼其實就分爲以下幾步ui
上述實例的總體流程圖以下
下面來從源碼的角度研究研究RxJava中的幾個基本方法
首先從最基本的map方法開始
map方法將每一個onNext事件都調用所傳入的Function實例的apply方法來達到轉化數據源的效果,以下圖所示
示例代碼以下所示
fun main() {
Observable.create {
emitter: ObservableEmitter<Int> ->
println("onSourceSubscribe")
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
}
.map {
it + 1
}
.subscribe(object : Observer<Int> {
override fun onComplete() {
println("onComplete")
}
override fun onSubscribe(d: Disposable) {
println("onObserverSubscribe")
}
override fun onError(e: Throwable) {
println("onError")
}
override fun onNext(t: Int) {
println("onNext $t")
}
})
}
複製代碼
輸出結果:
onObserverSubscribe
onSourceSubscribe
onNext 2
onNext 3
onNext 4
onComplete
複製代碼
很明顯map方法會對全部的next的數據作一次變化這裏是加1,接着看看map的源碼實現
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
複製代碼
內部建立了一個ObservableMap實例並將當前的Observable實例和Function實例傳入,根據本文一開始的分析當調用Observable的subscribe方法其實會調用subscribeActual方法
// ObservableMap.java
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
複製代碼
建立了MapObserver實例將Observer實例進行包裝而後調用source.subscribe,這個source其實就是上一級Observable實例本例中對應ObservableCreate實例,接着根據上文的分析會調用該MapObserver實例的onNext三次而後調用一次onComplete
// MapObserver.java
public void onNext(T t) {
// 初始化的時候done爲false
if (done) {
return;
}
// 初始化的時候就是NONE
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
複製代碼
咱們能夠看到內部調用了mapper.apply方法,接着將拿到的結果v當作參數調用downstream的onNext方法,注意這裏的downStream就是外界建立的一個Observer對象。上述實例是總體流程圖以下。注:綠色框表示對象,藍色框表示方法調用,括號內爲簡稱
綜上咱們能夠知道map經過代理下游Observer實例完成數據轉換,接着看看flatMap的源碼實現
flatMap方法用於將上游的每個onNext事件都轉換成一個Observable實例,以下圖所示
fun main() {
Observable.create {
emitter: ObservableEmitter<Int> ->
println("onSourceSubscribe")
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
}
.flatMap {
Observable.just(it, it + 1)
}
.subscribe(object : Observer<Int> {
override fun onComplete() {
println("onComplete")
}
override fun onSubscribe(d: Disposable) {
println("onObserverSubscribe")
}
override fun onError(e: Throwable) {
println("onError")
}
override fun onNext(t: Int) {
println("onNext $t")
}
})
}
複製代碼
輸出結果:
onObserverSubscribe
onSourceSubscribe
onNext 1
onNext 2
onNext 2
onNext 3
onNext 3
onNext 4
onComplete
複製代碼
很顯然flatMap將每個事件好比1轉換成一個擁有一、2兩個事件的Observable實例,來看看其源碼實現
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
複製代碼
默認delayErrors爲false表示當一個事件出現異常就會中止整個事件序列,默認併發數爲Int的最大值,默認緩存大小爲128,而後根據這些參數和當前Observable實例構建出一個ObservableFlatMap實例,咱們看看其subscribeActual方法
// ObservableFlatMap.java
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
複製代碼
內部又經過這些參數和下游的Observer實例構建了一個MergeObserver實例,直接看看其onSubscribe方法
// MergeObserver.java
public void onSubscribe(Disposable d) {
// 只會回調一次下游的onSubscribe方法
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
複製代碼
若是已經有上游了就不作任何處理否則進行上游的賦值,而後回調了下游也就是自定義的那個Observer的onSubscribe方法,接着看看其onNext方法是怎麼把一個輸入源轉化成一個Observable的
public void onNext(T t) {
ObservableSource<? extends U> p;
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
subscribeInner(p);
}
複製代碼
先是調用了傳入的apply方法將每一個onNext數據源轉化爲Observable實例,接着調用subscribeInner方法
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
...
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
boolean addInner(InnerObserver<T, U> inner) {
for (;;) {
InnerObserver<?, ?>[] a = observers.get();
int n = a.length;
InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = inner;
if (observers.compareAndSet(a, b)) {
return true;
}
}
}
複製代碼
爲每一個Observable對象都建立了一個InnerObserver實例,而後將其放入到一個數組中去,最後調用subscribe方法進行訂閱,因爲apply方法返回了一個ObservableFromArray實例,因此看看其subscribeActual方法
// ObservableFromArray.java
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
複製代碼
observer指代InnerObserver,看看其onSubscribe方法
public void onSubscribe(Disposable d) {
// 第一次調用會返回true,d就是FromArrayDisposable實例其派生自QueueDisposable
if (DisposableHelper.setOnce(this, d)) {
if (d instanceof QueueDisposable) {
QueueDisposable<U> qd = (QueueDisposable<U>) d;
// 至關於傳入了7
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
fusionMode = m;
queue = qd;
done = true;
parent.drain();
return;
}
if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;
}
}
}
}
//FromArrayDisposable.java
public int requestFusion(int mode) {
// 很明顯7 & 1 != 0
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
複製代碼
這裏暫時還無法理解這個fusionMode(混合模式)是幹什麼用的,接着會調用到MergeObserver的drain方法
void drain() {
// 只會執行一次,循環將全部事件取出
if (getAndIncrement() == 0) {
drainLoop();
}
}
// 當取消了或者出現了錯誤並其dealyErrors爲false時會將全部InnerObserver都dispose掉
boolean checkTerminate() {
if (cancelled) {
return true;
}
Throwable e = errors.get();
if (!delayErrors && (e != null)) {
disposeAll();
e = errors.terminate();
if (e != ExceptionHelper.TERMINATED) {
downstream.onError(e);
}
return true;
}
return false;
}
void drainLoop() {
// 這裏的downstream就是外界自定義的Observer實例
final Observer<? super U> child = this.downstream;
for (;;) {
...
for (int i = 0; i < n; i++) {
if (checkTerminate()) {
return;
}
InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
SimpleQueue<U> q = is.queue;
if (q != null) {
for (;;) {
U o;
try {
o = q.poll();
} catch (Throwable ex) {
...
if (checkTerminate()) {
return;
}
continue sourceLoop;
}
// 每取出一個便會調用下游的onNext方法
child.onNext(o);
}
// 會把一個Observable源全部的數據都取完了之後纔會進入下一個
if (o == null) {
break;
}
}
...
}
..
}
}
複製代碼
drainLoop內部會從數組中一個個取出InnerObserver實例,並取出所對應的數據源而後每取出一個回調下游Observer的onNext方法,下面用一張圖來總結下實例的流程
zip方法經過一個函數將多個Observables的發射物結合到一塊兒,基於這個函數的結果爲每一個結合體發射單個數據項。以下圖所示
下面是一個使用zip操做符的一個簡單例子
fun main() {
Observable.zip(getObservable1(), getObservable2(), zipper())
.subscribe(object : Observer<String> {
override fun onComplete() {
println("onComplete")
}
override fun onSubscribe(d: Disposable) {
println("onSubscribe")
}
override fun onNext(t: String) {
println("onNext $t")
}
override fun onError(e: Throwable) {
println("onError $e")
}
})
}
fun getObservable1(): ObservableSource<String> {
return Observable.create {
it.onNext("A")
it.onNext("B")
it.onNext("C")
Thread.sleep(1000)
}
}
fun getObservable2(): ObservableSource<String> {
return Observable.create {
it.onNext("1")
Thread.sleep(1000)
it.onNext("2")
Thread.sleep(1000)
it.onNext("3")
}
}
fun zipper(): BiFunction<String, String, String> {
return BiFunction { s1, s2 -> s1 + s2 }
}
複製代碼
輸出結果以下,onSubscribe輸出後過一秒輸出A1,再過一秒輸出A2,再過一秒輸出A3
onSubscribe
onNext A1
onNext B2
onNext C3
複製代碼
那麼爲何輸出結果會是這個樣子的呢?來看看zip的源碼實現
public static <T1, T2, R> Observable<R> zip( ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper) {
return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2);
}
public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize, ObservableSource<? extends T>... sources) {
return RxJavaPlugins.onAssembly(new ObservableZip<T, R>(sources, null, zipper, bufferSize, delayError));
}
複製代碼
根據源碼能夠看出zip
方法其實最終建立了一個ObservableZip
實例,直接看其subscribeActual
// ObservableZip
public void subscribeActual(Observer<? super R> observer) {
ObservableSource<? extends T>[] sources = this.sources;
int count = sources.length;
ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(observer, zipper, count, delayError);
zc.subscribe(sources, bufferSize);
}
ZipCoordinator(Observer<? super R> actual,
Function<? super Object[], ? extends R> zipper,
int count, boolean delayError) {
this.downstream = actual;
this.zipper = zipper;
this.observers = new ZipObserver[count];
this.row = (T[])new Object[count];
this.delayError = delayError;
}
public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
ZipObserver<T, R>[] s = observers;
int len = s.length;
for (int i = 0; i < len; i++) {
s[i] = new ZipObserver<T, R>(this, bufferSize);
}
this.lazySet(0);
downstream.onSubscribe(this);
for (int i = 0; i < len; i++) {
if (cancelled) {
return;
}
sources[i].subscribe(s[i]);
}
}
複製代碼
能夠看出ZipCoordinator的subscribe
內部建立了輸入源大小的ZipObserver實例,而後調用每一個輸入源的subscribe
方法,這樣當輸入源發送事件時就會調用ZipObserver的onNext
方法
// ZipObserver.java
public void onNext(T t) {
queue.offer(t);
parent.drain();
}
複製代碼
主要看看ZipCoordinator的drain
方法
public void drain() {
final ZipObserver<T, R>[] zs = observers;
final Observer<? super R> a = downstream;
final T[] os = row;
final boolean delayError = this.delayError;
for (;;) {
for (;;) {
int i = 0;
int emptyCount = 0;
for (ZipObserver<T, R> z : zs) {
if (os[i] == null) {
boolean d = z.done;
T v = z.queue.poll();
boolean empty = v == null;
if (!empty) {
os[i] = v;
} else {
emptyCount++;
}
} else {
// 主要是錯誤判斷
}
i++;
}
if (emptyCount != 0) {
break;
}
R v = zipper.apply(os.clone());
a.onNext(v);
Arrays.fill(os, null);
}
}
}
複製代碼
結合示例,首先Obserable1會發送一個A事件,將其放入到了一個隊列中去,接着drain
遍歷全部的ZipObserver,第一個ZipObserver能夠從隊列中事件將其賦值給os[0]
,第二個取不到所以emptyCount++
,而後退出循環。接着Observable1又發送了一個B事件,再將其放入隊列中,而後執行drain
,此次由於os[0]
已經不爲null因此不會從隊列中取,os[1]
仍是null,退出循環繼續執行,接着Observable1再次發送一個C事件,這個跟B事件處理邏輯同樣,再接着Observable2會發送一個1事件,將其放入隊列,執行drain
將os[1]
賦值成1,因爲emptyCount
等於0,所以會執行zipper.apply
,這個方法內部會回調傳入的BiFunction的apply
方法(示例中僅僅進行了字符串拼接),獲取到結果A1,回調下游的onNext方法,而後將row這個數組置空,接着線程睡眠1秒,而後再次發送事件2,將其放入隊列中,執行drain
,方法內部遍歷兩個ZipObserver而且都能從隊列中取到事件,因此emptyCount
等於0,接着就會執行apply
而後獲取到結果B2,調用下游的onNext
,後面Observable2的3事件也跟2事件同樣就不說了。
經過分析map和flatMap兩個方法能夠總結出以下幾個的結論