什麼是Subject? 在RxJS中,Subject是一類特殊的Observable,它能夠向多個Observer多路推送數值。普通的Observable並不具有多路推送的能力(每個Observer都有本身獨立的執行環境),而Subject能夠共享一個執行環境。html
Subject是一種能夠多路推送的可觀察對象。與EventEmitter相似,Subject維護着本身的Observer。react
每個Subject都是一個Observable(可觀察對象) 對於一個Subject,你能夠訂閱(subscribe
)它,Observer會和往常同樣接收到數據。從Observer的視角看,它並不能區分本身的執行環境是普通Observable的單路推送仍是基於Subject的多路推送。es6
Subject的內部實現中,並不會在被訂閱(subscribe
)後建立新的執行環境。它僅僅會把新的Observer註冊在由它自己維護的Observer列表中,這和其餘語言、庫中的addListener
機制相似。code
每個Subject也能夠做爲Observer(觀察者) Subject一樣也是一個由next(v)
,error(e)
,和 complete()
這些方法組成的對象。調用next(theValue)
方法後,Subject會向全部已經在其上註冊的Observer多路推送theValue
。server
下面的例子中,咱們在Subject上註冊了兩個Observer,而且多路推送了一些數值:htm
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(1); subject.next(2);
控制檯輸出結果以下:對象
observerA: 1 observerB: 1 observerA: 2 observerB: 2
既然Subject是一個Observer,你能夠把它做爲subscribe
(訂閱)普通Observable時的參數,以下面例子所示:rxjs
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); var observable = Rx.Observable.from([1, 2, 3]); observable.subscribe(subject); // 你能夠傳遞Subject來訂閱observable
執行後結果以下:ip
observerA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
經過上面的實現:咱們發現能夠經過Subject將普通的Observable單路推送轉換爲多路推送。這說明了Subject的做用——做爲單路Observable轉變爲多路Observable的橋樑。get
還有幾種特殊的Subject
類型,分別是BehaviorSubject
,ReplaySubject
,和 AsyncSubject
。
在之後的語境中,每當提到「多路推送的Observable」,咱們特指經過Subject構建的Observable執行環境。不然「普通的Observable」只是一個不會共享執行環境而且被訂閱後才生效的一系列值。
經過使用Subject能夠建立擁有相同執行環境的多路的Observable。
下面展現了多路
的運做方式:Subject從普通的Observable訂閱了數據,而後其餘Observer又訂閱了這個Subject,示例以下:
var source = Rx.Observable.from([1, 2, 3]); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); // 經過`subject.subscribe({...})`訂閱Subject的Observer: multicasted.subscribe({ next: (v) => console.log('observerA: ' + v) }); multicasted.subscribe({ next: (v) => console.log('observerB: ' + v) }); // 讓Subject從數據源訂閱開始生效: multicasted.connect();
multicast
方法返回一個相似於Observable的可觀察對象,可是在其被訂閱後,它會表現Subject的特性。 multicast
返回的對象同時是ConnectableObservable
類型的,擁有connect()
方法。
connect()
方法很是的重要,它決定Observable什麼時候開始執行。因爲調用connect()
後,Observable開始執行,所以,connect()
會返回一個Subscription
供調用者來終止執行。
經過手動調用connect()
返回的Subscription控制執行十分繁雜。一般,咱們但願在有第一個Observer訂閱Subject後自動connnect
,當全部Observer都取消訂閱後終止這個Subject。
咱們來分析一下下面例子中subscription的過程:
第一個Observer 訂閱了多路推送的 Observable
多路Observable被鏈接
向第一個Observer發送 值爲0
的next
通知
第二個Observer訂閱了多路推送的 Observable
向第一個Observer發送 值爲1
的next
通知
向第二個Observer發送 值爲1
的next
通知
第一個Observer取消了對多路推送的Observable的訂閱
向第二個Observer發送 值爲2
的next
通知
第二個Observer取消了對多路推送的Observable的訂閱
取消對多路推送的Observable的鏈接
經過顯式地調用connect()
,代碼以下:
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); var subscription1, subscription2, subscriptionConnect; subscription1 = multicasted.subscribe({ next: (v) => console.log('observerA: ' + v) }); subscriptionConnect = multicasted.connect(); setTimeout(() => { subscription2 = multicasted.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 600); setTimeout(() => { subscription1.unsubscribe(); }, 1200); setTimeout(() => { subscription2.unsubscribe(); subscriptionConnect.unsubscribe(); }, 2000);
若是你不想顯式地調用connect()
方法,能夠在ConnectableObservable類型的Observable上調用refCount()
方法。方法會進行引用計數:記錄Observable被訂閱的行爲。當訂閱數從 0
到 1
時refCount()
會調用connect()
方法。到訂閱數從1
到 0
,他會終止整個執行過程。
refCount
使得多路推送的Observable在被訂閱後自動執行,在全部觀察者取消訂閱後,中止執行。
下面是示例:
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var refCounted = source.multicast(subject).refCount(); var subscription1, subscription2, subscriptionConnect; console.log('observerA subscribed'); subscription1 = refCounted.subscribe({ next: (v) => console.log('observerA: ' + v) }); setTimeout(() => { console.log('observerB subscribed'); subscription2 = refCounted.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 600); setTimeout(() => { console.log('observerA unsubscribed'); subscription1.unsubscribe(); }, 1200); setTimeout(() => { console.log('observerB unsubscribed'); subscription2.unsubscribe(); }, 2000);
執行輸出結果以下:
observerA subscribed observerA: 0 observerB subscribed observerA: 1 observerB: 1 observerA unsubscribed observerB: 2 observerB unsubscribed
只有ConnectableObservables擁有refCount()
方法,調用後會返回一個Observable
而不是新的ConnectableObservable。
BehaviorSubject
是Subject的一個衍生類,具備「最新的值」的概念。它老是保存最近向數據消費者發送的值,當一個Observer訂閱後,它會即刻從BehaviorSubject
收到「最新的值」。
BehaviorSubjects很是適於表示「隨時間推移的值」。舉一個形象的例子,Subject表示一我的的生日,而Behavior則表示一我的的歲數。(生日只是一天,一我的的歲數會保持到下一次生日以前。)
下面例子中,展現瞭如何用 0
初始化BehaviorSubject,當Observer訂閱它時,0
是第一個被推送的值。緊接着,在第二個Observer訂閱BehaviorSubject以前,它推送了2
,雖然訂閱在推送2
以後,可是第二個Observer仍然能接受到2
:
var subject = new Rx.BehaviorSubject(0 /* 初始值 */); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(3);
輸出結果以下:
observerA: 0 observerA: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
ReplaySubject
如同於BehaviorSubject
是 Subject
的子類。經過 ReplaySubject
能夠向新的訂閱者推送舊數值,就像一個錄像機ReplaySubject
能夠記錄Observable的一部分狀態(過去時間內推送的值)。
.一個ReplaySubject
能夠記錄Observable執行過程當中推送的多個值,並向新的訂閱者回放它們。
你能夠指定回放值的數量:
var subject = new Rx.ReplaySubject(3 /* 回放數量 */); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(5);
輸出以下:
observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerB: 2 observerB: 3 observerB: 4 observerA: 5 observerB: 5
除了回放數量,你也能夠以毫秒爲單位去指定「窗口時間」,決定ReplaySubject記錄多久之前Observable推送的數值。下面的例子中,咱們把回放數量設置爲100
,把窗口時間設置爲500
毫秒:
var subject = new Rx.ReplaySubject(100, 500 /* windowTime */); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); var i = 1; setInterval(() => subject.next(i++), 200); setTimeout(() => { subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 1000);
第二個Observer接受到3
(600ms), 4
(800ms) 和 5
(1000ms),這些值均在訂閱以前的500
毫秒內推送(窗口長度 1000ms - 600ms = 400ms < 500ms):
observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerA: 5 observerB: 3 observerB: 4 observerB: 5 observerA: 6 observerB: 6 ...
AsyncSubject是Subject的另一個衍生類,Observable僅會在執行完成後,推送執行環境中的最後一個值。
var subject = new Rx.AsyncSubject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(5); subject.complete();
輸出結果以下:
observerA: 5 observerB: 5
AsyncSubject 與 last()
操做符類似,等待完成通知後推送執行過程的最後一個值。