上次給你們分享了cycle.js的內容,這個框架核心模塊的代碼其實只有一百多行,要理解這個看似複雜的框架,其實最核心的是理解它依賴的異步數據流處理框架——rx.js。今天,給你們分享一下rx.js的實現原理,你們有興趣能夠繼續研究它的源碼,會讓你對異步和響應式編程有更深的理解,進而將rx.js、cycle.js或者僅僅是函數式、響應式編程的思想融入到本身手裏的業務中。
爲了更好地理解rx.js,須要先談談異步編程的實現方案。web
makeHttpCall('/items', items => { for (itemId of items) { makeHttpCall(`/items/${itemId}/info`, itemInfo => { makeHttpCall(`/items/${itemInfo.pic}`, img => { showImg(img); }); }); } }); beginUiRendering();
一旦你須要多塊數據時你就陷入了流行的」末日金字塔「或者回調地獄。這段代碼有不少的問題。 其中之一就是風格。當你在這些嵌套的回調函數中添加愈來愈多的邏輯,這段代碼就會變得很複雜很難理解。由於循環還產生了一個更加細微的問題。for循環是同步的控制流語句,這並不能很好的配合異步調用,由於會有延遲,這可能會產生很奇怪的bug。express
makeHttpCall('/items') .then(itemId => makeHttpCall(`/items/${itemId}/info`)) .then(itemInfo => makeHttpCall(`/items/${itemInfo}.pic}`)) .then(showImg);
鏈式調用毫無疑問是一個進步,理解這段代碼的難度顯著降低。然而,儘管Promises在處理這種單值(或單個錯誤)時很是高效,它有也一些侷限性。Promises在處理用戶連續輸入的數據流時效率怎麼樣呢? 這時Promises處理起來也並不高效,由於它沒有事件的刪除、分配、重試等等的語法定義。編程
使用async/await,配合Promise能夠以同步的方式編寫異步代碼,是我如今最喜歡也最經常使用的異步編程方式。好比:上述實如今請求加載完圖片後再顯示圖片的邏輯,也能夠這樣實現:promise
const showLoadedImg = async () => { let getImgInfo = makeHttpCall('/items') .then(itemId => makeHttpCall(`/items/${itemId}/info`)) .then(itemInfo => makeHttpCall(`/items/${itemInfo}.pic}`)); let loadedImg = await getImgInfo; showImg(loadedImg); }
generator我用的不多,以前用過一段時間koa(express原班團隊搞的基於generator的現代web開發框架),不太喜歡代碼在各個模塊間跳來跳去的編寫思路。整個 Generator 函數就是一個封裝的異步任務,或者說是異步任務的容器。異步操做須要暫停的地方,都用 yield 語句註明。調用 Generator 函數,會返回一個內部指針(即遍歷器 )g,調用指針 g 的 next 方法,會移動內部指針(即執行異步任務的第一段),指向第一個遇到的 yield 語句。下面貼一段koa中間件級聯的代碼,你們感覺一下:數據結構
var koa = require('koa'); var app = koa(); // x-response-time app.use(function *(next){ var start = new Date; yield next; var ms = new Date - start; this.set('X-Response-Time', ms + 'ms'); }); // logger app.use(function *(next){ var start = new Date; yield next; var ms = new Date - start; console.log('%s %s - %s', this.method, this.url, ms); }); // response app.use(function *(){ this.body = 'Hello World'; }); app.listen(3000);
上面的例子在頁面中返回 "Hello World",然而當請求開始時,請求先通過 x-response-time 和 logging 中間件,並記錄中間件執行起始時間。 而後將控制權交給 reponse 中間件。當中間件運行到 yield next 時,函數掛起並將控制前交給下一個中間件。當沒有中間件執行 yield next 時,程序棧會逆序喚起被掛起的中間件來執行接下來的代碼。app
RxJS是一個解決異步問題的JS開發庫.它起源於 Reactive Extensions 項目,它帶來了觀察者模式和函數式編程的相結合的最佳實踐。 觀察者模式是一個被實踐證實的模式,基於生產者(事件的建立者)和消費者(事件的監聽者)的邏輯分離關係。框架
何況函數式編程方式的引入,如說明性編程,不可變數據結構,鏈式方法調用會使你極大的簡化代碼量。(和回調代碼方式說再見吧)koa
若是你熟悉函數式編程,請把RxJS理解爲異步化的Underscore.js。異步
RxJS 引入了一個重要的數據類型——流(stream)。async
觀察者模式在 Web 中最多見的應該是 DOM 事件的監聽和觸發。
訂閱:經過 addEventListener 訂閱 document.body 的 click 事件。
發佈:當 body 節點被點擊時,body 節點便會向訂閱者發佈這個消息。
document.body.addEventListener('click', function listener(e) { console.log(e); },false); document.body.click(); // 模擬用戶點擊
迭代器模式能夠用 JavaScript 提供了 Iterable Protocol 可迭代協議來表示。Iterable Protocol 不是具體的變量類型,而是一種可實現協議。JavaScript 中像 Array、Set 等都屬於內置的可迭代類型,能夠經過 iterator 方法來獲取一個迭代對象,調用迭代對象的 next 方法將獲取一個元素對象,以下示例。
var iterable = [1, 2]; var iterator = iterable[Symbol.iterator](); iterator.next(); // => { value: "1", done: false} iterator.next(); // => { value: "2", done: false} iterator.next(); // => { value: undefined, done: true}
元素對象中:value 表示返回值,done 表示是否已經到達最後。
遍歷迭代器可使用下面作法。
var iterable = [1, 2]; var iterator = iterable[Symbol.iterator](); while(true) { let result; try { result = iterator.next(); // <= 獲取下一個值 } catch (err) { handleError(err); // <= 錯誤處理 } if (result.done) { handleCompleted(); // <= 無更多值(已完成) break; } doSomething(result.value); }
主要對應三種狀況:
獲取下一個值
調用 next 能夠將元素一個個地返回,這樣就支持了返回屢次值。
無更多值(已完成)
當無更多值時,next 返回元素中 done 爲 true。
錯誤處理
當 next 方法執行時報錯,則會拋出 error 事件,因此能夠用 try catch 包裹 next 方法處理可能出現的錯誤。
RxJS 中含有兩個基本概念:Observables 與 Observer。Observables 做爲被觀察者,是一個值或事件的流集合;而 Observer 則做爲觀察者,根據 Observables 進行處理。
Observables 與 Observer 之間的訂閱發佈關係(觀察者模式) 以下:
訂閱:Observer 經過 Observable 提供的 subscribe() 方法訂閱 Observable。
發佈:Observable 經過回調 next 方法向 Observer 發佈事件。
下面爲 Observable 與 Observer 的僞代碼:
// Observer var Observer = { next(value) { alert(`收到${value}`); } }; // Observable function Observable (Observer) { setTimeout(()=>{ Observer.next('A'); },1000) } // subscribe Observable(Observer);
上面實際也是觀察者模式的表現,那麼迭代器模式在 RxJS 中如何體現呢?
在 RxJS 中,Observer 除了有 next 方法來接收 Observable 的事件外,還能夠提供了另外的兩個方法:error() 和 complete(),與迭代器模式一一對應。
var Observer = { next(value) { /* 處理值*/ }, error(error) { /* 處理異常 */ }, complete() { /* 處理已完成態 */ } };
結合迭代器 Iterator 進行理解:
next()
Observer 提供一個 next 方法來接收 Observable 流,是一種 push 形式;而 Iterator 是經過調用 iterator.next() 來拿到值,是一種 pull 的形式。
complete()
當再也不有新的值發出時,將觸發 Observer 的 complete 方法;而在 Iterator 中,則須要在 next 的返回結果中,當返回元素 done 爲 true 時,則表示 complete。
error()
當在處理事件中出現異常報錯時,Observer 提供 error 方法來接收錯誤進行統一處理;Iterator 則須要進行 try catch 包裹來處理可能出現的錯誤。
下面是 Observable 與 Observer 實現觀察者 + 迭代器模式的僞代碼,數據的逐漸傳遞傳遞與影響其實就是流的表現。
// Observer var Observer = { next(value) { alert(`收到${value}`); }, error(error) { alert(`收到${value}`); }, complete() { alert("complete"); }, }; // Observable function Observable (Observer) { [1,2,3].map(item=>{ Observer.next(item); }); Observer.complete(); // Observer.error("error message"); } // subscribe Observable(Observer);
有了上面的概念及僞代碼,那麼在 RxJS 中是怎麼建立 Observable 與 Observer 的呢?
建立 Observable
RxJS 提供 create 的方法來自定義建立一個 Observable,可使用 next 來發出流。
var Observable = Rx.Observable.create(observer => { observer.next(2); observer.complete(); return () => console.log('disposed'); });
建立 Observer
Observer 能夠聲明 next、err、complete 方法來處理流的不一樣狀態。
var Observer = Rx.Observer.create( x => console.log('Next:', x), err => console.log('Error:', err), () => console.log('Completed') );
最後將 Observable 與 Observer 經過 subscribe 訂閱結合起來。
var subscription = Observable.subscribe(Observer);
關於rx.js的使用和API就再也不贅述了,理解了其實現原理,使用起來就很簡單了!