rxjs入門6之合併數據流

一 concat,merge,zip,combineLatest等合併類操做符

以上操做符在版本6中已經只存在靜態方法,不能在pipe中使用。數組

import {concat,merge,zip,combineLatest}
1.concat (obs1,obs2,obs3) 首尾相連

依次將多個observable首尾合併,必須在第一個obs1的數據所有完成,才能進行第二個obs2,若是第一個爲interval(1000),那麼obs2 和obs3 也就永遠沒有機會獲得輸出。異步

concat(of(1,2,3),interval).subscribe(console.log);
// 1   2   3    0  1  2  3  ...
2.merge 先到先得快速經過

merge會第⼀時間訂閱全部的上游Observable,而後對上游的數據採起「先到先得」的策略,任何⼀個Observable只要有數據推下來,就⽴刻轉給下游Observable對象。函數

merge(interval(1000),of(1,2,3)).subscribe(console.log);
merge(of(1,2,3),interval(1000)).subscribe(console.log);
//兩種狀況的輸出結果同樣,都是先一次性輸出1 2 3  再間隔一秒依次輸出0 1 2 ...
const source1$ = Observable.timer(0, 1000).map(x => x+'A');
const source2$ = Observable.timer(500, 1000).map(x => x+'B');
merge(source1$, source2$).subscribe(
console.log,
null,
() => console.log('complete')
);
//0A
//0B
//1A
//1B
//2A
//2B

merge 的應用場景:咱們知道fromEvent能夠從⽹頁中獲取事件,只惋惜,fromEvent⼀次只能從⼀個DOM元素獲取⼀種類型的事件。⽐如,咱們關⼼某個元素的click事件,同時也關⼼這個元素上的touchend事件,由於在移動設備上touchend事件出現得⽐click更早,這兩個事件的處理是⼀模⼀樣的,可是fromEvent不能同時得到兩個事件的數據流,這時候就要藉助merge的⼒量了,代碼以下:code

const click$ = Rx.Observable.fromEvent(element, 'click');
const touchend$ = Rx.Observable.fromEvent(element, 'touchend');
merge(click$, touchend$).subscribe(eventHandler)
3.zip :拉鍊式組合

一對一的合併server

  • zip會把上游的數據轉化爲數組形式,每⼀個上游Observable貢獻的數據會在對應數組中佔⼀席之地.
  • 默認的輸出格式爲數組格式,可經過第二個參數進行參數格式組裝
  • 簡而言之:不論是同步產生數據仍是異步產生的數據,都會每次依次從須要合併的observable中取一個數據合併成一個數組輸出,當某一個observer再也不吐出數據了,則終止合併,執行complete函數
const source1$ = Observable.of(1, 2, 3);
const source2$ = Observable.of('a', 'b', 'c');
zip(source1$, source2$).subscribe(
    console.log,
    null,
    () => console.log('complete')
);
//[ 1, 'a' ]
//[ 2, 'b' ]
//[ 3, 'c' ]
//complete

4.combineLatest:合併最後一個數據
  • 輸出的數組中元素個數與合併的observable的個數相等。
  • 在合併的observable中,只有最後一個元素爲下游,前面的參數若是有同步的數據,同步數據中只有最後一個數據能進入數據流
  • 默認的輸出格式爲數組格式,可經過第二個參數進行參數格式組裝
  • 第一次執行,當上遊產生了數據,下游還沒來得及產生數據時,就會等待。第二輪時候,不論是上游或者下游產生一個數據,都會執行輸出,還沒來得及產生數據的observable就輸出原來產生的數據,以下彈珠圖
const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
combineLatest(source1$,source2$).subscribe(
    console.log,
    null,
    () => console.log('complete')
);
//[ 0, 0 ]
//[ 1, 0 ]
//[ 1, 1 ]
//[ 2, 1 ]
//[ 2, 2 ]
//[ 3, 2 ]

5.withLatestFrom
  • withLatestFrom 爲管道中使用的方法。默認的輸出格式爲數組格式,可經過第二個參數進行參數格式組裝
  • withLatestFrom只有實例操做符的形式,⽽且全部輸⼊Observable的地位並不相同,調⽤withLatestFrom的那個Observable對象起到主導數據產⽣節奏的做⽤,做爲參數的Observable對象只能貢獻數據,不能控制產⽣數據的時機。
const source1$ = Observable.timer(0, 2000).map(x => 100 * x);
const source2$ = Observable.timer(500, 1000);
 source1$.pipe(
    withLatestFrom(source2$, (a,b)=> a+b);
).subscribe(
    console.log,
    null,
    () => console.log('complete')
);


source1$產⽣第⼀個數據0時,withLatestFrom的另⼀個輸⼊Observable對象source2$尚未產⽣數據,因此這個0也被忽略了。對象

解決glitch
例1:blog

const original$ = Observable.timer(0, 1000);
const source1$ = original$.map(x => x+'a');
const source2$ = original$.map(x => x+'b');
const result$ = source1$.pipe(withLatestFrom(source2$);)
result$.subscribe(
console.log,
null,
() => console.log('complete')
);

例2:事件

const event$ = Rx.Observable.fromEvent(document.body, 'click');
const x$ = event$.map(e => e.x);
const y$ = event$.map(e => e.y);
const result$ = x$.pipe(combineLatest(y$, (x, y) => `x: ${x}, y: ${y}`)).subscribe(
    (location) => {
        console.log('#render', location);
        document.querySelector('#text').innerText = location;
    }
);
race :勝者通吃

