以上操做符在版本6中已經只存在靜態方法,不能在pipe中使用。數組
import {concat,merge,zip,combineLatest}
依次將多個observable首尾合併,必須在第一個obs1的數據所有完成,才能進行第二個obs2,若是第一個爲interval(1000),那麼obs2 和obs3 也就永遠沒有機會獲得輸出。異步
concat(of(1,2,3),interval).subscribe(console.log); // 1 2 3 0 1 2 3 ...
merge會第⼀時間訂閱全部的上游Observable,而後對上游的數據採起「先到先得」的策略,任何⼀個Observable只要有數據推下來,就⽴刻轉給下游Observable對象。函數
merge(interval(1000),of(1,2,3)).subscribe(console.log); merge(of(1,2,3),interval(1000)).subscribe(console.log); //兩種狀況的輸出結果同樣,都是先一次性輸出1 2 3 再間隔一秒依次輸出0 1 2 ...
const source1$ = Observable.timer(0, 1000).map(x => x+'A'); const source2$ = Observable.timer(500, 1000).map(x => x+'B'); merge(source1$, source2$).subscribe( console.log, null, () => console.log('complete') ); //0A //0B //1A //1B //2A //2B
merge 的應用場景:咱們知道fromEvent能夠從⽹頁中獲取事件,只惋惜,fromEvent⼀次只能從⼀個DOM元素獲取⼀種類型的事件。⽐如,咱們關⼼某個元素的click事件,同時也關⼼這個元素上的touchend事件,由於在移動設備上touchend事件出現得⽐click更早,這兩個事件的處理是⼀模⼀樣的,可是fromEvent不能同時得到兩個事件的數據流,這時候就要藉助merge的⼒量了,代碼以下:code
const click$ = Rx.Observable.fromEvent(element, 'click'); const touchend$ = Rx.Observable.fromEvent(element, 'touchend'); merge(click$, touchend$).subscribe(eventHandler)
一對一的合併server
const source1$ = Observable.of(1, 2, 3); const source2$ = Observable.of('a', 'b', 'c'); zip(source1$, source2$).subscribe( console.log, null, () => console.log('complete') ); //[ 1, 'a' ] //[ 2, 'b' ] //[ 3, 'c' ] //complete
const source1$ = Observable.timer(500, 1000); const source2$ = Observable.timer(1000, 1000); combineLatest(source1$,source2$).subscribe( console.log, null, () => console.log('complete') ); //[ 0, 0 ] //[ 1, 0 ] //[ 1, 1 ] //[ 2, 1 ] //[ 2, 2 ] //[ 3, 2 ]
const source1$ = Observable.timer(0, 2000).map(x => 100 * x); const source2$ = Observable.timer(500, 1000); source1$.pipe( withLatestFrom(source2$, (a,b)=> a+b); ).subscribe( console.log, null, () => console.log('complete') );
source1$產⽣第⼀個數據0時,withLatestFrom的另⼀個輸⼊Observable對象source2$尚未產⽣數據,因此這個0也被忽略了。對象
解決glitch
例1:blog
const original$ = Observable.timer(0, 1000); const source1$ = original$.map(x => x+'a'); const source2$ = original$.map(x => x+'b'); const result$ = source1$.pipe(withLatestFrom(source2$);) result$.subscribe( console.log, null, () => console.log('complete') );
例2:事件
const event$ = Rx.Observable.fromEvent(document.body, 'click'); const x$ = event$.map(e => e.x); const y$ = event$.map(e => e.y); const result$ = x$.pipe(combineLatest(y$, (x, y) => `x: ${x}, y: ${y}`)).subscribe( (location) => { console.log('#render', location); document.querySelector('#text').innerText = location; } );
race就是「競爭」,多個Observable對象在⼀起,看誰最早產⽣數據,不過這種競爭是⼗分殘酷的,勝者通吃,敗者則失去全部機會。
簡而言之,經過race合併多個observable時,最早吐出數據那個observable會成爲數據源,其它的observable會被淘汰。ip
startWith只有實例操做符的形式,其功能是讓⼀個Observable對象在被訂閱的時候,老是先吐出指定的若⼲個數據。下⾯是使⽤startWith的⽰例代碼element
of(0,1,2).pipe(startWith('a','b')).subscribe(console.log); //先依次吐出 a b 0 1 2
js forkJoin(interval(1000).pipe(take(3)),of(1,2,3),timer(2000,1000).pipe(take(3))).subscribe(console.log); // [2,3,2]
js簡言之:⾼階函數就是產⽣函數的函數;相似,所謂⾼階Observable,指的是產⽣的數據依然是Observable的Observable
concatAll只有⼀個上游Observable對象,這個Observable對象預期是⼀個⾼階Observable對象,concatAll會對其中的內部Observable對象作concat的操做.
interval(1000).pipe( take(2), map(x=>interval(1500).pipe(take(2),map(x=> `${x}:x,y:${y}`))), concatAll() ).subscribe(console.log); // 0:a,b:0 // 0:a,b:1 // 1:a,b:0 // 1:a,b:1
concat 實際運用
fromEvent(document.body,'mousedown').pipe( map( e=>fromEvent(document.body,'mousemove').pipe(map(e=>{return {x:e.clientX,y:e.clientY}}), takeUntil(fromEvent(document.body,'mouseup'))) ), concatAll() ).subscribe(console.log);
mergeAll就是處理⾼階Observable的merge,只是全部的輸⼊Observable來⾃於上游產⽣的內部Observable對象.
interval(1000).pipe( take(2), map(x => Observable.interval(1500).map(y => x+':'+y).take(2)), mergeAll() )
mergeAll只要
發現上游產⽣⼀個內部Observable就會⽴刻訂閱,並從中抽取收據,因此在上圖中,第⼆個內部Observable產⽣的數據1:0會出如今第⼀個內部Observable產⽣的數據0:1以前.
interval(1000).pipe( take(2), map(x => Observable.interval(1500).map(y => x+':'+y).take(2)), zipAll() ) //[ '0:0', '1:0' ] //[ '0:1', '1:1' ] //complete
combineAll就是處理⾼階Observable的combineLatest,多是由於combine-LatestAll太長了,因此RxJS選擇了combineAll這個名字。
interval(1000).pipe( take(2), map(x => Observable.interval(1500).map(y => x+':'+y).take(2)), combeneAll() ) //[ '0:0', '1:0' ] //[ '0:1', '1:0' ] //[ '0:1', '1:1' ] //complete
interval(1000).pipe( take(3), map(x => Observable.interval(1500).map(y => x+':'+y).take(2)), switchAll() ) //1:0 //1:1 //complete
第⼀個Observable對象有機會產⽣數據0:0,可是在第⼆個數據0:1產⽣以前,第⼆個內部Observable對象產⽣,這時發⽣切換,第⼀個內部Observable就退場了。一樣,第⼆個內部Observable只有機會產⽣⼀個數據1:0,而後第三個內部Observable對象產⽣,以後沒有新的內部Observable對象產⽣,因此第三個Observable對象的兩個數據2:0和2:1都進⼊了下游。
interval(1000).pipe( take(3), map(x => Observable.interval(700).map(y => x+':'+y).take(2)), exhaust() )