RxJS 核心概念之Subject

什麼是Subject? 在RxJS中,Subject是一類特殊的Observable,它能夠向多個Observer多路推送數值。普通的Observable並不具有多路推送的能力(每個Observer都有本身獨立的執行環境),而Subject能夠共享一個執行環境。html

Subject是一種能夠多路推送的可觀察對象。與EventEmitter相似,Subject維護着本身的Observer。react

每個Subject都是一個Observable(可觀察對象) 對於一個Subject,你能夠訂閱(subscribe)它,Observer會和往常同樣接收到數據。從Observer的視角看,它並不能區分本身的執行環境是普通Observable的單路推送仍是基於Subject的多路推送。es6

Subject的內部實現中,並不會在被訂閱(subscribe)後建立新的執行環境。它僅僅會把新的Observer註冊在由它自己維護的Observer列表中,這和其餘語言、庫中的addListener機制相似。code

每個Subject也能夠做爲Observer(觀察者) Subject一樣也是一個由next(v)error(e),和 complete()這些方法組成的對象。調用next(theValue)方法後,Subject會向全部已經在其上註冊的Observer多路推送theValueserver

下面的例子中,咱們在Subject上註冊了兩個Observer,而且多路推送了一些數值:htm

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

控制檯輸出結果以下:對象

observerA: 1
observerB: 1
observerA: 2
observerB: 2

既然Subject是一個Observer,你能夠把它做爲subscribe(訂閱)普通Observable時的參數,以下面例子所示:rxjs

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // 你能夠傳遞Subject來訂閱observable

執行後結果以下:ip

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

經過上面的實現:咱們發現能夠經過Subject將普通的Observable單路推送轉換爲多路推送。這說明了Subject的做用——做爲單路Observable轉變爲多路Observable的橋樑。get

還有幾種特殊的Subject 類型,分別是BehaviorSubjectReplaySubject,和 AsyncSubject

多路推送的Observable

在之後的語境中,每當提到「多路推送的Observable」,咱們特指經過Subject構建的Observable執行環境。不然「普通的Observable」只是一個不會共享執行環境而且被訂閱後才生效的一系列值。

經過使用Subject能夠建立擁有相同執行環境的多路的Observable。

下面展現了多路的運做方式:Subject從普通的Observable訂閱了數據,而後其餘Observer又訂閱了這個Subject,示例以下:

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// 經過`subject.subscribe({...})`訂閱Subject的Observer:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// 讓Subject從數據源訂閱開始生效:
multicasted.connect();

multicast方法返回一個相似於Observable的可觀察對象,可是在其被訂閱後,它會表現Subject的特性。 multicast 返回的對象同時是ConnectableObservable類型的,擁有connect() 方法。

connect()方法很是的重要,它決定Observable什麼時候開始執行。因爲調用connect()後,Observable開始執行,所以,connect()會返回一個Subscription供調用者來終止執行。

引用計數

經過手動調用connect()返回的Subscription控制執行十分繁雜。一般,咱們但願在有第一個Observer訂閱Subject後自動connnect,當全部Observer都取消訂閱後終止這個Subject。

咱們來分析一下下面例子中subscription的過程:

  1. 第一個Observer 訂閱了多路推送的 Observable

  2. 多路Observable被鏈接

  3. 向第一個Observer發送 值爲0next通知

  4. 第二個Observer訂閱了多路推送的 Observable

  5. 向第一個Observer發送 值爲1next通知

  6. 向第二個Observer發送 值爲1next通知

  7. 第一個Observer取消了對多路推送的Observable的訂閱

  8. 向第二個Observer發送 值爲2next通知

  9. 第二個Observer取消了對多路推送的Observable的訂閱

  10. 取消對多路推送的Observable的鏈接

經過顯式地調用connect(),代碼以下:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); 
}, 2000);

若是你不想顯式地調用connect()方法,能夠在ConnectableObservable類型的Observable上調用refCount()方法。方法會進行引用計數:記錄Observable被訂閱的行爲。當訂閱數從 01refCount() 會調用connect() 方法。到訂閱數從10,他會終止整個執行過程。

refCount 使得多路推送的Observable在被訂閱後自動執行,在全部觀察者取消訂閱後,中止執行。

下面是示例:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

執行輸出結果以下:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

只有ConnectableObservables擁有refCount()方法,調用後會返回一個Observable而不是新的ConnectableObservable。

BehaviorSubject

BehaviorSubject是Subject的一個衍生類,具備「最新的值」的概念。它老是保存最近向數據消費者發送的值,當一個Observer訂閱後,它會即刻從BehaviorSubject收到「最新的值」。

BehaviorSubjects很是適於表示「隨時間推移的值」。舉一個形象的例子,Subject表示一我的的生日,而Behavior則表示一我的的歲數。(生日只是一天,一我的的歲數會保持到下一次生日以前。)

下面例子中,展現瞭如何用 0初始化BehaviorSubject,當Observer訂閱它時,0是第一個被推送的值。緊接着,在第二個Observer訂閱BehaviorSubject以前,它推送了2,雖然訂閱在推送2以後,可是第二個Observer仍然能接受到2

var subject = new Rx.BehaviorSubject(0 /* 初始值 */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

輸出結果以下:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject

ReplaySubject 如同於BehaviorSubjectSubject 的子類。經過 ReplaySubject能夠向新的訂閱者推送舊數值,就像一個錄像機ReplaySubject能夠記錄Observable的一部分狀態(過去時間內推送的值)。

.一個ReplaySubject能夠記錄Observable執行過程當中推送的多個值,並向新的訂閱者回放它們。

你能夠指定回放值的數量:

var subject = new Rx.ReplaySubject(3 /* 回放數量 */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

輸出以下:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

除了回放數量,你也能夠以毫秒爲單位去指定「窗口時間」,決定ReplaySubject記錄多久之前Observable推送的數值。下面的例子中,咱們把回放數量設置爲100,把窗口時間設置爲500毫秒:

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

第二個Observer接受到3(600ms), 4(800ms) 和 5(1000ms),這些值均在訂閱以前的500毫秒內推送(窗口長度 1000ms - 600ms = 400ms < 500ms):

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

AsyncSubject

AsyncSubject是Subject的另一個衍生類,Observable僅會在執行完成後,推送執行環境中的最後一個值。

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

輸出結果以下:

observerA: 5
observerB: 5

AsyncSubject 與 last() 操做符類似,等待完成通知後推送執行過程的最後一個值。

相關文章
相關標籤/搜索