RxJs(1) 常見Operators

數據操做類

一、map

Observable 的 map 方法使用上跟陣列的 map 是同樣的數組

var source = interval(1000);
var newest = source.map(x => x + 2);

newest.subscribe(console.log);
// 2
// 3
// 4
// 5..
複製代碼

二、mapTo

mapTo 能夠把傳進來的值改爲一個固定的值,以下異步

var source = interval(1000);
var newest = source.mapTo(2);

newest.subscribe(console.log);
// 2
// 2
// 2
// 2..
複製代碼

三、filter

filter 在使用上也跟陣列的相同,傳入一個 callback function,這個 function 會傳入每一個被送出的元素,而且回傳一個 boolean 值,若是爲 true 的話就會保留,若是爲 false 就會被濾掉,以下函數

var source = interval(1000);
var newest = source.filter(x => x % 2 === 0);

newest.subscribe(console.log);
// 0
// 2
// 4
// 6..

複製代碼

四、scan

數據累加計算ui

var main = from('hello').pipe(
  // 依次打印hello的每一個字母
  zip(interval(500), (x, y) => x)
)
const example = main.pipe(
  // scan第二個參數爲初始值
  scan(
  (origin,next)=> origin + next
  )
)
example.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});
// h
// he
// hel
// hell
// hello
// complete
複製代碼

五、repeat

不少時候若是Observable沒有發生錯誤,我門也但願能夠重複發起訂閱,這個時候就要用到repeat方法了,repeat用法和retry基本同樣。spa

var example = from(['a','b','c']).pipe(
  zip(interval(500), (x,y) => x),
  repeat()
)
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
複製代碼

六、groupBy

根據一個或多個列對結果集進行分組code

var people = [
  { name: "Anna", score: 100, subject: "English" },
  { name: "Anna", score: 90, subject: "Math" },
  { name: "Anna", score: 96, subject: "Chinese" },
  { name: "Jerry", score: 100, subject: "Math" },
  { name: "Jerry", score: 80, subject: "English" },
  { name: "Jerry", score: 90, subject: "Chinese" }
];
var example = from(people).pipe(
  groupBy(item => item.name),
  map(group =>
    group.pipe(
      reduce((acc, cur) => ({
        name: cur.name,
        score: acc.score + cur.score
      }))
    )
  ),
  mergeAll()
);
example.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});
// {name: "Anna", score: 286}
// {name: "Jerry", score: 270}
// complete
複製代碼

選擇器類

一、take

take 是一個很簡單的 operator,顧名思義就是取前幾個元素後就結束,以下對象

var source = interval(1000);
var example = source.take(3);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// complete
複製代碼

二、first

first 會取 observable 送出的第 1 個元素以後就直接結束,行爲跟 take(1) 一致。rxjs

var source = interval(1000);
var example = source.first();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

// 0
// complete
複製代碼

三、takeLast

除了能夠用 take 取前幾個以外,咱們也能夠倒過來取最後幾個,範例以下:事件

var source = interval(1000).take(6);
var example = source.takeLast(2);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 4
// 5
// complete
複製代碼

四、last

跟 take(1) 相同,咱們有一個 takeLast(1) 的簡化寫法,那就是 last() 用來取得最後一個元素。ip

var source = interval(1000).take(6);
var example = source.last();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 5
// complete
複製代碼

控制數據流額類

一、takeUntil

他能夠在某件事情發生時,讓一個 observable 直送出 完成(complete)訊息,範例以下

var source = interval(1000);
var click = fromEvent(document.body, 'click');
var example = source.takeUntil(click);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// complete (點擊body了
複製代碼

這裡咱們一開始先用 interval 創建一個 observable,這個 observable 每隔 1 秒會送出一個從 0 開始遞增的數值,接著咱們用 takeUntil,傳入另外一個 observable。 當 takeUntil 傳入的 observable 發送值時,本來的 observable 就會直接進入完成(complete)的狀態,而且發送完成訊息。也就是說上面這段程式碼的行爲,會先每 1 秒印出一個數字(從 0 遞增)直到咱們點擊 body 爲止,他纔會送出 complete 訊息。

二、concatAll

一、這裡能夠看到 source observable 內部每次發送的值也是 observable,這時咱們用 concatAll 就能夠把 source 攤平成 example

var click = fromEvent(document.body, 'click');
var source = click.map(e => of(1,2,3));

var example = source.concatAll();
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
複製代碼

二、concatAll 會處理 source 先發出來的 observable,必須等到這個 observable 結束,纔會再處理下一個 source 發出來的 observable,範例以下。

var obs1 = interval(1000).take(5);
var obs2 = interval(500).take(2);
var obs3 = interval(2000).take(1);

var source = of(obs1, obs2, obs3);

var example = source.concatAll();

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 0
// 1
// 0
// complete
複製代碼

三、skip

skip用於略過前幾個送出元素

var source = interval(1000);
var example = source.skip(3);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 3
// 4
// 5...
複製代碼

本來從 0 開始的就會變成從 3 開始,可是記得本來元素的等待時間仍然存在,也就是說此範例第一個取得的元素須要等 4 秒。

四、concat

concat 能夠把多個 observable 實例合併成一個,concat和concatAll效果是同樣的,區別在於 concat要傳遞參數,參數必須是Observable類型。範例以下:

var source = interval(1000).pipe(take(3));
var source2 = of(3)
var source3 = of(4,5,6)
var example = source.pipe(
  concat(source2, source3)
);
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// complete

複製代碼

五、startWith

startWith 能夠在 observable 的一開始塞要發送的元素,有點像 concat 但參數不是 observable 而是要發送的元素,使用範例以下

var source = interval(1000);
var example = source.startWith(0);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 0
// 1
// 2
// 3...
複製代碼

六、merge

merge使用方式和concat同樣,區別就是merge處理的Observable是異步執行的,在時間序上是同時在跑。

const source1 = interval(1000).pipe(take(3));
const source2 = of(3);
const source3 = of (4,5);
const example = source1.pipe(merge(source2,source3))
example.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});

// 3
// 4
// 5
// 0
// 1
// 2
// complete
複製代碼

七、delay和delayWhen

delay會將observable第一次發出訂閱的時間延遲,以下:

const example = interval(100).pipe(take(5),delay(1000));
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
複製代碼

delayWhen和delay不一樣,他的延遲時間由參數函數決定,而且會將主訂閱對象發出的值做爲參數:

var example = interval(300).pipe(
  take(5),
  delayWhen(
    x => empty().pipe(delay(100 * x * x)))
);
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
複製代碼

上邊的例子會將第一次source發出的值做爲參數傳給delayWhen的函數做爲參數,只有在參數對象中的Observable發出訂閱的值,主訂閱對象纔會繼續發出訂閱的值。

八、debounceTime

debounce 在每次收到元素,他會先把元素 cache 住並等待一段時間,若是這段時間沒有收到任何元素,則把元素送出;若是這段時間又收到新的元素,則會把本來 cache 住的元素釋放掉並從新計時,不斷反覆。

var example = interval(300).pipe(take(5),debounceTime(1000));
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
// 4
// complete
複製代碼

九、throttle

跟 debounce 的不一樣是 throttle 會先放送出元素,等到有元素被送出就會沉默一段時間,等到時間過了又會繼續發送元素,防止某個事件頻繁觸發,影響效率。

var example = interval(300).pipe(
  take(5),
  throttleTime(1000)
);
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
複製代碼

十、distinct和distinctUntilChanged

distinct會和已經拿到的數據比較過濾掉 重複的元素以下:

var example = from(['a', 'b', 'c', 'a', 'b']).pipe(
  zip(interval(300), (x, y) => x),
  distinct()
)
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
// a
// b
// c
// complete

複製代碼

distinct第一個參數是個函數,函數返回值就是distinct比較的值:

var source = from([{ value: 'a' }, { value: 'b' }, { value: 'c' }, { value: 'a' }, { value: 'c' }]).pipe(
  zip(interval(300), (x, y) => x)
)
var example = source.pipe(
  distinct((x) => {
    return x.value
  })
)
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
// {value: "a"}
// {value: "b"}
// {value: "c"}
// complete
複製代碼

可是distinct底層是建立一個set來輔助去重,若是數據很大,可能致使set過大,這個時候就須要設置distinct第二個參數來刷新set,第二個 參數是個observable到發起訂閱的時候就會清空set

var flushes = interval(1300);

var example = from(['a', 'b', 'c', 'a', 'c']).pipe(
  zip(interval(300), (x, y) => x),
  distinct(
    null,flushes
  )
)
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
// a
// b
// c
// c
// complete
複製代碼

