構建流式應用—RxJS詳解

最近在 Alloyteam Conf 2016 分享了《使用RxJS構建流式前端應用》,會後在線上線下跟你們交流時發現對於 RxJS 的態度呈現出兩大類:有用過的都表達了 RxJS 帶來的優雅編碼體驗,未用過的則反饋太難入門。因此,本文將結合本身對 RxJS 理解,經過 RxJS 的實現原理、基礎實現及實例來一步步分析,提供 RxJS 較爲全面的指引,感覺下使用 RxJS 編碼是怎樣的體驗。html

目錄

  • 常規方式實現搜索功能
  • RxJS · 流 Stream
  • RxJS 實現原理簡析
    • 觀察者模式
    • 迭代器模式
    • RxJS 的觀察者 + 迭代器模式
  • RxJS 基礎實現
    • Observable
    • Observer
  • RxJS · Operators
    • Operators ·入門
    • 一系列的 Operators 操做
  • 使用 RxJS 一步步實現搜索功能
  • 總結

常規方式實現搜索

作一個搜索功能在前端開發中其實並不陌生,通常的實現方式是:監聽文本框的輸入事件,將輸入內容發送到後臺,最終將後臺返回的數據進行處理並展現成搜索結果。前端



    var text = document.querySelector('#text');
    text.addEventListener('keyup', (e) =>{
        var searchText = e.target.value;
        // 發送輸入內容到後臺
        $.ajax({
            url: `search.qq.com/${searchText}`,
            success: data => {
              // 拿到後臺返回數據,並展現搜索結果
              render(data);
            }
        });
    });
複製代碼

上面代碼實現咱們要的功能,但存在兩個較大的問題:react

  1. 多餘的請求
    當想搜索「愛迪生」時,輸入框可能會存在三種狀況,「愛」、「愛迪」、「愛迪生」。而這三種狀況將會發起 3 次請求,存在 2 次多餘的請求。git

  2. 已無用的請求仍然執行
    一開始搜了「愛迪生」,而後立刻改搜索「達爾文」。結果後臺返回了「愛迪生」的搜索結果,執行渲染邏輯後結果框展現了「愛迪生」的結果,而不是當前正在搜索的「達爾文」,這是不正確的。github

減小多餘請求數,能夠用 setTimeout 函數節流的方式來處理,核心代碼以下ajax



    var text = document.querySelector('#text'),
        timer = null;
    text.addEventListener('keyup', (e) =>{
        // 在 250 毫秒內進行其餘輸入,則清除上一個定時器
        clearTimeout(timer);
        // 定時器,在 250 毫秒後觸發
        timer = setTimeout(() => {
            console.log('發起請求..');
        },250)
    })
複製代碼

已無用的請求仍然執行的解決方式,能夠在發起請求前聲明一個當前搜索的狀態變量,後臺將搜索的內容及結果一塊兒返回,前端判斷返回數據與當前搜索是否一致,一致才走到渲染邏輯。最終代碼爲編程



    var text = document.querySelector('#text'),
        timer = null,
        currentSearch = '';

    text.addEventListener('keyup', (e) =>{
        clearTimeout(timer)
        timer = setTimeout(() => {
            // 聲明一個當前所搜的狀態變量
            currentSearch = '書'; 

            var searchText = e.target.value;
            $.ajax({
                url: `search.qq.com/${searchText}`,
                success: data => {
                    // 判斷後臺返回的標誌與咱們存的當前搜索變量是否一致
                    if (data.search === currentSearch) {
                        // 渲染展現
                        render(data);
                    } else {
                        // ..
                    }
                }           
            });
        },250)
    })
複製代碼

上面代碼基本知足需求,但代碼開始顯得亂糟糟。咱們來使用 RxJS 實現上面代碼功能,以下api

var text = document.querySelector('#text');
var inputStream = Rx.Observable.fromEvent(text, 'keyup')
                    .debounceTime(250)
                    .pluck('target', 'value')
                    .switchMap(url => Http.get(url))
                    .subscribe(data => render(data));複製代碼

