最近在 Alloyteam Conf 2016 分享了《使用RxJS構建流式前端應用》,會後在線上線下跟你們交流時發現對於 RxJS 的態度呈現出兩大類:有用過的都表達了 RxJS 帶來的優雅編碼體驗,未用過的則反饋太難入門。因此,本文將結合本身對 RxJS 理解,經過 RxJS 的實現原理、基礎實現及實例來一步步分析,提供 RxJS 較爲全面的指引,感覺下使用 RxJS 編碼是怎樣的體驗。html
作一個搜索功能在前端開發中其實並不陌生,通常的實現方式是:監聽文本框的輸入事件,將輸入內容發送到後臺,最終將後臺返回的數據進行處理並展現成搜索結果。前端
var text = document.querySelector('#text');
text.addEventListener('keyup', (e) =>{
var searchText = e.target.value;
// 發送輸入內容到後臺
$.ajax({
url: `search.qq.com/${searchText}`,
success: data => {
// 拿到後臺返回數據,並展現搜索結果
render(data);
}
});
});
複製代碼
上面代碼實現咱們要的功能,但存在兩個較大的問題:react
多餘的請求
當想搜索「愛迪生」時,輸入框可能會存在三種狀況,「愛」、「愛迪」、「愛迪生」。而這三種狀況將會發起 3 次請求,存在 2 次多餘的請求。git
已無用的請求仍然執行
一開始搜了「愛迪生」,而後立刻改搜索「達爾文」。結果後臺返回了「愛迪生」的搜索結果,執行渲染邏輯後結果框展現了「愛迪生」的結果,而不是當前正在搜索的「達爾文」,這是不正確的。github
減小多餘請求數,能夠用 setTimeout 函數節流的方式來處理,核心代碼以下ajax
var text = document.querySelector('#text'),
timer = null;
text.addEventListener('keyup', (e) =>{
// 在 250 毫秒內進行其餘輸入,則清除上一個定時器
clearTimeout(timer);
// 定時器,在 250 毫秒後觸發
timer = setTimeout(() => {
console.log('發起請求..');
},250)
})
複製代碼
已無用的請求仍然執行的解決方式,能夠在發起請求前聲明一個當前搜索的狀態變量,後臺將搜索的內容及結果一塊兒返回,前端判斷返回數據與當前搜索是否一致,一致才走到渲染邏輯。最終代碼爲編程
var text = document.querySelector('#text'),
timer = null,
currentSearch = '';
text.addEventListener('keyup', (e) =>{
clearTimeout(timer)
timer = setTimeout(() => {
// 聲明一個當前所搜的狀態變量
currentSearch = '書';
var searchText = e.target.value;
$.ajax({
url: `search.qq.com/${searchText}`,
success: data => {
// 判斷後臺返回的標誌與咱們存的當前搜索變量是否一致
if (data.search === currentSearch) {
// 渲染展現
render(data);
} else {
// ..
}
}
});
},250)
})
複製代碼
上面代碼基本知足需求,但代碼開始顯得亂糟糟。咱們來使用 RxJS 實現上面代碼功能,以下api
var text = document.querySelector('#text');
var inputStream = Rx.Observable.fromEvent(text, 'keyup')
.debounceTime(250)
.pluck('target', 'value')
.switchMap(url => Http.get(url))
.subscribe(data => render(data));複製代碼
能夠明顯看出,基於 RxJS 的實現,代碼十分簡潔!數組
RxJS 是 Reactive Extensions for JavaScript 的縮寫,起源於 Reactive Extensions,是一個基於可觀測數據流在異步編程應用中的庫。RxJS 是 Reactive Extensions 在 JavaScript 上的實現,而其餘語言也有相應的實現,如 RxJava、RxAndroid、RxSwift 等。學習 RxJS,咱們須要從可觀測數據流(Streams)提及,它是 Rx 中一個重要的數據類型。框架
流是在時間流逝的過程當中產生的一系列事件。它具備時間與事件響應的概念。
下雨天時,雨滴隨時間推移逐漸產生,下落時對水面產生了水波紋的影響,這跟 Rx 中的流是很相似的。而在 Web 中,雨滴可能就是一系列的鼠標點擊、鍵盤點擊產生的事件或數據集合等等。
對流的概念有必定理解後,咱們來說講 RxJS 是怎麼圍繞着流的概念來實現的,講講 RxJS 的基礎實現原理。RxJS 是基於觀察者模式和迭代器模式以函數式編程思惟來實現的。
觀察者模式在 Web 中最多見的應該是 DOM 事件的監聽和觸發。
document.body.addEventListener('click', function listener(e) {
console.log(e);
},false);
document.body.click(); // 模擬用戶點擊複製代碼
將上述例子抽象模型,並對應通用的觀察者模型
迭代器模式能夠用 JavaScript 提供了 Iterable Protocol 可迭代協議來表示。Iterable Protocol 不是具體的變量類型,而是一種可實現協議。JavaScript 中像 Array、Set 等都屬於內置的可迭代類型,能夠經過 iterator 方法來獲取一個迭代對象,調用迭代對象的 next 方法將獲取一個元素對象,以下示例。
var iterable = [1, 2];
var iterator = iterable[Symbol.iterator]();
iterator.next(); // => { value: "1", done: false}
iterator.next(); // => { value: "2", done: false}
iterator.next(); // => { value: undefined, done: true}複製代碼
元素對象中:value 表示返回值,done 表示是否已經到達最後。
遍歷迭代器可使用下面作法。
var iterable = [1, 2];
var iterator = iterable[Symbol.iterator]();
var iterator = iterable();
while(true) {
try {
let result = iterator.next(); //
} catch (err) {
handleError(err); //
}
if (result.done) {
handleCompleted(); //
break;
}
doSomething(result.value);
}複製代碼
主要對應三種狀況:
獲取下一個值
調用 next 能夠將元素一個個地返回,這樣就支持了返回屢次值。
無更多值(已完成)
當無更多值時,next 返回元素中 done 爲 true。
錯誤處理
當 next 方法執行時報錯,則會拋出 error 事件,因此能夠用 try catch 包裹 next 方法處理可能出現的錯誤。
RxJS 中含有兩個基本概念:Observables 與 Observer。Observables 做爲被觀察者,是一個值或事件的流集合;而 Observer 則做爲觀察者,根據 Observables 進行處理。
Observables 與 Observer 之間的訂閱發佈關係(觀察者模式) 以下:
下面爲 Observable 與 Observer 的僞代碼
// Observer
var Observer = {
next(value) {
alert(`收到${value}`);
}
};
// Observable
function Observable (Observer) {
setTimeout(()=>{
Observer.next('A');
},1000)
}
// subscribe
Observable(Observer);複製代碼
上面實際也是觀察者模式的表現,那麼迭代器模式在 RxJS 中如何體現呢?
在 RxJS 中,Observer 除了有 next 方法來接收 Observable 的事件外,還能夠提供了另外的兩個方法:error() 和 complete(),與迭代器模式一一對應。
var Observer = {
next(value) { /* 處理值*/ },
error(error) { /* 處理異常 */ },
complete() { /* 處理已完成態 */ }
};複製代碼
結合迭代器 Iterator 進行理解:
next()
Observer 提供一個 next 方法來接收 Observable 流,是一種 push 形式;而 Iterator 是經過調用 iterator.next() 來拿到值,是一種 pull 的形式。
complete()
當再也不有新的值發出時,將觸發 Observer 的 complete 方法;而在 Iterator 中,則須要在 next 的返回結果中,當返回元素 done 爲 true 時,則表示 complete。
error()
當在處理事件中出現異常報錯時,Observer 提供 error 方法來接收錯誤進行統一處理;Iterator 則須要進行 try catch 包裹來處理可能出現的錯誤。
下面是 Observable 與 Observer 實現觀察者 + 迭代器模式的僞代碼,數據的逐漸傳遞傳遞與影響其實就是流的表現。
// Observer
var Observer = {
next(value) {
alert(`收到${value}`);
},
error(error) {
alert(`收到${value}`);
},
complete() {
alert("complete");
},
};
// Observable
function Observable (Observer) {
[1,2,3].map(item=>{
Observer.next(item);
});
Observer.complete();
// Observer.error("error message");
}
// subscribe
Observable(Observer);複製代碼
有了上面的概念及僞代碼,那麼在 RxJS 中是怎麼建立 Observable 與 Observer 的呢?
RxJS 提供 create 的方法來自定義建立一個 Observable,可使用 next 來發出流。
var Observable = Rx.Observable.create(observer => {
observer.next(2);
observer.complete();
return () => console.log('disposed');
});複製代碼
Observer 能夠聲明 next、err、complete 方法來處理流的不一樣狀態。
var Observer = Rx.Observer.create(
x => console.log('Next:', x),
err => console.log('Error:', err),
() => console.log('Completed')
);複製代碼
最後將 Observable 與 Observer 經過 subscribe 訂閱結合起來。
var subscription = Observable.subscribe(Observer);複製代碼
RxJS 中流是能夠被取消的,調用 subscribe 將返回一個 subscription,能夠經過調用 subscription.unsubscribe() 將流進行取消,讓流再也不產生。
看了起來挺複雜的?換一個實現形式:
// @Observables 建立一個 Observables
var streamA = Rx.Observable.of(2);
// @Observer streamA$.subscribe(Observer)
streamA.subscribe(v => console.log(v));複製代碼
將上面代碼改用鏈式寫法,代碼變得十分簡潔:
Rx.Observable.of(2).subscribe(v => console.log(v));複製代碼
Rx.Observable.of(2).subscribe(v => console.log(v));複製代碼
上面代碼至關於建立了一個流(2),最終打印出2。那麼若是想將打印結果翻倍,變成4,應該怎麼處理呢?
方案一?: 改變事件源,讓 Observable 值 X 2
Rx.Observable.of(2 * 2 /* ).subscribe(v => console.log(v));複製代碼
方案二?: 改變響應方式,讓 Observer 處理 X 2
Rx.Observable.of(2).subscribe(v => console.log(v * 2 /* ));複製代碼
優雅方案: RxJS 提供了優雅的處理方式,能夠在事件源(Observable)與響應者(Observer)之間增長操做流的方法。
Rx.Observable.of(2)
.map(v => v * 2) /*
.subscribe(v => console.log(v));複製代碼
map 操做跟數組操做的做用是一致的,不一樣的這裏是將流進行改變,而後將新的流傳出去。在 RxJS 中,把這類操做流的方式稱之爲 Operators(操做)。RxJS提供了一系列 Operators,像map、reduce、filter 等等。操做流將產生新流,從而保持流的不可變性,這也是 RxJS 中函數式編程的一點體現。關於函數式編程,這裏暫很少講,能夠看看另一篇文章 《談談函數式編程》
到這裏,咱們知道了,流從產生到最終處理,可能通過的一些操做。即 RxJS 中 Observable 將通過一系列 Operators 操做後,到達 Observer。
Operator1 Operator2
Observable ----|-----------|-------> Observer複製代碼
RxJS 提供了很是多的操做,像下面這些。
Aggregate,All,Amb,ambArray,ambWith,AssertEqual,averageFloat,averageInteger,averageLong,blocking,blockingFirst,blockingForEach,blockingSubscribe,Buffer,bufferWithCount,bufferWithTime,bufferWithTimeOrCount,byLine,cache,cacheWithInitialCapacity,case,Cast,Catch,catchError,catchException,collect,concatWith,Connect,connect_forever,cons,Contains,doAction,doAfterTerminate,doOnComplete,doOnCompleted,doOnDispose,doOnEach,doOnError,doOnLifecycle,doOnNext,doOnRequest,dropUntil,dropWhile,ElementAt,ElementAtOrDefault,emptyObservable,fromNodeCallback,fromPromise,fromPublisher,fromRunnable,Generate,generateWithAbsoluteTime,generateWithRelativeTime,Interval,intervalRange,into,latest (Rx.rb version of Switch),length,mapTo,mapWithIndex,Materialize,Max,MaxBy,mergeArray,mergeArrayDelayError,mergeWith,Min,MinBy,multicastWithSelector,nest,Never,Next,Next (BlockingObservable version),partition,product,retryWhen,Return,returnElement,returnValue,runAsync,safeSubscribe,take_with_time,takeFirst,TakeLast,takeLastBuffer,takeLastBufferWithTime,windowed,withFilter,withLatestFrom,zipIterable,zipWith,zipWithIndex複製代碼
關於每個操做的含義,能夠查看官網進行了解。operators 具備靜態(static)方法和實例( instance)方法,下面使用 Rx.Observable.xx 和 Rx.Observable.prototype.xx 來簡單區分,舉幾個例子。
Rx.Observable.of
of 能夠將普通數據轉換成流式數據 Observable。如上面的 Rx.Observable.of(2)。
Rx.Observable.fromEvent
除了數值外,RxJS 還提供了關於事件的操做,fromEvent 能夠用來監聽事件。當事件觸發時,將事件 event 轉成可流動的 Observable 進行傳輸。下面示例表示:監聽文本框的 keyup 事件,觸發 keyup 能夠產生一系列的 event Observable。
var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
.subscribe(e => console.log(e));複製代碼
Rx.Observable.prototype.map
map 方法跟咱們日常使用的方式是同樣的,不一樣的只是這裏是將流進行改變,而後將新的流傳出去。上面示例已有涉及,這裏再也不多講。
Rx.Observable.of(2)
.map(v => 10 * v)
.subscribe(v => console.log(v));複製代碼
Rx 提供了許多的操做,爲了更好的理解各個操做的做用,咱們能夠經過一個可視化的工具 marbles 圖 來輔助理解。如 map 方法對應的 marbles 圖以下
箭頭能夠理解爲時間軸,上面的數據通過中間的操做,轉變成下面的模樣。
Rx.Observable.prototype.mergeMap
mergeMap 也是 RxJS 中經常使用的接口,咱們來結合 marbles 圖(flatMap(alias))來理解它
上面的數據流中,產生了新的分支流(流中流),mergeMap 的做用則是將分支流調整回主幹上,最終分支上的數據流都通過主幹的其餘操做,其實也是將流中流進行扁平化。
Rx.Observable.prototype.switchMap
switchMap 與 mergeMap 都是將分支流疏通到主幹上,而不一樣的地方在於 switchMap 只會保留最後的流,而取消拋棄以前的流。
除了上面提到的 marbles,也能夠 ASCII 字符的方式來繪製可視化圖表,下面將結合 Map、mergeMap 和 switchMap 進行對比來理解。
@Map @mergeMap @switchMap
↗ ↗ ↗ ↗
-A------B--> a2 b2 a2 b2
-2A-----2B-> / / / /
/ / / /
a1 b1 a1 b1
/ / / /
-A-B-----------> -A-B---------->
--a1-b1-a2-b2--> --a1-b1---b2-->複製代碼
mergeMap 和 switchMap 中,A 和 B 是主幹上產生的流,a一、a2 爲 A 在分支上產生,b一、b2 爲 B 在分支上產生,可看到,最終將歸併到主幹上。switchMap 只保留最後的流,因此將 A 的 a2 拋棄掉。
Rx.Observable.prototype.debounceTime
debounceTime 操做能夠操做一個時間戳 TIMES,表示通過 TIMES 毫秒後,沒有流入新值,那麼纔將值轉入下一個操做。
RxJS 中的操做符是知足咱們之前的開發思惟的,像 map、reduce 這些。另外,不管是 marbles 圖仍是用 ASCII 字符圖這些可視化的方式,都對 RxJS 的學習和理解有很是大的幫助。
RxJS 提供許多建立流或操做流的接口,應用這些接口,咱們來一步步將搜索的示例進行 Rx 化。
使用 RxJS 提供的 fromEvent 接口來監聽咱們輸入框的 keyup 事件,觸發 keyup 將產生 Observable。
var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
.subscribe(e => console.log(e));複製代碼
這裏咱們並不想輸出事件,而想拿到文本輸入值,請求搜索,最終渲染出結果。涉及到兩個新的 Operators 操做,簡單理解一下:
Rx.Observable.prototype.pluck('target', 'value')
將輸入的 event,輸出成 event.target.value。
Rx.Observable.prototype.mergeMap()
將請求搜索結果輸出回給 Observer 上進行渲染。
var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
.pluck('target', 'value') //
.mergeMap(url => Http.get(url)) //
.subscribe(data => render(data))複製代碼
上面代碼實現了簡單搜索呈現,但一樣存在一開始說起的兩個問題。那麼如何減小請求數,以及取消已無用的請求呢?咱們來了解 RxJS 提供的其餘 Operators 操做,來解決上述問題。
Rx.Observable.prototype.debounceTime(TIMES)
表示通過 TIMES 毫秒後,沒有流入新值,那麼纔將值轉入下一個環節。這個與前面使用 setTimeout 來實現函數節流的方式有一致效果。
Rx.Observable.prototype.switchMap()
使用 switchMap 替換 mergeMap,將能取消上一個已無用的請求,只保留最後的請求結果流,這樣就確保處理展現的是最後的搜索的結果。
最終實現以下,與一開始的實現進行對比,能夠明顯看出 RxJS 讓代碼變得十分簡潔。
var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
.debounceTime(250) //
.pluck('target', 'value')
.switchMap(url => Http.get(url)) //
.subscribe(data => render(data))複製代碼
本篇做爲 RxJS 入門篇到這裏就結束,關於 RxJS 中的其餘方面內容,後續再拎出來進一步分析學習。
RxJS 做爲一個庫,能夠與衆多框架結合使用,但並非每一種場合都須要使用到 RxJS。複雜的數據來源,異步多的狀況下才能更好凸顯 RxJS 做用,這一塊能夠看看民工叔寫的《流動的數據——使用 RxJS 構造複雜單頁應用的數據邏輯》 相信會有更好的理解。點擊查看更多文章>>
附:
RxJS(JavaScript) github.com/Reactive-Ex…
RxJS(TypeScript ) github.com/ReactiveX/r…