上一篇博客提到了幾種響應式的方案,以及它們的缺點。本文將介紹Observable
以及它的一個實現,以及它在處理響應式時相對於上篇博客中的方案的巨大優點(推薦兩篇博客對比閱讀)。react
Observable
是一個集合了觀察者模式、迭代器模式和函數式的庫,提供了基於事件流的強大的異步處理能力,而且已在Stage 1
草案中。本文介紹的Rxjs
是Observable
的一個實現,它是ReactiveX衆多語言中的JavaScript
版本。ajax
在JavaScript
中,咱們可使用T | null
去處理一個單值,使用Iterator
去處理多個值得狀況,使用Promise
處理異步的單個值,而Observable
則填補了缺失的「異步多個值」。redux
單個值 | 多個值 | |
---|---|---|
同步 | T | null |
Iterator<T> |
異步 | Promise<T> |
Observable<T> |
Rxjs
上文提到使用Event Emitter
作響應式處理,在Rxjs
中稍有些不一樣:安全
/* const change$ = new Subject(); <Input change$={change$} /> <Search change$={change$} /> */
class Input extends Component {
state = {
value: ''
};
onChange = e => {
this.props.change$.next(e.target.value);
};
componentDidMount() {
this.subscription = this.props.change$.subscribe(value => {
this.setState({
value
});
});
}
componentWillUnmount() {
this.subscription.ubsubscribe();
}
render() {
const { value } = this.state;
return <input value={value} onChange={this.onChange} />; } } class Search extends Component { // ... componentDidMount() { this.subscription = this.props.change$.subscribe(value => { ajax(/* ... */).then(list => this.setState({ list }) ); }); } componentWillUnmount() { this.subscription.ubsubscribe(); } render() { const { list } = this.state; return <ul>{list.map(item => <li key={item.id}>{item.value}</li>)}</ul>; } } 複製代碼
在這裏,咱們雖然也須要手動釋放對事件的訂閱,可是得益於Rxjs
的設計,咱們不須要像Event Emitter
那樣去存下回調函數的實例,用於釋放訂閱,所以咱們很容易就能夠經過高階組件解決這個問題。例如:併發
const withObservables = observables => ChildComponent => {
return class extends Component {
constructor(props) {
super(props);
this.subscriptions = {};
this.state = {};
Object.keys(observables).forEach(key => {
this.subscriptions[key] = observables[key].subscribe(value => {
this.setState({
[key]: value
});
});
});
}
onNext = (key, value) => {
observables[key].next(value);
};
componentWillUnmount() {
Object.keys(this.subscriptions).forEach(key => {
this.subscriptions[key].unsubscribe();
});
}
render() {
return (
<ChildComponent {...this.props} {...this.state} onNext={this.onNext} /> ); } }; }; 複製代碼
這樣在須要聚合多個數據源時,也不會像Event Emitter
那樣手動釋放資源形成麻煩。同時,在Rxjs
中咱們還有專用於聚合數據源的方法:異步
Observable.combineLatest(foo$, bar$)
.pipe(
// ...
);
複製代碼
顯然相對於Event Emitter
的方式十分高效,同時它相對於Mobx
也有巨大的優點。在Mobx
中,咱們提到須要聚合多個數據源的時候,採用autoRun
的方式容易收集到沒必要要的依賴,使用observe
則不夠高效。在Rxjs
中,顯然不會有這些問題,combineLatest
能夠以很簡練的方式聲明須要聚合的數據源,同時,得益於Rxjs
設計,咱們不須要像Mobx
一個一個去調用observe
返回的析構,只須要處理每個subscribe
返回的subscription
:函數
class Foo extends Component {
constructor(props) {
super(props);
this.subscription = Observable.combineLatest(foo$, bar$)
.pipe(
// ...
)
.subscribe(() => {
// ...
});
}
componentWillUnmount() {
this.subscription.unsubscribe();
}
}
複製代碼
Rxjs
使用操做符去描述各類行爲,每個操做符會返回一個新的Observable
,咱們能夠對它進行後續的操做。例如,使用map
操做符就能夠實現對數據的轉換:ui
foo$.map(event => event.target.value);
複製代碼
Rxjs 5.5
以後全部的Observable
上都引入了一個pipe
方法,接收若干個操做符,pipe
方法會返回一個Observable
。所以,咱們能夠很容易配合tree shaking
實現對操做符的按需引入,而不是把整個Rxjs
引入進來:this
import { map } from 'rxjs/operators';
foo$.pipe(map(event => event.target.value));
複製代碼
推薦使用這種寫法。spa
在討論面向對象的響應式的響應式中,咱們提到對於異步的問題,面向對象的方式很差處理。在Observable
中咱們能夠經過switchMap
操做符處理異步問題,一個異步搜索看起來會是這樣:
input$.pipe(switchMap(keyword => Observable.ajax(/* ... */)));
複製代碼
在處理異步單值時,咱們可使用Promise
,而Observable
用於處理異步多個值,咱們能夠很容易把一個Promise
轉成一個Observable
,從而複用已有的異步代碼:
input$.pipe(switchMap(keyword => fromPromise(search(/* ... */))));
複製代碼
switchMap
接受一個返回Observable
的函數做爲參數,下游的流就會切到這個返回的Observable
。 而要聚合多個數據源並作異步處理時:
combineLatest(foo$, bar$).pipe(
switchMap(keyword => fromPromise(someAsyncOperation(/* ... */)))
);
複製代碼
同時,因爲標準制定的Promise
是沒有cancel
方法的,有時候咱們要取消異步方法的時候就有些麻煩(主要是爲了解決一些併發安全問題)。switchMap
當上遊有新值到來時,會忽略結束已有未完成的Observable
而後調用函數返回一個新的Observable
,咱們只使用一個函數就解決了併發安全問題。固然,咱們能夠根據實際須要選用switchMap
、mergeMap
、concatMap
、exhaustMap
等。
而對於時間軸的操做,Rxjs
也有巨大優點。上篇博客中提到當咱們須要延時 5 秒作操做時,不管是Event Emitter
仍是面向對象的方式都力不從心,而在Rxjs
中咱們只須要一個delay
操做符便可解決問題:
input$.pipe(
delay(5000) // 下游會在input$值到來後5秒才接到數據
);
複製代碼
在實際開發過程當中,事件不能解決全部問題,咱們每每會須要存儲數據,而Observable
被設計成用於處理事件,所以它有不少符合事件直覺的設計。
Observable
被設計爲懶(lazy
)的,噹噹沒有訂閱者時,一個流不會執行。對於事件而言,沒有事件的消費者那麼不執行也不會有問題。而在 GUI 中,訂閱者多是View
:
class View extends Component {
state = {
input: ''
};
componentDidMount() {
this.subscription = input$.subscribe(input => {
this.setState({
input
});
});
}
componentWillUnmount() {
this.subscription.unsubscribe();
}
render() {
// ...
}
}
複製代碼
因爲這個View
可能不存在,例如路由被切走了,那麼咱們的事件源就沒有了訂閱者,他就不會運行。可是咱們但願在路由被且走後,後臺的數據依然會繼續。
對於事件而言,在事件發生以後的訂閱者不會受到訂閱以前的邏輯。例如在EventEmitter
中:
eventEmitter.emit('hello', 1);
// ...
eventEmitter.on('hello', function listener() {});
複製代碼
因爲listener
是在hello
事件發生後在監聽的,不會收到值爲1
的事件。可是這在處理數據的時候會形成麻煩,咱們的數據在View
被卸載(例如路由切走)後丟失。
同時,因爲Observable
沒有提供直接取到內部狀態的方法,當咱們使用Observable
處理數據時,咱們不方便隨時拿到數據。那有辦法解決這個問題,從而使Observable
強大抽象能力去賦能數據層呢?
回到Redux
。Redux
的事件(Action)實際上是一個事件流,那麼咱們就能夠很天然地把Redux
的事件流融入到Rxjs
流中:
() => next => {
const action$ = new Subject();
return action => {
action$.next(action);
// ...
};
};
複製代碼
經過這樣的封裝,redux-observable就能讓咱們把Observable
強大的事件描述和處理能力和Redux
結合。咱們能夠很是方便地根據Action
去處理反作用:
action$.pipe(
ofType('ACTION_1'),
switchMap(() => {
// ...
}),
map(res => ({
type: 'ACTION_2',
payload: res
}))
);
action$.pipe(
ofType('ACTION_3'),
mergeMap(() => {
// ...
}),
map(res => ({
type: 'ACTION_4',
payload: res
}))
);
複製代碼
Redux Observable
使咱們能夠結合Redux
和Observable
。在這裏,Action
被視做一個流,ofType
至關於filter(action => action.type === 'SOME_ACTION')
,從而獲得須要監聽的Action
,得益於Redux
的設計,咱們能夠經過監聽Action
去完成反作用的處理或者監聽數據變化。最後這個流返回一個新的Action
流,Redux Observable
會把這個新的Action
流中的Action
dispatch
出去。由此,咱們在使用Redux
存儲數據的基礎上得到了Rxjs
對異步事件的強大處理能力。
本文首發於(https://tech.youzan.com/reactive2/)[有贊技術博客]。