Rxjs中concat, concatAll, concatMap及concatMapTo的理解及應用

咱們知道在Rxjs中以concat開頭的操做符都是用於合併數據流的,它的特色就是將先後兩個數據流串起來,相似於Array.concat方法。 concat家族中有concat, concatAll, concatMap, concatMapTo等操做符,咱們來依次比較這些操做符的區別及應用。promise

concat

首先concat能夠簡單的將兩個數據流先後收尾相接的串起來,例如異步

例1-1函數

// RxJS v6+
import  {  of, concat }  from  'rxjs';
concat(
  of(1,  2,  3),
  // subscribed after first completes
  of(4,  5,  6),
  // subscribed after second completes
  of(7,  8,  9)
)
// log: 1, 2, 3, 4, 5, 6, 7, 8, 9
.subscribe(console.log);

代碼會按序輸出, 這是一個同步的例子,咱們再舉一個異步的例子spa

(stackblitz) 例1-2code

import { concat, merge, defer, from } from 'rxjs'; 

console.log('Start')
const promiseA$ = defer(()=>from(new Promise((reslove, reject)=>{
  setTimeout(()=>{
    reslove('PromiseA')
  }, 1000)
})))
const promiseB$ = defer(()=>from(new Promise((reslove, reject)=>{
  setTimeout(()=>{
    reslove('PromiseB')
  }, 1000)
})))

// 會依次間隔一秒打印Start, PromiseA, PromiseB
concat(promiseA$, promiseB$).subscribe(x => console.log(x));

在這個例子中,會每間隔一秒依次打印Start, PromiseA和PromiseB, 即concat會要等前一個promiseA$完成後再訂閱執行promiseB$,這也是concat的主要特色。blog

concatAll

接下來介紹concatAll, 咱們知道帶有All的是高階Observable操做符, concatAll就是concat的處理高階Oberservable的操做符,這個操做符有哪些特色呢?咱們改寫下上一個例子(例1-2)來看下concatAll是怎麼處理高階Oberservable的。接口

stackblitz 例 2-1rxjs

import { concat, defer, from, of } from 'rxjs'; 
import { tap, concatAll } from 'rxjs/operators';

console.log('Start')

const promiseA$ = defer(()=>from(new Promise((reslove, reject)=>{
  console.log('PromiseA is been Subscribed ')
  setTimeout(()=>{
    reslove('PromiseA')
  }, 1000)
})))

const promiseB$ = defer(()=>from(new Promise((reslove, reject)=>{
  console.log('PromiseB is been Subscribed ')
  setTimeout(()=>{
    reslove('PromiseB')
  }, 1000)
})))

// 會依次間隔一秒打印Start, PromiseA, PromiseB
of(promiseA$, promiseB$).pipe(tap(console.log),concatAll()).subscribe(x => console.log(x));

這個例子結果和例1-2是同樣的,但過程有些不一樣,爲了便於觀察,這裏加了不少console.log,咱們看下這段代碼的執行結果事件

  1. 首先 "Start", promiseA$, "PromiseA is been Subscribed", promiseB$ 被順序同時打印出來
  2. 一秒後, " PromiseA ", " PromiseB is been Subscribed " 也被順序同時打印出來
  3. 又過了一秒,"PromiseB"被最後打印

解釋下各個步驟ip

    • of先會把第一個數據流promiseA$拋給下游, 並交由tap打印, 而後promiseA$數據流做爲一個數據被concatAll接受並訂閱,因此"PromiseA is been Subscribed"被打印出來;
    • 這時因爲promiseA$是異步數據流,它尚未完結,所以它內部的數據暫時不會向下遊拋出;
    • 接着of向下遊拋出了下一個數據流promiseB$, 並被tap打印,而後被交給了concatAll, 這時concatAll中的第一個異步流還沒完成,所以它不會訂閱這個promiseB$這個數據流並把它暫存了起來。
  1. 一秒以後, concatAll中的第一個數據流promiseA$完成,向下遊subscribe拋出了數據並被打印, 這時concatAll還記得以前的promiseB$沒有被訂閱,所以訂閱了它並致使" PromiseB is been Subscribed "被打印
  2. 再一秒後,promiseB$完成

一句話總結, concatAll順序接受上游拋出的各個數據流做爲它的數據, 若前面的數據流不能同步的完結,它會暫存後續數據流,當前數據流完成後它纔會訂閱後一個暫存的數據流

使用map+concatAll在數據上銜接數據流

上面兩個例子(例1-2, 例2-1)都有一個問題,就是後一個數據流拿不到前一個數據流拋出的數據,這時由於concat方法接收的參數或者concatAll接收的數據都是各類數據流,這些數據流在數據傳遞上是並行關係,不是上下游關係,只是在執行順序上是先後關係, 因此這些數據流完成後會直接把數據拋給下游 。

如今咱們但願這些數據流有一個上下游關係,後面的數據流能接受前面的數據並和本身產生的數據進行再加工,再拋給下游,這時就須要使用到concatAll,來看下面這個使用concatAll後的改進版。

stackblitz 例3-1

import { concat, defer, from } from 'rxjs'; 
import { concatAll, map, tap } from 'rxjs/operators'; 

console.log('Start')

const promiseA$ = defer(()=>from(new Promise((reslove, reject)=>{
  setTimeout(()=>{
    reslove('PromiseA')
  }, 1000)
})))

// 這是一個會返回數據流promiseB$的函數
const promiseB = data => from(new Promise((reslove, reject)=>{
  setTimeout(()=>{
    reslove(`${data} then PromiseB`)
  }, 1000)
}))

// map會將把上游完成後的數據經過promiseB轉換成promiseB$數據流
// 並傳遞給concatAll, concatAll將promiseB$鏈接下游數據流
// 這裏將在兩秒後打印出 PromiseA then PromiseB
promiseA$.pipe(
  map(promiseB),
  concatAll()
).subscribe(x => console.log(x))

能夠看到這個例子的寫法與以前例子的寫法有較大的不一樣,各數據流不是一個並行的寫法,而是有一個鏈式的先後關係。

promiseB是一個會返回from數據流的函數, 爲了描述方便, 咱們如今把promiseB這個方法返回的數據流稱爲promiseB$。

在這個例子中其實就是一個把promiseB$這個數據流匯入到promiseA$的過程,咱們仔細來看:

  1. promiseA$自己就是一個異步數據流, 它將在1s後完成並執行下游
  2. map獲取上游的數據並將其轉換成一個包含上游數據的數據流promiseB$,並將promiseB$拋給了下游的的concatAll
  3. 這時對concatAll來講它接收到的數據就是數據流promiseB$,那麼concatAll會訂閱promiseB$,並將其完成後的數據拋給下游。

能夠這麼理解, map+concatAll的組合就是幫助promiseB$銜接上下游的

OK,咱們用一張手畫僞彈珠草圖來描述下上面這個例子的區別
concat&concatAll.jpg
能夠看到concat中 數據是不會在各個子數據流中傳遞的,統一拋給了下游;

在concatAll的圖例中,map將B做爲數據拋給了concatAll,B在concatAll中被訂閱, 完成後產生的數據直接拋給了下游。

concatMap

爲了將兩個數據流在數據上進行銜接須要用到map+concatAll這個固定的組合,有沒有方法能簡潔的就把這個事幹了的呢?那就是concatMap, map+concatAll的語法糖。

stackblitz 例 4-1

import { concat, defer, from } from 'rxjs'; 
import { concatMap, map, tap } from 'rxjs/operators'; 

console.log('Start')

const promiseA$ = defer(()=>from(new Promise((reslove, reject)=>{
  setTimeout(()=>{
    reslove('PromiseA')
  }, 1000)
})))

// 這是一個會返回數據流promiseB$的函數
const promiseB = data => new Promise((reslove, reject)=>{
  setTimeout(()=>{
    reslove(`${data} then PromiseB`)
  }, 1000)
})

// concatMap 能夠接收一個返回Promise的函數或者是數據流
// 這裏將在兩秒後打印出 PromiseA then PromiseB
promiseA$.pipe(
  concatMap(promiseB)
).subscribe(x => console.log(x))

這個例子中用cancatMap代替了 map+concatAll, 能夠看到效果是同樣的。

值得注意的是咱們發現promiseB函數和上面的例子(例3-1)也產生了不一樣,固然promiseB若是和例3-1是同樣的話在這裏也正常能運行。在這裏promiseB去掉了defer和from的包裹,這是由於concatMap能夠接受一個Promise返回做爲一個新的數據流。這樣代碼會比以前的例子更加簡潔一點。

第二個參數

concatMap還能夠接受第二個參數, 第二個參數是一個回調函數, 這個回調函數會在當前數據流完成後被當即調用。這個回調函數接受的第一個參數是當前數據流接受的數據參數,第二個參數是當前數據流處理後返回的數據。

考慮這個場景,咱們有一個頁面,頁面初始化的時候有個初始化接口獲取數據,頁面上有個下一步按鈕,這個按鈕觸發的事件須要使用初始化接口的數據。由於事件流返回的數據就是事件自己,此時咱們不須要這個數據,咱們只需事件發生後獲取以前拋出的數據,這時咱們可使用第二個回調函數:

stackblitz 例4-2

import { concat, defer, from, fromEvent } from 'rxjs'; 
import { tap, concatMap } from 'rxjs/operators'; 

// 使用promise模擬數據請求過程
const req$ = defer(()=>from(new Promise((reslove, reject)=>{
  setTimeout(()=>{
    reslove('This is init data')
  }, 1000)
})))

// 事件流
const button$ = _ => fromEvent(document.getElementById('button'), 'click')

// 點擊按鈕後輸出請求內容
// 這裏會打印出 This is init data
req$.pipe(
  concatMap(button$, (data, event) => data)
).subscribe(x => console.log(x))

能夠看到點擊按鈕後會輸出請求數據,而參數event就是事件流返回的點擊事件自己了。

concatMapTo

若是說cancatMap至關於map+concatAll, 那concatMapTo就至關於mapTo + concatAll了,就是把上游的數據統一映射成下游的數據。

對於concatMapTo須要注意這些

  1. concatMapTo接受的第一個參數和concatMap不一樣, 它直接接受一個數據流,並把這個數據流產生的數據直接拋給下游
  2. 它的第二個參數與concatMap相同

因此對於上一個例子(例4-2)咱們能夠有個小改進,就是把button$外圍包裹的函數去掉,直接把fromEvent給concatMapTo就好了。

做者說

這是我第一篇公開的文章, 文章裏面有什麼不對的歡迎你們挑刺~

相關文章
相關標籤/搜索