能夠明顯看出,基於 RxJS 的實現,代碼十分簡潔!數組

RxJS · 流 Stream

RxJS 是 Reactive Extensions for JavaScript 的縮寫,起源於 Reactive Extensions,是一個基於可觀測數據流在異步編程應用中的庫。RxJS 是 Reactive Extensions 在 JavaScript 上的實現,而其餘語言也有相應的實現,如 RxJava、RxAndroid、RxSwift 等。學習 RxJS,咱們須要從可觀測數據流(Streams)提及,它是 Rx 中一個重要的數據類型。框架

是在時間流逝的過程當中產生的一系列事件。它具備時間與事件響應的概念。

rxjs_stream

下雨天時,雨滴隨時間推移逐漸產生,下落時對水面產生了水波紋的影響,這跟 Rx 中的流是很相似的。而在 Web 中,雨滴可能就是一系列的鼠標點擊、鍵盤點擊產生的事件或數據集合等等。

RxJS 基礎實現原理簡析

對流的概念有必定理解後,咱們來說講 RxJS 是怎麼圍繞着流的概念來實現的,講講 RxJS 的基礎實現原理。RxJS 是基於觀察者模式和迭代器模式以函數式編程思惟來實現的。

觀察者模式

觀察者模式在 Web 中最多見的應該是 DOM 事件的監聽和觸發。

  • 訂閱:經過 addEventListener 訂閱 document.body 的 click 事件。
  • 發佈:當 body 節點被點擊時,body 節點便會向訂閱者發佈這個消息。
document.body.addEventListener('click', function listener(e) {
    console.log(e);
},false);

document.body.click(); // 模擬用戶點擊複製代碼

將上述例子抽象模型,並對應通用的觀察者模型

2016-11-01 9 53 52

迭代器模式

迭代器模式能夠用 JavaScript 提供了 Iterable Protocol 可迭代協議來表示。Iterable Protocol 不是具體的變量類型,而是一種可實現協議。JavaScript 中像 Array、Set 等都屬於內置的可迭代類型,能夠經過 iterator 方法來獲取一個迭代對象,調用迭代對象的 next 方法將獲取一個元素對象,以下示例。

var iterable = [1, 2];

var iterator = iterable[Symbol.iterator]();

iterator.next(); // => { value: "1", done: false}
iterator.next(); // => { value: "2", done: false}

iterator.next(); // => { value: undefined, done: true}複製代碼

元素對象中:value 表示返回值,done 表示是否已經到達最後。

遍歷迭代器可使用下面作法。

var iterable = [1, 2];
var iterator = iterable[Symbol.iterator]();

var iterator = iterable();

while(true) {
    try {
        let result = iterator.next();  // 
    } catch (err) {
        handleError(err);  // 
    }
    if (result.done) {
        handleCompleted();  // 
        break;
    }
    doSomething(result.value);
}複製代碼

主要對應三種狀況:

  • 獲取下一個值
    調用 next 能夠將元素一個個地返回,這樣就支持了返回屢次值。

  • 無更多值(已完成)
    當無更多值時,next 返回元素中 done 爲 true。

  • 錯誤處理
    當 next 方法執行時報錯,則會拋出 error 事件,因此能夠用 try catch 包裹 next 方法處理可能出現的錯誤。

RxJS 的觀察者 + 迭代器模式

RxJS 中含有兩個基本概念:Observables 與 Observer。Observables 做爲被觀察者,是一個值或事件的流集合;而 Observer 則做爲觀察者,根據 Observables 進行處理。
Observables 與 Observer 之間的訂閱發佈關係(觀察者模式) 以下:

  • 訂閱:Observer 經過 Observable 提供的 subscribe() 方法訂閱 Observable。
  • 發佈:Observable 經過回調 next 方法向 Observer 發佈事件。

下面爲 Observable 與 Observer 的僞代碼

// Observer
var Observer = {
    next(value) {
        alert(`收到${value}`);
    }
};

// Observable
function Observable (Observer) {
    setTimeout(()=>{
        Observer.next('A');
    },1000)
}

// subscribe
Observable(Observer);複製代碼

