RxJS 是響應式編程 (reactive programming) 強大的工具,今天咱們將深刻介紹 Observables 和 Observers 的內容,以及介紹如何建立本身的操做符 (operators)。javascript
若是你以前已經使用過 RxJS,並但願瞭解 Observable 及 Operators (操做符) 的內部工做原理,那麼這篇文章很是適合你。html
Observable 就是一個擁有如下特性的函數:java
它接收一個 observer
對象做爲參數,該對象中包含 next
、error
和 complete
方法node
它返回一個函數,用於在銷燬 Observable 時,執行清理操做react
在咱們實現的示例中,咱們將定義一個簡單的 unsubscribe
函數來實現取消訂閱的功能。然而在 RxJS 中,返回的是 Subcription
對象,該對象中包含一個 unsubscribe
方法。es6
一個 Observable 對象設置觀察者 (observer),並將它與生產者關聯起來。該生產者多是 DOM 元素產生的 click
或 input
事件,也多是更復雜的事件,如 HTTP。typescript
爲了更好地理解 Observable,咱們來自定義 Observable。首先,咱們先來看一個訂閱的例子:編程
const node = document.querySelector('input[type=text]'); const input$ = Rx.Observable.fromEvent(node, 'input'); input$.subscribe({ next: (event) => console.log(`You just typed ${event.target.value}!`), error: (err) => console.log(`Oops... ${err}`), complete: () => console.log(`Complete!`) });
該示例中,Rx.Observable.formEvent()
方法接收一個 input
元素和事件名做爲參數,而後返回一個 $input
Observable 對象。接下來咱們使用 subscribe()
方法來定於該 Observable 對象。當觸發 input
事件後,對應的值將會傳遞給 observer 對象。segmentfault
Observer (觀察者) 很是簡單,在上面的示例中,觀察者是一個普通的對象,該對象會做爲 subscribe()
方法的參數。此外 subscribe(next, error, complete)
也是一個有效的語法,但在本文中咱們將討論對象字面量的形式。安全
當 Observable 對象產生新值的時候,咱們能夠經過調用 next()
方法來通知對應的觀察者。若出現異常,則會調用觀察者的 error()
方法。當咱們訂閱 Observable 對象後,只要有新的值,都會通知對應的觀察者。但在如下兩種狀況下,新的值不會再通知對應的觀察者:
已調用 observer 對象的 complete()
方法
消費者對數據再也不感興趣,執行取消訂閱操做
此外在執行最終的 subscribe()
訂閱操做前,咱們傳遞的值能夠通過一系列的鏈式處理操做。執行對應操做的東西叫操做符,每一個操做符執行完後會返回一個新的 Observable 對象,而後繼續咱們的處理流程。
正如上面所說的,Observable 對象可以執行鏈式操做,具體以下所示:
const input$ = Rx.Observable.fromEvent(node, 'input') .map(event => event.target.value) .filter(value => value.length >= 2) .subscribe(value => { // use the `value` });
上面代碼的執行流程以下:
假設用戶在輸入框中輸入字符 a
Observable 對象響應對應的 input
事件,而後把值傳遞給 observer
map()
操做符返回一個新的 Observable 對象
filter()
操做符執行過濾操做,而後又返回一個新的 Observable 對象
最後咱們經過調用 subscribe()
方法,來獲取最終的值
簡而言之,Operator 就是一個函數,它接收一個 Observable 對象,而後返回一個新的 Observable 對象。當咱們訂閱新返回的 Observable 對象時,它內部會自動訂閱前一個 Observable 對象。
function Observable(subscribe) { this.subscribe = subscribe; }
每一個 subscribe
回調函數被賦值給 this.subscribe
屬性,該回調函數將會被咱們或其它 Observable 對象調用。
在咱們深刻介紹前,咱們先來看一個簡單的示例。以前咱們已經建立完 Observable
函數,如今咱們能夠調用咱們的觀察者 (observer),而後傳遞數值 1,而後訂閱它:
const one$ = new Observable((observer) => { observer.next(1); observer.complete(); }); one$.subscribe({ next: (value) => console.log(value) // 1 });
即咱們訂閱咱們建立的 Observable 實例,而後經過 subscribe()
方法調用經過構造函數設置的回調函數。
下面就是咱們須要的基礎結構,即在 Observable 對象上須要新增一個靜態方法 fromEvent
:
Observable.fromEvent = (element, name) => { };
接下來咱們將參考 RxJS 爲咱們提供的方法來實現自定義的 fromEvent()
方法:
const node = document.querySelector('input'); const input$ = Observable.fromEvent(node, 'input');
按照上面的使用方式,咱們的 fromEvent()
方法須要接收兩個參數,同時須要返回一個新的 Observable 對象,具體以下:
Observable.fromEvent = (element, name) => { return new Observable((observer) => { }); };
接下來咱們來實現事件監聽功能:
Observable.fromEvent = (element, name) => { return new Observable((observer) => { element.addEventListener(name, (event) => {}, false); }); };
那麼咱們的 observer
參數來自哪裏? 其實 observer
對象就是包含 next
、error
和 complete
方法的對象字面量。
須要注意的是,咱們的 observer 參數不會被傳遞,直到
subscribe()
方法被調用。這意味着addEventListener()
方法不會被調用,除非你訂閱該 Observable 對象。
當咱們調用 subscribe()
方法,以前設置的 this.subscribe
回調函數會被調用,對應的參數是咱們定義的 observer 對象字面量,接下來將使用新的值,做爲 next()
方法的參數,調用該方法。
很好,那接下來咱們要作什麼?以前版本咱們只是設置了監聽,但沒有調用 observer 對象的 next()
方法,接下來讓咱們來修復這個問題:
Observable.fromEvent = (element, name) => { return new Observable((observer) => { element.addEventListener(name, (event) => { observer.next(event); }, false); }); };
如你所知,當銷燬 Observables 對象時,須要調用一個函數用來執行清理操做。針對目前的場景,在銷燬時咱們須要移除事件監聽:
Observable.fromEvent = (element, name) => { return new Observable((observer) => { const callback = (event) => observer.next(event); element.addEventListener(name, callback, false); return () => element.removeEventListener(name, callback, false); }); };
咱們沒有調用 complete()
方法,由於該 Observable 對象處理的 DOM 相關的事件,在時間維度上它們多是無終止的。
如今讓咱們來驗證一下最終實現的功能:
const node = document.querySelector('input'); const p = document.querySelector('p'); function Observable(subscribe) { this.subscribe = subscribe; } Observable.fromEvent = (element, name) => { return new Observable((observer) => { const callback = (event) => observer.next(event); element.addEventListener(name, callback, false); return () => element.removeEventListener(name, callback, false); }); }; const input$ = Observable.fromEvent(node, 'input'); const unsubscribe = input$.subscribe({ next: (event) => { p.innerHTML = event.target.value; } }); // automatically unsub after 5s setTimeout(unsubscribe, 5000);
建立咱們本身的操做符應該會更容易一些,如今咱們瞭解 Observable
和 Observable
背後的概念。咱們將在 Observable 的原型對象上添加一個方法:
Observable.prototype.map = function (mapFn) { };
該方法的功能與 JavaScript 中的 Array.prototype.map
方法相似:
const input$ = Observable.fromEvent(node, 'input') .map(event => event.target.value);
因此咱們須要應用回調函數並調用它,這用於獲取咱們所須要的數據。在咱們這樣作以前,咱們須要流中的最新值。這裏是巧妙的部分,在 map()
操做符中,咱們須要訪問 Observable
實例。由於 map
方法在原型上,咱們能夠經過如下方式訪問 Observable 實例:
Observable.prototype.map = function (mapFn) { const input = this; };
接下來咱們在返回的 Observable 對象中執行 input
對象的訂閱操做:
Observable.prototype.map = function(mapFn) { const input = this; return new Observable((observer) => { return input.subscribe(); }); };
咱們返回了
input.subscribe()
方法執行的結果,由於當咱們執行取消訂閱操做時,將會依次調用每一個 Observable 對象取消訂閱的方法。
最後咱們來完善一下 map
操做符的內部代碼:
Observable.prototype.map = function (mapFn) { const input = this; return new Observable((observer) => { return input.subscribe({ next: (value) => observer.next(mapFn(value)), error: (err) => observer.error(err), complete: () => observer.complete() }); }); };
如今咱們已經能夠執行鏈式操做了:
const input$ = Observable.fromEvent(node, 'input') .map(event => event.target.value); input$.subscribe({ next: (value) => { p.innerHTML = value; } });
Observable(可觀察對象)是基於推送(Push)運行時執行(lazy)的多值集合。
MagicQ | 單值 | 多值 |
---|---|---|
拉取(Pull) | 函數 | 遍歷器 |
推送(Push) | Promise | Observable |
Promise
返回單個值
不可取消的
Observable
隨着時間的推移發出多個值
能夠取消的
支持 map、filter、reduce 等操做符
延遲執行,當訂閱的時候纔會開始執行
上面的示例中,咱們使用一個包含了 next、error、complete 方法的普通 JavaScript 對象來定義觀察者。一個普通的 JavaScript 對象只是一個開始,在 RxJS 5 裏面,爲開發者提供了一些保障機制,來保證一個更安全的觀察者。如下是一些比較重要的原則:
傳入的 Observer
對象能夠不實現全部規定的方法 (next、error、complete 方法)
在 complete
或者 error
觸發以後再調用 next
方法是沒用的
調用 unsubscribe
方法後,任何方法都不能再被調用了
complete
和 error
觸發後,unsubscribe
也會自動調用
當 next
、complete
和error
出現異常時,unsubscribe
也會自動調用以保證資源不會浪費
next
、complete
和error
是可選的。按需處理便可,沒必要所有處理
爲了完成上述目標,咱們得把傳入的匿名 Observer
對象封裝在一個 SafeObserver
裏以提供上述保障。
若想進一步瞭解詳細信息,請參考 Observable詳解 文章中 "自定義 Observable" 章節的內容。