race就是「競爭」,多個Observable對象在⼀起,看誰最早產⽣數據,不過這種競爭是⼗分殘酷的,勝者通吃,敗者則失去全部機會。
簡而言之,經過race合併多個observable時,最早吐出數據那個observable會成爲數據源,其它的observable會被淘汰。ip

startWith

startWith只有實例操做符的形式,其功能是讓⼀個Observable對象在被訂閱的時候,老是先吐出指定的若⼲個數據。下⾯是使⽤startWith的⽰例代碼element

of(0,1,2).pipe(startWith('a','b')).subscribe(console.log);
//先依次吐出 a b 0 1 2
forkJoin
  • forkJoin能夠接受多個Observable對象做爲參數,forkJoin產⽣的Observable對象也頗有特色,它只會產⽣⼀個數據,由於它會等待全部參數Observable對象的最後⼀個數據,也就是說,只有當全部Observable對象都完結,肯定不會有新的數據產⽣的時候,forkJoin就會把全部輸⼊Observable對象產⽣的最後⼀個數據合併成給下游惟⼀的數據。
  • forkJoin就是RxJS界的Promise.all,Promise.all等待全部輸⼊的Promise對象成功以後把結果合併,forkJoin等待全部輸⼊的Observable對象完結以後把最後⼀個數據合併。
  • 返回數組形式,數組中元素個數爲合併的observable的個數
    js forkJoin(interval(1000).pipe(take(3)),of(1,2,3),timer(2000,1000).pipe(take(3))).subscribe(console.log); // [2,3,2]js

高階Observable

簡言之:⾼階函數就是產⽣函數的函數;相似,所謂⾼階Observable,指的是產⽣的數據依然是Observable的Observable

1.concatAll

concatAll只有⼀個上游Observable對象,這個Observable對象預期是⼀個⾼階Observable對象,concatAll會對其中的內部Observable對象作concat的操做.

interval(1000).pipe(
    take(2),
    map(x=>interval(1500).pipe(take(2),map(x=> `${x}:x,y:${y}`))),
    concatAll()
).subscribe(console.log);
// 0:a,b:0
// 0:a,b:1
// 1:a,b:0
// 1:a,b:1

concat 實際運用

fromEvent(document.body,'mousedown').pipe(
      map(
        e=>fromEvent(document.body,'mousemove').pipe(map(e=>{return {x:e.clientX,y:e.clientY}}), takeUntil(fromEvent(document.body,'mouseup')))
      ),
      concatAll()
    ).subscribe(console.log);
mergeAll

mergeAll就是處理⾼階Observable的merge,只是全部的輸⼊Observable來⾃於上游產⽣的內部Observable對象.

interval(1000).pipe(
    take(2),
    map(x => Observable.interval(1500).map(y => x+':'+y).take(2)),
    mergeAll()
)


mergeAll只要
發現上游產⽣⼀個內部Observable就會⽴刻訂閱,並從中抽取收據,因此在上圖中,第⼆個內部Observable產⽣的數據1:0會出如今第⼀個內部Observable產⽣的數據0:1以前.

zipAll
interval(1000).pipe(
    take(2),
    map(x => Observable.interval(1500).map(y => x+':'+y).take(2)),
    zipAll()
)
//[ '0:0', '1:0' ]
//[ '0:1', '1:1' ]
//complete
combineAll

combineAll就是處理⾼階Observable的combineLatest,多是由於combine-LatestAll太長了,因此RxJS選擇了combineAll這個名字。

interval(1000).pipe(
    take(2),
    map(x => Observable.interval(1500).map(y => x+':'+y).take(2)),
    combeneAll()
)
//[ '0:0', '1:0' ]
//[ '0:1', '1:0' ]
//[ '0:1', '1:1' ]
//complete
switchAll
  • switch的含義就是「切換」,老是切換到最新的內部Observable對象獲取數據。每當switch的上游⾼階Observable產⽣⼀個內部Observable對象,switch都會⽴刻訂閱最新的內部Observable對象上,若是已經訂閱了以前的內部Observable對象,就會退訂那個過期的內部Observable對象,這個「⽤上新的,捨棄舊的」動做,就是切換。
  • 應用場景:也就是外層的數據產生快於內層的數據產生的速度形成數據積壓,需求又可以捨棄原來的舊的外層的數據不讓其舊的外層數據再傳遞到內層產生數據了。
    簡而言之,當外層新產生數據時,不管內部數據產生狀況如何都做廢,從新計算數據流
interval(1000).pipe(
    take(3),
    map(x => Observable.interval(1500).map(y => x+':'+y).take(2)),
    switchAll()
)
//1:0
//1:1
//complete


第⼀個Observable對象有機會產⽣數據0:0,可是在第⼆個數據0:1產⽣以前,第⼆個內部Observable對象產⽣,這時發⽣切換,第⼀個內部Observable就退場了。一樣,第⼆個內部Observable只有機會產⽣⼀個數據1:0,而後第三個內部Observable對象產⽣,以後沒有新的內部Observable對象產⽣,因此第三個Observable對象的兩個數據2:0和2:1都進⼊了下游。

exhaust
  • 在耗盡當前內部Observable的數據以前不會切換到下⼀個內部Observable對象
  • 一樣是鏈接⾼階Observable產⽣的內部Observable對象,可是exhaust的策略和switch相反,當內部Observable對象在時間上發⽣重疊時,情景就是前⼀個內部Observable尚未完結,⽽新的Observable又已經產⽣,到底應該選擇哪⼀個做爲數據源?switch選擇新產⽣的內部Observable對象,exhaust則選擇前⼀個內部Observable對象.
interval(1000).pipe(
    take(3),
    map(x => Observable.interval(700).map(y => x+':'+y).take(2)),
    exhaust()
)

相關文章
相關標籤/搜索