RxJS與Redux結合使用(一):打造本身的redux-observable

背景

Redux 的核心理念是單向數據流,只能經過 dispatch(action) 的方式修改狀態,使用react-redux能夠在組件和redux之間造成下面這麼一個數據流閉環:javascript

view ->  action -> reducer -> state -> view
複製代碼

然而,在實際業務中每每有大量異步場景,最直接的作法是在React組件中發起異步請求,在拿到數據後調用dispatch(action)去數據層修改數據。不過這樣的作法使得視圖層和數據層耦合在一塊兒,會形成後期維護的困難。java

Redux做者建議用中間件來處理異步流,由於在中間件中咱們能夠靈活地控制 dispatch的時機,這對於處理異步場景很是有效。較爲常見的作法主要有兩種:react

  1. 更改action的類型,如redux-thunk,用函數替換了action;
  2. 在middleware中接收到action的時候作出一些對應處理,如redux-saga。

而咱們今天要講的rxjs與redux的結合,採用了第二種方式來處理異步流程。編程

中間件干預處理

市面上已經存在這麼個中間件了:redux-observable。而咱們今天要作的就是帶領你們,一步一步慢慢實現本身的一個redux-observable。redux

這個中間件的原理我能夠簡化爲下面的代碼:api

export default store => next => action => {
    const result = next(action);
    if (action.type === 'ping') {
        store.dispatch({ type: 'pong' })
    }
    return result;
}
複製代碼

原理實在簡單,在next(action)以後去根據action作判斷,作一些異步邏輯,再發起dispatch修改數據便可,而redux-observable也只是在這個基礎之上加入RxJs的一些特性。bash

處理異步邏輯的思路

若是你比較熟悉redux的話,就會知道,redux的中間件就是一個洋蔥模型,上面咱們也說了,咱們會在中間件的後面根據你的action再去從新dispatch一些action,而Rxjs最核心的思想就是將數據都流化。因此你能夠理解爲action在中間件的末端流入一個管道,最後從管道又流出一些action,這些action最終會再次被store dispatch。異步

image

至於在這個管道中進行了什麼樣的變化、操做,那就是Rxjs的管轄範圍,經過Rxjs的強大的操做符,咱們能夠很是優雅地實現異步邏輯。async

因此,須要有個流來承載全部的action,這樣你就能夠經過這個action$來進行fetch:模塊化

action$.pipe(
    switchMap(
        () => fromPromise(fetch('/api/whatever')).pipe(
            map(res => action)
        )
    ),
    catchError(() => {})
)
複製代碼

這樣就將異步邏輯嵌入到流當中。

建立Action流

咱們的核心思想是action in, action out,因此最終流出的action是要從新被store.dispatch消費的,因此action$是一個Observable對象。

同時,在dispatch的時候,action通過中間件,action中須要放入這個action,因此action也是一個observer。

所以,action$既是觀察者又是可觀察對象,是一個Subject對象:

替換中間件的簡單寫法,變成:

import { Subject } from 'rxjs/Subject';

