查看新版教程,請訪問前端修仙之路javascript
在介紹 Observable 以前,咱們要先了解兩個設計模式:html
這兩個模式是 Observable 的基礎,下面咱們先來介紹一下 Observer Pattern。前端
觀察者模式是軟件設計模式的一種。在此種模式中,一個目標對象管理全部相依於它的觀察者對象,而且在它自己的狀態改變時主動發出通知。這一般透過呼叫各觀察者所提供的方法來實現。此種模式一般被用來實時事件處理系統。 — 維基百科java
觀察者模式又叫發佈訂閱模式(Publish/Subscribe),它定義了一種一對多的關係,讓多個觀察者對象同時監聽某一個主題對象,這個主題對象的狀態發生變化時就會通知全部的觀察者對象,使得它們可以自動更新本身。react
咱們可使用平常生活中,期刊訂閱的例子來形象地解釋一下上面的概念。期刊訂閱包含兩個主要的角色:期刊出版方和訂閱者,他們之間的關係以下:git
在觀察者模式中也有兩個主要角色:Subject (主題) 和 Observer (觀察者) 。它們分別對應例子中的期刊出版方和訂閱者。接下來咱們來看張圖,從而加深對上面概念的理解。es6
觀察者模式的優勢:github
觀察者模式的缺點:算法
在前端領域,觀察者模式被普遍地使用。最多見的例子就是爲 DOM 對象添加事件監聽,具體示例以下:typescript
<button id="btn">確認</button>
function clickHandler(event) {
console.log('用戶已點擊確認按鈕!');
}
document.getElementById("btn").addEventListener('click', clickHandler);
複製代碼
上面代碼中,咱們經過 addEventListener API 監聽 button 對象上的點擊事件,當用戶點擊按鈕時,會自動執行咱們的 clickHandler
函數。
Subject 類定義:
class Subject {
constructor() {
this.observerCollection = []; // 觀察者集合
}
registerObserver(observer) { // 註冊觀察者
this.observerCollection.push(observer);
}
unregisterObserver(observer) { // 移除觀察者
let index = this.observerCollection.indexOf(observer);
if(index >= 0) this.observerCollection.splice(index, 1);
}
notifyObservers() { // 通知全部觀察者
this.observerCollection.forEach((observer)=>observer.notify());
}
}
複製代碼
Observer 類定義:
class Observer {
constructor(name) {
this.name = name;
}
notify() {
console.log(`${this.name} has been notified.`);
}
}
複製代碼
使用示例:
let subject = new Subject(); // 建立主題對象
let observer1 = new Observer('semlinker'); // 建立觀察者A - 'semlinker'
let observer2 = new Observer('lolo'); // 建立觀察者B - 'lolo'
subject.registerObserver(observer1); // 註冊觀察者A
subject.registerObserver(observer2); // 註冊觀察者B
subject.notifyObservers(); // 通知觀察者
subject.unregisterObserver(observer1); // 移除觀察者A
subject.notifyObservers(); // 驗證是否成功移除
複製代碼
以上代碼成功運行後控制檯的輸出結果:
semlinker has been notified. # 輸出一次
2(unknown) lolo has been notified. # 輸出兩次
複製代碼
須要注意的是,在觀察者模式中,一般狀況下調用註冊觀察者後,會返回一個函數,用於移除監聽,有興趣的讀者,能夠本身嘗試一下。(備註:在 Angular 1.x 中調用 on() 方法後,就會返回一個函數,用於移除監聽)
迭代器(Iterator)模式,又叫作遊標(Cursor)模式。它提供一種方法順序訪問一個聚合對象中的各個元素,而又不須要暴露該對象的內部表示。迭代器模式能夠把迭代的過程從業務邏輯中分離出來,在使用迭代器模式以後,即便不關心對象的內部構造,也能夠按順序訪問其中的每一個元素。
迭代器模式的優勢:
迭代器模式的缺點:
在 ECMAScript 中 Iterator 最先實際上是要採用相似 Python 的 Iterator 規範,就是 Iterator 在沒有元素以後,執行
next
會直接拋出錯誤;但後來通過一段時間討論後,決定採更 functional 的作法,改爲在取得最後一個元素以後執行next
永遠都回傳{ done: true, value: undefined }
一個迭代器對象 ,知道如何每次訪問集合中的一項, 並記錄它的當前在序列中所在的位置。在 JavaScript 中迭代器是一個對象,它提供了一個 next() 方法,返回序列中的下一項。這個方法返回包含 done
和 value
兩個屬性的對象。對象的取值以下:
{ done: false, value: elementValue }
{ done: true, value: undefined }
詳細信息能夠參考 - 可迭代協議和迭代器協議
接下來咱們來建立一個 makeIterator 函數,該函數的參數類型是數組,當調用該函數後,返回一個包含 next() 方法的 Iterator 對象, 其中 next() 方法是用來獲取容器對象中下一個元素。具體示例以下:
function makeIterator(array){
var nextIndex = 0;
return {
next: function(){
return nextIndex < array.length ?
{value: array[nextIndex++], done: false} :
{done: true};
}
}
}
複製代碼
一旦初始化, next() 方法能夠用來依次訪問可迭代對象中的元素:
var it = makeIterator(['yo', 'ya']);
console.log(it.next().value); // 'yo'
console.log(it.next().value); // 'ya'
console.log(it.next().done); // true
複製代碼
在 ES 6 中咱們能夠經過 Symbol.iterator
來建立可迭代對象的內部迭代器,具體示例以下:
let arr = ['a', 'b', 'c'];
let iter = arr[Symbol.iterator]();
複製代碼
調用 next()
方法來獲取數組中的元素:
> iter.next()
{ value: 'a', done: false }
> iter.next()
{ value: 'b', done: false }
> iter.next()
{ value: 'c', done: false }
> iter.next()
{ value: undefined, done: true }
複製代碼
ES 6 中可迭代的對象:
RxJS 是基於觀察者模式和迭代器模式以函數式編程思惟來實現的。RxJS 中含有兩個基本概念:Observables 與 Observer。Observables 做爲被觀察者,是一個值或事件的流集合;而 Observer 則做爲觀察者,根據 Observables 進行處理。
Observables 與 Observer 之間的訂閱發佈關係(觀察者模式) 以下:
若是你想真正瞭解 Observable,最好的方式就是本身寫一個。其實 Observable 就是一個函數,它接受一個 Observer
做爲參數而後返回另外一個函數。
它的基本特徵:
Observer
對象 (包含 next、error、complete 方法的對象) 做爲參數unsubscribe
函數,用於取消訂閱它的做用:
做爲生產者與觀察者之間的橋樑,並返回一種方法來解除生產者與觀察者之間的聯繫,其中觀察者用於處理時間序列上數據流。接下來咱們來看一下 Observable 的基礎實現:
DataSource - 數據源
class DataSource {
constructor() {
let i = 0;
this._id = setInterval(() => this.emit(i++), 200); // 建立定時器
}
emit(n) {
const limit = 10; // 設置數據上限值
if (this.ondata) {
this.ondata(n);
}
if (n === limit) {
if (this.oncomplete) {
this.oncomplete();
}
this.destroy();
}
}
destroy() { // 清除定時器
clearInterval(this._id);
}
}
複製代碼
myObservable
function myObservable(observer) {
let datasource = new DataSource(); // 建立數據源
datasource.ondata = (e) => observer.next(e); // 處理數據流
datasource.onerror = (err) => observer.error(err); // 處理異常
datasource.oncomplete = () => observer.complete(); // 處理數據流終止
return () => { // 返回一個函數用於,銷燬數據源
datasource.destroy();
};
}
複製代碼
使用示例:
const unsub = myObservable({
next(x) { console.log(x); },
error(err) { console.error(err); },
complete() { console.log('done')}
});
/** * 移除註釋,能夠測試取消訂閱 */
// setTimeout(unsub, 500);
複製代碼
具體運行結果,能夠查看線上示例。
上面的示例中,咱們使用一個包含了 next、error、complete 方法的普通 JavaScript 對象來定義觀察者。一個普通的 JavaScript 對象只是一個開始,在 RxJS 5 裏面,爲開發者提供了一些保障機制,來保證一個更安全的觀察者。如下是一些比較重要的原則:
Observer
對象能夠不實現全部規定的方法 (next、error、complete 方法)complete
或者 error
觸發以後再調用 next
方法是沒用的unsubscribe
方法後,任何方法都不能再被調用了complete
和 error
觸發後,unsubscribe
也會自動調用next
、complete
和error
出現異常時,unsubscribe
也會自動調用以保證資源不會浪費next
、complete
和error
是可選的。按需處理便可,沒必要所有處理爲了完成上述目標,咱們得把傳入的匿名 Observer
對象封裝在一個 SafeObserver
裏以提供上述保障。SafeObserver 的具體實現以下:
class SafeObserver {
constructor(destination) {
this.destination = destination;
}
next(value) {
// 還沒有取消訂閱,且包含next方法
if (!this.isUnsubscribed && this.destination.next) {
try {
this.destination.next(value);
} catch (err) {
// 出現異常時,取消訂閱釋放資源,再拋出異常
this.unsubscribe();
throw err;
}
}
}
error(err) {
// 還沒有取消訂閱,且包含error方法
if (!this.isUnsubscribed && this.destination.error) {
try {
this.destination.error(err);
} catch (e2) {
// 出現異常時,取消訂閱釋放資源,再拋出異常
this.unsubscribe();
throw e2;
}
this.unsubscribe();
}
}
complete() {
// 還沒有取消訂閱,且包含complete方法
if (!this.isUnsubscribed && this.destination.complete) {
try {
this.destination.complete();
} catch (err) {
// 出現異常時,取消訂閱釋放資源,再拋出異常
this.unsubscribe();
throw err;
}
this.unsubscribe();
}
}
unsubscribe() { // 用於取消訂閱
this.isUnsubscribed = true;
if (this.unsub) {
this.unsub();
}
}
}
複製代碼
myObservable - 使用 SafeObserver
function myObservable(observer) {
const safeObserver = new SafeObserver(observer); // 建立SafeObserver對象
const datasource = new DataSource(); // 建立數據源
datasource.ondata = (e) => safeObserver.next(e);
datasource.onerror = (err) => safeObserver.error(err);
datasource.oncomplete = () => safeObserver.complete();
safeObserver.unsub = () => { // 爲SafeObserver對象添加unsub方法
datasource.destroy();
};
// 綁定this上下文,並返回unsubscribe方法
return safeObserver.unsubscribe.bind(safeObserver);
}
複製代碼
使用示例:
const unsub = myObservable({
next(x) { console.log(x); },
error(err) { console.error(err); },
complete() { console.log('done')}
});
複製代碼
具體運行結果,能夠查看線上示例。
Operator 是一個函數,它接收一個 Observable 對象,而後返回一個新的 Observable 對象。當咱們訂閱新返回的 Observable 對象時,它內部會自動訂閱前一個 Observable 對象。接下來咱們來實現經常使用的 map 操做符:
Observable 實現:
class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe(observer) {
const safeObserver = new SafeObserver(observer);
safeObserver.unsub = this._subscribe(safeObserver);
return safeObserver.unsubscribe.bind(safeObserver);
}
}
複製代碼
map 操做符實現:
function map(source, project) {
return new Observable((observer) => {
const mapObserver = {
next: (x) => observer.next(project(x)),
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return source.subscribe(mapObserver);
});
}
複製代碼
具體運行結果,能夠查看線上示例。
若是把 Operator
都寫成如上那種獨立的函數,咱們鏈式代碼會逐漸變醜:
map(map(myObservable, (x) => x + 1), (x) => x + 2);
複製代碼
對於上面的代碼,想象一下有 五、6 個嵌套着的 Operator
,再加上更多、更復雜的參數,基本上就無法兒看了。
你也能夠試下 Texas Toland 提議的簡單版管道實現,合併壓縮一個數組的Operator
並生成一個最終的Observable
,不過這意味着要寫更復雜的 Operator
,上代碼:JSBin。其實寫完後你會發現,代碼也不怎麼漂亮:
pipe(myObservable, map(x => x + 1), map(x => x + 2));
複製代碼
理想狀況下,咱們想將代碼用更天然的方式鏈起來:
myObservable.map(x => x + 1).map(x => x + 2);
複製代碼
幸運的是,咱們已經有了這樣一個 Observable
類,咱們能夠基於 prototype 在不增長複雜度的狀況下支持多 Operators
的鏈式結構,下面咱們採用prototype方式再次實現一下 Observable
:
Observable.prototype.map = function (project) {
return new Observable((observer) => {
const mapObserver = {
next: (x) => observer.next(project(x)),
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return this.subscribe(mapObserver);
});
};
複製代碼
如今咱們終於有了一個還不錯的實現。這樣實現還有其餘好處,例如:能夠寫子類繼承 Observable
類,而後在子類中重寫某些內容以優化程序。
接下來咱們來總結一下該部分的內容:Observable 就是函數,它接受 Observer 做爲參數,又返回一個函數。若是你也寫了一個函數,接收一個 Observer 做爲參數,又返回一個函數,那麼,它是異步的、仍是同步的 ?其實都不是,它就只是一個函數。任何函數的行爲都依賴於它的具體實現,因此當你處理一個 Observable 時,就把它當成一個普通函數,裏面沒有什麼黑魔法。當你要構建 Operator 鏈時,你須要作的其實就是生成一個函數將一堆 Observers 連接在一塊兒,而後讓真正的數據依次穿過它們。
var observable = Rx.Observable
.create(function(observer) {
observer.next('Semlinker'); // RxJS 4.x 之前的版本用 onNext
observer.next('Lolo');
});
// 訂閱這個 Observable
observable.subscribe(function(value) {
console.log(value);
});
複製代碼
以上代碼運行後,控制檯會依次輸出 'Semlinker' 和 'Lolo' 兩個字符串。
須要注意的是,不少人認爲 RxJS 中的全部操做都是異步的,但其實這個觀念是錯的。RxJS 的核心特性是它的異步處理能力,但它也是能夠用來處理同步的行爲。具體示例以下:
var observable = Rx.Observable
.create(function(observer) {
observer.next('Semlinker'); // RxJS 4.x 之前的版本用 onNext
observer.next('Lolo');
});
console.log('start');
observable.subscribe(function(value) {
console.log(value);
});
console.log('end');
複製代碼
以上代碼運行後,控制檯的輸出結果:
start
Semlinker
Lolo
end
複製代碼
固然咱們也能夠用它處理異步行爲:
var observable = Rx.Observable
.create(function(observer) {
observer.next('Semlinker'); // RxJS 4.x 之前的版本用 onNext
observer.next('Lolo');
setTimeout(() => {
observer.next('RxJS Observable');
}, 300);
})
console.log('start');
observable.subscribe(function(value) {
console.log(value);
});
console.log('end');
複製代碼
以上代碼運行後,控制檯的輸出結果:
start
Semlinker
Lolo
end
RxJS Observable
複製代碼
從以上例子中,咱們能夠得出一個結論 - Observable 能夠應用於同步和異步的場合。
RxJS 中提供了不少操做符,用於建立 Observable 對象,經常使用的操做符以下:
上面的例子中,咱們已經使用過了 create 操做符,接下來咱們來看一下 of 操做符:
var source = Rx.Observable.of('Semlinker', 'Lolo');
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error);
}
});
複製代碼
以上代碼運行後,控制檯的輸出結果:
Semlinker
Lolo
complete!
複製代碼
var arr = [1, 2, 3];
var source = Rx.Observable.from(arr); // 也支持字符串,如 "Angular 2 修仙之路"
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error);
}
});
複製代碼
以上代碼運行後,控制檯的輸出結果:
1
2
3
complete!
複製代碼
Rx.Observable.fromEvent(document.querySelector('button'), 'click');
複製代碼
var source = Rx.Observable
.fromPromise(new Promise((resolve, reject) => {
setTimeout(() => {
resolve('Hello RxJS!');
},3000)
}));
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error);
}
});
複製代碼
以上代碼運行後,控制檯的輸出結果:
Hello RxJS!
complete!
複製代碼
var source = Rx.Observable.empty();
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error);
}
});
複製代碼
以上代碼運行後,控制檯的輸出結果:
complete!
複製代碼
empty 操做符返回一個空的 Observable 對象,若是咱們訂閱該對象,它會當即返回 complete 信息。
var source = Rx.Observable.never();
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error);
}
});
複製代碼
never 操做符會返回一個無窮的 Observable,當咱們訂閱它後,什麼事情都不會發生,它是一個一直存在卻什麼都不作的 Observable 對象。
var source = Rx.Observable.throw('Oop!');
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error);
}
});
複製代碼
以上代碼運行後,控制檯的輸出結果:
Throw Error: Oop!
複製代碼
throw 操做如,只作一件事就是拋出異常。
var source = Rx.Observable.interval(1000);
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error);
}
});
複製代碼
以上代碼運行後,控制檯的輸出結果:
0
1
2
...
複製代碼
interval 操做符支持一個數值類型的參數,用於表示定時的間隔。上面代碼表示每隔 1s,會輸出一個遞增的值,初始值從 0 開始。
var source = Rx.Observable.timer(1000, 5000);
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error);
}
});
複製代碼
以上代碼運行後,控制檯的輸出結果:
0 # 1s後
1 # 5s後
2 # 5s後
...
複製代碼
timer 操做符支持兩個參數,第一個參數用於設定發送第一個值需等待的時間,第二個參數表示第一次發送後,發送其它值的間隔時間。此外,timer 操做符也能夠只傳遞一個參數,具體以下:
var source = Rx.Observable.timer(1000);
source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error);
}
});
複製代碼
以上代碼運行後,控制檯的輸出結果:
0
complete!
複製代碼
有些時候對於一些 Observable 對象 (如經過 interval、timer 操做符建立的對象),當咱們不須要的時候,要釋放相關的資源,以免資源浪費。針對這種狀況,咱們能夠調用 Subscription
對象的 unsubscribe
方法來釋放資源。具體示例以下:
var source = Rx.Observable.timer(1000, 1000);
// 取得subscription對象
var subscription = source.subscribe({
next: function(value) {
console.log(value);
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error);
}
});
setTimeout(() => {
subscription.unsubscribe();
}, 5000);
複製代碼
Observer (觀察者) 是一個包含三個方法的對象,每當 Observable 觸發事件時,便會自動調用觀察者的對應方法。
interface Observer<T> {
closed?: boolean; // 標識是否已經取消對Observable對象的訂閱
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
複製代碼
Observer 中的三個方法的做用:
接下來咱們來看個具體示例:
var observable = Rx.Observable
.create(function(observer) {
observer.next('Semlinker');
observer.next('Lolo');
observer.complete();
observer.next('not work');
});
// 建立一個觀察者
var observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error);
},
complete: function() {
console.log('complete');
}
}
// 訂閱已建立的observable對象
observable.subscribe(observer);
複製代碼
以上代碼運行後,控制檯的輸出結果:
Semlinker
Lolo
complete
複製代碼
上面的例子中,咱們能夠看出,complete 方法執行後,next 就會失效,因此不會輸出 not work
。
另外觀察者能夠不用同時包含 next、complete、error 三種方法,它能夠只包含一個 next 方法,具體以下:
var observer = {
next: function(value) {
console.log(value);
}
};
複製代碼
有時候 Observable 多是一個無限的序列,例如 click 事件,對於這種場景,complete 方法就永遠不會被調用。
咱們也能夠在調用 Observable 對象的 subscribe
方法時,依次傳入 next、error、complete 三個函數,來建立觀察者:
observable.subscribe(
value => { console.log(value); },
error => { console.log('Error: ', error); },
() => { console.log('complete'); }
);
複製代碼
Pull 和 Push 是數據生產者和數據的消費者兩種不一樣的交流方式。
在 "拉" 體系中,數據的消費者決定什麼時候從數據生產者那裏獲取數據,而生產者自身並不會意識到何時數據將會被髮送給消費者。
每個 JavaScript 函數都是一個 "拉" 體系,函數是數據的生產者,調用函數的代碼經過 ''拉出" 一個單一的返回值來消費該數據。
const add = (a, b) => a + b;
let sum = add(3, 4);
複製代碼
ES6介紹了 iterator迭代器 和 Generator生成器 —另外一種 "拉" 體系,調用 iterator.next()
的代碼是消費者,可從中拉取多個值。
在 "推" 體系中,數據的生產者決定什麼時候發送數據給消費者,消費者不會在接收數據以前意識到它將要接收這個數據。
Promise(承諾) 是當今 JS 中最多見的 "推" 體系,一個Promise (數據的生產者)發送一個 resolved value (成功狀態的值)來執行一個回調(數據消費者),可是不一樣於函數的地方的是:Promise 決定着什麼時候數據才被推送至這個回調函數。
RxJS 引入了 Observables (可觀察對象),一個全新的 "推" 體系。一個可觀察對象是一個產生多值的生產者,當產生新數據的時候,會主動 "推送給" Observer (觀察者)。
生產者 | 消費者 | |
---|---|---|
pull拉 | 被請求的時候產生數據 | 決定什麼時候請求數據 |
push推 | 按本身的節奏生產數據 | 對接收的數據進行處理 |
接下來咱們來看張圖,從而加深對上面概念的理解:
Observable(可觀察對象)是基於推送(Push)運行時執行(lazy)的多值集合。下方表格對Observable進行了定位。
MagicQ | 單值 | 多值 |
---|---|---|
拉取(Pull) | 函數 | 遍歷器 |
推送(Push) | Promise | Observable |
全部的 Observable 對象必定會等到訂閱後,纔開始執行,若是沒有訂閱就不會執行。
var source = Rx.Observable.from([1,2,3,4,5]);
var example = source.map(x => x + 1);
複製代碼
上面的示例中,由於 example 對象還未被訂閱,因此不會進行運算。這跟數組不同,具體以下:
var source = [1,2,3,4,5];
var example = source.map(x => x + 1);
複製代碼
以上代碼運行後,example 中就包含已運算後的值。
數組中的操做符如:filter、map 每次都會完整執行並返回一個新的數組,纔會繼續下一步運算。具體示例以下:
var source = [1,2,3,4,5];
var example = source
.filter(x => x % 2 === 0) // [2, 4]
.map(x => x + 1) // [3, 5]
複製代碼
關於數組中的 map、filter 的詳細信息,能夠參考 - RxJS Functional Programming
爲了更好地理解數組操做符的運算過程,咱們能夠參考下圖:
雖然 Observable 運算符每次都會返回一個新的 Observable 對象,但每一個元素都是漸進式獲取的,且每一個元素都會通過操做符鏈的運算後才輸出,而不會像數組那樣,每一個階段都得完整運算。具體示例以下:
var source = Rx.Observable.from([1,2,3,4,5]);
var example = source
.filter(x => x % 2 === 0)
.map(x => x + 1)
example.subscribe(console.log);
複製代碼
以上代碼的執行過程以下:
爲了更好地理解 Observable 操做符的運算過程,咱們能夠參考下圖: