Redux 的核心理念是單向數據流,只能經過 dispatch(action) 的方式修改狀態,使用react-redux能夠在組件和redux之間造成下面這麼一個數據流閉環:javascript
view -> action -> reducer -> state -> view
複製代碼
然而,在實際業務中每每有大量異步場景,最直接的作法是在React組件中發起異步請求,在拿到數據後調用dispatch(action)去數據層修改數據。不過這樣的作法使得視圖層和數據層耦合在一塊兒,會形成後期維護的困難。java
Redux做者建議用中間件來處理異步流,由於在中間件中咱們能夠靈活地控制 dispatch的時機,這對於處理異步場景很是有效。較爲常見的作法主要有兩種:react
而咱們今天要講的rxjs與redux的結合,採用了第二種方式來處理異步流程。編程
市面上已經存在這麼個中間件了:redux-observable。而咱們今天要作的就是帶領你們,一步一步慢慢實現本身的一個redux-observable。redux
這個中間件的原理我能夠簡化爲下面的代碼:api
export default store => next => action => {
const result = next(action);
if (action.type === 'ping') {
store.dispatch({ type: 'pong' })
}
return result;
}
複製代碼
原理實在簡單,在next(action)以後去根據action作判斷,作一些異步邏輯,再發起dispatch修改數據便可,而redux-observable也只是在這個基礎之上加入RxJs的一些特性。bash
若是你比較熟悉redux的話,就會知道,redux的中間件就是一個洋蔥模型,上面咱們也說了,咱們會在中間件的後面根據你的action再去從新dispatch一些action,而Rxjs最核心的思想就是將數據都流化。因此你能夠理解爲action在中間件的末端流入一個管道,最後從管道又流出一些action,這些action最終會再次被store dispatch。異步
至於在這個管道中進行了什麼樣的變化、操做,那就是Rxjs的管轄範圍,經過Rxjs的強大的操做符,咱們能夠很是優雅地實現異步邏輯。async
因此,須要有個流來承載全部的action,這樣你就能夠經過這個action$來進行fetch:模塊化
action$.pipe(
switchMap(
() => fromPromise(fetch('/api/whatever')).pipe(
map(res => action)
)
),
catchError(() => {})
)
複製代碼
這樣就將異步邏輯嵌入到流當中。
咱們的核心思想是action in, action out,因此最終流出的action是要從新被store.dispatch消費的,因此action$是一個Observable對象。
同時,在dispatch的時候,action通過中間件,action也是一個observer。
所以,action$既是觀察者又是可觀察對象,是一個Subject對象:
替換中間件的簡單寫法,變成:
import { Subject } from 'rxjs/Subject';
export default (store) => {
const action$ = new Subject();
action$.subscribe(store.dispatch);
return next => (action) => {
const result = next(action);
action$.next(action);
return result;
};
};
複製代碼
在上面代碼中咱們在middleware中去放入action,而後經過訂閱,store會觸發dispatch。
可是,若是咱們就這麼寫的話,這是個死循環,由於任何action在進入到action去。
聰明的你應該想到了,咱們對於進入到action$的action尚未進行任何過濾,而這個過濾過程也正是咱們須要的處理異步邏輯的地方。
下面咱們要把這個步驟加上。
爲了達到action的一個轉化處理,咱們將這個過程抽離出來,這個中間處理的邏輯稱爲Epic,epic的形式大概能夠寫爲:
const epic = (action$) => {
return action$.pipe(
// 由於全部的action都會過來
// 因此咱們只須要處理咱們想要的aciton
filter(action => action.type === 'GET_USER'),
switchMap(
// 將fetch也轉化爲流
() => fromPromise(fetch('/api/user/get', {
method: 'POST',
body: {
id: 1
},
})).pipe(
map(res => ({ type: 'GET_USER_SUCCESS', payload: res })),
catchError(error => ({ type: 'GET_USER_FAILED', payload: error }))
)
)
)
}
複製代碼
epic本質是一個函數,在這個函數中,咱們在action$的基礎上,加入了管道控制,產生了另一個流,而這個流就是最終咱們要的,對action進行了控制的action流,上面的fetch只是一個例子,在這個管道中,你能夠處理任意的異步邏輯。
而咱們要作的就是將這個Epic,整合進剛纔的中間中。
作法也很簡單,咱們只須要將訂閱從action$換到新的流上就能夠了:
import { Subject } from 'rxjs/Subject';
export default (store) => {
const action$ = new Subject();
const newAction$ = epic(action$);
newAction$.subscribe(store.dispatch);
return next => (action) => {
const result = next(action);
action$.next(action);
return result;
};
};
複製代碼
這樣,action$在接收到新的action的時候,會流經epic定義的管道,而後纔出發dispatch
到此,咱們的中間件已經有初步處理異步邏輯的能力,可是,在現實中,咱們的異步邏輯不可能只有一個,因此epic是會有不少的,而store去訂閱的流只能是一個,因此這麼多的epic產生的流要合併成一個流。
合併流的操做,強大的RxJs天然是有安排的,相信你想到了操做符merge,咱們能夠提供一個combineEpics的函數:
export const combineEpics = (...epics) => {
const merger = (...args) => merge(
...epics.map((epic) => {
const output$ = epic(...args);
return output$;
})
);
return merger;
};
複製代碼
上面的代碼不難理解,combineEpics整合了全部傳入的epic,而後返回一個merger,這個merger是利用merge操做符,將全部的epic產生的流合併成一個流。
代碼形式爲:
const pingEpic = action$ => action$.pipe(
filter(action => action.type === 'ping'),
map(() => ({ type: 'pong' })),
);
const getUserEpic = action$ => action$.pipe(
filter(action => action.type === 'GET_USER'),
map(() => ({ type: 'GET_USER_SUCCESS', payload: { user: { name: 'kang' } } })),
);
const rootEpic = combineEpics(pingEpic, getUserEpic);
export default (store) => {
const action$ = new Subject();
const newAction$ = rootEpic(action$);
newAction$.subscribe(store.dispatch);
return next => (action) => {
const result = next(action);
action$.next(action);
return result;
};
};
複製代碼
在epic中咱們不可避免地要藉助state裏面的數據進行不一樣的處理,因此咱們是須要獲取到state的,因此你能夠在中間件中的epci執行函數中添加一個參數,將state獲取函數暴露出去:
export default (store) => {
...
const newAction$ = rootEpic(action$, store.getState);
...
};
複製代碼
這樣epic裏就能夠用getState()獲取state:
const pingEpic = (action$, getState) => action$.pipe(
filter(action => action.type === 'ping'),
map(() => ({ type: 'pong', payload: getState() })),
);
複製代碼
上面的作法是直接去獲取state,這樣的作法是主動獲取,不符合函數響應式編程模式。函數響應式中,state的改變狀態,應該是要能被觀察的。
當state也能被響應觀察,咱們就能夠作更多的功能,例如:當state的某些數據在發生變化的時候,咱們要去進行實時保存。
在傳統模式的作法中,你能夠在中間件中這樣寫:
export default store => next => action => {
const oldState = store.getState();
const result = next(action);
const newState = store.getState();
// 相似這樣的寫法
if (newState.xxx !== oldState.xxx) {
fetch('/api/save', {
method: 'POST',
body: {
}
}).then(() => {}).catch(() => {})
}
return result;
}
複製代碼
這個處理邏輯要獨立爲一箇中間件,而若是你將state也流化,你能夠直接使用epic這樣處理:
const saveEpic = (action$, state$) => state$.pipe(
const autoSaveEpic = (action$, state$) =>
return action$.pipe(
filter(action => action.type === 'AUTO_SAVE_ENABLE'), // 自動保存的開啓
exhaustMap(() => state$.pipe(
pluck('xxx'), // 獲取state.xxx
distinctUntilChanged(), // 先後值不一樣時纔將其發出。
concatMap((value) => {
// fetch to save
}),
// 自動保存的關閉
takeUntil(action$.pipe(
filter(action => action.type === 'AUTO_SAVE_DISABLE')
))
))
)
)
複製代碼
若是仔細閱讀這段代碼,能夠發現這樣的方式可使用很是優雅的方式控制這個自動保存,能夠和action$結合使用,快速開關自動保存,能夠利用RxJs的特性解決保存的異步執行延遲問題。
若是你只是單存想要獲取最新state,可使用withLatestFrom操做符:
const countEpic = (action$, state$) => action$.pipe(
filter(action => action.type === 'count'),
withLatestFrom(state$),
switchMap(([action, state]) => {
return of({ type: 'whatever' });
})
);
複製代碼
在中間件加入state流:
export default (store) => {
const action$ = new Subject();
const state$ = new Subject();
const source$ = rootEpic(action$, state$);
source$.subscribe(store.dispatch);
return next => (action) => {
const result = next(action);
state$.next(store.getState());
action$.next(action);
return result;
};
};
複製代碼
注意state最新值不是默認state。
若是你有耐心看到這裏,那麼說明你對於redux結合RxJs的使用已經理解得差很少了,可是這裏仍是有個問題,就是action的生效順序,咱們能夠直接看個例子說明,假設有下面這樣兩個epic:
const epic1 = action$ => action$.pipe(
filter(action => action.type === 'one'),
mergeMap(() => of({ type: 'two' }, { type: 'three' })),
);
const epic2 = action$ => action$.pipe(
filter(action => action.type === 'two'),
mergeMap(() => of({ type: 'four' })),
);
複製代碼
當 store.dispatch({ type: 'one' })
的時候,action的順序爲:
'one' -> 'two' -> 'four' -> 'three'
複製代碼
可見,action的執行順序並非如咱們預期的那樣,在two觸發後就發出了four,這是由於RxJs默認的調度器是同步的,用一段簡單的代碼,上面的效果相似於:
class Print {
constructor(name, nexts = []) {
this.name = name;
this.nexts = nexts;
}
print() {
console.log(this.name);
this.nexts.forEach((p) => {
p.print();
});
}
}
const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, four, three
複製代碼
換成上面的代碼的話你就不陌生了吧,也會對於輸出的結果表示確定,可是咱們須要的效果是
'one' -> 'two' -> 'three' -> 'four'
複製代碼
這如何作到?
明顯,須要將調度器換成其餘的,RxJs有這麼幾種調度器:null(同步)、asap、queue、async、animationFrame。最後一種是動畫場景的調度器,直接剔除,默認是第一種,那麼就剩下asap、queue、async。在這個場景下,這三種調度器都是可行的,可是queue在大量的數據的時候對於性能是有利的,因此這裏可使用它。不過,記住,這三種調度器是有區別的,你們有興趣的本身去google一下,只提示:asap是Micro Task、async是Macro Task、queue在延遲爲0的時候接近於同步,在延遲不爲0的時候與async同樣。
中間件中:
const action$ = new Subject().pipe(
observeOn(queue)
);
複製代碼
這樣獲得的結果就是:
'one' -> 'two' -> 'three' -> 'four'
複製代碼
若是用簡單的代碼,至關於發生了這樣的變化:
class Print {
constructor(name, nexts = []) {
this.name = name;
this.nexts = nexts;
}
print() {
console.log(this.name);
this.nexts.forEach((p) => {
setTimeout(() => p.print(), 0);
});
}
}
const three = new Print('three');
const four = new Print('four');
const two = new Print('two', [four]);
const one = new Print('one', [two, three]);
one.print(); // one, two, three, four
複製代碼
本文就講到這裏,此次介紹瞭如何本身實現一個redux-observable,下次會講redux-observable在實戰中的一些應用,例如怎麼相似dva那樣進行模塊化開發、如何統一處理loading、error等。