系列文章:html
本文基於 RxJava 2.1.9java
距離前兩篇文章已通過去三個月之久了,終於補上第三篇了。第三篇預期就是針對某一個操做符的源碼進行解析,選擇了 Observable.zip
的緣由一是司裏這塊用的比較多,再一個筆者以爲這個操做符十分強大,想去探索一番 zip 操做符是如何實現這樣的騷操做,若是讀者還不瞭解 zip 操做符,建議查看文檔並上手一番,文檔地址:Zip · ReactiveX文檔中文翻譯react
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.functions.BiFunction;
public class Test {
@SuppressWarnings("ResultOfMethodCallIgnored")
public static void main(String[] args) {
Observable.zip(first(), second(), zipper())
.subscribe(System.out::println);
}
private static ObservableSource<String> first() {
return Observable.create(emitter -> {
Thread.sleep(1000);
emitter.onNext("11");
emitter.onNext("12");
emitter.onNext("13");
}
);
}
private static ObservableSource<String> second() {
return Observable.create(emitter -> {
emitter.onNext("21");
Thread.sleep(2000);
emitter.onNext("22");
Thread.sleep(3000);
emitter.onNext("23");
}
);
}
private static BiFunction<String, String, String> zipper() {
return (s1, s2) -> s1 + "," + s2;
}
}
複製代碼
hello world 級別的代碼就是爲了 hello world. —— 魯迅git
如上所示,操做過 zip 操做符的讀者們應該都知道,會在一秒後輸出【11,21】,緊接着兩秒後輸出【12,22】,再緊接着三秒後輸出【13,23】。數組
通過前兩篇文章的閱讀,筆者相信讀者們能很快地找到 ObservableZip
這個類,這個類就是實現具體 zip 操做的核心類,一樣地,直接針對該類的 subscribeActual(Observer)
解析,簡化後源碼以下:併發
public void subscribeActual(Observer<? super R> s) {
// sources 是上游 ObservableSource 數組
// 在本案例中也就是上面 first() 和 second() 方法傳回的 ObservableSource
ObservableSource<? extends T>[] sources = this.sources;
ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(s, zipper, count, delayError);
zc.subscribe(sources, bufferSize);
}
複製代碼
簡化後能夠看到仍是很簡單的,因此下步就是了解 ZipCoordinator
類和其 subscribe()
方法的實現了,ZipCoordinator
構造函數和 ZipCoordinator#subscribe()
代碼簡化以下 ——app
ZipCoordinator(Observer<? super R> actual, int count) {
this.actual = actual;
this.observers = new ZipObserver[count];
this.row = (T[])new Object[count];
}
public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
ZipObserver<T, R>[] s = observers;
int len = s.length;
for (int i = 0; i < len; i++) {
s[i] = new ZipObserver<T, R>(this, bufferSize);
}
actual.onSubscribe(this);
for (int i = 0; i < len; i++) {
sources[i].subscribe(s[i]);
}
}
複製代碼
大體作了如下幾件事:ide
ZipCoordinator#subscribe()
中初始化了 ZipObserver 數組並讓上游 ObservableSource 分別訂閱了對應的 ZipObserver。通過前面的文章分析咱們知道,上游的 onNext(T)
方法會觸發下游的 onNext(T)
方法,因此下一步來看看 ZipObserver 的 onNext(T)
方法實現 ——函數
@Override
public void onNext(T t) {
queue.offer(t);
parent.drain();
}
複製代碼
能夠看到,源碼十分的簡單,一是入隊,二是調用 ZipCoordinator#drain()
方法,精簡以下 ——高併發
public void drain() {
final ZipObserver<T, R>[] zs = observers;
final Observer<? super R> a = actual;
// row 在咱們前面提到過
final T[] os = row;
for (; ; ) {
int i = 0;
int emptyCount = 0;
for (ZipObserver<T, R> z : zs) {
if (os[i] == null) {
boolean d = z.done;
T v = z.queue.poll();
boolean empty = v == null;
if (!empty) {
os[i] = v;
} else {
emptyCount++;
}
} else {
// ...
}
i++;
}
if (emptyCount != 0) {
break;
}
R v = zipper.apply(os.clone();
a.onNext(v);
Arrays.fill(os, null);
}
}
複製代碼
先從實際場景解析流程,再來總結 ——
第一個事件應該是上游 first()
返回的 ObservableSource 中發射的【11】,最終在 ZipObserver#onNext(T)
方法中,該事件首先被塞入隊列,再觸發上述的 ZipCoordinator#drain()
,在 drain()
方法中會進入 ZipObserver 的遍歷 ——
for 循環跳出後,因爲 emptyCount 不爲0,死循環結束。
第二個事件也是由 first()
發射過來的(【12】), 當第二個事件發射過來的時候——
一樣地,第三個事件(【13】)發射過來的時候,走一樣的邏輯。
可是1000毫秒後,第「四」個事件由 second()
發射(也就是【21】)的時候,事情就不同了——
for 循環跳出後,通過 zipper 操做合併後兩個事件被傳輸給下游 Observer 的 onNext(T)
中,此時打印臺就輸出了【11,21】了。固然,最後還會將 os 數組中元素所有填充爲 null,爲下一次數據填充作準備。
因此實際上 zip 操做符的原理在於就是依靠隊列+數組,當一個事件被髮射過來的時候,首先進入隊列,再去查看數組的每一個元素是否爲空 ——
直到最後,斷定 emptyCount 是否不爲0,不爲0則意味着數組沒有被填滿,某些隊列中尚未值,因此只能結束這次操做,等待下一次上游發射事件了。而若是 emptyCount 爲0,那麼說明數組中的值被填滿了,這意味着符合觸發下游 Observer#onNext(T)
的要求了,固然,不要忘了將數組內部元素置 null,爲下次數據填充作準備。
媽個雞,是否是還沒懂?筆者也以爲挺難懂的,誰要跟我這麼說我也聽不懂啊!畫圖吧 ——
第一次事件由「第一個」事件源發出:
當【11】入隊後,數組開始遍歷,數組 0 的位置試圖將第一個隊列 poll 的值填入,此時爲【11】;數組 1 的位置試圖將第二個隊列 poll 的值填入,可是此時爲 null,因此最終結束操做,等待下一次上游的事件發射。
第二次事件仍然是由「第一個」事件源發出的 ——
當【12】入隊後,數組開始遍歷,數組 0 位置已經被填入值,數組 1 的位置試圖將第二個隊列 poll 的值填入,可是此時爲 null,結束操做。
另外一種狀況則是第二次事件是由「第二個」事件源發出:
當【21】入隊後,數組開始遍歷,數組 0 位置已經被填入值,數組 1 的位置試圖將第二個隊列 poll 的值填入,此時爲【21】。循環結束後,emptyCount 依舊爲0,符合條件,觸發下游 Observer#onNext(T)
,而後將數組中元素置 null,爲下一次數據填充作準備。
ZipCoordinator 爲了應對高併發引入了 CAS,同時也利用 CAS 優化 ZipCoordinator#drain()
實現,另外若是各位讀者對 rxjava 有必定的瞭解,必定知道有一些和 zip 一類的操做符被稱爲組合操做符,而裏面的 concat 操做符的實現,和 zip 操做符的實現有着殊途同歸之妙,感興趣的讀者能夠去自行去源碼中一探究竟,感覺下 rxjava 的魅力。