distinctUntilChanged與distinct不一樣之處就是,distinctUntilChanged只會比較相鄰兩次輸入,例子以下:

var example = from(['a', 'b', 'c', 'c', 'b']).pipe(
  .zip(interval(300), (x, y) => x),
  distinctUntilChanged()
)

example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
// a
// b
// c
// b
// complete
複製代碼

協調多個Observable類

一、combineLatest

協調過個observable,參數Observable中有一個發生變化都會發起訂閱(前提是每一個observable都有值)。 一、當conbineLatest沒有傳入第二個參數,返回的訂閱值是個數組

// timerOne 在1秒時發出第一個值,而後每4秒發送一次
const timerOne = timer(1000, 4000);
// timerTwo 在2秒時發出第一個值,而後每4秒發送一次
const timerTwo = timer(2000, 4000);
// timerThree 在3秒時發出第一個值,而後每4秒發送一次
const timerThree = timer(3000, 4000);

// 當一個 timer 發出值時,將每一個 timer 的最新值做爲一個數組發出
const combined = combineLatest(timerOne, timerTwo, timerThree);

const subscribe = combined.subscribe(latestValues => {
  // 從 timerValOne、timerValTwo 和 timerValThree 中獲取最新發出的值
    const [timerValOne, timerValTwo, timerValThree] = latestValues;
  /* 示例: timerOne first tick: 'Timer One Latest: 1, Timer Two Latest:0, Timer Three Latest: 0 timerTwo first tick: 'Timer One Latest: 1, Timer Two Latest:1, Timer Three Latest: 0 timerThree first tick: 'Timer One Latest: 1, Timer Two Latest:1, Timer Three Latest: 1 */
    console.log(
      `Timer One Latest: ${timerValOne}, Timer Two Latest: ${timerValTwo}, Timer Three Latest: ${timerValThree}`
    );
  }
);
複製代碼

二、conbineLatest能夠傳入第二個參數,在發給Observabler進行數據處理。

var source = interval(500).take(3);
var newest = interval(300).take(6);

var example = source.combineLatest(newest, (x, y) => x + y);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// complete
複製代碼

對於上面的例子來說,由於咱們這裡合併了兩個 observable,因此後面的 callback function 就接收 x, y 兩個參數,x 會接收從 source 發送出來的值,y 會接收從 newest 發送出來的值。 最後一個重點就是不論是 source 仍是 newest 送出值來,只要另外一方曾有送出過值(有最後的值),就會執行 callback 並送出新的值,因此這段程式是這樣運行的: newest 送出了 0,但此時 source 並無送出過任何值,因此不會執行 callback。 source 送出了 0,此時 newest 最後一次送出的值爲 0,把這兩個數傳入 callback 獲得 0。 newest 送出了 1,此時 source 最後一次送出的值爲 0,把這兩個數傳入 callback 獲得 1。 newest 送出了 2,此時 source 最後一次送出的值爲 0,把這兩個數傳入 callback 獲得 2。 source 送出了 1,此時 newest 最後一次送出的值爲 2,把這兩個數傳入 callback 獲得 3。 newest 送出了 3,此時 source 最後一次送出的值爲 1,把這兩個數傳入 callback 獲得 4。 source 送出了 2,此時 newest 最後一次送出的值爲 3,把這兩個數傳入 callback 獲得 5。 source 結束,但 newest 還沒結束,因此 example 還不會結束。 newest 送出了 4,此時 source 最後一次送出的值爲 2,把這兩個數傳入 callback 獲得 6。 newest 送出了 5,此時 source 最後一次送出的值爲 2,把這兩個數傳入 callback 獲得 7。 newest 結束,由於 source 也結束了,因此 example 結束。

二、zip

每一個 observable 的相同 index 元素會一塊兒被傳入 callback

var source = interval(500).take(3);
var newest = interval(300).take(6);

var example = source.zip(newest, (x, y) => x + y);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 2
// 4
// complete
複製代碼

