forkJoin
, zip
, combineLatest
是rxjs
中的合併操做符,用於對多個流進行合併。不少人第一次接觸rxjs
時每每分不清它們之間的區別,其實這很正常,由於當你準備用來合併的流是那種只會發射一次數據就關閉的流時(好比http
請求),就結果而言這三個操做符沒有任何區別。數組
const ob1 = Rx.Observable.of(1).delay(1000); const ob2 = Rx.Observable.of(2).delay(2000); const ob3 = Rx.Observable.of(3).delay(3000); Rx.Observable.forkJoin(ob1, ob2, ob3).subscribe((data) => console.log(data)); Rx.Observable.zip(ob1, ob2, ob3).subscribe((data) => console.log(data)); Rx.Observable.combineLatest(ob1, ob2, ob3).subscribe((data) => console.log(data)); // [1, 2, 3] // [1, 2, 3] // [1, 2, 3] // 都是在3秒的時候打印
rxjs
中不少操做符功能相近,只有當其操做的流會屢次發射數據時纔會體現出它們之間的區別,下面咱們來詳細解釋forkJoin
, zip
, 和combineLatest
。併發
首先咱們要知道,一個流(或者說Observable
序列)的生命週期中,每次發射數據會發出next
信號(Notification
),結束髮射時會發出complete
信號,發生錯誤時發出error
信號,三個信號分別對應observer
的三個方法。next
信號會因爲發射源的不一樣發射0到屢次;而complete
和error
僅會發射其中一個,且只發射一次,標誌着流的結束。subscribe
接收一個observer
對象用來處理上述三種信號,只傳入一個函數會被認爲是next
方法,所以傳入subscribe
的next
方法會執行0到N次,N爲序列正常發射信號的次數。函數
forkJoin
用forkJoin
合併的流,會在每一個被合併的流都發出結束信號時發射一次也是惟一一次數據。假設咱們有兩個流:code
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3); const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2); Rx.Observable.forkJoin(ob1, ob2).subscribe((data) => console.log(data)); // ["ob1:2", "ob2:1"]
ob1
會在發射完第三個數據時中止發射,ob2
會在發射完第二個數據時中止,而forkJoin
合併後的流會等到ob1
和ob2
都結束時,發射一次數據,也就是觸發一次subscribe
裏的回調,接收到的數據爲ob1
和ob2
發射的最後一次數據的數組。server
zip
zip
工做原理以下,當每一個傳入zip
的流都發射完畢第一次數據時,zip
將這些數據合併爲數組併發射出去;當這些流都發射完第二次數據時,zip
再次將它們合併爲數組併發射。以此類推直到其中某個流發出結束信號,整個被合併後的流結束,再也不發射數據。對象
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3); const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2); Rx.Observable.zip(ob1, ob2).subscribe({ next: (data) => console.log(data), complete: () => console.log('complete') }); // ["ob1:0", "ob2:0"] ob1等待ob2發射數據,以後合併 // ["ob1:1", "ob2:1"] 此時ob2結束,整個合併的流也結束 // "complete"
zip
和forkJoin
的區別在於,forkJoin
僅會合並各個子流最後發射的一次數據,觸發一次回調;zip
會等待每一個子流都發射完一次數據而後合併發射,以後繼續等待,直到其中某個流結束(由於此時不能使合併的數據包含每一個子流的數據)。rxjs
combineLatest
combineLatest
與zip
很類似,combineLatest
一開始也會等待每一個子流都發射完一次數據,可是在合併時,若是子流1在等待其餘流發射數據期間又發射了新數據,則使用子流最新發射的數據進行合併,以後每當有某個流發射新數據,再也不等待其餘流同步發射數據,而是使用其餘流以前的最近一次數據進行合併。生命週期
const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3); const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2); Rx.Observable.combineLatest(ob1, ob2).subscribe({ next: (data) => console.log(data), complete: () => console.log('complete') }); // ["ob1:1", "ob2:0"] ob1等待ob2發射,當ob2發射時ob1已經發射了第二次數據,使用ob1的第二次數據 // ["ob1:2", "ob2:0"] ob1繼續發射第三次也是最後一次數據,ob2雖然還未發射,可是能夠使用它上一次的數據 // ["ob1:2", "ob2:1"] ob2發射第二次也是最後一次數據,使ob1上一次的數據。 // "complete"
本期內容結束,下一期會繼續帶來rxjs
的其餘操做符或者概念詳解。ip