如何實現一個 redux-observable

本文是 《使用 RxJS + Redux 管理應用狀態》系列第二篇文章,將會介紹 redux-observable 的設計哲學和實現思路。返回第一篇:使用 redux-observable 實現組件自治html

本系列的文章地址彙總:前端

Redux

Redux 脫胎於 Elm 架構,其狀態管理視角和流程很是清晰和明確:react

  1. dispatch 了一個 action
  2. reducer 俘獲 action,並根據 action 類型進行不一樣的狀態更新邏輯
  3. 周而復始地進行這個過程

這個過程是同步的,Redux 爲了保護 reducer 的純度是不推薦在 reducer 中處理反作用的(如 HTTP 請求)。所以,就出現了 redux-thunk、redux-saga 這樣的 Redux 中間件去處理反作用。git

這些中間件本質都是俘獲 dispatch 的內容,並在這個過程當中進行反作用處理,最終 dispatch 一個新的 action 給 reducer,讓 reducer 專心作一個純的狀態機。es6

用 observable 管理反作用

假定咱們在 UI 層能派發出一個數據拉取的 FETCH action,拉取數據後,將派發拉取成功的 FETCH_SUCCESS action 或者是數據拉取失敗的 FETCH_ERROR action 到 reducer。github

FETCH
             |
       fetching data...
             |
            / \
           /   \
 FETCH_SUCCESS FETCH_ERROR
複製代碼

若是咱們用 FRP 模式來思考這個過程,FETCH 就不是一個獨立的個體,而是存在於一條會派發 FETCH action 的流上(observable):編程

---- FETCH ---- FETCH ---- 

---- FETCH_SUCCESS ---- FETCH_SUCCESS ----

---- FETCH_ERROR ---- FETCH_ERROR ----
複製代碼

若咱們將 FETCH 流定義爲 fetch$,則 FETCH_SUCCESS 和 FETCH_ERROR 都未來自於 fetch$redux

