更新內容 - Multicasting Operators 更新時間 - 2017-03-30javascript
咱們把描繪 Observable 的圖稱爲 Marble diagrams,咱們用 -
來表示一小段時間,這些 -
串起來就表示一個 Observable 對象。html
----------------
X 表示有錯誤發生java
---------------X
|
表示 Observable 結束react
----------------|
在時間序列中,咱們可能會持續發出值,若是值是數字則直接用阿拉伯數字表示,其它數據類型使用相近的英文符號表示,接下來咱們看一下 interval
操做符對應的 marble 圖:es6
var source = Rx.Observable.interval(1000);
source 對應的 marble 圖:web
-----0-----1-----2-----3--...
當 observable 同步發送值時,如使用 of 操做符建立以下 Observable 對象:typescript
var source = Rx.Observable.of(1,2,3,4);
source 對應的 marble 圖:shell
(1234)|
小括號表示同步發生。緩存
另外 marble 圖也可以表示 operator 的先後轉換關係,例如:併發
var source = Rx.Observable.interval(1000); var newest = source.map(x => x + 1);
對應的 marble 圖以下:
source: -----0-----1-----2-----3--... map(x => x + 1) newest: -----1-----2-----3-----4--...
經過 marble 圖,能夠幫助咱們更好地理解 operator。
詳細的信息能夠參考 - RxMarbles
public repeat(count: number): Observable
repeat 操做符做用:
重複 count 次,源 Observable 發出的值。
repeat 操做符示例:
var source = Rx.Observable.from(['a','b','c']) .zip(Rx.Observable.interval(500), (x,y) => x); var example = source.repeat(2); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----a----b----c| repeat(2) example: ----a----b----c----a----b----c|
以上代碼運行後,控制檯的輸出結果:
a b c a b c complete
public map(project: function(value: T, index: number): R, thisArg: any): Observable<R>
map 操做符做用:
對 Observable 對象發出的每一個值,使用指定的 project 函數,進行映射處理。
map 操做符示例:
var source = Rx.Observable.interval(1000); var newest = source.map(x => x + 2); newest.subscribe(console.log);
示例 marble 圖:
source: -----0-----1-----2-----3--... map(x => x + 2) newest: -----2-----3-----4-----5--...
以上代碼運行後,控制檯的輸出結果:
2 3 4 ...
public mapTo(value: any): Observable
mapTo 操做符做用:
對 Observable 對象發出的每一個值,映射成固定的值。
mapTo 操做符示例:
var source = Rx.Observable.interval(1000); var newest = source.mapTo(2); newest.subscribe(console.log);
示例 marble 圖:
source: -----0-----1-----2-----3--... mapTo(2) newest: -----2-----2-----2-----2--...
以上代碼運行後,控制檯的輸出結果:
2 2 2 ...
public scan(accumulator: function(acc: R, value: T, index: number): R, seed: T | R): Observable<R>
scan 操做符做用:
對 Observable 發出值,執行 accumulator 指定的運算,能夠簡單地認爲是 Observable 版本的 Array.prototype.reduce 。
scan 操做符示例:
var source = Rx.Observable.from('hello') .zip(Rx.Observable.interval(600), (x, y) => x); var example = source.scan((origin, next) => origin + next, ''); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----h----e----l----l----o| scan((origin, next) => origin + next, '') example: ----h----(he)----(hel)----(hell)----(hello)|
以上代碼運行後,控制檯的輸出結果:
h he hel hell hello complete
(備註:scan 與 reduce 最大的差異就是 scan 最終返回的必定是一個 Observable 對象,而 reduce 的返回類型不是固定的)
public buffer(closingNotifier: Observable<any>): Observable<T[]>
buffer 操做符做用:
緩衝源 Observable 對象已發出的值,直到 closingNotifier 觸發後,才統一輸出緩存的元素。
buffer 操做符示例:
var source = Rx.Observable.interval(300); var source2 = Rx.Observable.interval(1000); var example = source.buffer(source2); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : --0--1--2--3--4--5--6--7.. source2: ---------0---------1--------... buffer(source2) example: ---------([0,1,2])---------([3,4,5])
以上代碼運行後,控制檯的輸出結果:
[0,1,2] [3,4,5] [6,7,8] ....
public bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler: Scheduler): Observable<T[]>
bufferTime 操做符做用:
設定源 Observable 對象已發出的值的緩衝時間。
bufferTime 操做符示例:
var source = Rx.Observable.interval(300); var example = source.bufferTime(1000); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : --0--1--2--3--4--5--6--7.. bufferTime(1000) example: ---------([0,1,2])---------([3,4,5])
以上代碼運行後,控制檯的輸出結果:
[0,1,2] [3,4,5] [6,7,8] ....
public bufferCount(bufferSize: number, startBufferEvery: number): Observable<T[]>
bufferCount 操做符做用:
緩衝源 Observable對象已發出的值,直到大小達到給定的最大 bufferSize 。
bufferCount 操做符示例:
var source = Rx.Observable.interval(300); var example = source.bufferCount(3); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : --0--1--2--3--4--5--6--7.. bufferCount(3) example: ---------([0,1,2])---------([3,4,5])
以上代碼運行後,控制檯的輸出結果:
[0,1,2] [3,4,5] [6,7,8] ....
public concatMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable
concatMap 操做符做用:
對每一個 Observable 對象發出的值,進行映射處理,並進行合併。該操做符也會先處理前一個 Observable 對象,在處理下一個 Observable 對象。
concatMap 操做符示例:
var source = Rx.Observable.fromEvent(document.body, 'click'); var example = source.concatMap( e => Rx.Observable.interval(100).take(3)); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : -----------c--c------------------... concatMap(c => Rx.Observable.interval(100).take(3)) example: -------------0-1-2-0-1-2---------...
以上代碼運行後,控制檯的輸出結果:
0 1 2 0 1 2
concatMap 其實就是 map 加上 concatAll 的簡化寫法。
public switchMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any): Observable
switchMap 操做符做用:
對源 Observable 對象發出的值,作映射處理。如有新的 Observable 對象出現,會在新的 Observable 對象發出新值後,退訂前一個未處理完的 Observable 對象。
switchMap 操做符示例:
var source = Rx.Observable.fromEvent(document.body, 'click'); var example = source.switchMap( e => Rx.Observable.interval(100).take(3)); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : -----------c--c-----------------... concatMap(c => Rx.Observable.interval(100).take(3)) example: -------------0--0-1-2-----------...
以上代碼運行後,控制檯的輸出結果:
0 0 1 2
public filter(predicate: function(value: T, index: number): boolean, thisArg: any): Observable
filter 操做符做用:
對 Observable 對象發出的每一個值,做爲參數調用指定的 predicate 函數,若該函數的返回值爲 true,則表示保留該項,若返回值爲 false,則捨棄該值。
filter 操做符示例:
var source = Rx.Observable.interval(1000); var newest = source.filter(x => x % 2 === 0); newest.subscribe(console.log);
示例 marble 圖:
source: -----0-----1-----2-----3-----4-... filter(x => x % 2 === 0) newest: -----0-----------2-----------4-...
以上代碼運行後,控制檯的輸出結果:
0 2 4 ...
public take(count: number): Observable<T>
take 操做符做用:
用於獲取 Observable 對象發出的前 n 項值,取完後就結束。
take 操做符示例:
var source = Rx.Observable.interval(1000); var example = source.take(3); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : -----0-----1-----2-----3--.. take(3) example: -----0-----1-----2|
以上代碼運行後,控制檯的輸出結果:
0 1 2 complete
public first(predicate: function(value: T, index: number, source: Observable<T>): boolean, resultSelector: function(value: T, index: number): R, defaultValue: R): Observable<T | R>
first 操做符做用:
用於獲取 Observable 對象發出的第一個元素,取完後就結束。
first 操做符示例:
var source = Rx.Observable.interval(1000); var example = source.first(); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : -----0-----1-----2-----3--.. first() example: -----0|
以上代碼運行後,控制檯的輸出結果:
0 complete
public takeUntil(notifier: Observable): Observable<T>
takeUntil 操做符做用:
當 takeUntil 傳入的 notifier 發出值時,源 Observable 對象就會直接進入完成狀態。
takeUntil 操做符示例:
var source = Rx.Observable.interval(1000); var click = Rx.Observable.fromEvent(document.body, 'click'); var example = source.takeUntil(click); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : -----0-----1-----2------3-- click : ----------------------c---- takeUntil(click) example: -----0-----1-----2----|
以上代碼運行後,控制檯的輸出結果:
0 1 2 complete
public skip(count: Number): Observable
skip 操做符做用:
跳過源 Observable 對象前 count 項,並返回新的 Observable 對象。
skip 操做符示例:
var source = Rx.Observable.interval(1000); var example = source.skip(3); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----0----1----2----3----4----5--.... skip(3) example: -------------------3----4----5--...
以上代碼運行後,控制檯的輸出結果:
3 4 5 ...
public takeLast(count: number): Observable<T>
takeLast 操做符做用:
獲取源 Observable 對象發出的,後面 count 項的值。
takeLast 操做符示例:
var source = Rx.Observable.interval(1000).take(6); var example = source.takeLast(2); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----0----1----2----3----4----5| takeLast(2) example: ------------------------------(45)|
以上代碼運行後,控制檯的輸出結果:
4 5 complete
public last(predicate: function): Observable
last 操做符做用:
獲取源 Observable 對象發出的最後一項的值。
last 操做符示例:
var source = Rx.Observable.interval(1000).take(6); var example = source.last(); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----0----1----2----3----4----5| last() example: ------------------------------(5)|
以上代碼運行後,控制檯的輸出結果:
5 complete
public debounceTime(dueTime: number, scheduler: Scheduler): Observable
debounceTime 操做符做用:
在設定的時間跨度內,若源 Observable 對象沒有再發出新值,則返回最近一次發出的值。
debounceTime 操做符示例:
var source = Rx.Observable.interval(300).take(5); var example = source.debounceTime(1000); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : --0--1--2--3--4| debounceTime(1000) example: --------------4|
以上代碼運行後,控制檯的輸出結果:
4 complete
debounceTime 的工做方式是每次收到元素時,它會先把元素緩存住並等待給定的時間,若是等待時間內沒有收到新的元素,則返回最新的緩存值。若是等待時間內,又收到新的元素,則會替換以前緩存的元素,並從新開始計時。
public throttleTime(duration: number, scheduler: Scheduler): Observable<T>
throttleTime 操做符做用:
從源 Observable 對象發出第一個值開始,忽略等待時間內發出的值,等待時間事後再發出新值。與 debounceTime 不一樣的是,throttleTime 一開始就會發出值,在等待時間內不會發出任何值,等待時間事後又會發出新的值。
throttleTime 示例:
var source = Rx.Observable.interval(300).take(5); var example = source.throttleTime(1000); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : --0--1--2--3--4| throttleTime(1000) example: --0------------4|
以上代碼運行後,控制檯的輸出結果:
0 4 complete
throttle 比較像是控制行爲的最高頻率,也就是說若是咱們設定 1000 ms,那麼該事件最大頻率就是每秒觸發一次而不會過快。debounce 則比較像是必須等待的時間,要等必定的時間過了纔會收到元素。
public distinct(keySelector: function, flushes: Observable): Observable
distinct 操做符的做用:
過濾源 Observable 發出的值,確保不會發出重複出現的值。
distinct 操做符示例:
var source = Rx.Observable.from(['a', 'b', 'c', 'a', 'b']) .zip(Rx.Observable.interval(300), (x, y) => x); var example = source.distinct() example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : --a--b--c--a--b| distinct() example: --a--b--c------|
以上代碼運行後,控制檯的輸出結果:
a b c complete
distinct 內部會建立一個 Set 集合,當接收到元素時,會判斷 Set 集合中,是否已存在相同的值,若是已存在的話,就不會發出值。若不存在的話,會把值存入到 Set 集合中併發出該值。因此儘可能不要直接把 distinct 操做符應用在無限的 Observable 對象中,這樣會致使 Set 集合愈來愈大。針對這種場景,你們能夠設置 distinct 的第二個參數 (清除已保存的數據),或使用 distinctUntilChanged。
public distinctUntilChanged(compare: function): Observable
distinctUntilChanged 操做符做用:
過濾源 Observable 發出的值,若當前發出的值與前一次值不一致,則發出該值。
distinctUntilChanged 操做符示例:
var source = Rx.Observable.from(['a', 'b', 'c', 'c', 'b']) .zip(Rx.Observable.interval(300), (x, y) => x); var example = source.distinctUntilChanged() example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : --a--b--c--c--b| distinctUntilChanged() example: --a--b--c-----b|
以上代碼運行後,控制檯的輸出結果:
a b c b complete
distinctUntilChanged 跟 distinct 同樣會把相同的元素過濾掉,但 distinctUntilChanged 只會跟最後一次送出的元素比較,不會每一個比較。
public concat(other: ObservableInput, scheduler: Scheduler): Observable
concat 操做符做用:
把多個 Observable 對象合併爲一個 Observable 對象,Observable 對象會依次執行,即需等前一個 Observable 對象完成後,纔會繼續訂閱下一個。
concat 操做符示例:
var source = Rx.Observable.interval(1000).take(3); var source2 = Rx.Observable.of(3) var source3 = Rx.Observable.of(4,5,6) var example = source.concat(source2, source3); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----0----1----2| source2: (3)| source3: (456)| concat() example: ----0----1----2(3456)|
以上代碼運行後,控制檯的輸出結果:
0 # source 1 # source 2 # source 3 # source2 4 # source3 5 # source3 6 # source3 complete # example
public concatAll(): Observable
concatAll 操做符做用:
合併多個 Observable 對象,並在上一個 Observable 對象完成後訂閱下一個 Observable 對象。
concatAll 操做符示例:
var obs1 = Rx.Observable.interval(1000).take(5); var obs2 = Rx.Observable.interval(500).take(2); var obs3 = Rx.Observable.interval(2000).take(1); var source = Rx.Observable.of(obs1, obs2, obs3); var example = source.concatAll(); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : (o1 o2 o3)| \ \ \ --0--1--2--3--4| -0-1| ----0| concatAll() example: --0--1--2--3--4-0-1----0|
以上代碼運行後,控制檯的輸出結果:
0 # o1 1 # o1 2 # o1 3 # o1 4 # o1 0 # o2 1 # o2 0 # o3 complete # o3
public startWith(values: ...T, scheduler: Scheduler): Observable
startWith 操做符做用:
在開始發出源 Observable 數據以前發出已設置的參數值,並返回新的 Observable 對象。
startWith 操做符示例:
var source = Rx.Observable.interval(1000); var example = source.startWith(0); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----0----1----2----3--... startWith(0) example: (0)----0----1----2----3--...
以上代碼運行後,控制檯的輸出結果:
0 0 1 2 ...
(備註:startWith 的值一開始是同步發出的,該操做符經常使用於保存程序的初始狀態)
public merge(other: ObservableInput, concurrent: number, scheduler: Scheduler): Observable
merge 操做符做用:
合併 Observable 對象,並按給定的時序發出對應值。
merge 操做符示例:
var source = Rx.Observable.interval(500).take(3); var source2 = Rx.Observable.interval(300).take(6); var example = source.merge(source2); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----0----1----2| source2: --0--1--2--3--4--5| merge() example: --0-01--21-3--(24)--5|
以上代碼運行後,控制檯的輸出結果:
0 # source2 0 # source 1 # source2 2 # source2 1 # source 3 # source2 2 # source 4 # source2 5 # source2 complete
(備註:注意與 concat 操做符的區別,concat 會在前一個 Observable 對象執行完後,再訂閱下一個 Observable 對象)
public mergeAll(concurrent: number): Observable
mergeAll 操做符做用:
將高階 Observable 對象轉換爲一階Observable 對象,並同時處理全部的 Observable 對象。
mergeAll 操做符示例:
var click = Rx.Observable.fromEvent(document.body, 'click'); var source = click.map(e => Rx.Observable.interval(1000)); var example = source.mergeAll(); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
click : ---------c-c------------------c--.. map(e => Rx.Observable.interval(1000)) source : ---------o-o------------------o--.. \ \ \----0----1--... \ ----0----1----2----3----4--... ----0----1----2----3----4--... mergeAll() example: ----------------00---11---22---33---(04)4--...
以上代碼運行後,控制檯的輸出結果:
00 11 22 33 04 4
mergeAll 不會像 switch 那樣退訂原有的 Observable 對象,而是會並行處理多個 Observable 對象。
public combineLatest(other: ObservableInput, project: function): Observable
combineLatest 操做符做用:
用於合併輸入的 Observable 對象,當源 Observable 對象和 other Observable 對象都發出值後,纔會調用 project 函數。
combineLatest 操做符示例:
var source = Rx.Observable.interval(500).take(3); var newest = Rx.Observable.interval(300).take(6); var example = source.combineLatest(newest, (x, y) => x + y); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----0----1----2| newest : --0--1--2--3--4--5| combineLatest(newest, (x, y) => x + y); example: ----01--23-4--(56)--7|
以上代碼運行後,控制檯的輸出結果:
0 1 2 3 4 5 6 7 complete
combineLatest 示例執行過程 (project -> (x, y) => x + y):
newest 發出 0
,但此時 source 並未發出任何值,因此不會調用 project 函數
source 發出 0
,此時 newest 最後一次發出的值爲 0 ,調用 project 函數,返回值爲 0
newest 發出 1
,此時 source 最後一次發出的值爲 0,調用 project 函數,返回值爲 1
newest 發出 2
,此時 source 最後一次發出的值爲 0,調用 project 函數,返回值爲 2
source 發出 1
,此時 newest 最後一次發出的值爲 2 ,調用 project 函數,返回值爲 3
newest 發出 3
,此時 source 最後一次發出的值爲 1,調用 project 函數,返回值爲 4
source 發出 2
,此時 newest 最後一次發出的值爲 3 ,調用 project 函數,返回值爲 5
newest 發出 4
,此時 source 最後一次發出的值爲 2,調用 project 函數,返回值爲 6
newest 發出 5
,此時 source 最後一次發出的值爲 2,調用 project 函數,返回值爲 7
newest 和 source 都結束了,因此 example 也結束了。
public static zip(observables: *,project: Function): Observable<R>
zip 操做符做用:
根據每一個輸入 Observable 對象的輸出順序,產生一個新的 Observable 對象。
zip 操做符示例:
var source = Rx.Observable.interval(500).take(3); var newest = Rx.Observable.interval(300).take(6); var example = source.zip(newest, (x, y) => x + y); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----0----1----2| newest : --0--1--2--3--4--5| zip(newest, (x, y) => x + y) example: ----0----2----4|
以上代碼運行後,控制檯的輸出結果:
0 2 4 complete
zip 示例執行過程 (project -> (x, y) => x + y):
newest 發出第一個值 0
,此時 source 並未發出任何值,因此不會調用 project 函數
source 發出第一個值 0
,此時 newest 以前發出的第一個值爲 0,調用 project 函數,返回值爲 0
newest 發出第二個值 1
,此時 source 並未發出第二個值,因此不會調用 project 函數
newest 發出第三個值 2
,此時 source 並未發出第三個值,因此不會調用 project 函數
source 發出第二個值 1
,此時 newest 以前發出的第二個值爲 1,調用 project 函數,返回值爲 2
newest 發出第四個值 3
,此時 source 並未發出第四個值,因此不會調用 project 函數
source 發出第三個值 2
,此時 newest 以前發出的第三個值爲 2,調用 project 函數,返回值爲 4
source 對象結束,example 對象也同時結束,由於 source 對象與 newest 對象不會再有相同次序的值
public withLatestFrom(other: ObservableInput, project: Function): Observable
withLatestFrom 操做符做用:
當源 Observable 發出新值的時候,根據 project 函數,合併 other Observable 對象此前發出的最新值。
withLatestFrom 操做符示例:
var main = Rx.Observable.from('hello').zip(Rx.Observable.interval(500), (x, y) => x); var some = Rx.Observable.from([0,1,0,0,0,1]).zip(Rx.Observable.interval(300), (x, y) => x); var example = main.withLatestFrom(some, (x, y) => { return y === 1 ? x.toUpperCase() : x; }); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
main : ----h----e----l----l----o| some : --0--1--0--0--0--1| withLatestFrom(some, (x, y) => y === 1 ? x.toUpperCase() : x); example: ----h----e----l----L----O|
以上代碼運行後,控制檯的輸出結果:
h e l L O complete
withLatestFrom 示例執行過程 (project -> (x, y) => y === 1 ? x.toUpperCase() : x) ):
main 發出 h
,此時 some 上一次發出的值爲 0,調用 project 函數,返回值爲 h
main 發出 e
,此時 some 上一次發出的值爲 0,調用 project 函數,返回值爲 e
main 發出 l
,此時 some 上一次發出的值爲 0,調用 project 函數,返回值爲 l
main 發出 l
,此時 some 上一次發出的值爲 1,調用 project 函數,返回值爲 L
main 發出 o
,此時 some 上一次發出的值爲 1,調用 project 函數,返回值爲 O
public switch(): Observable<T>
switch 操做符做用:
切換爲最新的 Observable 數據源,並退訂前一個 Observable 數據源。
switch 操做符示例:
var click = Rx.Observable.fromEvent(document.body, 'click'); var source = click.map(e => Rx.Observable.interval(1000)); var example = source.switch(); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
click : ---------c-c------------------c--.. map(e => Rx.Observable.interval(1000)) source : ---------o-o------------------o--.. \ \ \----0----1--... \ ----0----1----2----3----4--... ----0----1----2----3----4--... switch() example: -----------------0----1----2--------0----1--...
以上代碼運行後,控制檯的輸出結果:
0 1 2 0 1 ...
從 switch 操做符示例的 marble 圖,能夠看得出來第一次點擊事件與第二次點擊事件時間點太靠近了,致使第一個 Observable 還來不及發出值就直接被退訂了,當每次點擊後建立新的 Observable 對象,就會自動退訂前一次建立的 Observable 對象。
switch 操做符會在新的 Observable 對象建立完後,直接使用新的 Observable 對象,並會自動退訂以前舊的 Observable 對象。
public delay(delay: number | Date, scheduler: Scheduler): Observable
delay 操做符做用:
延遲源 Observable 對象,發出第一個元素的時間點。
delay 操做符使用示例:
var source = Rx.Observable.interval(300).take(5); var example = source.delay(500); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : --0--1--2--3--4| delay(500) example: -------0--1--2--3--4|
以上代碼運行後,控制檯的輸出結果:
0 # 500ms後發出 1 2 3 4 complete
public delayWhen(delayDurationSelector: function(value: T): Observable, subscriptionDelay: Observable): Observable
delayWhen 操做符做用:
delayWhen 的做用跟 delay 操做符相似,最大的區別是 delayWhen 會影響每一個元素,並且調用的時候須要設置 delayDurationSelector 函數,該函數的返回值是 Observable 對象。
delayWhen 操做符示例:
var source = Rx.Observable.interval(300).take(5); var example = source .delayWhen( x => Rx.Observable.interval(100 * x).take(1)); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : --0--1--2--3--4| .delayWhen(x => Rx.Observable.interval(100 * x).take(1)); example: --0---1----2-----3------4|
以上代碼運行後,控制檯的輸出結果:
0 1 2 3 4 complete
public multicast(subjectOrSubjectFactory: Function | Subject, selector: Function): Observable
multicast 操做符做用:
用於掛載 Subject 對象,並返回一個可連接 (connectable) 的 Observable 對象。
multicast 操做符示例:
var source = Rx.Observable.interval(1000) .take(3) .multicast(new Rx.Subject()); var observerA = { next: value => console.log('A next: ' + value), error: error => console.log('A error: ' + error), complete: () => console.log('A complete!') }; var observerB = { next: value => console.log('B next: ' + value), error: error => console.log('B error: ' + error), complete: () => console.log('B complete!') }; source.subscribe(observerA); // subject.subscribe(observerA) source.connect(); // source.subscribe(subject) setTimeout(() => { source.subscribe(observerB); // subject.subscribe(observerA) }, 1000);
以上代碼運行後,控制檯的輸出結果:
A next: 0 A next: 1 B next: 1 A next: 2 B next: 2 A complete! B complete!
上面示例中,咱們經過 multicast
掛載 Subject 對象以後返回了 source 對象,該對象經過 subscribe
添加的觀察者,都是添加到 Subject 對象內部的觀察者列表中。此外當調用 source 對象的 connect()
方法後纔會真正的訂閱 source 對象,若是沒有執行 connect()
,source 不會真正執行。若是要真正退訂觀察者,應該使用如下方式:
var realSubscription = source.connect(); ... realSubscription.unsubscribe();
refCount 必須搭配 multicast
一塊兒使用,在調用 multicast
操做符後,接着調用 refCount()
。這樣只要有訂閱就會自動進行 connect (連接) 操做。具體示例以下:
var source = Rx.Observable.interval(1000) .do(x => console.log('send: ' + x)) .multicast(new Rx.Subject()) .refCount(); var observerA = { next: value => console.log('A next: ' + value), error: error => console.log('A error: ' + error), complete: () => console.log('A complete!') }; var observerB = { next: value => console.log('B next: ' + value), error: error => console.log('B error: ' + error), complete: () => console.log('B complete!') }; var subscriptionA = source.subscribe(observerA); // 訂閱數 0 => 1 var subscriptionB; setTimeout(() => { subscriptionB = source.subscribe(observerB); // 訂閱數 1 => 2 }, 1000);
上面示例中,當 source 對象被觀察者 A 訂閱時,就會當即執行併發送值,咱們就不須要再額外執行 connect 操做。一樣只要訂閱數變成 0,就會自動中止發送。具體示例以下:
var source = Rx.Observable.interval(1000) .do(x => console.log('send: ' + x)) .multicast(new Rx.Subject()) .refCount(); var observerA = { next: value => console.log('A next: ' + value), error: error => console.log('A error: ' + error), complete: () => console.log('A complete!') }; var observerB = { next: value => console.log('B next: ' + value), error: error => console.log('B error: ' + error), complete: () => console.log('B complete!') } var subscriptionA = source.subscribe(observerA); // 訂閱數 0 => 1 var subscriptionB; setTimeout(() => { subscriptionB = source.subscribe(observerB); // 訂閱數 1 => 2 }, 1000); setTimeout(() => { subscriptionA.unsubscribe(); // 訂閱數 2 => 1 subscriptionB.unsubscribe(); // 訂閱數 1 => 0,source 中止發送元素 }, 5000);
以上代碼運行後,控制檯的輸出結果:
send: 0 A next: 0 send: 1 A next: 1 B next: 1 send: 2 A next: 2 B next: 2 send: 3 A next: 3 B next: 3 send: 4 A next: 4 B next: 4
public publish(selector: Function): *
publish 操做符做用:
用於掛載 Subject 對象,並返回一個可連接 (connectable) 的 Observable 對象。即 publish 操做符與 multicast(new Rx.Subject())
是等價的。
var source = Rx.Observable.interval(1000) .publish() .refCount(); var source = Rx.Observable.interval(1000) .multicast(new Rx.Subject()) .refCount();
var source = Rx.Observable.interval(1000) .publishReplay(1) .refCount(); var source = Rx.Observable.interval(1000) .multicast(new Rx.ReplaySubject(1)) .refCount();
var source = Rx.Observable.interval(1000) .publishBehavior(0) .refCount(); var source = Rx.Observable.interval(1000) .multicast(new Rx.BehaviorSubject(0)) .refCount();
var source = Rx.Observable.interval(1000) .publishLast() .refCount(); var source = Rx.Observable.interval(1000) .multicast(new Rx.AsyncSubject(1)) .refCount();
public share(): Observable<T>
share 操做符做用:
share 操做符是 publish + refCount 的簡寫。
share 操做符示例:
var source = Rx.Observable.interval(1000) .share(); var source = Rx.Observable.interval(1000) .publish() .refCount(); var source = Rx.Observable.interval(1000) .multicast(new Rx.Subject()) .refCount();
public catch(selector: function): Observable
catch 操做符做用:
用於捕獲異常,同時能夠返回一個 Observable 對象,用於發出新的值。
catch 操做符示例:
var source = Rx.Observable.from(['a','b','c','d',2]) .zip(Rx.Observable.interval(500), (x,y) => x); var example = source.map(x => x.toUpperCase()) .catch(error => Rx.Observable.of('h')); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----a----b----c----d----2| map(x => x.toUpperCase()) ----a----b----c----d----X| catch(error => Rx.Observable.of('h')) example: ----A----B----C----D----h|
以上代碼運行後,控制檯的輸出結果:
A B C D h complete
當錯誤發生時,咱們能夠返回一個 empty 的 Observable 對象來直接結束源 Observable 對象。
public retry(count: number): Observable
retry 操做符做用:
發生錯誤後,重試 count 次數
retry 操做符示例:
var source = Rx.Observable.from(['a','b','c','d',2]) .zip(Rx.Observable.interval(500), (x,y) => x); var example = source.map(x => x.toUpperCase()) .retry(1); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----a----b----c----d----2| map(x => x.toUpperCase()) ----a----b----c----d----X| retry(1) example: ----A----B----C----D--------A----B----C----D----X|
以上代碼運行後,控制檯的輸出結果:
A B C D A B C D Error: TypeError: x.toUpperCase is not a function
public retryWhen(notifier: function(errors: Observable): Observable): Observable
retryWhen 操做符做用:
捕獲異常 Observable 對象,進行異常處理後,可從新訂閱源 Observable 對象。
retryWhen 操做符示例:
var source = Rx.Observable.from(['a','b','c','d',2]) .zip(Rx.Observable.interval(500), (x,y) => x); var example = source.map(x => x.toUpperCase()) .retryWhen(errorObs => errorObs.delay(1000)); example.subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log('Error: ' + err); }, complete: () => { console.log('complete'); } });
示例 marble 圖:
source : ----a----b----c----d----2| map(x => x.toUpperCase()) ----a----b----c----d----X| retryWhen(errorObs => errorObs.delay(1000)) example: ----A----B----C----D-------------------A----B----C----D----...
以上代碼運行後,控制檯的輸出結果:
A B C D ...