[譯]RxJS06——Subject

原文:http://reactivex.io/rxjs/manu...html

Subject是什麼? RxJS的Subject是Observable的一個特殊類型,他能夠將流中的值廣播給衆多觀察者(Observer)。
通常的Observalbe流是單一廣播制(每個訂閱流的Observer擁有一個獨立的執行過程)。react

一個Subject相似一道Observable數據流,可是能夠對多個Observer進行多點廣播。這就像事件觸發器(EventEmitter):維護了一個偵聽器的列表。this

每個Subject就是一個Observable流。 對於給定的Subject,你能夠訂閱它(subscribe),提供一個Observer,以後將會正常的接收傳遞來的數據。從Observer的角度來講,它是沒法分辨一個流中的值是來源於單一廣播機制的Observable流仍是一個Subject流。rest

在Subject內部,訂閱(subscribe)不會引發一個新的接收數據的過程。相似於其餘庫或語言中的註冊事件偵聽器(addListener),它會直接把給定的Observer放入到一個註冊列表中。code

每個Subject也是一個觀察者(Observer)。 擁有next(v)error(e)complete()方法。往Subject中填充數據,只須要調用next(theValue)便可,它將會把數據廣播給全部已註冊的Observer。 server

如下的例子中,咱們設定了2個訂閱Subject流的Observer,而後咱們填充一些數據到Subject:htm

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

獲得了以下輸出:rxjs

observerA: 1
observerB: 1
observerA: 2
observerB: 2

由於Subject是一個Observer,所以你也能夠將它做爲任何Observable的subscribe()的參數,訂閱這個Observable流,就像下面這樣:事件

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // You can subscribe providing a Subject

運行的結果:ip

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

在上面的方法中,咱們使用Subject將一個單點廣播的Observable流轉換爲多點廣播。這也佐證了,Subject是能夠將任何Observable流共享給多個Observer的惟一途徑。

除了Subject,還有一些衍生出的專門的Subject:BehaviorSubject,ReplaySubjectAsyncSubject

多路傳播的Observable流 Multicasted Observables

相比於只能推送消息給單個的Observer的「單路Observable流」,利用具備多個訂閱者的Subject,「多路傳播的Observable流」能夠有多個通知通道。

多路傳播的Observable在後臺經過使用Subject讓多個Observers可以從同一個Observable流中獲取數據。

在後臺,multicast操做符是這樣工做的:Obersver訂閱潛在的Subject,而Subject又訂閱了源Observable流。下面的例子和以前使用observable.subscribe(subject)的狀況相似:

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

multicast流返回了一個看似普通的Observable流,可是當訂閱的時候他表現的與Subject相似。這個流被稱做ConnectableObservable流,本質是一個Observable流,但擁有connect()方法。

connect()在內部執行了source.subscribe(subject),而且返回了一個你能夠取消Observable流執行的Subscription。所以,當可被共享的Observable流開始時,connect()方法對於精確的斷定執行過程很重要。

引用計數 Reference counting

手動的調用connect()和執行Subscription每每是很累人的。咱們固然但願能夠在第一個Observer訂閱的時候就自動的執行connect(),而且最好在最後一個Observer取消訂閱(unsubscribe)的時候能自動取消流的執行。

考慮一下,處於下列操做順序時的表現狀況:

  1. 第一個Observer訂閱了多路傳播的Observable流

  2. 多路傳播的Observable流呈被鏈接狀態

  3. 調用next()傳0給第一個Observer

  4. 第二個Observer訂閱多路傳播Observable流

  5. 調用next()傳1給第一個Observer

  6. 調用next()傳1給第二個Observer

  7. 第一個Observer取消訂閱

  8. 調用next()傳2給第二個Observer

  9. 第二個Observer取消訂閱

  10. 多路傳播Observable流的鏈接狀況是未被訂閱狀態

爲了顯式的調用connect()實現這個過程,咱們編寫以下代碼:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

若是咱們想避免顯式的調用connect(),咱們可使用ConnectableObservable的refCount()方法(引用計數),他返回了一個存有衆多訂閱者的Observable流。當訂閱者的數量從0增長到1時,將會自動調用connect(),開始共享流。
當訂閱者的數量從1變爲0,即將處於未訂閱狀態時,將會自動中止下一步的執行。

refCount使多路傳播Observable流在第一個訂閱者出現時自動啓動,在最後一個訂閱者離開時自動中止。

請看下面的例子:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

執行事後的輸出是:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

refCount()方法只存在於ConnectableObservable中,他返回一個Observable流,而不是另外一個ConnectableObservable流。

BehaviorSubject

BehaviorSubject是一類特異的Subject。具備返回「當前值」的特性。它存儲了流中最新的值並把它推送給本身的用戶,不論它的新舊與否,都可以當即收到推送的這個「當前值」。

BehaviorSubject 很是有利於表示「變化中的值」。舉例來講,每一年都有生日是一道Subject數據流,可是一我的的年齡倒是一個BehaviorSubject流。

來看下面的例子,BehaviorSubject以0爲值進行初始化,第一個訂閱的Observer將會直接收到這個值。當2被填充入流以後,第二個Observer訂閱流時,儘管時間較晚,也會收到最新值2。

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

輸出以下:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject

ReplaySubject 很像BehaviorSubject,他會把時間線中較老的值推送給新的訂閱者們,並且他還能夠記錄Observable流中一段時間的值。

ReplaySubject可以記錄Observable流中的多個值,並將它們推送給新的訂閱者。

建立ReplaySubject時,你能夠指定須要回放多少個值,像這樣:

var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

輸出以下:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

在設定數據量大小以外,你還能夠指定一個以毫秒爲單位的窗口時間,用來肯定記錄的數據所在的時間區間(數據有多老)。
在下面的例子中,咱們使用了一個較大的數據量設定,同時還設定了500毫秒的窗口時間。

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

運行結果顯示,第二個Observer在訂閱以後,得到了數據流中最後500毫秒事件內產生的3,4和5三個值。

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
/************/
observerB: 3
observerB: 4
observerB: 5
/************/
observerA: 6
observerB: 6
...

AsyncSubject

AsyncSubject是Subject的另外一個變化,他會在流發出complete通知時,將數據流中的最後一個值推送給全部訂閱流的Observer。

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

輸出爲:
With output:

observerA: 5
observerB: 5

AsyncSubject很是相似last()操做符,它會等待complete通知,並在那時推送流中的數據值。

相關文章
相關標籤/搜索