const fetch$: Observable<FetchAction> = //....
fetch$.pipe(
  switchMap(() => from(api.fetch).pipe(
    // 拉取數據成功
    switchMap(resp => ({
      type: FETCH_SUCCESS,
      payload: {
        // ...
      }
    }),
    // 拉取數據失敗
    catchError(error => of({
      type: FETCH_ERROR,
      payload: {
        // ....
      }
    }))
  ))
)
複製代碼

除此以外,咱們能夠用一個流來承載頁面全部的 action:api

const action$: Observable<Action>
複製代碼

那麼, fetch$ 亦能夠由 action$ 流轉獲得:緩存

const fetch$ = action$.pipe(
  filter(({type}) => type === FETCH)
)
複製代碼

這樣,咱們就造成了使用 observable 流轉 action 的模式:

接下來,咱們嘗試講這個模式整合到 Redux 中,讓 observable 來負責應用的 action 流轉和反作用處理。

構建中間件

Redux 提供的中間件機制能讓咱們干預每一個到來的 action, 藉此處理一些業務邏輯,而後再返還一個 action 給 reducer:

中間件的函數構成以下:

const middleware: Middleware = store => {
  // 初始化中間件
  return next => action => { 
  	// do something
  }
}

const store = createStore(
  rootReducer,
  applyMiddleware(middleware)
)
複製代碼

如今,當中間件初始化時,咱們進行 action$ 。當新的 action 到來時:

  1. 將 action 交給 reducer 處理
  2. action$ 中放入 action
  3. action$ 能夠轉化另外一個的 action 流

所以,action$ 既是觀察者又是可觀察對象,是一個 Subject 對象:

const createMiddleware = (): Middleware => {
  const action$ = new Subject()
  const middleware: Middleware = store => next => action => {
    // 將 action 交給 reducer 處理
    const result = next(action)
    // 將 action 放到 action$ 中進行流轉
    action$.next(action)
    return result
  }
  return middleware
}
複製代碼

流的轉換器

如今,在中間件中,咱們初始化了 action$,可是如何獲得 fetch$ 這些由 action$ 派生的流呢?所以,咱們還須要告知中間件若是經過 action$ 生成更多的流,不妨定義一個轉換器,由它負責 action$ 的流轉,並在當中處理反作用:

interface Transformer {
  (action$: Observable<Action>): Observable<Action>
}

const fetchTransformer: Transformer = (action$) => {
  action$.pipe(
    filter(({type}) => type === FETCH),
    switchMap(() => from(api.fetch).pipe(
      switchMap(resp => ({
        type: FETCH_SUCCESS,
        payload: {
          // ...
        }
      }),
      catchError(error => of({
        type: FETCH_ERROR,
        payload: {
          // ....
        }
      }))
    ))
  )
}
複製代碼

應用中,咱們可能定義不一樣的轉換器,從而獲得派發不一樣 action 的流:

const newActionsStreams: Observable<Action>[] = transformers.map(transformer => transformer(action$))
複製代碼

因爲這些 action 還具備一致的數據結構,所以咱們能夠將這些流進行合併,由合併後的流負責派發 action 到 reducer:

const newAction$ = merge(newActionStreams)
複製代碼

那麼,修改咱們的中間件實現:

const createMiddleware = (...transformers): Middleware => {
  const action$ = new Subject()
  // 運行各個 transformer,並將轉換的流進行合併
  const newAction$ = merge(tramsformer.map(transformer => transformer(action$)))
  const middleware: Middleware = store => {
    // 訂閱 newAction$
    newAction$.subscribe(action => store.dispatch(action))
    return next => action => {
      // 將 action 交給 reducer 處理
      const result = next(action)
      // 將 action 放到 action$ 中進行流轉
      action$.next(action)
      return result
    }
  }
  return middleware
}
複製代碼

優化:ofType operator

因爲咱們老是須要 filter(action => action.type === SOME_TYPE) 來過濾 action,所以能夠封裝一個 operator 來優化這個過程:

const ofType: OperatorFunction<Observable<Action>, Observable<Action>> = (type: String) => pipe(
  filter(action => action.type === type)
)
複製代碼
const fetchTransformer: Transformer = (action$) {
  return action$.pipe(
    filter(({type}) => type === FETCH),
    switchMap(() => from(api.fetch)),
    // ...
  )
}
複製代碼

再考慮到咱們可能不僅過濾一個 action type,所以能夠優化咱們的 ofType operator 爲:

const ofType: OperatorFunction<Observable<Action>, Observable<Action>> = 
  (...types: String[]) => pipe(
    filter((action: Action) => types.indexOf(action.type) > -1)
  )
複製代碼
const counterTransformer: Transformer = (action$) {
  return action$.pipe(
    ofType(INCREMENT, DECREMENT),
    // ...
  )
}
複製代碼

下面這個測試用例將用來測試咱們的中間件是否可以工做了:

it('should transform action', () => {
   const reducer: Reducer = (state = 0, action) => {
    switch(action.type) {
      case 'PONG':
        return state + 1
      default:
        return state
    }
  }

  const transformer: Transformer = (action$) => {
    return action$.pipe(
        ofType('PING'),
        mapTo({type: 'PONG'})
      )
    )
  }

  const middleware = createMiddleware(transformer)
  const store = createStore(reducer, applyMiddleware(middleware))
  store.dispatch({type: 'PING'})
  expect(store.getState()).to.be.equal(1)
})
複製代碼

優化:得到 state

在 action 的流轉過程可能還須要得到應用狀態,例如,fetch$ 中獲取數據前,須要封裝請求參數,部分參數可能來自於應用狀態。所以,咱們能夠考慮爲每一個 transformer 再傳遞當前的 store 對象,使它能拿到當前的應用狀態:

interface Transformer {
  (action$: Observable<Action>, store: Store): Observable<Action>
}

// ...

const createMiddleware = (...transformers): Middleware => {
  const action$ = new Subject()
  const middleware: Middleware = store => {
    // 將 store 也傳遞給 transformer
    const newAction$ = merge(tramsformer.map(transformer => transformer(action$, store)))
    newAction$.subscribe(action => store.dispatch(action))
    return next => action => {
      const result = next(action)
      action$.next(action)
      return result
    }
  }
  return middleware
}
複製代碼

如今,當須要取用狀態的時候,就經過 store.getState() 拿取:

const fetchTransformer: Transformer = (action$, store) {
  return action$.pipe(
    filter(({type}) => type === FETCH),
    switchMap(() => {
      const { query, page, pageSize } = store.getState()
      const params = { query, page, pageSize }
      return from(api.fetch, params)
    }),
    // ...
  )
}
複製代碼

優化:觀察狀態

在響應式編程體系下,一切數據源都應當是可被觀察的,而上面咱們對狀態的取值確是主動的(proactive)的,正確的方式是應當觀察狀態的變化,並在變化時做出決策:

爲此,相似 action$,咱們也將 state 流化,使得應用狀態成爲一個可觀察對象,並將 state$ 傳遞給 transformer:

interface Transformer {
  (action$: Observable<Action>, state$: Observable<State>): Observable<Action>
}

// ...

const createMiddleware = (...transformers): Middleware => {
  const action$ = new Subject()
  const state$ = new Subject()
  const middleware: Middleware = store => {
    // 由各個 transformer 得到應用的 action$
    const newAction$ = merge(tramsformer.map(transformer => transformer(action$, state$)))
    // 新的 action 到來時,將其又 dispatch 到 Redux 生態
    newAction$.subscribe(action => store.dispatch(action))
    return next => action => {
      // 將 action 交給 reducer
      const result = next(action)
      // 得到 reducer 處理後的新狀態
      state$.next(state)
		  // 將 action 放入 action$
      action$.next(action)
      return result
    }
  }
  return middleware
}
複製代碼

當業務流程須要狀態時,就能夠自由組合 state$ 獲得:

const fetchTransformer: Transformer = (action$, state$) {
  return action$.pipe(
    filter(({type}) => type === FETCH),
    withLatestFrom(state$),
    switchMap(([action, state]) => {
      const { query, page, pageSize } = state
      const params = { query, page, pageSize }
      return from(api.fetch, params)
    }),
    // ...
  )
}
複製代碼

乍看之下,彷佛不如 store.getState() 來的方便,爲了得到當前狀態,咱們還額外引入了一個 operator withLatestFrom。可是,要注意到,咱們引入 state$ 不僅爲了得到狀態和統一模式,更重要是爲了觀察狀態。

舉個例子,咱們有一個備忘錄組件,每次內容變更時,咱們就存儲一下草稿。若是咱們能觀察狀態變更,經過響應式編程模式,當狀態變更時,自動造成草稿存儲的業務:

const saveDraft$: Observable<Action> = state$.pipe(
  // 選出當前
	pluck('content'),
  // 只有當內容變更時才考慮存儲草稿
  distinctUntilChanged(),
  // 只在 1 s 內保存一次
  throttleTime(1000),
  // 調用服務存儲草稿
  switchMap(content => from(api.saveDraft(content)))
  // ....
)
複製代碼

你們也能夠在回顧系列第一篇所介紹的內容,正是因爲 redux-observable 在 1.0 版本引入了 state$,咱們才得以解耦組件的業務關係,實現單個組件的自治。

優化:響應初始狀態

如今,咱們能夠測試一下如今的中間件,看可否觀察應用狀態了:

it('should observe state', () => {
   const reducer: Reducer = (state = {step: 10, counter: 0}, action) => {
    switch(action.type) {
      case 'PONG':
        return {
          ...state,
          counter: action.counter
        }
      default:
        return state
    }
  }

  const transformer: Transformer = (action$, state$) => {
    return action$.pipe(
        ofType('PING'),
      	withLatestFrom(state$, (action, state) => state.step + state.counter),
        map(counter => ({type: 'PONG', counter}))
      )
    )
  }

  const middleware = createMiddleware(transformer)
  const store = createStore(reducer, applyMiddleware(middleware))
  store.dispatch({type: 'PING'})
  expect(store.getState().counter).to.be.equal(10)
})
複製代碼