上面實際也是觀察者模式的表現,那麼迭代器模式在 RxJS 中如何體現呢?

在 RxJS 中,Observer 除了有 next 方法來接收 Observable 的事件外,還能夠提供了另外的兩個方法:error() 和 complete(),與迭代器模式一一對應。

var Observer = {
    next(value) { /* 處理值*/ },
    error(error) { /* 處理異常 */ },
    complete() { /* 處理已完成態 */ }
};複製代碼

結合迭代器 Iterator 進行理解:

  • next()
    Observer 提供一個 next 方法來接收 Observable 流,是一種 push 形式;而 Iterator 是經過調用 iterator.next() 來拿到值,是一種 pull 的形式。

  • complete()
    當再也不有新的值發出時,將觸發 Observer 的 complete 方法;而在 Iterator 中,則須要在 next 的返回結果中,當返回元素 done 爲 true 時,則表示 complete。

  • error()
    當在處理事件中出現異常報錯時,Observer 提供 error 方法來接收錯誤進行統一處理;Iterator 則須要進行 try catch 包裹來處理可能出現的錯誤。

下面是 Observable 與 Observer 實現觀察者 + 迭代器模式的僞代碼,數據的逐漸傳遞傳遞與影響其實就是流的表現。

// Observer
var Observer = {
    next(value) {
        alert(`收到${value}`);
    },
    error(error) {
        alert(`收到${value}`);
    },
    complete() {
        alert("complete");
    },
};

// Observable
function Observable (Observer) {
    [1,2,3].map(item=>{
        Observer.next(item);
    });

    Observer.complete();
    // Observer.error("error message");
}

// subscribe
Observable(Observer);複製代碼

RxJS 基礎實現

有了上面的概念及僞代碼,那麼在 RxJS 中是怎麼建立 Observable 與 Observer 的呢?

建立 Observable

RxJS 提供 create 的方法來自定義建立一個 Observable,可使用 next 來發出流。

var Observable = Rx.Observable.create(observer => {
    observer.next(2);
    observer.complete();
    return  () => console.log('disposed');
});複製代碼

建立 Observer

Observer 能夠聲明 next、err、complete 方法來處理流的不一樣狀態。

var Observer = Rx.Observer.create(
    x => console.log('Next:', x),
    err => console.log('Error:', err),
    () => console.log('Completed')
);複製代碼

最後將 Observable 與 Observer 經過 subscribe 訂閱結合起來。

var subscription = Observable.subscribe(Observer);複製代碼

RxJS 中流是能夠被取消的,調用 subscribe 將返回一個 subscription,能夠經過調用 subscription.unsubscribe() 將流進行取消,讓流再也不產生。

看了起來挺複雜的?換一個實現形式:

// @Observables 建立一個 Observables
var streamA = Rx.Observable.of(2);

// @Observer streamA$.subscribe(Observer)
streamA.subscribe(v => console.log(v));複製代碼

將上面代碼改用鏈式寫法,代碼變得十分簡潔:

Rx.Observable.of(2).subscribe(v => console.log(v));複製代碼

RxJS · Operators 操做

Operators 操做·入門

Rx.Observable.of(2).subscribe(v => console.log(v));複製代碼

上面代碼至關於建立了一個流(2),最終打印出2。那麼若是想將打印結果翻倍,變成4,應該怎麼處理呢?

方案一?: 改變事件源,讓 Observable 值 X 2

