昨天咱們介紹完了各類 Subject,不曉得各位讀者還記不記得在一開始講到 Subject 時,是但願可以讓 Observable 有新訂閱時,能夠共用前一個訂閱的執行而不要從頭開始,以下面的例子javascript
var source = Rx.Observable.interval(1000).take(3);
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subject = new Rx.Subject()
subject.subscribe(observerA)
source.subscribe(subject);
setTimeout(() => {
subject.subscribe(observerB);
}, 1000);
// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"複製代碼
上面這段代碼咱們用 subject 訂閱了 source,再把 observerA 跟 observerB 一個個訂閱到 subject,這樣就可讓 observerA 跟 observerB 共用同一個執行。但這樣的寫法會讓代碼看起來太過複雜,咱們能夠用 Observable 的 multicast operator 來簡化這段代碼java
multicast 能夠用來掛載 subject 並回傳一個可連結(connectable)的 observable,以下併發
var source = Rx.Observable.interval(1000)
.take(3)
.multicast(new Rx.Subject());
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
source.subscribe(observerA); // subject.subscribe(observerA)
source.connect(); // source.subscribe(subject)
setTimeout(() => {
source.subscribe(observerB); // subject.subscribe(observerB)
}, 1000);複製代碼
JSBin | JSFiddleui
上面這段代碼咱們透過 multicast 來掛載一個 subject 以後這個 observable(source) 的訂閱其實都是訂閱到 subject 上。spa
source.subscribe(observerA); // subject.subscribe(observerA)複製代碼
必須真的等到 執行 connect()
後纔會真的用 subject 訂閱 source,並開始送出元素,若是沒有執行 connect()
observable 是不會真正執行的。code
source.connect();複製代碼
另外值得注意的是這裏要退訂的話,要把 connect()
回傳的 subscription 退訂纔會真正中止 observable 的執行,以下server
var source = Rx.Observable.interval(1000)
.do(x => console.log('send: ' + x))
.multicast(new Rx.Subject()); // 無限的 observable
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subscriptionA = source.subscribe(observerA);
var realSubscription = source.connect();
var subscriptionB;
setTimeout(() => {
subscriptionB = source.subscribe(observerB);
}, 1000);
setTimeout(() => {
subscriptionA.unsubscribe();
subscriptionB.unsubscribe();
// 這裏雖然 A 跟 B 都退訂了,但 source 還會繼續送元素
}, 5000);
setTimeout(() => {
realSubscription.unsubscribe();
// 這裏 source 纔會真正中止送元素
}, 7000);複製代碼
JSBin | JSFiddle對象
上面這段的代碼,必須等到 realSubscription.unsubscribe()
執行完,source 纔會真的結束。ip
雖然用了 multicast 感受會讓咱們處理的對象少一點,但必須搭配 connect 一塊兒使用仍是讓代碼有點複雜,一般咱們會但願有 observer 訂閱時,就當即執行併發送元素,而不要再多執行一個方法(connect),這時咱們就能夠用 refCount。get
refCount 必須搭配 multicast 一塊兒使用,他能夠創建一個只要有訂閱就會自動 connect 的 observable,範例以下
var source = Rx.Observable.interval(1000)
.do(x => console.log('send: ' + x))
.multicast(new Rx.Subject())
.refCount();
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subscriptionA = source.subscribe(observerA);
// 訂閱數 0 => 1
var subscriptionB;
setTimeout(() => {
subscriptionB = source.subscribe(observerB);
// 訂閱數 0 => 2
}, 1000);複製代碼
JSBin | JSFiddle
上面這段代碼,當 source 一被 observerA 訂閱時(訂閱數從 0 變成 1),就會當即執行併發送元素,咱們就不須要再額外執行 connect。
一樣的在退訂時只要訂閱數變成 0 就會自動中止發送
var source = Rx.Observable.interval(1000)
.do(x => console.log('send: ' + x))
.multicast(new Rx.Subject())
.refCount();
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subscriptionA = source.subscribe(observerA);
// 訂閱數 0 => 1
var subscriptionB;
setTimeout(() => {
subscriptionB = source.subscribe(observerB);
// 訂閱數 0 => 2
}, 1000);
setTimeout(() => {
subscriptionA.unsubscribe(); // 訂閱數 2 => 1
subscriptionB.unsubscribe(); // 訂閱數 1 => 0,source 中止發送元素
}, 5000);複製代碼
JSBin | JSFiddle
其實 multicast(new Rx.Subject())
很經常使用到,咱們有一個簡化的寫法那就是 publish,下面這兩段代碼是徹底等價的
var source = Rx.Observable.interval(1000)
.publish()
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.Subject())
// .refCount();複製代碼
加上 Subject 的三種變形
var source = Rx.Observable.interval(1000)
.publishReplay(1)
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.ReplaySubject(1))
// .refCount();複製代碼
var source = Rx.Observable.interval(1000)
.publishBehavior(0)
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.BehaviorSubject(0))
// .refCount();複製代碼
var source = Rx.Observable.interval(1000)
.publishLast()
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.AsyncSubject(1))
// .refCount();複製代碼
另外 publish + refCount 能夠在簡寫成 share
var source = Rx.Observable.interval(1000)
.share();
// var source = Rx.Observable.interval(1000)
// .publish()
// .refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.Subject())
// .refCount();複製代碼
今天主要講解了 multicast 和 refCount 兩個 operators 能夠幫助咱們既可能的簡化代碼,並同時達到組播的效果。最後介紹 publish 跟 share 幾個簡化寫法,這幾個簡化的寫法是比較常見的,在理解 multicast 跟 refCount 運做方式後就能直接套用到 publish 跟 share 上。
不知道今天讀者們有沒有收穫呢? 若是有任何問題歡迎在下方留言給我,謝謝^^