RxJS是一個基於可觀測數據流在異步編程應用中的庫。javascript
ReactiveX is a combination of the best ideas from
the Observer pattern, the Iterator pattern, and functional programming
正如官網所說,RxJS是基於觀察者模式,迭代器模式和函數式編程。所以,首先要對這幾個模式有所理解php
window.addEventListener('click', function(){ console.log('click!'); })
JS的事件監聽就是天生的觀察者模式。給window的click事件(被觀察者)綁定了一個listener(觀察者),當事件發生,回調函數就會被觸發css
<!-- more -->html
迭代器模式,提供一種方法順序訪問一個聚合對象中的各類元素,而又不暴露該對象的內部表示。前端
ES6裏的Iterator便可實現:java
let arr = ['a', 'b', 'c']; let iter = arr[Symbol.iterator](); iter.next() // { value: 'a', done: false } iter.next() // { value: 'b', done: false } iter.next() // { value: 'c', done: false } iter.next() // { value: undefined, done: true }
反覆調用迭代對象的next
方法,便可順序訪問git
提到函數式編程,就要提到聲明式編程和命令式編程
函數式編程是聲明式編程的體現github
問題:將數組[1, 2, 3]
的每一個元素乘以2,而後計算總和。ajax
命令式編程編程
const arr = [1, 2, 3]; let total = 0; for(let i = 0; i < arr.length; i++) { total += arr[i] * 2; }
聲明式編程
const arr = [1, 2, 3]; let total = arr.map(x => x * 2).reduce((total, value) => total + value)
聲明式的特色是專一於描述結果自己,不關注到底怎麼到達結果。而命令式就是真正實現結果的步驟
聲明式編程把原始數據通過一系列轉換(map, reduce),最後獲得想要的數據
如今前端流行的MVC框架(Vue,React,Angular),也都是提倡:編寫UI結構時使用聲明式編程,在編寫業務邏輯時使用命令式編程
RxJS裏有兩個重要的概念須要咱們理解:Observable
(可觀察對象)Observer
(觀察者)
var btn = document.getElementById('btn'); var handler = function() { console.log('click'); } btn.addEventListener('click', handler)
上面這個例子裏:btn
這個DOM元素的click
事件就是一個Observablehandler
這個函數就是一個Observer,當btn的click事件被觸發,就會調用該函數
改用RxJS編寫;
Rx.Observable.fromEvent(btn, 'click') .subscribe(() => console.log('click'));
fromEvent
把一個event轉成了一個Observable
,而後它就能夠被訂閱subscribe
了
Observable其實就是數據流stream
流是在時間流逝的過程當中產生的一系列事件。它具備時間與事件響應的概念。
咱們能夠把一切輸入都當作數據流來處理,好比說:
當產生了一個流後,咱們能夠經過操做符(Operator)對這個流進行一系列加工操做,而後產生一個新的流
Rx.Observable.fromEvent(window, 'click') .map(e => 1) .scan((total, now) => total + now) .subscribe(value => { console.log(value) })
map
把流轉換成了一個每次產生1的新流,而後scan
相似reduce
,也會產生一個新流,最後這個流被訂閱。最終實現了:每次點擊累加1的效果
能夠用一個效果圖來表示該過程:
也能夠對若干個數據流進行組合:
例子:咱們要實現下面這個效果:
Rx.Observable.fromEvent(document.querySelector('input[name=plus]'), 'click') .mapTo(1) .merge( Rx.Observable.fromEvent(document.querySelector('input[name=minus]'), 'click') .mapTo(-1) ) .scan((total, now) => total + now) .subscribe(value => { document.querySelector('#counter').innerText = value; })
merge
能夠把兩個數據流整個在一塊兒,效果能夠參考以下:
剛纔那個例子的數據流以下:
以RxJS的寫法,就是把按下加1當成一個數據流,把按下減1當成一個數據流,再經過merge把兩個數據流合併,最後經過scan
操做符,把新流上的數據累加,這就是咱們想要的計數器效果
有時候,咱們的Observable送出的是一個新的Observable:
var click = Rx.Observable.fromEvent(document.body, 'click'); var source = click.map(e => Rx.Observable.of(1, 2, 3)); source.subscribe(value => { console.log(value) });
這裏,console打印出來的是對象,而不是咱們想要的1,2,3,這是由於map
返回的Rx.Observable.of(1, 2, 3)
自己也是個Observable
用圖表示以下:
click : ------c------------c-------- map(e => Rx.Observable.of(1,2,3)) source : ------o------------o-------- \ \ (123)| (123)|
所以,咱們訂閱到的value值就是一個Observable對象,而不是普通數據1,2,3
我想要的其實不是Observable自己,而是屬於這個Observable裏面的那些東西,如今這個情形就是Observable裏面又有Observable,有兩層,但是我想要讓它變成一層就好,該怎麼辦呢?
這就須要把Observable扁平化
const arr = [1, [2, 3], 4]; // 扁平化後: const flatArr = [1, 2, 3, 4];
concatAll
這個操做符就能夠把Observable扁平化
var click = Rx.Observable.fromEvent(document.body, 'click'); var source = click.map(e => Rx.Observable.of(1, 2, 3)); var example = source.concatAll(); example.subscribe(value => { console.log(value) })
click : ------c------------c-------- map(e => Rx.Observable.of(1,2,3)) source : ------o------------o-------- \ \ (123)| (123)| concatAll() example: ------(123)--------(123)------------
flatMap
操做符也能夠實現一樣的做用,就是寫法有些不一樣:
var click = Rx.Observable.fromEvent(document.body, 'click'); var source = click.flatMap(e => Rx.Observable.of(1, 2, 3)); source.subscribe(value => { console.log(value) })
click : ------c------------c-------- flatMap(e => Rx.Observable.of(1,2,3)) source: ------(123)--------(123)------------
學完前面幾個操做符,咱們就能夠寫一個簡單的實例了
拖拽的原理是:
<style type="text/css"> html, body { height: 100%; background-color: tomato; position: relative; } #drag { position: absolute; display: inline-block; width: 100px; height: 100px; background-color: #fff; cursor: all-scroll; } </style> <div id="drag"></div>
const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown'); const mouseUp = Rx.Observable.fromEvent(body, 'mouseup'); const mouseMove = Rx.Observable.fromEvent(body, 'mousemove');
首先給出3個Observable,分別表明3種事件,咱們但願mousedown的時候監聽mousemove,而後mouseup時中止監聽,因而RxJS能夠這麼寫:
const source = mouseDown .map(event => mouseMove.takeUntil(mouseUp))
takeUntil
操做符能夠在某個條件符合時,發送complete
事件
source: -------e--------------e----- \ \ --m-m-m-m| -m--m-m--m-m|
從圖上能夠看出,咱們還須要把source扁平化,才能獲取所需數據。
完整代碼:
const dragDOM = document.getElementById('drag'); const body = document.body; const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown'); const mouseUp = Rx.Observable.fromEvent(body, 'mouseup'); const mouseMove = Rx.Observable.fromEvent(body, 'mousemove'); mouseDown .flatMap(event => mouseMove.takeUntil(mouseUp)) .map(event => ({ x: event.clientX, y: event.clientY })) .subscribe(pos => { dragDOM.style.left = pos.x + 'px'; dragDOM.style.top = pos.y + 'px'; })
前面的例子,咱們都在討論fromEvent
轉換的Observable,其實還有不少種方法產生一個Observable
,其中create
也是一種常見的方法,能夠用來建立自定義的Observable
var observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); observer.next(3); setTimeout(() => { observer.next(4); observer.complete(); }, 1000); }); console.log('just before subscribe'); observable.subscribe({ next: x => console.log('got value ' + x), error: err => console.error('something wrong occurred: ' + err), complete: () => console.log('done'), }); console.log('just after subscribe');
控制檯執行的結果:
just before subscribe got value 1 got value 2 got value 3 just after subscribe got value 4 done
Observable 執行能夠傳遞三種類型的值:
"Next" 通知: 發送一個值,好比數字、字符串、對象,等等。
"Error" 通知: 發送一個 JavaScript 錯誤 或 異常。
"Complete" 通知: 再也不發送任何值。
"Next" 通知是最重要,也是最多見的類型:它們表示傳遞給觀察者的實際數據。"Error" 和 "Complete" 通知可能只會在 Observable 執行期間發生一次,而且只會執行其中的一個。
var observable = Rx.Observable.create(function subscribe(observer) { try { observer.next(1); observer.next(2); observer.next(3); observer.complete(); } catch (err) { observer.error(err); // 若是捕獲到異常會發送一個錯誤 } });
Observer觀察者只是一組回調函數的集合,每一個回調函數對應一種 Observable 發送的通知類型:next、error 和 complete 。
var observer = { next: x => console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification'), };
Observer和Observable是經過subscribe方法創建聯繫的
observable.subscribe(observer);
observer訂閱了Observable以後,還能夠取消訂閱
var observable = Rx.Observable.from([10, 20, 30]); var subscription = observable.subscribe(x => console.log(x)); // 稍後: subscription.unsubscribe();
unsubscribe陷阱:
let stream$ = new Rx.Observable.create((observer) => { let i = 0; let id = setInterval(() => { console.log('setInterval'); observer.next(i++); },1000) }) let subscription = stream$.subscribe((value) => { console.log('Value', value) }); setTimeout(() => { subscription.unsubscribe(); }, 3000)
3秒後雖然取消了訂閱,可是開啓的setInterval定時器並不會自動清理,咱們須要本身返回一個清理函數
let stream$ = new Rx.Observable.create((observer) => { let i = 0; let id = setInterval(() => { observer.next(i++); },1000) // 返回了一個清理函數 return function(){ clearInterval( id ); } }) let subscription = stream$.subscribe((value) => { console.log('Value', value) }); setTimeout(() => { subscription.unsubscribe() // 在這咱們調用了清理函數 }, 3000)
<input type="text">
function sendRequest(search) { return Rx.Observable.ajax.getJSON(`http://deepred5.com/cors.php?search=${search}`) .map(response => response) } Rx.Observable.fromEvent(document.querySelector('input'), 'keyup') .map(e => e.target.value) .flatMap(search => sendRequest(search)) .subscribe(value => { console.log(value) })
用戶每次在input框每次進行輸入,均會觸發ajax請求,而且每一個ajax返回的值都會被打印一遍
如今須要實現這樣一個功能:
但願用戶在300ms之內中止輸入,才發送請求(防抖),而且console打印出來的值只要最近的一個ajax返回的
Rx.Observable.fromEvent(document.querySelector('input'), 'keyup') .debounceTime(300) .map(e => e.target.value) .switchMap(search => sendRequest(search)) .subscribe(value => { console.log(value) })
debounceTime
表示通過n毫秒後,沒有流入新值,那麼纔將值轉入下一個環節switchMap
能取消上一個已無用的請求,只保留最後的請求結果流,這樣就確保處理展現的是最後的搜索的結果
能夠看到,RxJS對異步的處理是很是優秀的,對異步的結果能進行各類複雜的處理和篩選。
Redux的action都是同步的,因此默認狀況下也只能處理同步數據流。
爲了生成異步action,處理異步數據流,有許多不一樣的解決方案,例如 redux-thunk、redux-promise、redux-saga 等等。
以redux-thunk舉例:
調用一個異步API,首先要先定義三個同步action構造函數,分別表示
而後再定義一個異步action構造函數,該函數再也不是返回普通的對象,而是返回一個函數,在這個函數裏,進行ajax異步操做,而後根據返回的成功和失敗,分別調用前面定義的同步action
actions.js
export const FETCH_STARTED = 'WEATHER/FETCH_STARTED'; export const FETCH_SUCCESS = 'WEATHER/FETCH_SUCCESS'; export const FETCH_FAILURE = 'WEATHER/FETCH_FAILURE'; // 普通action構造函數,返回普通對象 export const fetchWeatherStarted = () => ({ type: FETCH_STARTED }); export const fetchWeatherSuccess = (result) => ({ type: FETCH_SUCCESS, result }) export const fetchWeatherFailure = (error) => ({ type: FETCH_FAILURE, error }) // 異步action構造函數,返回一個函數 export const fetchWeather = (cityCode) => { return (dispatch) => { const apiUrl = `/data/cityinfo/${cityCode}.html`; dispatch(fetchWeatherStarted()) return fetch(apiUrl).then((response) => { if (response.status !== 200) { throw new Error('Fail to get response with status ' + response.status); } response.json().then((responseJson) => { dispatch(fetchWeatherSuccess(responseJson.weatherinfo)); }).catch((error) => { dispatch(fetchWeatherFailure(error)); }); }).catch((error) => { dispatch(fetchWeatherFailure(error)); }) }; }
如今若是想要異步請求,只要:
// fetchWeather是個異步action構造函數 dispatch(fetchWeather('23333'));
咱們再來看看redux-observable
:
調用一個異步API,再也不須要定義一個異步action構造函數,全部的action構造函數都只是返回普通的對象
那麼ajax請求在哪裏發送?
答案是在Epic進行異步操做
Epic是redux-observable的核心原語。
它是一個函數,接收 actions 流做爲參數而且返回 actions 流。 Actions 入, actions 出.
export const FETCH_STARTED = 'WEATHER/FETCH_STARTED'; export const FETCH_SUCCESS = 'WEATHER/FETCH_SUCCESS'; export const FETCH_FAILURE = 'WEATHER/FETCH_FAILURE'; export const fetchWeather = cityCode => ({ type: FETCH_STARTED, cityCode }); export const fetchWeatherSuccess = result => ( { type: FETCH_SUCCESS, result }; ); export const fetchWeatherFailure = (error) => ( { type: FETCH_FAILURE, error } ) export const fetchWeatherEpic = action$ => action$.ofType(FETCH_STARTED) .mergeMap(action => ajax.getJSON(`/data/cityinfo/${action.cityCode}.html`) .map(response => fetchWeatherSuccess(response.weatherinfo)) // 這個處理異常的action必須使用Observable.of方法轉爲一個observable .catch(error => Observable.of(fetchWeatherFailure(error))) );
如今若是想要異步請求,只要:
// fetchWeather只是個普通的action構造函數 dispatch(fetchWeather('23333'));
相較於thunk中間件,使用redux-observable來處理異步action,有如下優勢:
原生JS傳統解決異步的方式:callback、Generator、Promise、async/await
RxJS解決的是數據流的問題,它可讓批量數據處理起來更方便
能夠想象的一些使用場景:
能夠看出,這種須要對流進行復雜操做的場景更加適合RxJS
公司內部目前的大部分系統,前端就可能不太適合用RxJS,由於大部分是後臺CRUD系統,總體性、實時性的要求都不高,而且也沒有特別複雜的數據流操做
咱們推薦在適合RxJS的地方用RxJS,可是不強求RxJS for everything。RxJS給了咱們另外一種思考和解決問題的方式,但這不必定是必要的