瀏覽新版,請訪問 RxJS Observable
在介紹 Observable 以前,咱們要先了解兩個設計模式:javascript
這兩個模式是 Observable 的基礎,下面咱們先來介紹一下 Observer Pattern。html
觀察者模式是 軟件設計模式的一種。在此種模式中,一個目標對象管理全部相依於它的觀察者對象,而且在它自己的狀態改變時主動發出通知。這一般透過呼叫各觀察者所提供的方法來實現。此種模式一般被用來實時事件處理系統。 — 維基百科
觀察者模式又叫發佈訂閱模式(Publish/Subscribe),它定義了一種一對多的關係,讓多個觀察者對象同時監聽某一個主題對象,這個主題對象的狀態發生變化時就會通知全部的觀察者對象,使得它們可以自動更新本身。前端
咱們可使用平常生活中,期刊訂閱的例子來形象地解釋一下上面的概念。期刊訂閱包含兩個主要的角色:期刊出版方和訂閱者,他們之間的關係以下:java
在觀察者模式中也有兩個主要角色:Subject (主題) 和 Observer (觀察者) 。它們分別對應例子中的期刊出版方和訂閱者。接下來咱們來看張圖,從而加深對上面概念的理解。react
觀察者模式的優勢:git
觀察者模式的缺點:es6
在前端領域,觀察者模式被普遍地使用。最多見的例子就是爲 DOM 對象添加事件監聽,具體示例以下:github
<button id="btn">確認</button> function clickHandler(event) { console.log('用戶已點擊確認按鈕!'); } document.getElementById("btn").addEventListener('click', clickHandler);
上面代碼中,咱們經過 addEventListener API 監聽 button 對象上的點擊事件,當用戶點擊按鈕時,會自動執行咱們的 clickHandler
函數。算法
Subject 類定義:typescript
class Subject { constructor() { this.observerCollection = []; } registerObserver(observer) { this.observerCollection.push(observer); } unregisterObserver(observer) { let index = this.observerCollection.indexOf(observer); if(index >= 0) this.observerCollection.splice(index, 1); } notifyObservers() { this.observerCollection.forEach((observer)=>observer.notify()); } }
Observer 類定義:
class Observer { constructor(name) { this.name = name; } notify() { console.log(`${this.name} has been notified.`); } }
使用示例:
let subject = new Subject(); // 建立主題對象 let observer1 = new Observer('semlinker'); // 建立觀察者A - 'semlinker' let observer2 = new Observer('lolo'); // 建立觀察者B - 'lolo' subject.registerObserver(observer1); // 註冊觀察者A subject.registerObserver(observer2); // 註冊觀察者B subject.notifyObservers(); // 通知觀察者 subject.unregisterObserver(observer1); // 移除觀察者A subject.notifyObservers(); // 驗證是否成功移除
以上代碼成功運行後控制檯的輸出結果:
semlinker has been notified. # 輸出一次 2(unknown) lolo has been notified. # 輸出兩次
須要注意的是,在觀察者模式中,一般狀況下調用註冊觀察者後,會返回一個函數,用於移除監聽,有興趣的讀者,能夠本身嘗試一下。(備註:在 Angular 1.x 中調用 $scope.$on() 方法後,就會返回一個函數,用於移除監聽)
迭代器(Iterator)模式,又叫作遊標(Cursor)模式。它提供一種方法順序訪問一個聚合對象中的各個元素,而又不須要暴露該對象的內部表示。迭代器模式能夠把迭代的過程從業務邏輯中分離出來,在使用迭代器模式以後,即便不關心對象的內部構造,也能夠按順序訪問其中的每一個元素。
迭代器模式的優勢:
迭代器模式的缺點:
在 ECMAScript 中 Iterator 最先實際上是要採用相似 Python 的 Iterator 規範,就是 Iterator 在沒有元素以後,執行next
會直接拋出錯誤;但後來通過一段時間討論後,決定採更 functional 的作法,改爲在取得最後一個元素以後執行next
永遠都回傳{ done: true, value: undefined }
一個迭代器對象 ,知道如何每次訪問集合中的一項, 並記錄它的當前在序列中所在的位置。在 JavaScript 中迭代器是一個對象,它提供了一個 next() 方法,返回序列中的下一項。這個方法返回包含 done
和 value
兩個屬性的對象。對象的取值以下:
{ done: false, value: elementValue }
{ done: true, value: undefined }
詳細信息能夠參考 - 可迭代協議和迭代器協議
接下來咱們來建立一個 makeIterator 函數,該函數的參數類型是數組,當調用該函數後,返回一個包含 next() 方法的 Iterator 對象, 其中 next() 方法是用來獲取容器對象中下一個元素。具體示例以下:
function makeIterator(array){ var nextIndex = 0; return { next: function(){ return nextIndex < array.length ? {value: array[nextIndex++], done: false} : {done: true}; } } }
一旦初始化, next() 方法能夠用來依次訪問可迭代對象中的元素:
var it = makeIterator(['yo', 'ya']); console.log(it.next().value); // 'yo' console.log(it.next().value); // 'ya' console.log(it.next().done); // true
在 ES 6 中咱們能夠經過 Symbol.iterator
來建立可迭代對象的內部迭代器,具體示例以下:
let arr = ['a', 'b', 'c']; let iter = arr[Symbol.iterator]();
調用 next()
方法來獲取數組中的元素:
> iter.next() { value: 'a', done: false } > iter.next() { value: 'b', done: false } > iter.next() { value: 'c', done: false } > iter.next() { value: undefined, done: true }
ES 6 中可迭代的對象:
RxJS 是基於觀察者模式和迭代器模式以函數式編程思惟來實現的。RxJS 中含有兩個基本概念:Observables 與 Observer。Observables 做爲被觀察者,是一個值或事件的流集合;而 Observer 則做爲觀察者,根據 Observables 進行處理。
Observables 與 Observer 之間的訂閱發佈關係(觀察者模式) 以下:
Proposal Observable Implementations
若是你想真正瞭解 Observable,最好的方式就是本身寫一個。其實 Observable 就是一個函數,它接受一個 Observer
做爲參數而後返回另外一個函數。
它的基本特徵:
Observer
對象 (包含 next、error、complete 方法的對象) 做爲參數unsubscribe
函數,用於取消訂閱它的做用:
做爲生產者與觀察者之間的橋樑,並返回一種方法來解除生產者與觀察者之間的聯繫,其中觀察者用於處理時間序列上數據流。接下來咱們來看一下 Observable 的基礎實現:
DataSource - 數據源
class DataSource { constructor() { let i = 0; this._id = setInterval(() => this.emit(i++), 200); // 建立定時器 } emit(n) { const limit = 10; // 設置數據上限值 if (this.ondata) { this.ondata(n); } if (n === limit) { if (this.oncomplete) { this.oncomplete(); } this.destroy(); } } destroy() { // 清除定時器 clearInterval(this._id); } }
myObservable
function myObservable(observer) { let datasource = new DataSource(); // 建立數據源 datasource.ondata = (e) => observer.next(e); // 處理數據流 datasource.onerror = (err) => observer.error(err); // 處理異常 datasource.oncomplete = () => observer.complete(); // 處理數據流終止 return () => { // 返回一個函數用於,銷燬數據源 datasource.destroy(); }; }
使用示例:
const unsub = myObservable({ next(x) { console.log(x); }, error(err) { console.error(err); }, complete() { console.log('done')} }); /** * 移除註釋,能夠測試取消訂閱 */ // setTimeout(unsub, 500);
具體運行結果,能夠查看線上示例。
上面的示例中,咱們使用一個包含了 next、error、complete 方法的普通 JavaScript 對象來定義觀察者。一個普通的 JavaScript 對象只是一個開始,在 RxJS 5 裏面,爲開發者提供了一些保障機制,來保證一個更安全的觀察者。如下是一些比較重要的原則:
Observer
對象能夠不實現全部規定的方法 (next、error、complete 方法)complete
或者 error
觸發以後再調用 next
方法是沒用的unsubscribe
方法後,任何方法都不能再被調用了complete
和 error
觸發後,unsubscribe
也會自動調用next
、complete
和error
出現異常時,unsubscribe
也會自動調用以保證資源不會浪費next
、complete
和error
是可選的。按需處理便可,沒必要所有處理爲了完成上述目標,咱們得把傳入的匿名 Observer
對象封裝在一個 SafeObserver
裏以提供上述保障。SafeObserver 的具體實現以下:
class SafeObserver { constructor(destination) { this.destination = destination; } next(value) { // 還沒有取消訂閱,且包含next方法 if (!this.isUnsubscribed && this.destination.next) { try { this.destination.next(value); } catch (err) { // 出現異常時,取消訂閱釋放資源,再拋出異常 this.unsubscribe(); throw err; } } } error(err) { // 還沒有取消訂閱,且包含error方法 if (!this.isUnsubscribed && this.destination.error) { try { this.destination.error(err); } catch (e2) { // 出現異常時,取消訂閱釋放資源,再拋出異常 this.unsubscribe(); throw e2; } this.unsubscribe(); } } complete() { // 還沒有取消訂閱,且包含complete方法 if (!this.isUnsubscribed && this.destination.complete) { try { this.destination.complete(); } catch (err) { // 出現異常時,取消訂閱釋放資源,再拋出異常 this.unsubscribe(); throw err; } this.unsubscribe(); } } unsubscribe() { // 用於取消訂閱 this.isUnsubscribed = true; if (this.unsub) { this.unsub(); } } }
myObservable - 使用 SafeObserver
function myObservable(observer) { const safeObserver = new SafeObserver(observer); // 建立SafeObserver對象 const datasource = new DataSource(); // 建立數據源 datasource.ondata = (e) => safeObserver.next(e); datasource.onerror = (err) => safeObserver.error(err); datasource.oncomplete = () => safeObserver.complete(); safeObserver.unsub = () => { // 爲SafeObserver對象添加unsub方法 datasource.destroy(); }; // 綁定this上下文,並返回unsubscribe方法 return safeObserver.unsubscribe.bind(safeObserver); }
使用示例:
const unsub = myObservable({ next(x) { console.log(x); }, error(err) { console.error(err); }, complete() { console.log('done')} });
具體運行結果,能夠查看線上示例。
Operator 是一個函數,它接收一個 Observable 對象,而後返回一個新的 Observable 對象。當咱們訂閱新返回的 Observable 對象時,它內部會自動訂閱前一個 Observable 對象。接下來咱們來實現經常使用的 map 操做符:
Observable 實現:
class Observable { constructor(_subscribe) { this._subscribe = _subscribe; } subscribe(observer) { const safeObserver = new SafeObserver(observer); safeObserver.unsub = this._subscribe(safeObserver); return safeObserver.unsubscribe.bind(safeObserver); } }
map 操做符實現:
function map(source, project) { return new Observable((observer) => { const mapObserver = { next: (x) => observer.next(project(x)), error: (err) => observer.error(err), complete: () => observer.complete() }; return source.subscribe(mapObserver); }); }
具體運行結果,能夠查看線上示例。
若是把 Operator
都寫成如上那種獨立的函數,咱們鏈式代碼會逐漸變醜:
map(map(myObservable, (x) => x + 1), (x) => x + 2);
對於上面的代碼,想象一下有 五、6 個嵌套着的 Operator
,再加上更多、更復雜的參數,基本上就無法兒看了。
你也能夠試下 Texas Toland 提議的簡單版管道實現,合併壓縮一個數組的Operator
並生成一個最終的Observable
,不過這意味着要寫更復雜的 Operator
,上代碼:JSBin。其實寫完後你會發現,代碼也不怎麼漂亮:
pipe(myObservable, map(x => x + 1), map(x => x + 2));
理想狀況下,咱們想將代碼用更天然的方式鏈起來:
myObservable.map(x => x + 1).map(x => x + 2);
幸運的是,咱們已經有了這樣一個 Observable
類,咱們能夠基於 prototype 在不增長複雜度的狀況下支持多 Operators
的鏈式結構,下面咱們採用prototype方式再次實現一下 Observable
:
Observable.prototype.map = function (project) { return new Observable((observer) => { const mapObserver = { next: (x) => observer.next(project(x)), error: (err) => observer.error(err), complete: () => observer.complete() }; return this.subscribe(mapObserver); }); };
如今咱們終於有了一個還不錯的實現。這樣實現還有其餘好處,例如:能夠寫子類繼承 Observable
類,而後在子類中重寫某些內容以優化程序。
接下來咱們來總結一下該部分的內容:Observable 就是函數,它接受 Observer 做爲參數,又返回一個函數。若是你也寫了一個函數,接收一個 Observer 做爲參數,又返回一個函數,那麼,它是異步的、仍是同步的 ?其實都不是,它就只是一個函數。任何函數的行爲都依賴於它的具體實現,因此當你處理一個 Observable 時,就把它當成一個普通函數,裏面沒有什麼黑魔法。當你要構建 Operator 鏈時,你須要作的其實就是生成一個函數將一堆 Observers 連接在一塊兒,而後讓真正的數據依次穿過它們。
var observable = Rx.Observable .create(function(observer) { observer.next('Semlinker'); // RxJS 4.x 之前的版本用 onNext observer.next('Lolo'); }); // 訂閱這個 Observable observable.subscribe(function(value) { console.log(value); });
以上代碼運行後,控制檯會依次輸出 'Semlinker' 和 'Lolo' 兩個字符串。
須要注意的是,不少人認爲 RxJS 中的全部操做都是異步的,但其實這個觀念是錯的。RxJS 的核心特性是它的異步處理能力,但它也是能夠用來處理同步的行爲。具體示例以下:
var observable = Rx.Observable .create(function(observer) { observer.next('Semlinker'); // RxJS 4.x 之前的版本用 onNext observer.next('Lolo'); }); console.log('start'); observable.subscribe(function(value) { console.log(value); }); console.log('end');
以上代碼運行後,控制檯的輸出結果:
start Semlinker Lolo end
固然咱們也能夠用它處理異步行爲:
var observable = Rx.Observable .create(function(observer) { observer.next('Semlinker'); // RxJS 4.x 之前的版本用 onNext observer.next('Lolo'); setTimeout(() => { observer.next('RxJS Observable'); }, 300); }) console.log('start'); observable.subscribe(function(value) { console.log(value); }); console.log('end');
以上代碼運行後,控制檯的輸出結果:
start Semlinker Lolo end RxJS Observable
從以上例子中,咱們能夠得出一個結論 - Observable 能夠應用於同步和異步的場合。
RxJS 中提供了不少操做符,用於建立 Observable 對象,經常使用的操做符以下:
上面的例子中,咱們已經使用過了 create 操做符,接下來咱們來看一下其它的操做符:
var source = Rx.Observable.of('Semlinker', 'Lolo'); source.subscribe({ next: function(value) { console.log(value); }, complete: function() { console.log('complete!'); }, error: function(error) { console.log(error); } });
以上代碼運行後,控制檯的輸出結果:
Semlinker Lolo complete!
var arr = [1, 2, 3]; var source = Rx.Observable.from(arr); // 也支持字符串,如 "Angular 2 修仙之路" source.subscribe({ next: function(value) { console.log(value); }, complete: function() { console.log('complete!'); }, error: function(error) { console.log(error); } });
以上代碼運行後,控制檯的輸出結果:
1 2 3 complete!
Rx.Observable.fromEvent(document.querySelector('button'), 'click');
var source = Rx.Observable .fromPromise(new Promise((resolve, reject) => { setTimeout(() => { resolve('Hello RxJS!'); },3000) })); source.subscribe({ next: function(value) { console.log(value); }, complete: function() { console.log('complete!'); }, error: function(error) { console.log(error); } });
以上代碼運行後,控制檯的輸出結果:
Hello RxJS! complete!
var source = Rx.Observable.empty(); source.subscribe({ next: function(value) { console.log(value); }, complete: function() { console.log('complete!'); }, error: function(error) { console.log(error); } });
以上代碼運行後,控制檯的輸出結果:
complete!
empty 操做符返回一個空的 Observable 對象,若是咱們訂閱該對象,它會當即返回 complete 信息。
var source = Rx.Observable.never(); source.subscribe({ next: function(value) { console.log(value); }, complete: function() { console.log('complete!'); }, error: function(error) { console.log(error); } });
never 操做符會返回一個無窮的 Observable,當咱們訂閱它後,什麼事情都不會發生,它是一個一直存在卻什麼都不作的 Observable 對象。
var source = Rx.Observable.throw('Oop!'); source.subscribe({ next: function(value) { console.log(value); }, complete: function() { console.log('complete!'); }, error: function(error) { console.log('Throw Error: ' + error); } });
以上代碼運行後,控制檯的輸出結果:
Throw Error: Oop!
throw 操做如,只作一件事就是拋出異常。
var source = Rx.Observable.interval(1000); source.subscribe({ next: function(value) { console.log(value); }, complete: function() { console.log('complete!'); }, error: function(error) { console.log('Throw Error: ' + error); } });
以上代碼運行後,控制檯的輸出結果:
0 1 2 ...
interval 操做符支持一個數值類型的參數,用於表示定時的間隔。上面代碼表示每隔 1s,會輸出一個遞增的值,初始值從 0 開始。
var source = Rx.Observable.timer(1000, 5000); source.subscribe({ next: function(value) { console.log(value); }, complete: function() { console.log('complete!'); }, error: function(error) { console.log('Throw Error: ' + error); } });
以上代碼運行後,控制檯的輸出結果:
0 # 1s後 1 # 5s後 2 # 5s後 ...
timer 操做符支持兩個參數,第一個參數用於設定發送第一個值需等待的時間,第二個參數表示第一次發送後,發送其它值的間隔時間。此外,timer 操做符也能夠只傳遞一個參數,具體以下:
var source = Rx.Observable.timer(1000); source.subscribe({ next: function(value) { console.log(value); }, complete: function() { console.log('complete!'); }, error: function(error) { console.log('Throw Error: ' + error); } });
以上代碼運行後,控制檯的輸出結果:
0 complete!
有些時候對於一些 Observable 對象 (如經過 interval、timer 操做符建立的對象),當咱們不須要的時候,要釋放相關的資源,以免資源浪費。針對這種狀況,咱們能夠調用 Subscription
對象的 unsubscribe
方法來釋放資源。具體示例以下:
var source = Rx.Observable.timer(1000, 1000); // 取得subscription對象 var subscription = source.subscribe({ next: function(value) { console.log(value); }, complete: function() { console.log('complete!'); }, error: function(error) { console.log('Throw Error: ' + error); } }); setTimeout(() => { subscription.unsubscribe(); }, 5000);
Observer (觀察者) 是一個包含三個方法的對象,每當 Observable 觸發事件時,便會自動調用觀察者的對應方法。
interface Observer<T> { closed?: boolean; // 標識是否已經取消對Observable對象的訂閱 next: (value: T) => void; error: (err: any) => void; complete: () => void; }
Observer 中的三個方法的做用:
接下來咱們來看個具體示例:
var observable = Rx.Observable .create(function(observer) { observer.next('Semlinker'); observer.next('Lolo'); observer.complete(); observer.next('not work'); }); // 建立一個觀察者 var observer = { next: function(value) { console.log(value); }, error: function(error) { console.log(error); }, complete: function() { console.log('complete'); } } // 訂閱已建立的observable對象 observable.subscribe(observer);
以上代碼運行後,控制檯的輸出結果:
Semlinker Lolo complete
上面的例子中,咱們能夠看出,complete 方法執行後,next 就會失效,因此不會輸出 not work
。
另外觀察者能夠不用同時包含 next、complete、error 三種方法,它能夠只包含一個 next 方法,具體以下:
var observer = { next: function(value) { console.log(value); } };
有時候 Observable 多是一個無限的序列,例如 click 事件,對於這種場景,complete 方法就永遠不會被調用。
咱們也能夠在調用 Observable 對象的 subscribe
方法時,依次傳入 next、error、complete 三個函數,來建立觀察者:
observable.subscribe( value => { console.log(value); }, error => { console.log('Error: ', error); }, () => { console.log('complete'); } );
Pull 和 Push 是數據生產者和數據的消費者兩種不一樣的交流方式。
在 "拉" 體系中,數據的消費者決定什麼時候從數據生產者那裏獲取數據,而生產者自身並不會意識到何時數據將會被髮送給消費者。
每個 JavaScript 函數都是一個 "拉" 體系,函數是數據的生產者,調用函數的代碼經過 ''拉出" 一個單一的返回值來消費該數據。
const add = (a, b) => a + b; let sum = add(3, 4);
ES6介紹了 iterator迭代器 和 Generator生成器 — 另外一種 "拉" 體系,調用 iterator.next()
的代碼是消費者,可從中拉取多個值。
在 "推" 體系中,數據的生產者決定什麼時候發送數據給消費者,消費者不會在接收數據以前意識到它將要接收這個數據。
Promise(承諾) 是當今 JS 中最多見的 "推" 體系,一個Promise (數據的生產者)發送一個 resolved value (成功狀態的值)來執行一個回調(數據消費者),可是不一樣於函數的地方的是:Promise 決定着什麼時候數據才被推送至這個回調函數。
RxJS 引入了 Observables (可觀察對象),一個全新的 "推" 體系。一個可觀察對象是一個產生多值的生產者,當產生新數據的時候,會主動 "推送給" Observer (觀察者)。
生產者 | 消費者 | |
---|---|---|
pull拉 | 被請求的時候產生數據 | 決定什麼時候請求數據 |
push推 | 按本身的節奏生產數據 | 對接收的數據進行處理 |
接下來咱們來看張圖,從而加深對上面概念的理解:
Observable(可觀察對象)是基於推送(Push)運行時執行(lazy)的多值集合。
MagicQ | 單值 | 多值 |
---|---|---|
拉取(Pull) | 函數 | 遍歷器 |
推送(Push) | Promise | Observable |
Promise
Observable
全部的 Observable 對象必定會等到訂閱後,纔開始執行,若是沒有訂閱就不會執行。
var source = Rx.Observable.from([1,2,3,4,5]); var example = source.map(x => x + 1);
上面的示例中,由於 example 對象還未被訂閱,因此不會進行運算。這跟數組不同,具體以下:
var source = [1,2,3,4,5]; var example = source.map(x => x + 1);
以上代碼運行後,example 中就包含已運算後的值。
數組中的操做符如:filter、map 每次都會完整執行並返回一個新的數組,纔會繼續下一步運算。具體示例以下:
var source = [1,2,3,4,5]; var example = source .filter(x => x % 2 === 0) // [2, 4] .map(x => x + 1) // [3, 5]
關於數組中的 map、filter 的詳細信息,能夠參考 - RxJS Functional Programming
爲了更好地理解數組操做符的運算過程,咱們能夠參考下圖:
雖然 Observable 運算符每次都會返回一個新的 Observable 對象,但每一個元素都是漸進式獲取的,且每一個元素都會通過操做符鏈的運算後才輸出,而不會像數組那樣,每一個階段都得完整運算。具體示例以下:
var source = Rx.Observable.from([1,2,3,4,5]); var example = source .filter(x => x % 2 === 0) .map(x => x + 1) example.subscribe(console.log);
以上代碼的執行過程以下:
爲了更好地理解 Observable 操做符的運算過程,咱們能夠參考下圖: