Subject是什麼? RxJS的Subject是Observable的一個特殊類型,他能夠將流中的值廣播給衆多觀察者(Observer)。
通常的Observalbe流是單一廣播制(每個訂閱流的Observer擁有一個獨立的執行過程)。react
一個Subject相似一道Observable數據流,可是能夠對多個Observer進行多點廣播。這就像事件觸發器(EventEmitter):維護了一個偵聽器的列表。this
每個Subject就是一個Observable流。 對於給定的Subject,你能夠訂閱它(subscribe
),提供一個Observer,以後將會正常的接收傳遞來的數據。從Observer的角度來講,它是沒法分辨一個流中的值是來源於單一廣播機制的Observable流仍是一個Subject流。rest
在Subject內部,訂閱(subscribe
)不會引發一個新的接收數據的過程。相似於其餘庫或語言中的註冊事件偵聽器(addListener
),它會直接把給定的Observer放入到一個註冊列表中。code
每個Subject也是一個觀察者(Observer
)。 擁有next(v)
、error(e)
和complete()
方法。往Subject中填充數據,只須要調用next(theValue)
便可,它將會把數據廣播給全部已註冊的Observer。 server
如下的例子中,咱們設定了2個訂閱Subject流的Observer,而後咱們填充一些數據到Subject:htm
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(1); subject.next(2);
獲得了以下輸出:rxjs
observerA: 1 observerB: 1 observerA: 2 observerB: 2
由於Subject是一個Observer,所以你也能夠將它做爲任何Observable的subscribe()
的參數,訂閱這個Observable流,就像下面這樣:事件
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); var observable = Rx.Observable.from([1, 2, 3]); observable.subscribe(subject); // You can subscribe providing a Subject
運行的結果:ip
observerA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
在上面的方法中,咱們使用Subject將一個單點廣播的Observable流轉換爲多點廣播。這也佐證了,Subject是能夠將任何Observable流共享給多個Observer的惟一途徑。
除了Subject,還有一些衍生出的專門的Subject:BehaviorSubject
,ReplaySubject
和AsyncSubject
。
相比於只能推送消息給單個的Observer的「單路Observable流」,利用具備多個訂閱者的Subject,「多路傳播的Observable流」能夠有多個通知通道。
多路傳播的Observable在後臺經過使用Subject讓多個Observers可以從同一個Observable流中獲取數據。
在後臺,multicast操做符是這樣工做的:Obersver訂閱潛在的Subject,而Subject又訂閱了源Observable流。下面的例子和以前使用observable.subscribe(subject)
的狀況相似:
var source = Rx.Observable.from([1, 2, 3]); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); // These are, under the hood, `subject.subscribe({...})`: multicasted.subscribe({ next: (v) => console.log('observerA: ' + v) }); multicasted.subscribe({ next: (v) => console.log('observerB: ' + v) }); // This is, under the hood, `source.subscribe(subject)`: multicasted.connect();
multicast流返回了一個看似普通的Observable流,可是當訂閱的時候他表現的與Subject相似。這個流被稱做ConnectableObservable
流,本質是一個Observable流,但擁有connect()
方法。
connect()
在內部執行了source.subscribe(subject)
,而且返回了一個你能夠取消Observable流執行的Subscription
。所以,當可被共享的Observable流開始時,connect()
方法對於精確的斷定執行過程很重要。
手動的調用connect()
和執行Subscription
每每是很累人的。咱們固然但願能夠在第一個Observer訂閱的時候就自動的執行connect()
,而且最好在最後一個Observer取消訂閱(unsubscribe)的時候能自動取消流的執行。
考慮一下,處於下列操做順序時的表現狀況:
第一個Observer訂閱了多路傳播的Observable流
多路傳播的Observable流呈被鏈接狀態
調用next()
傳0給第一個Observer
第二個Observer訂閱多路傳播Observable流
調用next()
傳1給第一個Observer
調用next()
傳1給第二個Observer
第一個Observer取消訂閱
調用next()
傳2給第二個Observer
第二個Observer取消訂閱
多路傳播Observable流的鏈接狀況是未被訂閱狀態
爲了顯式的調用connect()
實現這個過程,咱們編寫以下代碼:
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); var subscription1, subscription2, subscriptionConnect; subscription1 = multicasted.subscribe({ next: (v) => console.log('observerA: ' + v) }); // We should call `connect()` here, because the first // subscriber to `multicasted` is interested in consuming values subscriptionConnect = multicasted.connect(); setTimeout(() => { subscription2 = multicasted.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 600); setTimeout(() => { subscription1.unsubscribe(); }, 1200); // We should unsubscribe the shared Observable execution here, // because `multicasted` would have no more subscribers after this setTimeout(() => { subscription2.unsubscribe(); subscriptionConnect.unsubscribe(); // for the shared Observable execution }, 2000);
若是咱們想避免顯式的調用connect()
,咱們可使用ConnectableObservable的refCount()
方法(引用計數),他返回了一個存有衆多訂閱者的Observable流。當訂閱者的數量從0增長到1時,將會自動調用connect()
,開始共享流。
當訂閱者的數量從1變爲0,即將處於未訂閱狀態時,將會自動中止下一步的執行。
refCount
使多路傳播Observable流在第一個訂閱者出現時自動啓動,在最後一個訂閱者離開時自動中止。
請看下面的例子:
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var refCounted = source.multicast(subject).refCount(); var subscription1, subscription2, subscriptionConnect; // This calls `connect()`, because // it is the first subscriber to `refCounted` console.log('observerA subscribed'); subscription1 = refCounted.subscribe({ next: (v) => console.log('observerA: ' + v) }); setTimeout(() => { console.log('observerB subscribed'); subscription2 = refCounted.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 600); setTimeout(() => { console.log('observerA unsubscribed'); subscription1.unsubscribe(); }, 1200); // This is when the shared Observable execution will stop, because // `refCounted` would have no more subscribers after this setTimeout(() => { console.log('observerB unsubscribed'); subscription2.unsubscribe(); }, 2000);
執行事後的輸出是:
observerA subscribed observerA: 0 observerB subscribed observerA: 1 observerB: 1 observerA unsubscribed observerB: 2 observerB unsubscribed
refCount()
方法只存在於ConnectableObservable
中,他返回一個Observable流,而不是另外一個ConnectableObservable流。
BehaviorSubject
是一類特異的Subject。具備返回「當前值」的特性。它存儲了流中最新的值並把它推送給本身的用戶,不論它的新舊與否,都可以當即收到推送的這個「當前值」。
BehaviorSubject 很是有利於表示「變化中的值」。舉例來講,每一年都有生日是一道Subject數據流,可是一我的的年齡倒是一個BehaviorSubject流。
來看下面的例子,BehaviorSubject以0爲值進行初始化,第一個訂閱的Observer將會直接收到這個值。當2被填充入流以後,第二個Observer訂閱流時,儘管時間較晚,也會收到最新值2。
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(3);
輸出以下:
observerA: 0 observerA: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
ReplaySubject 很像BehaviorSubject,他會把時間線中較老的值推送給新的訂閱者們,並且他還能夠記錄Observable流中一段時間的值。
ReplaySubject可以記錄Observable流中的多個值,並將它們推送給新的訂閱者。
建立ReplaySubject時,你能夠指定須要回放多少個值,像這樣:
var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(5);
輸出以下:
observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerB: 2 observerB: 3 observerB: 4 observerA: 5 observerB: 5
在設定數據量大小以外,你還能夠指定一個以毫秒爲單位的窗口時間,用來肯定記錄的數據所在的時間區間(數據有多老)。
在下面的例子中,咱們使用了一個較大的數據量設定,同時還設定了500毫秒的窗口時間。
var subject = new Rx.ReplaySubject(100, 500 /* windowTime */); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); var i = 1; setInterval(() => subject.next(i++), 200); setTimeout(() => { subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); }, 1000);
運行結果顯示,第二個Observer在訂閱以後,得到了數據流中最後500毫秒事件內產生的3,4和5三個值。
observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerA: 5 /************/ observerB: 3 observerB: 4 observerB: 5 /************/ observerA: 6 observerB: 6 ...
AsyncSubject是Subject的另外一個變化,他會在流發出complete
通知時,將數據流中的最後一個值推送給全部訂閱流的Observer。
var subject = new Rx.AsyncSubject(); subject.subscribe({ next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log('observerB: ' + v) }); subject.next(5); subject.complete();
輸出爲:
With output:
observerA: 5 observerB: 5
AsyncSubject很是相似last()
操做符,它會等待complete
通知,並在那時推送流中的數據值。