export default (store) => {

  const action$ = new Subject();
  
  action$.subscribe(store.dispatch);
  
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
複製代碼

在上面代碼中咱們在middleware中去放入action,而後經過訂閱,store會觸發dispatch。

可是,若是咱們就這麼寫的話,這是個死循環,由於任何action在進入到action後就立馬被消費者store.dipatch(action)執行了,這個action又會在後面的流程中從新被送到action去。

聰明的你應該想到了,咱們對於進入到action$的action尚未進行任何過濾,而這個過濾過程也正是咱們須要的處理異步邏輯的地方。

下面咱們要把這個步驟加上。

流的轉化器Epic

爲了達到action的一個轉化處理,咱們將這個過程抽離出來,這個中間處理的邏輯稱爲Epic,epic的形式大概能夠寫爲:

const epic = (action$) => {
    return action$.pipe(
        // 由於全部的action都會過來
        // 因此咱們只須要處理咱們想要的aciton
        filter(action => action.type === 'GET_USER'),
        switchMap(
            // 將fetch也轉化爲流
            () => fromPromise(fetch('/api/user/get', {
                method: 'POST',
                body: {
                    id: 1
                },
            })).pipe(
                map(res => ({ type: 'GET_USER_SUCCESS', payload: res })),
                catchError(error => ({ type: 'GET_USER_FAILED', payload: error }))
            )
        )
    )
}

複製代碼

epic本質是一個函數,在這個函數中,咱們在action$的基礎上,加入了管道控制,產生了另一個流,而這個流就是最終咱們要的,對action進行了控制的action流,上面的fetch只是一個例子,在這個管道中,你能夠處理任意的異步邏輯。

而咱們要作的就是將這個Epic,整合進剛纔的中間中。

作法也很簡單,咱們只須要將訂閱從action$換到新的流上就能夠了:

import { Subject } from 'rxjs/Subject';

export default (store) => {

  const action$ = new Subject();
  const newAction$ = epic(action$);
  
  newAction$.subscribe(store.dispatch);
  
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
複製代碼

這樣,action$在接收到新的action的時候,會流經epic定義的管道,而後纔出發dispatch

多個Epic合併

到此,咱們的中間件已經有初步處理異步邏輯的能力,可是,在現實中,咱們的異步邏輯不可能只有一個,因此epic是會有不少的,而store去訂閱的流只能是一個,因此這麼多的epic產生的流要合併成一個流。

合併流的操做,強大的RxJs天然是有安排的,相信你想到了操做符merge,咱們能夠提供一個combineEpics的函數:

export const combineEpics = (...epics) => {
  const merger = (...args) => merge(
    ...epics.map((epic) => {
      const output$ = epic(...args);
      return output$;
    })
  );
  return merger;
};
複製代碼

上面的代碼不難理解,combineEpics整合了全部傳入的epic,而後返回一個merger,這個merger是利用merge操做符,將全部的epic產生的流合併成一個流。

image

代碼形式爲:

const pingEpic = action$ => action$.pipe(
  filter(action => action.type === 'ping'),
  map(() => ({ type: 'pong' })),
);

const getUserEpic = action$ => action$.pipe(
  filter(action => action.type === 'GET_USER'),
  map(() => ({ type: 'GET_USER_SUCCESS', payload: { user: { name: 'kang' } } })),
);

const rootEpic = combineEpics(pingEpic, getUserEpic);

export default (store) => {
  const action$ = new Subject();
  const newAction$ = rootEpic(action$);

  newAction$.subscribe(store.dispatch);
  return next => (action) => {
    const result = next(action);
    action$.next(action);
    return result;
  };
};
複製代碼

state獲取

在epic中咱們不可避免地要藉助state裏面的數據進行不一樣的處理,因此咱們是須要獲取到state的,因此你能夠在中間件中的epci執行函數中添加一個參數,將state獲取函數暴露出去:

export default (store) => {
  ...
  const newAction$ = rootEpic(action$, store.getState);
  ...
};
複製代碼

這樣epic裏就能夠用getState()獲取state:

const pingEpic = (action$, getState) => action$.pipe(
  filter(action => action.type === 'ping'),
  map(() => ({ type: 'pong', payload: getState() })),
);
複製代碼

進一步優化:將state也流化

上面的作法是直接去獲取state,這樣的作法是主動獲取,不符合函數響應式編程模式。函數響應式中,state的改變狀態,應該是要能被觀察的。

當state也能被響應觀察,咱們就能夠作更多的功能,例如:當state的某些數據在發生變化的時候,咱們要去進行實時保存。

在傳統模式的作法中,你能夠在中間件中這樣寫:

export default store => next => action => {
    const oldState = store.getState();
    const result = next(action);
    const newState = store.getState();
    // 相似這樣的寫法
    if (newState.xxx !== oldState.xxx) {
        fetch('/api/save', {
            method: 'POST',
            body: {
            
            }
        }).then(() => {}).catch(() => {})
    }
    return result;
}
複製代碼

這個處理邏輯要獨立爲一箇中間件,而若是你將state也流化,你能夠直接使用epic這樣處理:

const saveEpic = (action$, state$) => state$.pipe(
const autoSaveEpic = (action$, state$) =>
  return action$.pipe(
    filter(action => action.type === 'AUTO_SAVE_ENABLE'), // 自動保存的開啓
    exhaustMap(() => state$.pipe(
        pluck('xxx'), // 獲取state.xxx
        distinctUntilChanged(), // 先後值不一樣時纔將其發出。
        concatMap((value) => {
            // fetch to save
        }),
        // 自動保存的關閉
        takeUntil(action$.pipe(
            filter(action => action.type === 'AUTO_SAVE_DISABLE')
        ))
    ))
  )
)
複製代碼

若是仔細閱讀這段代碼,能夠發現這樣的方式可使用很是優雅的方式控制這個自動保存,能夠和action$結合使用,快速開關自動保存,能夠利用RxJs的特性解決保存的異步執行延遲問題。

若是你只是單存想要獲取最新state,可使用withLatestFrom操做符:

const countEpic = (action$, state$) => action$.pipe(
  filter(action => action.type === 'count'),
  withLatestFrom(state$),
  switchMap(([action, state]) => {
    return of({ type: 'whatever' });
  })
);
複製代碼

在中間件加入state流:

export default (store) => {
  const action$ = new Subject();
  const state$ = new Subject();
  const source$ = rootEpic(action$, state$);

  source$.subscribe(store.dispatch);
  return next => (action) => {
    const result = next(action);
    state$.next(store.getState());
    action$.next(action);
    return result;
  };
};
複製代碼

注意state.next要先執行,這樣在epic中才會拿到最新的,另外能夠知道一下,redux在init的時候不會通過中間件,因此當你沒有dispatch任何action的時候,state最新值不是默認state。

Action的順序問題

若是你有耐心看到這裏,那麼說明你對於redux結合RxJs的使用已經理解得差很少了,可是這裏仍是有個問題,就是action的生效順序,咱們能夠直接看個例子說明,假設有下面這樣兩個epic:

const epic1 = action$ => action$.pipe(
  filter(action => action.type === 'one'),
  mergeMap(() => of({ type: 'two' }, { type: 'three' })),
);

const epic2 = action$ => action$.pipe(
  filter(action => action.type === 'two'),
  mergeMap(() => of({ type: 'four' })),
);
複製代碼

store.dispatch({ type: 'one' }) 的時候,action的順序爲:

'one' -> 'two' -> 'four' -> 'three'
複製代碼

可見,action的執行順序並非如咱們預期的那樣,在two觸發後就發出了four,這是由於RxJs默認的調度器是同步的,用一段簡單的代碼,上面的效果相似於:

class Print {
  constructor(name, nexts = []) {
    this.name = name;
    this.nexts = nexts;
  }
  print() {
    console.log(this.name);
    this.nexts.forEach((p) => {
      p.print();
    });
  }
}
const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, four, three
複製代碼

換成上面的代碼的話你就不陌生了吧,也會對於輸出的結果表示確定,可是咱們須要的效果是

'one' -> 'two' -> 'three' -> 'four'
複製代碼

這如何作到?

明顯,須要將調度器換成其餘的,RxJs有這麼幾種調度器:null(同步)、asap、queue、async、animationFrame。最後一種是動畫場景的調度器,直接剔除,默認是第一種,那麼就剩下asap、queue、async。在這個場景下,這三種調度器都是可行的,可是queue在大量的數據的時候對於性能是有利的,因此這裏可使用它。不過,記住,這三種調度器是有區別的,你們有興趣的本身去google一下,只提示:asap是Micro Task、async是Macro Task、queue在延遲爲0的時候接近於同步,在延遲不爲0的時候與async同樣。

中間件中:

const action$ = new Subject().pipe(
    observeOn(queue)
  );
複製代碼

這樣獲得的結果就是:

'one' -> 'two' -> 'three' -> 'four'
複製代碼

若是用簡單的代碼,至關於發生了這樣的變化:

class Print {
  constructor(name, nexts = []) {
    this.name = name;
    this.nexts = nexts;
  }
  print() {
    console.log(this.name);
    this.nexts.forEach((p) => {
      setTimeout(() => p.print(), 0);
    });
  }
}
const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, three, four

複製代碼

總結

本文就講到這裏,此次介紹瞭如何本身實現一個redux-observable,下次會講redux-observable在實戰中的一些應用,例如怎麼相似dva那樣進行模塊化開發、如何統一處理loading、error等。

相關文章
相關標籤/搜索