遺憾的是,這個測試用例將不會經過,經過調試發現,當咱們 dispatch 了 PING action 後,withLatestFrom 沒有拿到最近一次的 state。這是爲何呢?原來是由於 Redux 的 init action 並無暴露給中間件進行攔截,所以,應用的初始狀態沒能被送入 state$ 中,觀察者沒法觀察到初始狀態。

爲了解決這個問題,在建立了 store 後,咱們能夠嘗試 dispatch 一個無心義的 action 給中間件,強制將初始狀態先送入 state$ 中:

const middleware = createMiddleware(transformer)
const store = createStore(reducer, applyMiddleware(middleware))
// 派發一個 action 去得到初始狀態
store.dispatch({type: '@@INIT_STATE'})
複製代碼

這個方式雖然能讓測試經過,但缺不是很優雅,咱們讓用戶手動去派發一個無心義的 action,這會讓用戶感受很困惑。所以,咱們考慮爲中間件單獨設置一個 API,用以在 store 建立後,完成一些任務:

// 設置一個 store 副本
let cachedStore: Store
const createMiddleware = (...transformers): Middleware => {
  const action$ = new Subject()
  const state$ = new Subject()
  const newAction$ = merge(transformers.map(transformer => transformer(action$, state$)))
  
  const middleware: Middleware = store => {
    cachedStore = store
    
    return next => action => {
      // 將 action 交給 reducer
      const result = next(action)
      // 得到 reducer 處理後的新狀態
      state$.next(state)
		  // 將 action 放入 action$
      action$.next(action)
      return result
    }
  }
  
  middleware.run = function() {
    // 1. 開始對 action 的訂閱
    newAction$.subscribe(cachedStore.dispatch)
    // 2. 將初始狀態傳遞給 state$
    state$.next(cachedStore.getState())
  }
  return middleware
}
複製代碼

如今,咱們爲中間件提供了一個 run 方法,來讓中間件在 store 建立之後完成一些工做。當咱們建立好 store 後,運行 run 方法來運行中間件:

const middleware = createMiddleware(transformer)
const store = createStore(reducer, applyMiddleware(middleware))
// 運行咱們的中間件
middleware.run()
複製代碼

優化:相互關聯的 transformer

再考慮一個更加場景,各個 transformer 之間可能存在關聯,各個 trasformer 也可能直接發出 action,而不須要依賴於 action$

it('should queue synchronous actions', () => {
    const reducer = (state = [], action) => state.concat(action)
    const transformer1 = (action$, state$) => action$.pipe(
      ofType('FIRST'),
      mergeMap(() => of({ type: 'SECOND' }, { type: 'THIRD'} ))
    )
    const transformer2 = (action$, state$) => action$.pipe(
        ofType('SECOND'),
        mapTo({type: 'FORTH'})
    )
    
    const middleware = createMiddleware(transformer1, transformer2)
    const store = createStore(reducer, applyMiddleware(middleware))
    middleware.run()
    
    const actions = store.getState()
    actions.shift() // remove redux init action
    expect(actions).to.deep.equal([
      { type: 'FIRST' },
      { type: 'SECOND' },
      { type: 'THIRD' },
      { type: 'FORTH' }
    ])
})
複製代碼

在這個測試用例中,咱們看到的 action 序列是:

FIRST
SECOND
THIRD
FORTH
複製代碼

可是,在當前的實現中,你將獲得:

FIRST
SECOND
FORTH
THIRD
複製代碼

這並不符合預期。可是,問題又出在哪裏呢?咱們分析下程序執行過程:

  1. 發出 first action
  2. 調度 first action,派生出 second action 及 third action 的 observable
  3. 調度 second action,派生出 forth action 的 observable
  4. 調度 forth action
  5. 調度 third action

問題顯然就出在第 二、3 步,若是第 2 步中,咱們控制 observable 吐出值的速度,將同時到來的 second 和 third action 緩存到隊列,並依次執行,就能獲得咱們指望的輸出。

幸運的是,RxJS 中提供了 observeOn 這個 operator 來控制數據源發出值的節奏。其第一個參數接收一個調度器,用於告知數據源以怎樣的速錄調度任務,這裏咱們將使用 Queue Scheduler 將各個 action 緩存到隊列,當此時再無 action 時,各個 action 出隊並被調度:

export const createEpicMiddleware = (...epics) => {
  const action$ = new Subject().pipe(observeOn(queueScheduler)) as Subject<Action>
  
  // ...
  
  return middleware
}
複製代碼

如今,再次運行測試用例,你講看到符合指望的 action 序列:

FIRST
SECOND
THIRD
FORTH
複製代碼

這是由於:

  1. 發出 first action
  2. 調度 first action,入隊
  3. 此時沒有 action,first action 出隊,store.dispatch(first),派生出 second action 及 third action 的 observable
  4. second action 入隊,third action 入隊
  5. 此時沒有等待的 action,則 second action 出隊,store.dispatch(second),派生出 forth action 的 observable
  6. forth action 入隊
  7. 此時沒有等待的 action,隊首元素 third action 出隊,store.dispatch(third)
  8. forth action 出隊,store.dispatch(forth)

總結

截止目前,咱們的中間件已經容許咱們經過 FRP 模式梳理應用狀態了,這個中間件的實現已經很是相似於 redux-observable 的實現了。固然,你們生產環境仍是用更流行,更穩定的 redux-observable,本文旨在幫助你們更好的理解如何在 Redux 中集成 RxJS 更好的管理狀態,經過一步一步對中間件的優化,也讓你們理解了了 redux-observable 的設計哲學和實現原理。本文實現的 mini redux-observable 我也放到了個人 github 上,包含了一些測試用例和一個小的 demo。

接下來,咱們將探索將 redux-observable 以及 FRP 這套模式集成到 dva 架構的前端框架中,dva 架構幫助砍掉 Redux 冗長的樣板代碼,而 redux-observable 則專一於反作用處理。


參考資料

關於本系列

  • 本系列將從介紹 redux-observable 1.0 開始,闡述本身在結合 RxJS 到 Redux 中的心得體會。涉及內容會有 redux-observable 實踐介紹,redux-observable 實現原理探究,最後會介紹下本身當前基於 redux-observble + dva architecture 的一個 state 管理框架 reobservable。
  • 本系列不是 RxJS 或者 Redux 入門,再也不講述他們的基礎概念,宣揚他們的核心優點。若是你搜索 RxJS 不當心進到了這個系列,對 RxJS 和 FRP 程序設計產生了興趣,那麼入門我會推薦:
  • 本系列更不是教程,只是介紹本身在 Redux 中應用 RxJS 的一些思路,但願更多人能指出當中存在的誤區,或者交流更優雅的實踐。
  • 由衷的感謝實踐路上一些師兄的幫助,尤爲感謝騰訊雲的 questguo 學長在模式上的指導。reobservable 脫胎於騰訊雲 questguo 主導的 React 框架 —— TCFF,期待將來 TCFF 的開源。
  • 感謝小雨的設計支援。
相關文章
相關標籤/搜索