Rx.Observable.of(2 * 2 /* ).subscribe(v => console.log(v));複製代碼

方案二?: 改變響應方式,讓 Observer 處理 X 2

Rx.Observable.of(2).subscribe(v => console.log(v * 2 /* ));複製代碼

優雅方案: RxJS 提供了優雅的處理方式,能夠在事件源(Observable)與響應者(Observer)之間增長操做流的方法。

Rx.Observable.of(2)
             .map(v => v * 2) /* 
             .subscribe(v => console.log(v));複製代碼

map 操做跟數組操做的做用是一致的,不一樣的這裏是將流進行改變,而後將新的流傳出去。在 RxJS 中,把這類操做流的方式稱之爲 Operators(操做)。RxJS提供了一系列 Operators,像map、reduce、filter 等等。操做流將產生新流,從而保持流的不可變性,這也是 RxJS 中函數式編程的一點體現。關於函數式編程,這裏暫很少講,能夠看看另一篇文章 《談談函數式編程》

到這裏,咱們知道了,流從產生到最終處理,可能通過的一些操做。即 RxJS 中 Observable 將通過一系列 Operators 操做後,到達 Observer。

Operator1   Operator2
Observable ----|-----------|-------> Observer複製代碼

一系列的 Operators 操做

RxJS 提供了很是多的操做,像下面這些。

Aggregate,All,Amb,ambArray,ambWith,AssertEqual,averageFloat,averageInteger,averageLong,blocking,blockingFirst,blockingForEach,blockingSubscribe,Buffer,bufferWithCount,bufferWithTime,bufferWithTimeOrCount,byLine,cache,cacheWithInitialCapacity,case,Cast,Catch,catchError,catchException,collect,concatWith,Connect,connect_forever,cons,Contains,doAction,doAfterTerminate,doOnComplete,doOnCompleted,doOnDispose,doOnEach,doOnError,doOnLifecycle,doOnNext,doOnRequest,dropUntil,dropWhile,ElementAt,ElementAtOrDefault,emptyObservable,fromNodeCallback,fromPromise,fromPublisher,fromRunnable,Generate,generateWithAbsoluteTime,generateWithRelativeTime,Interval,intervalRange,into,latest (Rx.rb version of Switch),length,mapTo,mapWithIndex,Materialize,Max,MaxBy,mergeArray,mergeArrayDelayError,mergeWith,Min,MinBy,multicastWithSelector,nest,Never,Next,Next (BlockingObservable version),partition,product,retryWhen,Return,returnElement,returnValue,runAsync,safeSubscribe,take_with_time,takeFirst,TakeLast,takeLastBuffer,takeLastBufferWithTime,windowed,withFilter,withLatestFrom,zipIterable,zipWith,zipWithIndex複製代碼

關於每個操做的含義,能夠查看官網進行了解。operators 具備靜態(static)方法和實例( instance)方法,下面使用 Rx.Observable.xx 和 Rx.Observable.prototype.xx 來簡單區分,舉幾個例子。

Rx.Observable.of
of 能夠將普通數據轉換成流式數據 Observable。如上面的 Rx.Observable.of(2)。

Rx.Observable.fromEvent
除了數值外,RxJS 還提供了關於事件的操做,fromEvent 能夠用來監聽事件。當事件觸發時,將事件 event 轉成可流動的 Observable 進行傳輸。下面示例表示:監聽文本框的 keyup 事件,觸發 keyup 能夠產生一系列的 event Observable。

var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
             .subscribe(e => console.log(e));複製代碼

Rx.Observable.prototype.map
map 方法跟咱們日常使用的方式是同樣的,不一樣的只是這裏是將流進行改變,而後將新的流傳出去。上面示例已有涉及,這裏再也不多講。

Rx.Observable.of(2)
             .map(v => 10 * v)
             .subscribe(v => console.log(v));複製代碼

Rx 提供了許多的操做,爲了更好的理解各個操做的做用,咱們能夠經過一個可視化的工具 marbles 圖 來輔助理解。如 map 方法對應的 marbles 圖以下

map

箭頭能夠理解爲時間軸,上面的數據通過中間的操做,轉變成下面的模樣。

Rx.Observable.prototype.mergeMap
mergeMap 也是 RxJS 中經常使用的接口,咱們來結合 marbles 圖(flatMap(alias))來理解它

rxjs_flatmap

上面的數據流中,產生了新的分支流(流中流),mergeMap 的做用則是將分支流調整回主幹上,最終分支上的數據流都通過主幹的其餘操做,其實也是將流中流進行扁平化。

Rx.Observable.prototype.switchMap
switchMap 與 mergeMap 都是將分支流疏通到主幹上,而不一樣的地方在於 switchMap 只會保留最後的流,而取消拋棄以前的流。

除了上面提到的 marbles,也能夠 ASCII 字符的方式來繪製可視化圖表,下面將結合 Map、mergeMap 和 switchMap 進行對比來理解。

@Map             @mergeMap            @switchMap
                         ↗  ↗                 ↗  ↗
-A------B-->           a2 b2                a2 b2  
-2A-----2B->          /  /                 /  /  
                    /  /                 /  /
                  a1 b1                a1 b1
                 /  /                 /  /
                -A-B----------->     -A-B---------->
                --a1-b1-a2-b2-->     --a1-b1---b2-->複製代碼

mergeMap 和 switchMap 中,A 和 B 是主幹上產生的流,a一、a2 爲 A 在分支上產生,b一、b2 爲 B 在分支上產生,可看到,最終將歸併到主幹上。switchMap 只保留最後的流,因此將 A 的 a2 拋棄掉。

Rx.Observable.prototype.debounceTime
debounceTime 操做能夠操做一個時間戳 TIMES,表示通過 TIMES 毫秒後,沒有流入新值,那麼纔將值轉入下一個操做。

rxjs_debounce

RxJS 中的操做符是知足咱們之前的開發思惟的,像 map、reduce 這些。另外,不管是 marbles 圖仍是用 ASCII 字符圖這些可視化的方式,都對 RxJS 的學習和理解有很是大的幫助。

使用 RxJS 一步步實現搜索示例

RxJS 提供許多建立流或操做流的接口,應用這些接口,咱們來一步步將搜索的示例進行 Rx 化。

使用 RxJS 提供的 fromEvent 接口來監聽咱們輸入框的 keyup 事件,觸發 keyup 將產生 Observable。

var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
             .subscribe(e => console.log(e));複製代碼

這裏咱們並不想輸出事件,而想拿到文本輸入值,請求搜索,最終渲染出結果。涉及到兩個新的 Operators 操做,簡單理解一下:

  • Rx.Observable.prototype.pluck('target', 'value')
    將輸入的 event,輸出成 event.target.value。

  • Rx.Observable.prototype.mergeMap()
    將請求搜索結果輸出回給 Observer 上進行渲染。

var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
             .pluck('target', 'value') // 
             .mergeMap(url => Http.get(url)) // 
             .subscribe(data => render(data))複製代碼

上面代碼實現了簡單搜索呈現,但一樣存在一開始說起的兩個問題。那麼如何減小請求數,以及取消已無用的請求呢?咱們來了解 RxJS 提供的其餘 Operators 操做,來解決上述問題。

  • Rx.Observable.prototype.debounceTime(TIMES)
    表示通過 TIMES 毫秒後,沒有流入新值,那麼纔將值轉入下一個環節。這個與前面使用 setTimeout 來實現函數節流的方式有一致效果。

  • Rx.Observable.prototype.switchMap()
    使用 switchMap 替換 mergeMap,將能取消上一個已無用的請求,只保留最後的請求結果流,這樣就確保處理展現的是最後的搜索的結果。

最終實現以下,與一開始的實現進行對比,能夠明顯看出 RxJS 讓代碼變得十分簡潔。

var text = document.querySelector('#text');
Rx.Observable.fromEvent(text, 'keyup')
             .debounceTime(250) // 
             .pluck('target', 'value')
             .switchMap(url => Http.get(url)) // 
             .subscribe(data => render(data))複製代碼

總結

本篇做爲 RxJS 入門篇到這裏就結束,關於 RxJS 中的其餘方面內容,後續再拎出來進一步分析學習。
RxJS 做爲一個庫,能夠與衆多框架結合使用,但並非每一種場合都須要使用到 RxJS。複雜的數據來源,異步多的狀況下才能更好凸顯 RxJS 做用,這一塊能夠看看民工叔寫的《流動的數據——使用 RxJS 構造複雜單頁應用的數據邏輯》 相信會有更好的理解。點擊查看更多文章>>

附:
RxJS(JavaScript) github.com/Reactive-Ex…
RxJS(TypeScript ) github.com/ReactiveX/r…

相關文章
相關標籤/搜索