原文連接: netbasal.com/rxjs-subjec…javascript
本文爲 RxJS 中文社區 翻譯文章,如需轉載,請註明出處,謝謝合做!java
若是你也想和咱們一塊兒,翻譯更多優質的 RxJS 文章以奉獻給你們,請點擊【這裏】git
我已經發表過一篇關於 Subject 的文章 (中文),但此次我想嘗試一種不一樣的方式。github
要理解 Subject
是什麼的最簡單的方式就是從新建立一個。咱們來建立一個簡易版的 Subject
。api
注意: 下面的示例只是爲了闡述概念,還不足以應用於實際開發之中,還有它們並非 Rx 中 Subjects 的真正完整實現。數組
咱們來看看真相。ide
Subject 既是 Observable,又是 Observer 。ui
這表示它擁有全部的操做符 (map
、filter
,等等) 而且你能夠訂閱它。this
class MySubject extends Rx.Observable {
constructor() {
super();
}
}
複製代碼
這是第一部分所需的一切了。它能夠經過擴展 Observable
類成爲 Observable
。spa
這表示它必須實現 next()
,error()
和 complete()
方法。
class MySubject extends Rx.Observable {
constructor() {
super();
}
next() {}
error() {}
complete() {}
}
複製代碼
好了,咱們來看下一個真相。
Subject 能夠扮演源 observable 和 衆多觀察者之間的橋樑或代理,使得多個觀察者能夠共享同一個 observable 執行。
class MySubject extends Rx.Observable {
constructor() {
super();
this.observers = [];
}
subscribe(observer) {
this.observers.push(observer);
}
next(value) {
this.observers.forEach(observer => observer.next(value));
}
error(error) {
this.observers.forEach(observer => observer.error(error));
}
complete() {
this.observers.forEach(observer => observer.complete());
}
}
複製代碼
當你調用 subscribe()
方法時,僅僅是將 observer
添加到一個數組中。next()
、error()
和 completed()
方法會調用數組中每一個 observer
的對應方法。
來使用咱們的 Subject 。
const interval$ = Rx.Observable.interval(1000).take(7);
const subject = new MySubject();
subject.map(value => `Observer one ${value}`).subscribe(value => {
console.log(value);
});
interval$.subscribe(subject);
setTimeout(() => {
subject.map(value => `Observer two ${value}`).subscribe(value => {
console.log(value);
});
}, 2000);
複製代碼
當使用 Subject
時,不管你什麼時候 subscribe
, 你永遠都會獲得相同的執行,這點不一樣於典型的 observable,每次 subscribe
都會開啓有個新的執行。(在咱們的案例中,這表示你會有兩個不相關的 intervals)
Subject 讓你同享相同的 observable 執行
咱們來總結一下這裏發生了什麼。
當對 subject 調用 subscribe
時,只是將 observer
添加到數組中。
當 subject
扮演 observer
時,每當源 observable (在咱們的案例中是指 interval
) 發出值時,它會調用數組中每一個 observer
的 next()
方法。
如今讓咱們來嘗試實現 BehaviorSubject
的簡易版。
咱們來看看真相。
BehaviorSubject
須要一個初始值,由於它必須始終返回一個訂閱值,即便它還沒接收到 next()
調用。getValue()
方法來獲取 subject 的最新值。class MyBehaviorSubject extends Rx.Observable {
constructor(initialValue) {
super();
this.observers = [];
if (typeof initialValue === 'undefined') {
throw new Error('You need to provide initial value');
}
this.lastValue = initialValue;
}
subscribe(observer) {
this.observers.push(observer);
observer.next(this.lastValue);
}
next(value) {
this.lastValue = value;
this.observers.forEach(observer => observer.next(value));
}
getValue() {
return this.lastValue;
}
}
複製代碼
來使用咱們的 BehaviorSubject
。
const subject = new MyBehaviorSubject('initialValue');
subject.map(value => `Observer one ${value}`).subscribe(function(value) {
console.log(value);
});
subject.next('New value');
setTimeout(() => {
subject.map(value => `Observer two ${value}`).subscribe(function(value) {
console.log(value);
});
}, 2000);
複製代碼
如今讓咱們來嘗試實現 ReplaySubject
的簡易版。
咱們來看看真相.
ReplaySubject
表示一個對象既是 observable 序列,又是 observer 。class MyReplaySubject extends Rx.Observable {
constructor(bufferSize) {
super();
this.observers = [];
this.bufferSize = bufferSize;
this.lastValues = [];
}
subscribe(observer) {
this.lastValues.forEach(val => observer.next(val));
this.observers.push(observer);
}
next(value) {
if (this.lastValues.length === this.bufferSize) {
this.lastValues.shift();
}
this.lastValues.push(value);
this.observers.forEach(observer => observer.next(value));
}
}
複製代碼
來使用咱們的 ReplaySubject
。
const subject = new MyReplaySubject(3);
subject.next('One');
subject.next('Two');
subject.next('Three');
subject.next('Four');
setTimeout(() => {
subject.map(value => `Later Observer ${value}`).subscribe(function(value) {
console.log(value);
});
}, 2000);
複製代碼
ReplaySubject
、BehaviorSubject
?next()
、error()
和 completed()
方法。