觀察者模式又叫發佈訂閱模式(Publish/Subscribe),它定義了一種一對多的關係,讓多個觀察者對象同時監聽某一個主題對象,這個主題對象的狀態發生變化時就會通知全部的觀察者對象,使得它們可以自動更新本身。javascript
咱們能夠使用平常生活中,期刊訂閱的例子來形象地解釋一下上面的概念。期刊訂閱包含兩個主要的角色:期刊出版方和訂閱者,他們之間的關係以下:html
期刊出版方 - 負責期刊的出版和發行工做java
訂閱者 - 只需執行訂閱操做,新版的期刊發佈後,就會主動收到通知,若是取消訂閱,之後就不會再收到通知react
在觀察者模式中也有兩個主要角色:Subject (主題) 和 Observer (觀察者) 。它們分別對應例子中的期刊出版方和訂閱者。接下來咱們來看張圖,從而加深對上面概念的理解。es6
class Subject { constructor() { this.observerCollection = []; } addObserver(observer) { // 添加觀察者 this.observerCollection.push(observer); } deleteObserver(observer) { // 移除觀察者 let index = this.observerCollection.indexOf(observer); if(index >= 0) this.observerCollection.splice(index, 1); } notifyObservers() { // 通知觀察者 this.observerCollection.forEach((observer)=>observer.notify()); } }
class Observer { constructor(name) { this.name = name; } notify() { console.log(`${this.name} has been notified.`); } }
let subject = new Subject(); // 建立主題對象 let observer1 = new Observer('semlinker'); // 建立觀察者A - 'semlinker' let observer2 = new Observer('lolo'); // 建立觀察者B - 'lolo' subject.addObserver(observer1); // 註冊觀察者A subject.addObserver(observer2); // 註冊觀察者B subject.notifyObservers(); // 通知觀察者 subject.deleteObserver(observer1); // 移除觀察者A subject.notifyObservers(); // 驗證是否成功移除
以上代碼成功運行後控制檯的輸出結果:typescript
semlinker has been notified. lolo has been notified. lolo has been notified.
在介紹 RxJS - Subject 以前,咱們先來看個示例:shell
const interval$ = Rx.Observable.interval(1000).take(3); interval$.subscribe({ next: value => console.log('Observer A get value: ' + value); }); setTimeout(() => { interval$.subscribe({ next: value => console.log('Observer B get value: ' + value); }); }, 1000);
以上代碼運行後,控制檯的輸出結果:瀏覽器
Observer A get value: 0 Observer A get value: 1 Observer B get value: 0 Observer A get value: 2 Observer B get value: 1 Observer B get value: 2
經過以上示例,咱們能夠得出如下結論:app
Observable 對象能夠被重複訂閱函數
Observable 對象每次被訂閱後,都會從新執行
上面的示例,咱們能夠簡單地認爲兩次調用普通的函數,具體參考如下代碼:
function interval() { setInterval(() => console.log('..'), 1000); } interval(); setTimeout(() => { interval(); }, 1000);
Observable 對象的默認行爲,適用於大部分場景。但有些時候,咱們會但願在第二次訂閱的時候,不會從頭開始接收 Observable 發出的值,而是從第一次訂閱當前正在處理的值開始發送,咱們把這種處理方式成爲組播 (multicast),那咱們要怎麼實現呢 ?回想一下咱們剛纔介紹過觀察者模式,你腦海中是否是已經想到方案了。沒錯,咱們能夠經過自定義 Subject 來實現上述功能。
class Subject { constructor() { this.observers = []; } addObserver(observer) { this.observers.push(observer); } next(value) { this.observers.forEach(o => o.next(value)); } error(error){ this.observers.forEach(o => o.error(error)); } complete() { this.observers.forEach(o => o.complete()); } }
const interval$ = Rx.Observable.interval(1000).take(3); let subject = new Subject(); let observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.addObserver(observerA); // 添加觀察者A interval$.subscribe(subject); // 訂閱interval$對象 setTimeout(() => { subject.addObserver(observerB); // 添加觀察者B }, 1000);
以上代碼運行後,控制檯的輸出結果:
Observer A get value: 0 Observer A get value: 1 Observer B get value: 1 Observer A get value: 2 Observer B get value: 2 Observer A complete! Observer B complete!
經過自定義 Subject,咱們實現了前面提到的功能。接下來咱們進入正題 - RxJS Subject。
首先咱們經過 RxJS Subject 來重寫一下上面的示例:
const interval$ = Rx.Observable.interval(1000).take(3); let subject = new Rx.Subject(); let observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); // 添加觀察者A interval$.subscribe(subject); // 訂閱interval$對象 setTimeout(() => { subject.subscribe(observerB); // 添加觀察者B }, 1000);
/** * Suject繼承於Observable */ export class Subject extends Observable { constructor() { super(); this.observers = []; // 觀察者列表 this.closed = false; this.isStopped = false; this.hasError = false; this.thrownError = null; } next(value) { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循環調用觀察者next方法,通知觀察者 copy[i].next(value); } } } error(err) { if (this.closed) { throw new ObjectUnsubscribedError(); } this.hasError = true; this.thrownError = err; this.isStopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循環調用觀察者error方法 copy[i].error(err); } this.observers.length = 0; } complete() { if (this.closed) { throw new ObjectUnsubscribedError(); } this.isStopped = true; const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循環調用觀察者complete方法 copy[i].complete(); } this.observers.length = 0; // 清空內部觀察者列表 } }
經過 RxJS Subject 示例和源碼片斷,對於 Subject 咱們能夠得出如下結論:
Subject 既是 Observable 對象,又是 Observer 對象
當有新消息時,Subject 會對內部的 observers 列表進行組播 (multicast)
在 Angular 2 中,咱們能夠利用 RxJS Subject 來實現組件通訊,具體示例以下:
message.service.ts
import { Injectable } from '@angular/core'; import {Observable} from 'rxjs/Observable'; import { Subject } from 'rxjs/Subject'; @Injectable() export class MessageService { private subject = new Subject<any>(); sendMessage(message: string) { this.subject.next({ text: message }); } clearMessage() { this.subject.next(); } getMessage(): Observable<any> { return this.subject.asObservable(); } }
home.component.ts
import { Component } from '@angular/core'; import { MessageService } from '../_services/index'; @Component({ moduleId: module.id, templateUrl: 'home.component.html' }) export class HomeComponent { constructor(private messageService: MessageService) {} sendMessage(): void { // 發送消息 this.messageService.sendMessage('Message from Home Component to App Component!'); } clearMessage(): void { // 清除消息 this.messageService.clearMessage(); } }
app.component.ts
import { Component, OnDestroy } from '@angular/core'; import { Subscription } from 'rxjs/Subscription'; import { MessageService } from './_services/index'; @Component({ moduleId: module.id, selector: 'app', templateUrl: 'app.component.html' }) export class AppComponent implements OnDestroy { message: any; subscription: Subscription; constructor(private messageService: MessageService) { this.subscription = this.messageService.getMessage() .subscribe(message => { this.message = message; }); } ngOnDestroy() { this.subscription.unsubscribe(); } }
以上示例實現的功能是組件之間消息通訊,即 HomeComponent 子組件,向 AppComponent 父組件發送消息。代碼運行後,瀏覽器的顯示結果以下:
由於 Subject 在訂閱時,是把 observer 存放到觀察者列表中,並在接收到新值的時候,遍歷觀察者列表並調用觀察者上的 next
方法,具體以下:
next(value) { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const { observers } = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { // 循環調用觀察者next方法,通知觀察者 copy[i].next(value); } } }
這樣會有一個大問題,若是某個 observer 在執行時出現異常,卻沒進行異常處理,就會影響到其它的訂閱者,具體示例以下:
const source = Rx.Observable.interval(1000); const subject = new Rx.Subject(); const example = subject.map(x => { if (x === 1) { throw new Error('oops'); } return x; }); subject.subscribe(x => console.log('A', x)); example.subscribe(x => console.log('B', x)); subject.subscribe(x => console.log('C', x)); source.subscribe(subject);
以上代碼運行後,控制檯的輸出結果:
A 0 B 0 C 0 A 1 Rx.min.js:74 Uncaught Error: oops
在代碼運行前,你們會認爲觀察者B 會在接收到 1
值時拋出異常,觀察者 A 和 C 仍會正常運行。但實際上,在當前的 RxJS 版本中若觀察者 B 報錯,觀察者 A 和 C 也會中止運行。那麼應該如何解決這個問題呢?目前最簡單的方式就是爲全部的觀察者添加異常處理,更新後的代碼以下:
const source = Rx.Observable.interval(1000); const subject = new Rx.Subject(); const example = subject.map(x => { if (x === 1) { throw new Error('oops'); } return x; }); subject.subscribe( x => console.log('A', x), error => console.log('A Error:' + error) ); example.subscribe( x => console.log('B', x), error => console.log('B Error:' + error) ); subject.subscribe( x => console.log('C', x), error => console.log('C Error:' + error) ); source.subscribe(subject);
JSBin - RxJS Subject Problem Solved Demo
Subject 實際上是觀察者模式的實現,因此當觀察者訂閱 Subject 對象時,Subject 對象會把訂閱者添加到觀察者列表中,每當有 subject 對象接收到新值時,它就會遍歷觀察者列表,依次調用觀察者內部的 next()
方法,把值一一送出。
Subject 之因此具備 Observable 中的全部方法,是由於 Subject 類繼承了 Observable 類,在 Subject 類中有五個重要的方法:
next - 每當 Subject 對象接收到新值的時候,next 方法會被調用
error - 運行中出現異常,error 方法會被調用
complete - Subject 訂閱的 Observable 對象結束後,complete 方法會被調用
subscribe - 添加觀察者
unsubscribe - 取消訂閱 (設置終止標識符、清空觀察者列表)
export class BehaviorSubject extends Subject { constructor(_value) { // 設置初始值 super(); this._value = _value; } get value() { // 獲取當前值 return this.getValue(); } _subscribe(subscriber) { const subscription = super._subscribe(subscriber); if (subscription && !subscription.closed) { subscriber.next(this._value); // 爲新的訂閱者發送當前最新的值 } return subscription; } getValue() { if (this.hasError) { throw this.thrownError; } else if (this.closed) { throw new ObjectUnsubscribedError(); } else { return this._value; } } next(value) { // 調用父類Subject的next方法,同時更新當前值 super.next(this._value = value); } }
有些時候咱們會但願 Subject 能保存當前的最新狀態,而不是單純的進行事件發送,也就是說每當新增一個觀察者的時候,咱們但願 Subject 可以當即發出當前最新的值,而不是沒有任何響應。具體咱們先看一下示例:
var subject = new Rx.Subject(); var observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒後訂閱 }, 1000);
以上代碼運行後,控制檯的輸出結果:
Observer A get value: 1 Observer A get value: 2 Observer A get value: 3
經過輸出結果,咱們發如今 observerB 訂閱 Subject 對象後,它再也沒有收到任何值了。由於 Subject 對象沒有再調用 next()
方法。但不少時候咱們會但願 Subject 對象可以保存當前的狀態,當新增訂閱者的時候,自動把當前最新的值發送給訂閱者。要實現這個功能,咱們就須要使用 BehaviorSubject。
BehaviorSubject 跟 Subject 最大的不一樣就是 BehaviorSubject 是用來保存當前最新的值,而不是單純的發送事件。BehaviorSubject 會記住最近一次發送的值,並把該值做爲當前值保存在內部的屬性中。接下來咱們來使用 BehaviorSubject 從新一下上面的示例:
var subject = new Rx.BehaviorSubject(0); // 設定初始值 var observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒後訂閱 }, 1000);
以上代碼運行後,控制檯的輸出結果:
Observer A get value: 0 Observer A get value: 1 Observer A get value: 2 Observer A get value: 3 Observer B get value: 3
export class ReplaySubject extends Subject { constructor(bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, scheduler) { super(); this.scheduler = scheduler; this._events = []; // ReplayEvent對象列表 this._bufferSize = bufferSize < 1 ? 1 : bufferSize; // 設置緩衝區大小 this._windowTime = windowTime < 1 ? 1 : windowTime; } next(value) { const now = this._getNow(); this._events.push(new ReplayEvent(now, value)); this._trimBufferThenGetEvents(); super.next(value); } _subscribe(subscriber) { const _events = this._trimBufferThenGetEvents(); // 過濾ReplayEvent對象列表 let subscription; if (this.closed) { throw new ObjectUnsubscribedError(); } ... else { this.observers.push(subscriber); subscription = new SubjectSubscription(this, subscriber); } ... const len = _events.length; // 從新發送設定的最後bufferSize個值 for (let i = 0; i < len && !subscriber.closed; i++) { subscriber.next(_events[i].value); } ... return subscription; } } class ReplayEvent { constructor(time, value) { this.time = time; this.value = value; } }
有些時候咱們但願在 Subject 新增訂閱者後,能向新增的訂閱者從新發送最後幾個值,這時咱們就能夠使用 ReplaySubject ,具體示例以下:
var subject = new Rx.ReplaySubject(2); // 從新發送最後2個值 var observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); setTimeout(() => { subject.subscribe(observerB); // 1秒後訂閱 }, 1000);
以上代碼運行後,控制檯的輸出結果:
Observer A get value: 1 Observer A get value: 2 Observer A get value: 3 Observer B get value: 2 Observer B get value: 3
可能會有人認爲 ReplaySubject(1)
是否是等同於 BehaviorSubject,其實它們是不同的。在建立BehaviorSubject 對象時,是設置初始值,它用於表示 Subject 對象當前的狀態,而 ReplaySubject 只是事件的重放。
export class AsyncSubject extends Subject { constructor() { super(...arguments); this.value = null; this.hasNext = false; this.hasCompleted = false; // 標識是否已完成 } _subscribe(subscriber) { if (this.hasError) { subscriber.error(this.thrownError); return Subscription.EMPTY; } else if (this.hasCompleted && this.hasNext) { // 等到完成後,才發出最後的值 subscriber.next(this.value); subscriber.complete(); return Subscription.EMPTY; } return super._subscribe(subscriber); } next(value) { if (!this.hasCompleted) { // 若未完成,保存當前的值 this.value = value; this.hasNext = true; } } }
AsyncSubject 相似於 last
操做符,它會在 Subject 結束後發出最後一個值,具體示例以下:
var subject = new Rx.AsyncSubject(); var observerA = { next: value => console.log('Observer A get value: ' + value), error: error => console.log('Observer A error: ' + error), complete: () => console.log('Observer A complete!') }; var observerB = { next: value => console.log('Observer B get value: ' + value), error: error => console.log('Observer B error: ' + error), complete: () => console.log('Observer B complete!') }; subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); subject.complete(); setTimeout(() => { subject.subscribe(observerB); // 1秒後訂閱 }, 1000);
以上代碼運行後,控制檯的輸出結果:
Observer A get value: 3 Observer A complete! Observer B get value: 3 Observer B complete!