本文是 《使用 RxJS + Redux 管理應用狀態》系列第二篇文章,將會介紹 redux-observable 的設計哲學和實現思路。返回第一篇:使用 redux-observable 實現組件自治html
本系列的文章地址彙總:前端
Redux 脫胎於 Elm 架構,其狀態管理視角和流程很是清晰和明確:react
這個過程是同步的,Redux 爲了保護 reducer 的純度是不推薦在 reducer 中處理反作用的(如 HTTP 請求)。所以,就出現了 redux-thunk、redux-saga 這樣的 Redux 中間件去處理反作用。git
這些中間件本質都是俘獲 dispatch 的內容,並在這個過程當中進行反作用處理,最終 dispatch 一個新的 action 給 reducer,讓 reducer 專心作一個純的狀態機。es6
假定咱們在 UI 層能派發出一個數據拉取的 FETCH
action,拉取數據後,將派發拉取成功的 FETCH_SUCCESS
action 或者是數據拉取失敗的 FETCH_ERROR
action 到 reducer。github
FETCH
|
fetching data...
|
/ \
/ \
FETCH_SUCCESS FETCH_ERROR
複製代碼
若是咱們用 FRP 模式來思考這個過程,FETCH 就不是一個獨立的個體,而是存在於一條會派發 FETCH action 的流上(observable):編程
---- FETCH ---- FETCH ----
---- FETCH_SUCCESS ---- FETCH_SUCCESS ----
---- FETCH_ERROR ---- FETCH_ERROR ----
複製代碼
若咱們將 FETCH 流定義爲 fetch$
,則 FETCH_SUCCESS 和 FETCH_ERROR 都未來自於 fetch$
:redux
const fetch$: Observable<FetchAction> = //....
fetch$.pipe(
switchMap(() => from(api.fetch).pipe(
// 拉取數據成功
switchMap(resp => ({
type: FETCH_SUCCESS,
payload: {
// ...
}
}),
// 拉取數據失敗
catchError(error => of({
type: FETCH_ERROR,
payload: {
// ....
}
}))
))
)
複製代碼
除此以外,咱們能夠用一個流來承載頁面全部的 action:api
const action$: Observable<Action>
複製代碼
那麼, fetch$
亦能夠由 action$
流轉獲得:緩存
const fetch$ = action$.pipe(
filter(({type}) => type === FETCH)
)
複製代碼
這樣,咱們就造成了使用 observable 流轉 action 的模式:
接下來,咱們嘗試講這個模式整合到 Redux 中,讓 observable 來負責應用的 action 流轉和反作用處理。
Redux 提供的中間件機制能讓咱們干預每一個到來的 action, 藉此處理一些業務邏輯,而後再返還一個 action 給 reducer:
中間件的函數構成以下:
const middleware: Middleware = store => {
// 初始化中間件
return next => action => {
// do something
}
}
const store = createStore(
rootReducer,
applyMiddleware(middleware)
)
複製代碼
如今,當中間件初始化時,咱們進行 action$
。當新的 action 到來時:
action$
中放入 actionaction$
能夠轉化另外一個的 action 流所以,action$
既是觀察者又是可觀察對象,是一個 Subject 對象:
const createMiddleware = (): Middleware => {
const action$ = new Subject()
const middleware: Middleware = store => next => action => {
// 將 action 交給 reducer 處理
const result = next(action)
// 將 action 放到 action$ 中進行流轉
action$.next(action)
return result
}
return middleware
}
複製代碼
如今,在中間件中,咱們初始化了 action$
,可是如何獲得 fetch$
這些由 action$
派生的流呢?所以,咱們還須要告知中間件若是經過 action$
生成更多的流,不妨定義一個轉換器,由它負責 action$
的流轉,並在當中處理反作用:
interface Transformer {
(action$: Observable<Action>): Observable<Action>
}
const fetchTransformer: Transformer = (action$) => {
action$.pipe(
filter(({type}) => type === FETCH),
switchMap(() => from(api.fetch).pipe(
switchMap(resp => ({
type: FETCH_SUCCESS,
payload: {
// ...
}
}),
catchError(error => of({
type: FETCH_ERROR,
payload: {
// ....
}
}))
))
)
}
複製代碼
應用中,咱們可能定義不一樣的轉換器,從而獲得派發不一樣 action 的流:
const newActionsStreams: Observable<Action>[] = transformers.map(transformer => transformer(action$))
複製代碼
因爲這些 action 還具備一致的數據結構,所以咱們能夠將這些流進行合併,由合併後的流負責派發 action 到 reducer:
const newAction$ = merge(newActionStreams)
複製代碼
那麼,修改咱們的中間件實現:
const createMiddleware = (...transformers): Middleware => {
const action$ = new Subject()
// 運行各個 transformer,並將轉換的流進行合併
const newAction$ = merge(tramsformer.map(transformer => transformer(action$)))
const middleware: Middleware = store => {
// 訂閱 newAction$
newAction$.subscribe(action => store.dispatch(action))
return next => action => {
// 將 action 交給 reducer 處理
const result = next(action)
// 將 action 放到 action$ 中進行流轉
action$.next(action)
return result
}
}
return middleware
}
複製代碼
ofType
operator因爲咱們老是須要 filter(action => action.type === SOME_TYPE)
來過濾 action,所以能夠封裝一個 operator 來優化這個過程:
const ofType: OperatorFunction<Observable<Action>, Observable<Action>> = (type: String) => pipe(
filter(action => action.type === type)
)
複製代碼
const fetchTransformer: Transformer = (action$) {
return action$.pipe(
filter(({type}) => type === FETCH),
switchMap(() => from(api.fetch)),
// ...
)
}
複製代碼
再考慮到咱們可能不僅過濾一個 action type,所以能夠優化咱們的 ofType
operator 爲:
const ofType: OperatorFunction<Observable<Action>, Observable<Action>> =
(...types: String[]) => pipe(
filter((action: Action) => types.indexOf(action.type) > -1)
)
複製代碼
const counterTransformer: Transformer = (action$) {
return action$.pipe(
ofType(INCREMENT, DECREMENT),
// ...
)
}
複製代碼
下面這個測試用例將用來測試咱們的中間件是否可以工做了:
it('should transform action', () => {
const reducer: Reducer = (state = 0, action) => {
switch(action.type) {
case 'PONG':
return state + 1
default:
return state
}
}
const transformer: Transformer = (action$) => {
return action$.pipe(
ofType('PING'),
mapTo({type: 'PONG'})
)
)
}
const middleware = createMiddleware(transformer)
const store = createStore(reducer, applyMiddleware(middleware))
store.dispatch({type: 'PING'})
expect(store.getState()).to.be.equal(1)
})
複製代碼
在 action 的流轉過程可能還須要得到應用狀態,例如,fetch$
中獲取數據前,須要封裝請求參數,部分參數可能來自於應用狀態。所以,咱們能夠考慮爲每一個 transformer 再傳遞當前的 store 對象,使它能拿到當前的應用狀態:
interface Transformer {
(action$: Observable<Action>, store: Store): Observable<Action>
}
// ...
const createMiddleware = (...transformers): Middleware => {
const action$ = new Subject()
const middleware: Middleware = store => {
// 將 store 也傳遞給 transformer
const newAction$ = merge(tramsformer.map(transformer => transformer(action$, store)))
newAction$.subscribe(action => store.dispatch(action))
return next => action => {
const result = next(action)
action$.next(action)
return result
}
}
return middleware
}
複製代碼
如今,當須要取用狀態的時候,就經過 store.getState()
拿取:
const fetchTransformer: Transformer = (action$, store) {
return action$.pipe(
filter(({type}) => type === FETCH),
switchMap(() => {
const { query, page, pageSize } = store.getState()
const params = { query, page, pageSize }
return from(api.fetch, params)
}),
// ...
)
}
複製代碼
在響應式編程體系下,一切數據源都應當是可被觀察的,而上面咱們對狀態的取值確是主動的(proactive)的,正確的方式是應當觀察狀態的變化,並在變化時做出決策:
爲此,相似 action$
,咱們也將 state 流化,使得應用狀態成爲一個可觀察對象,並將 state$
傳遞給 transformer:
interface Transformer {
(action$: Observable<Action>, state$: Observable<State>): Observable<Action>
}
// ...
const createMiddleware = (...transformers): Middleware => {
const action$ = new Subject()
const state$ = new Subject()
const middleware: Middleware = store => {
// 由各個 transformer 得到應用的 action$
const newAction$ = merge(tramsformer.map(transformer => transformer(action$, state$)))
// 新的 action 到來時,將其又 dispatch 到 Redux 生態
newAction$.subscribe(action => store.dispatch(action))
return next => action => {
// 將 action 交給 reducer
const result = next(action)
// 得到 reducer 處理後的新狀態
state$.next(state)
// 將 action 放入 action$
action$.next(action)
return result
}
}
return middleware
}
複製代碼
當業務流程須要狀態時,就能夠自由組合 state$
獲得:
const fetchTransformer: Transformer = (action$, state$) {
return action$.pipe(
filter(({type}) => type === FETCH),
withLatestFrom(state$),
switchMap(([action, state]) => {
const { query, page, pageSize } = state
const params = { query, page, pageSize }
return from(api.fetch, params)
}),
// ...
)
}
複製代碼
乍看之下,彷佛不如 store.getState()
來的方便,爲了得到當前狀態,咱們還額外引入了一個 operator withLatestFrom
。可是,要注意到,咱們引入 state$
不僅爲了得到狀態和統一模式,更重要是爲了觀察狀態。
舉個例子,咱們有一個備忘錄組件,每次內容變更時,咱們就存儲一下草稿。若是咱們能觀察狀態變更,經過響應式編程模式,當狀態變更時,自動造成草稿存儲的業務:
const saveDraft$: Observable<Action> = state$.pipe(
// 選出當前
pluck('content'),
// 只有當內容變更時才考慮存儲草稿
distinctUntilChanged(),
// 只在 1 s 內保存一次
throttleTime(1000),
// 調用服務存儲草稿
switchMap(content => from(api.saveDraft(content)))
// ....
)
複製代碼
你們也能夠在回顧系列第一篇所介紹的內容,正是因爲 redux-observable 在 1.0 版本引入了 state$
,咱們才得以解耦組件的業務關係,實現單個組件的自治。
如今,咱們能夠測試一下如今的中間件,看可否觀察應用狀態了:
it('should observe state', () => {
const reducer: Reducer = (state = {step: 10, counter: 0}, action) => {
switch(action.type) {
case 'PONG':
return {
...state,
counter: action.counter
}
default:
return state
}
}
const transformer: Transformer = (action$, state$) => {
return action$.pipe(
ofType('PING'),
withLatestFrom(state$, (action, state) => state.step + state.counter),
map(counter => ({type: 'PONG', counter}))
)
)
}
const middleware = createMiddleware(transformer)
const store = createStore(reducer, applyMiddleware(middleware))
store.dispatch({type: 'PING'})
expect(store.getState().counter).to.be.equal(10)
})
複製代碼
遺憾的是,這個測試用例將不會經過,經過調試發現,當咱們 dispatch 了 PING action 後,withLatestFrom
沒有拿到最近一次的 state。這是爲何呢?原來是由於 Redux 的 init action 並無暴露給中間件進行攔截,所以,應用的初始狀態沒能被送入 state$
中,觀察者沒法觀察到初始狀態。
爲了解決這個問題,在建立了 store 後,咱們能夠嘗試 dispatch 一個無心義的 action 給中間件,強制將初始狀態先送入 state$
中:
const middleware = createMiddleware(transformer)
const store = createStore(reducer, applyMiddleware(middleware))
// 派發一個 action 去得到初始狀態
store.dispatch({type: '@@INIT_STATE'})
複製代碼
這個方式雖然能讓測試經過,但缺不是很優雅,咱們讓用戶手動去派發一個無心義的 action,這會讓用戶感受很困惑。所以,咱們考慮爲中間件單獨設置一個 API,用以在 store 建立後,完成一些任務:
// 設置一個 store 副本
let cachedStore: Store
const createMiddleware = (...transformers): Middleware => {
const action$ = new Subject()
const state$ = new Subject()
const newAction$ = merge(transformers.map(transformer => transformer(action$, state$)))
const middleware: Middleware = store => {
cachedStore = store
return next => action => {
// 將 action 交給 reducer
const result = next(action)
// 得到 reducer 處理後的新狀態
state$.next(state)
// 將 action 放入 action$
action$.next(action)
return result
}
}
middleware.run = function() {
// 1. 開始對 action 的訂閱
newAction$.subscribe(cachedStore.dispatch)
// 2. 將初始狀態傳遞給 state$
state$.next(cachedStore.getState())
}
return middleware
}
複製代碼
如今,咱們爲中間件提供了一個 run
方法,來讓中間件在 store 建立之後完成一些工做。當咱們建立好 store 後,運行 run
方法來運行中間件:
const middleware = createMiddleware(transformer)
const store = createStore(reducer, applyMiddleware(middleware))
// 運行咱們的中間件
middleware.run()
複製代碼
再考慮一個更加場景,各個 transformer 之間可能存在關聯,各個 trasformer 也可能直接發出 action,而不須要依賴於 action$
:
it('should queue synchronous actions', () => {
const reducer = (state = [], action) => state.concat(action)
const transformer1 = (action$, state$) => action$.pipe(
ofType('FIRST'),
mergeMap(() => of({ type: 'SECOND' }, { type: 'THIRD'} ))
)
const transformer2 = (action$, state$) => action$.pipe(
ofType('SECOND'),
mapTo({type: 'FORTH'})
)
const middleware = createMiddleware(transformer1, transformer2)
const store = createStore(reducer, applyMiddleware(middleware))
middleware.run()
const actions = store.getState()
actions.shift() // remove redux init action
expect(actions).to.deep.equal([
{ type: 'FIRST' },
{ type: 'SECOND' },
{ type: 'THIRD' },
{ type: 'FORTH' }
])
})
複製代碼
在這個測試用例中,咱們看到的 action 序列是:
FIRST
SECOND
THIRD
FORTH
複製代碼
可是,在當前的實現中,你將獲得:
FIRST
SECOND
FORTH
THIRD
複製代碼
這並不符合預期。可是,問題又出在哪裏呢?咱們分析下程序執行過程:
問題顯然就出在第 二、3 步,若是第 2 步中,咱們控制 observable 吐出值的速度,將同時到來的 second 和 third action 緩存到隊列,並依次執行,就能獲得咱們指望的輸出。
幸運的是,RxJS 中提供了 observeOn
這個 operator 來控制數據源發出值的節奏。其第一個參數接收一個調度器,用於告知數據源以怎樣的速錄調度任務,這裏咱們將使用 Queue Scheduler 將各個 action 緩存到隊列,當此時再無 action 時,各個 action 出隊並被調度:
export const createEpicMiddleware = (...epics) => {
const action$ = new Subject().pipe(observeOn(queueScheduler)) as Subject<Action>
// ...
return middleware
}
複製代碼
如今,再次運行測試用例,你講看到符合指望的 action 序列:
FIRST
SECOND
THIRD
FORTH
複製代碼
這是由於:
store.dispatch(first)
,派生出 second action 及 third action 的 observablestore.dispatch(second)
,派生出 forth action 的 observablestore.dispatch(third)
store.dispatch(forth)
截止目前,咱們的中間件已經容許咱們經過 FRP 模式梳理應用狀態了,這個中間件的實現已經很是相似於 redux-observable 的實現了。固然,你們生產環境仍是用更流行,更穩定的 redux-observable,本文旨在幫助你們更好的理解如何在 Redux 中集成 RxJS 更好的管理狀態,經過一步一步對中間件的優化,也讓你們理解了了 redux-observable 的設計哲學和實現原理。本文實現的 mini redux-observable 我也放到了個人 github 上,包含了一些測試用例和一個小的 demo。
接下來,咱們將探索將 redux-observable 以及 FRP 這套模式集成到 dva 架構的前端框架中,dva 架構幫助砍掉 Redux 冗長的樣板代碼,而 redux-observable 則專一於反作用處理。