30 天精通 RxJS(17): Observable Operators - switch, mergeAll, concatAll

今天咱們要講三個 operators,這三個 operators 都是用來處理 Higher Order Observable。所謂的 Higher Order Observable 就是指一個 Observable 送出的元素仍是一個 Observable,就像是二維數組同樣,一個數組中的每一個元素都是數組。若是用泛型來表達就像是javascript

Observable<Observable<T>>複製代碼

一般咱們須要的是第二層 Observable 送出的元素,因此咱們但願能夠把二維的 Observable 改爲一維的,像是下面這樣java

Observable<Observable<T>> => Observable<T>複製代碼

其實想要作到這件事有三個方法 switch、mergeAll 和 concatAll,其中 concatAll 咱們在以前的文章已經稍微講過了,今天這篇文章會講解這三個 operators 各自的效果跟差別。數組

Operators

concatAll

咱們在講簡易拖拽的示例時就有講過這個 operator,concatAll 最重要的重點就是他會處理完前一個 observable 纔會在處理下一個 observable,讓咱們來看一個示例bash

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));

var example = source.concatAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// (點擊後)
// 0
// 1
// 2
// 3
// 4
// 5 ...複製代碼

JSBinui

上面這段代碼,當咱們點擊畫面時就會開始送出數值,若是用 Marble Diagram 表示以下spa

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \
                    \ ----0----1----2----3----4--...
                     ----0----1----2----3----4--...
                     concatAll()
example: ----------------0----1----2----3----4--..複製代碼

從 Marble Diagram 能夠看得出來,當咱們點擊一下 click 事件會被轉成一個 observable 而這個 observable 會每一秒送出一個遞增的數值,當咱們用 concatAll 以後會把二維的 observable 攤平成一維的 observable,但 concatAll 會一個一個處理,必定是等前一個 observable 完成(complete)纔會處理下一個 observable,由於如今送出 observable 是無限的永遠不會完成(complete),就致使他永遠不會處理第二個送出的 observable!code

咱們再看一個例子事件

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000).take(3));

var example = source.concatAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});複製代碼

如今咱們把送出的 observable 限制只取前三個元素,用 Marble Diagram 表示以下ip

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \                  \
                    \ ----0----1----2|   ----0----1----2|
                     ----0----1----2|
                     concatAll()
example: ----------------0----1----2----0----1----2--..複製代碼

這裏咱們把送出的 observable 變成有限的,只會送出三個元素,這時就能看得出來 concatAll 無論兩個 observable 送出的時間多麼相近,必定會先處理前一個 observable 再處理下一個。get

switch

switch 一樣能把二維的 observable 攤平成一維的,但他們在行爲上有很大的不一樣,咱們來看下面這個示例

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));

var example = source.switch();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});複製代碼

JSBin

用 Marble Diagram 表示以下

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \                  \----0----1--...
                    \ ----0----1----2----3----4--...
                     ----0----1----2----3----4--...
                     switch()
example: -----------------0----1----2--------0----1--...複製代碼

switch 最重要的就是他會在新的 observable 送出後直接處理新的 observable 無論前一個 observable 是否完成,每當有新的 observable 送出就會直接把舊的 observable 退訂(unsubscribe),永遠只處理最新的 observable!

因此在這上面的 Marble Diagram 能夠看得出來第一次送出的 observable 跟第二次送出的 observable 時間點太相近,致使第一個 observable 還來不及送出元素就直接被退訂了,當下一次送出 observable 就又會把前一次的 observable 退訂。

mergeAll

咱們以前講過 merge 他可讓多個 observable 同時送出元素,mergeAll 也是一樣的道理,它會把二維的 observable 轉成一維的,而且可以同時處理全部的 observable,讓咱們來看這個示例

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));

var example = source.mergeAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});複製代碼

上面這段代碼用 Marble Diagram 表示以下

click  : ---------c-c------------------c--.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \                  \----0----1--...
                    \ ----0----1----2----3----4--...
                     ----0----1----2----3----4--...
                     switch()
example: ----------------00---11---22---33---(04)4--...複製代碼

從 Marble Diagram 能夠看出來,全部的 observable 是並行(Parallel)處理的,也就是說 mergeAll 不會像 switch 同樣退訂(unsubscribe)原先的 observable 而是並行處理多個 observable。以咱們的示例來講,當咱們點擊越多下,最後送出的頻率就會越快。

另外 mergeAll 能夠傳入一個數值,這個數值表明他能夠同時處理的 observable 數量,咱們來看一個例子

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000).take(3));

var example = source.mergeAll(2);
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});複製代碼

這裏咱們送出的 observable 改爲取前三個,而且讓 mergeAll 最多隻能同時處理 2 個 observable,用 Marble Diagram 表示以下

click  : ---------c-c----------o----------.. 
        map(e => Rx.Observable.interval(1000))
source : ---------o-o----------c----------..
                   \ \          \----0----1----2|     
                    \ ----0----1----2|  
                     ----0----1----2|
                     mergeAll(2)
example: ----------------00---11---22---0----1----2--..複製代碼

當 mergeAll 傳入參數後,就會等處理中的其中一個 observable 完成,再去處理下一個。以咱們的例子來講,前面兩個 observabel 能夠被並行處理,但第三個 observable 必須等到第一個 observable 結束後,纔會開始。

咱們能夠利用這個參數來決定要同時處理幾個 observable,若是咱們傳入 1 其行爲就會跟 concatAll 是如出一轍的,這點在原始碼能夠看到他們是徹底相同的。

今日小結

今天介紹了三個能夠處理 High Order Observable 的方法,並講解了三個方法的差別,不知道讀者有沒有收穫呢? 若是有任何問題歡迎在下方留言給我,感謝!

相關文章
相關標籤/搜索