原文連接: blog.angularindepth.com/rxjs-how-to…html
本文爲 RxJS 中文社區 翻譯文章,如需轉載,請註明出處,謝謝合做!git
若是你也想和咱們一塊兒,翻譯更多優質的 RxJS 文章以奉獻給你們,請點擊【這裏】es6
在個人上篇文章 理解 publish 和 share 操做符中,只是簡單介紹了 refCount 方法。在這篇文章中咱們將深刻介紹。github
簡單回顧一下, RxJS 多播的基本心智模型包括: 一個源 observable,一個訂閱源 observable 的 subject 和多個訂閱 subject 的觀察者。multicast
操做符封裝了基於 subject 的基礎結構並返回擁有 connect
和 refCount
方法的 ConnectableObservable
。bash
顧名思義,refCount
返回的 observable 維護訂閱者的引用計數。函數
當觀察者訂閱有引用計數的 observable 時,引用計數會增長,若是上一個引用計數爲零的話,負責多播基礎結構的 subject 會訂閱源 observable 。而當觀察者取消訂閱時,引用計數則會減小,若是引用計數歸零的話,subject 會取消源 observable 的訂閱。工具
這種引用計數的行爲有兩種用途:post
咱們來詳細介紹每一種狀況,而後創建一些使用 refCount
的通用指南。ui
publish
操做符返回 ConnectableObservable
。調用 ConnectableObservable
的 connect
方法時,負責多播基礎結構的 subject 會訂閱源 observable 並返回 subscription (訂閱)。subject 會保持對源 observable 的訂閱直到調用 subscription 的 unsubscribe
方法。spa
咱們來看下面的示例,觀察者會接收一個值,而後(隱式地)取消對調用過 publish
的 observable 的訂閱:
const source = instrument(Observable.interval(100));
const published = source.publish();
const a = published.take(1).subscribe(observer("a"));
const b = published.take(1).subscribe(observer("b"));
const subscription = published.connect();
複製代碼
本文中的示例都將使用下面的工具函數來讓源 observable 具有日誌功能,以及建立有名稱的觀察者:
function instrument<T>(source: Observable<T>) {
return Observable.create((observer: Observer<T>) => {
console.log("source: subscribing");
const subscription = source
.do(value => console.log(`source: ${value}`))
.subscribe(observer);
return () => {
subscription.unsubscribe();
console.log("source: unsubscribed");
};
}) as Observable<T>;
}
function observer<T>(name: string) {
return {
next: (value: T) => console.log(`observer ${name}: ${value}`),
complete: () => console.log(`observer ${name}: complete`)
};
}
複製代碼
示例的輸出以下所示:
source: subscribing
source: 0
observer a: 0
observer a: complete
observer b: 0
observer b: complete
source: 1
source: 2
source: 3
...
複製代碼
兩個觀察者都只接收一個值而後完成,完成的同時取消對調用過 publish
的 observable 的訂閱。可是,多播基礎結構仍然保持着對源 observable 的訂閱。
若是不想顯示地執行取消訂閱操做的話,可使用 refCount
:
const source = instrument(Observable.interval(100));
const counted = source.publish().refCount();
const a = counted.take(1).subscribe(observer("a"));
const b = counted.take(1).subscribe(observer("b"));
複製代碼
觀察者訂閱使用引用計數的 observable 的話,當引用計數歸零時,負責多播的基礎結構的 subject 會取消源 observable 的訂閱,示例的輸出以下所示:
source: subscribing
source: 0
observer a: 0
observer a: complete
observer b: 0
observer b: complete
source: unsubscribed
複製代碼
當引用計數歸零後,多播的基礎結構除了取消源 observable 的訂閱,當負責引用計數的 observable 再次發生訂閱時,它還會從新訂閱源 observable 。
咱們使用下面的示例來看看當使用已完成的源 observable 時會發生什麼:
const source = instrument(Observable.timer(100));
const counted = source.publish().refCount();
const a = counted.subscribe(observer("a"));
setTimeout(() => a.unsubscribe(), 110);
setTimeout(() => counted.subscribe(observer("b")), 120);
複製代碼
示例中使用 timer
observable 做爲源。它會等待指定的毫秒數後發出 next
和 complete
通知。還有兩個觀察者: a
在源 observable 完成後訂閱,在源 observable 完成後取消訂閱;b
在 a
取消訂閱後訂閱。
示例的輸出以下:
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer a: complete
observer b: complete
複製代碼
當 b
訂閱時,引用計數爲零,因此多播的基礎結構會指望 subject 從新訂閱源 observable 。可是,因爲 subject 已經收到了源 observable 的 complete
通知,而且 subject 是沒法複用的,因此實際上並無進行從新訂閱,b
只能收到 complete
通知。
若是使用 publishBehavior(-1)
來代替 publish()
的話,輸出相似,但會包含 BehaviorSubject
的初始值:
observer a: -1
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer a: complete
observer b: complete
複製代碼
一樣的,b
仍是隻能收到 complete
通知。
若是使用 publishReplay(1)
來代替 publish()
的話,狀況會有些變化,輸出以下:
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer a: complete
observer b: 0
observer b: complete
複製代碼
一樣的,此次也沒有從新訂閱源 observable,由於 subject 已經完成了。可是,已完成的 ReplaySubject
將通知重放給後來的訂閱者,因此 b
能收到重放的 next
通知和 complete
通知。
若是使用 publishLast()
來代替 publish()
的話,狀況又會有些不一樣,輸出以下:
source: subscribing
source: 0
source: unsubscribed
observer a: 0
observer a: complete
observer b: 0
observer b: complete
複製代碼
一樣的,依然沒有從新訂閱源 observable,由於 subject 已經完成了。可是,AsyncSubject
會將最後收到的 next
通知發給它的訂閱者,因此 a
和 b
都收到的是 next
和 complete
通知。
綜上所述,根據示例咱們能夠發現 publish
以及它的變種:
publish
和 publishBehavior
與 refCount
一塊兒使用時,後來的訂閱者只會收到 complete
通知,這彷佛並非咱們想要的效果。publishReplay
和 publishLast
與 refCount
一塊兒使用時,後來的訂閱者會收到預期的通知。咱們已經看過了從新訂閱已完成的源 observable 時會發生什麼,如今咱們再來看看從新訂閱未完成的源 observable 是怎樣一個狀況。
這個示例中將使用 interval
observable 來替代 timer
observable,它會根據指定的時間間隔重複地發出包含自增數字的 next
通知:
const source = instrument(Observable.interval(100));
const counted = source.publish().refCount();
const a = counted.subscribe(observer("a"));
setTimeout(() => a.unsubscribe(), 110);
setTimeout(() => counted.subscribe(observer("b")), 120);
複製代碼
示例的輸出以下所示:
source: subscribing
source: 0
observer a: 0
source: unsubscribed
source: subscribing
source: 0
observer b: 0
source: 1
observer b: 1
...
複製代碼
與使用已完成的源 observable 的示例不一樣的是,負責多播基礎結構的 subject 可以被從新訂閱,因此源 observable 能夠產生新的訂閱。b
所收到的 next
通知即是從新訂閱的證據: 該通知包含數值0,由於從新訂閱已經開啓了全新的 interval
序列。
若是使用 publishBehavior(-1)
來代替 publish()
的話,狀況會有所不一樣,輸出以下所示:
observer a: -1
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer b: 0
source: subscribing
source: 0
observer b: 0
source: 1
observer b: 1
...
複製代碼
輸出是相似的,能夠清楚地看到從新訂閱開啓了全新的 interval
序列。可是,在收到 interval
的 next
通知前,a
還收到了包含 BehaviorSubject
初始值-1的 next
通知,b
會收到包含 BehaviorSubject
當前值0的 next
通知。
若是使用 publishReplay(1)
來代替 publish()
的話,狀況又會有所不一樣,輸出以下所示:
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer b: 0
source: subscribing
source: 0
observer b: 0
source: 1
observer b: 1
...
複製代碼
輸出也是相似的,能夠清楚地看到從新訂閱開啓了全新的 interval
序列。可是,b
在收到源 observable 的第一個 next
通知以前會收到重放的 next
通知。
綜上所述,根據示例咱們能夠發現,當對未完成的源 observable 使用 refCount
時,publish
、publishBehavior
和 publishReplay
的行爲都如預期通常,沒有讓人出乎意料之處。
在 RxJS 5.4.0 版本中引入了 shareReplay 操做符。它與 publishReplay().refCount()
十分類似,只是有一個細微的差異。
與 share
相似, shareReplay
傳給 multicast
操做符的也是 subject 的工廠函數。這意味着當從新訂閱源 observable 時,會使用工廠函數來建立出一個新的 subject 。可是,只有當前一個被訂閱 subject 未完成的狀況下,工廠函數纔會返回新的 subject 。
publishReplay
傳給 multicast
操做符的是 ReplaySubject
實例,而不是工廠函數,這是影響行爲不一樣的緣由。
對調用了 publishReplay().refCount()
的 observable 進行從新訂閱,subject 會一直重放它的可重放通知。可是,對調用了 shareReplay()
的 observable 進行從新訂閱,行爲未必如前者同樣,若是 subject 還未完成,會建立一個新的 subject 。因此區別在於,使用調用了 shareReplay()
的 observable 的話,當引用計數歸零時,若是 subject 還未完成的話,可重放的通知會被沖洗掉。
根據咱們看過的這些示例,能夠概括出以下使用準則:
refCount
能夠與 publish
及其變種一塊兒使用,從而自動地取消源 observable 的訂閱。refCount
來自動取消已完成的源 observable 的訂閱時,publishReplay
和 publishLast
的行爲會如預期同樣,可是,對於後來的訂閱,publish
和 publishBehavior
的行爲並沒太大幫助,因此你應該只使用 publish
和 publishBehavior
來自動取消訂閱。refCount
來自動取消未完成的源 observable 的訂閱時,publish
、publishBehavior
和 publishRelay
的行爲都會如預期同樣。shareReplay()
的行爲相似於 publishReplay().refCount()
,在對二者進行選擇時,應該根據在對源 observable 進行從新訂閱時,你是否想要衝洗掉可重放的通知。上面所描述的 shareReplay
的行爲只適用於 RxJS 5.5 以前的版本。在 5.5.0 beta 中,shareReplay
作出了變動: 當引用計數歸零時,操做符再也不取消源 observable 的訂閱。
這項變化當即使得引用計數變得多餘,由於只有當源 observable 完成或報錯時,源 observable 的訂閱纔會取消訂閱。這項變化也意味着只有在處理錯誤時,shareReplay
和 publishReplay().refCount()
纔有所不一樣:
publishReplay().refCount()
返回的 observable 的任何後來訂閱者都將收到錯誤。shareReplay
返回的 observable 的任何後來訂閱者都將產生一個源 observable 的新訂閱。===================================結尾分界線=================================
這是多播三連的最後一篇,也應該是年前更新的最後一篇,在這裏提早祝你們春節快樂,闔家歡樂,18年開開心心學 Rx 。
順便預告下,年後回來咱們會來兩篇實戰型的文章。