RxJS: 詳解forkJoin, zip, combineLatest之間的區別

前言

forkJoin, zip, combineLatestrxjs中的合併操做符,用於對多個流進行合併。不少人第一次接觸rxjs時每每分不清它們之間的區別,其實這很正常,由於當你準備用來合併的流是那種只會發射一次數據就關閉的流時(好比http請求),就結果而言這三個操做符沒有任何區別。數組

const ob1 = Rx.Observable.of(1).delay(1000);
const ob2 = Rx.Observable.of(2).delay(2000);
const ob3 = Rx.Observable.of(3).delay(3000);

Rx.Observable.forkJoin(ob1, ob2, ob3).subscribe((data) => console.log(data));
Rx.Observable.zip(ob1, ob2, ob3).subscribe((data) => console.log(data));
Rx.Observable.combineLatest(ob1, ob2, ob3).subscribe((data) => console.log(data));

// [1, 2, 3]
// [1, 2, 3]
// [1, 2, 3]
// 都是在3秒的時候打印

rxjs中不少操做符功能相近,只有當其操做的流會屢次發射數據時纔會體現出它們之間的區別,下面咱們來詳細解釋forkJoin, zip, 和combineLatest併發

一個基本概念

首先咱們要知道,一個流(或者說Observable序列)的生命週期中,每次發射數據會發出next信號(Notification),結束髮射時會發出complete信號,發生錯誤時發出error信號,三個信號分別對應observer的三個方法。next信號會因爲發射源的不一樣發射0到屢次;而completeerror僅會發射其中一個,且只發射一次,標誌着流的結束。
subscribe接收一個observer對象用來處理上述三種信號,只傳入一個函數會被認爲是next方法,所以傳入subscribenext方法會執行0到N次,N爲序列正常發射信號的次數。函數

forkJoin

forkJoin合併的流,會在每一個被合併的流都發出結束信號時發射一次也是惟一一次數據。假設咱們有兩個流:code

const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.forkJoin(ob1, ob2).subscribe((data) => console.log(data));
// ["ob1:2", "ob2:1"]

ob1會在發射完第三個數據時中止發射,ob2會在發射完第二個數據時中止,而forkJoin合併後的流會等到ob1ob2都結束時,發射一次數據,也就是觸發一次subscribe裏的回調,接收到的數據爲ob1ob2發射的最後一次數據的數組。server

zip

zip工做原理以下,當每一個傳入zip的流都發射完畢第一次數據時,zip將這些數據合併爲數組併發射出去;當這些流都發射完第二次數據時,zip再次將它們合併爲數組併發射。以此類推直到其中某個流發出結束信號,整個被合併後的流結束,再也不發射數據。對象

const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.zip(ob1, ob2).subscribe({
  next: (data) => console.log(data),
  complete: () => console.log('complete')
});
// ["ob1:0", "ob2:0"] ob1等待ob2發射數據,以後合併
// ["ob1:1", "ob2:1"] 此時ob2結束,整個合併的流也結束
// "complete"

zipforkJoin的區別在於,forkJoin僅會合並各個子流最後發射的一次數據,觸發一次回調;zip會等待每一個子流都發射完一次數據而後合併發射,以後繼續等待,直到其中某個流結束(由於此時不能使合併的數據包含每一個子流的數據)。rxjs

combineLatest

combineLatestzip很類似,combineLatest一開始也會等待每一個子流都發射完一次數據,可是在合併時,若是子流1在等待其餘流發射數據期間又發射了新數據,則使用子流最新發射的數據進行合併,以後每當有某個流發射新數據,再也不等待其餘流同步發射數據,而是使用其餘流以前的最近一次數據進行合併。生命週期

const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.combineLatest(ob1, ob2).subscribe({
  next: (data) => console.log(data),
  complete: () => console.log('complete')
});
// ["ob1:1", "ob2:0"] ob1等待ob2發射,當ob2發射時ob1已經發射了第二次數據,使用ob1的第二次數據
// ["ob1:2", "ob2:0"] ob1繼續發射第三次也是最後一次數據,ob2雖然還未發射,可是能夠使用它上一次的數據
// ["ob1:2", "ob2:1"] ob2發射第二次也是最後一次數據,使ob1上一次的數據。
// "complete"

本期內容結束,下一期會繼續帶來rxjs的其餘操做符或者概念詳解。ip

相關文章
相關標籤/搜索