對於上面的例子來說,zip 會等到 source 跟 newest 都送出了第一個元素,再傳入 callback,下次則等到 source 跟 newest 都送出了第二個元素再一塊兒傳入 callback,因此運行的步驟以下: newest 送出了第一個值 0,但此時 source 並無送出第一個值,因此不會執行 callback。 source 送出了第一個值 0,newest 以前送出的第一個值爲 0,把這兩個數傳入 callback 獲得 0。 newest 送出了第二個值 1,但此時 source 並無送出第二個值,因此不會執行 callback。 newest 送出了第三個值 2,但此時 source 並無送出第三個值,因此不會執行 callback。 source 送出了第二個值 1,newest 以前送出的第二個值爲 1,把這兩個數傳入 callback 獲得 2。 newest 送出了第四個值 3,但此時 source 並無送出第四個值,因此不會執行 callback。 source 送出了第三個值 2,newest 以前送出的第三個值爲 2,把這兩個數傳入 callback 獲得 4。 source 結束 example 就直接結束,由於 source 跟 newest 不會再有對應順位的值。

三、withLatestFrom

withLatestFrom和combineLatest用法很相似,withLatestFrom主要特色是隻有在,主Observable發起值的時候纔會發動訂閱,不過若是副Observable沒有發送過值,也不會發起訂閱,例子以下:

var main = from('hello').pipe(
  zip(interval(500), (x, y) => x)
)
var some = from([0,1,0,0,0,1]).pipe(
  zip(interval(300), (x, y) => x)
)

var example = main.pipe(
withLatestFrom(some, (x, y) => {
    return y === 1 ? x.toUpperCase() : x;
})
)

example.subscribe({
  next: value => {
    console.log(value);
  },
  error: err => {
    console.log("Error: " + err);
  },
  complete: () => {
    console.log("complete");
  }
});
// h
// e
// l
// L
// O
// complete
複製代碼

withLatestFrom 會在 main 送出值的時候執行 callback,但請注意若是 main 送出值時 some 以前沒有送出過任何值 callback 仍然不會執行! 這裡咱們在 main 送出值時,去判斷 some 最後一次送的值是否是 1 來決定是否要切換大小寫,執行步驟以下: main 送出了 h,此時 some 上一次送出的值爲 0,把這兩個參數傳入 callback 獲得 h。 main 送出了 e,此時 some 上一次送出的值爲 0,把這兩個參數傳入 callback 獲得 e。 main 送出了 l,此時 some 上一次送出的值爲 0,把這兩個參數傳入 callback 獲得 l。 main 送出了 l,此時 some 上一次送出的值爲 1,把這兩個參數傳入 callback 獲得 L。 main 送出了 o,此時 some 上一次送出的值爲 1,把這兩個參數傳入 callback 獲得 O。

四、concatMap

concatMap就是map加上concatAll

var source = fromEvent(document.body, 'click');
var example = source.pipe(
  map(e => interval(1000).pipe(take(3))),
  concatAll()
)
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
複製代碼

轉化成concatMap就是以下這樣:

var source = fromEvent(document.body, 'click');
var example = source.pipe(
  concatMap(
    e => interval(100).pipe(take(3))
  )
)
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
複製代碼

五、mergeMap

mergeMap一樣是mergeAll加上map

var source = fromEvent(document.body, 'click');
var example = source.pipe(
  mergeMap(
    e => interval(100).take(3)
  )
)
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
複製代碼

六、switchMap

switch在rxjs6中只有switchMap switch對比merge和concat有個特色就是附屬observable發起訂閱後會馬上解綁主observable。

var source = fromEvent(document.body, 'click');
var example = source.pipe(
  .switchMap(
    e => interval(100).pipe(take(3))
  )
)
example.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});
複製代碼

改變數據流結構

一、concatAll
var obs1 = interval(1000).pipe(take(5));
var obs2 = interval(500).pipe(take(2));
var obs3 = interval(2000).pipe(take(1));
var source = of(obs1, obs2, obs3);
var example = source.pipe(concatAll());
example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 0
// 1
// 0
// complete
複製代碼

上邊的例子中會一個個按照順序執行obs一、obs二、obs3

mergeAll

mergeAll和concatAll用法基本一致,區別在於mergeAll是並行處理Observable,實例以下:

var click = fromEvent(document.body, 'click');
var source = click.pipe(
  map(_ => interval(1000)),
  mergeAll()
);

source.subscribe({
  next: (value) => { console.log(value); },
  error: (err) => { console.log('Error: ' + err); },
  complete: () => { console.log('complete'); }
});

複製代碼

mergeAll使用特殊的一點就是mergeAll能夠傳遞一個參數,這個參數表示最大並行處理數量,當處理的observable數量大於這個數字的時候,就須要等待在處理的observable有完成的纔會分配資源處理。mergeAll(1)的效果就和concatAll效果同樣。

相關文章
相關標籤/搜索