[譯] RxJS: 如何使用 refCount

原文連接: blog.angularindepth.com/rxjs-how-to…html

本文爲 RxJS 中文社區 翻譯文章,如需轉載,請註明出處,謝謝合做!git

若是你也想和咱們一塊兒,翻譯更多優質的 RxJS 文章以奉獻給你們,請點擊【這裏】es6

在個人上篇文章 理解 publish 和 share 操做符中,只是簡單介紹了 refCount 方法。在這篇文章中咱們將深刻介紹。github

refCount 的做用是什麼?

簡單回顧一下, RxJS 多播的基本心智模型包括: 一個源 observable,一個訂閱源 observable 的 subject 和多個訂閱 subject 的觀察者。multicast 操做符封裝了基於 subject 的基礎結構並返回擁有 connectrefCount 方法的 ConnectableObservablebash

顧名思義,refCount 返回的 observable 維護訂閱者的引用計數。函數

當觀察者訂閱有引用計數的 observable 時,引用計數會增長,若是上一個引用計數爲零的話,負責多播基礎結構的 subject 會訂閱源 observable 。而當觀察者取消訂閱時,引用計數則會減小,若是引用計數歸零的話,subject 會取消源 observable 的訂閱。工具

這種引用計數的行爲有兩種用途:post

  • 當全部觀察者都取消訂閱後,自動取消 subject 對源 observable 的訂閱
  • 當全部觀察者都取消訂閱後,自動取消 subject 對源 observable 的訂閱,而後當再有觀察者訂閱該引用計數的 observable 時,subject 從新訂閱源 observable

咱們來詳細介紹每一種狀況,而後創建一些使用 refCount 的通用指南。ui

使用 refCount 自動取消訂閱

publish 操做符返回 ConnectableObservable 。調用 ConnectableObservableconnect 方法時,負責多播基礎結構的 subject 會訂閱源 observable 並返回 subscription (訂閱)。subject 會保持對源 observable 的訂閱直到調用 subscription 的 unsubscribe 方法。spa

咱們來看下面的示例,觀察者會接收一個值,而後(隱式地)取消對調用過 publish 的 observable 的訂閱:

const source = instrument(Observable.interval(100));
const published = source.publish();
const a = published.take(1).subscribe(observer("a"));
const b = published.take(1).subscribe(observer("b"));
const subscription = published.connect();
複製代碼

本文中的示例都將使用下面的工具函數來讓源 observable 具有日誌功能,以及建立有名稱的觀察者:

function instrument<T>(source: Observable<T>) {
  return Observable.create((observer: Observer<T>) => {
    console.log("source: subscribing");
    const subscription = source
      .do(value => console.log(`source: ${value}`))
      .subscribe(observer);
    return () => {
      subscription.unsubscribe();
      console.log("source: unsubscribed");
    };
  }) as Observable<T>;
}

function observer<T>(name: string) {
  return {
    next: (value: T) => console.log(`observer ${name}: ${value}`),
    complete: () => console.log(`observer ${name}: complete`)
  };
}
複製代碼

示例的輸出以下所示:

source: subscribing
source: 0
observer a: 0
observer a: complete
observer b: 0
observer b: complete
source: 1
source: 2
source: 3
...
複製代碼

兩個觀察者都只接收一個值而後完成,完成的同時取消對調用過 publish 的 observable 的訂閱。可是,多播基礎結構仍然保持着對源 observable 的訂閱。

若是不想顯示地執行取消訂閱操做的話,可使用 refCount:

const source = instrument(Observable.interval(100));
const counted = source.publish().refCount();
const a = counted.take(1).subscribe(observer("a"));
const b = counted.take(1).subscribe(observer("b"));
複製代碼

觀察者訂閱使用引用計數的 observable 的話,當引用計數歸零時,負責多播的基礎結構的 subject 會取消源 observable 的訂閱,示例的輸出以下所示:

source: subscribing
source: 0
observer a: 0
observer a: complete
observer b: 0
observer b: complete
source: unsubscribed
複製代碼

從新訂閱已完成的 observables

當引用計數歸零後,多播的基礎結構除了取消源 observable 的訂閱,當負責引用計數的 observable 再次發生訂閱時,它還會從新訂閱源 observable 。

咱們使用下面的示例來看看當使用已完成的源 observable 時會發生什麼:

const source = instrument(Observable.timer(100));
const counted = source.publish().refCount();
const a = counted.subscribe(observer("a"));
setTimeout(() => a.unsubscribe(), 110);
setTimeout(() => counted.subscribe(observer("b")), 120);
複製代碼

示例中使用 timer observable 做爲源。它會等待指定的毫秒數後發出 nextcomplete 通知。還有兩個觀察者: a 在源 observable 完成後訂閱,在源 observable 完成後取消訂閱;ba 取消訂閱後訂閱。

示例的輸出以下:

source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer a: complete
observer b: complete
複製代碼

b 訂閱時,引用計數爲零,因此多播的基礎結構會指望 subject 從新訂閱源 observable 。可是,因爲 subject 已經收到了源 observable 的 complete 通知,而且 subject 是沒法複用的,因此實際上並無進行從新訂閱,b 只能收到 complete 通知。

若是使用 publishBehavior(-1) 來代替 publish() 的話,輸出相似,但會包含 BehaviorSubject 的初始值:

observer a: -1
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer a: complete
observer b: complete
複製代碼

一樣的,b 仍是隻能收到 complete 通知。

若是使用 publishReplay(1) 來代替 publish() 的話,狀況會有些變化,輸出以下:

source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer a: complete
observer b: 0
observer b: complete
複製代碼

一樣的,此次也沒有從新訂閱源 observable,由於 subject 已經完成了。可是,已完成的 ReplaySubject 將通知重放給後來的訂閱者,因此 b 能收到重放的 next 通知和 complete 通知。

若是使用 publishLast() 來代替 publish() 的話,狀況又會有些不一樣,輸出以下:

source: subscribing
source: 0
source: unsubscribed
observer a: 0
observer a: complete
observer b: 0
observer b: complete
複製代碼

一樣的,依然沒有從新訂閱源 observable,由於 subject 已經完成了。可是,AsyncSubject 會將最後收到的 next 通知發給它的訂閱者,因此 ab 都收到的是 nextcomplete 通知。

綜上所述,根據示例咱們能夠發現 publish 以及它的變種:

  • 當源 observable 完成時,負責多播基礎結構的 subject 也會完成,並且這會阻止對源 observable 的從新訂閱。
  • publishpublishBehaviorrefCount 一塊兒使用時,後來的訂閱者只會收到 complete 通知,這彷佛並非咱們想要的效果。
  • publishReplaypublishLastrefCount 一塊兒使用時,後來的訂閱者會收到預期的通知。

從新訂閱未完成的 observables

咱們已經看過了從新訂閱已完成的源 observable 時會發生什麼,如今咱們再來看看從新訂閱未完成的源 observable 是怎樣一個狀況。

這個示例中將使用 interval observable 來替代 timer observable,它會根據指定的時間間隔重複地發出包含自增數字的 next 通知:

const source = instrument(Observable.interval(100));
const counted = source.publish().refCount();
const a = counted.subscribe(observer("a"));
setTimeout(() => a.unsubscribe(), 110);
setTimeout(() => counted.subscribe(observer("b")), 120);
複製代碼

示例的輸出以下所示:

source: subscribing
source: 0
observer a: 0
source: unsubscribed
source: subscribing
source: 0
observer b: 0
source: 1
observer b: 1
...
複製代碼

與使用已完成的源 observable 的示例不一樣的是,負責多播基礎結構的 subject 可以被從新訂閱,因此源 observable 能夠產生新的訂閱。b 所收到的 next 通知即是從新訂閱的證據: 該通知包含數值0,由於從新訂閱已經開啓了全新的 interval 序列。

若是使用 publishBehavior(-1) 來代替 publish() 的話,狀況會有所不一樣,輸出以下所示:

observer a: -1
source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer b: 0
source: subscribing
source: 0
observer b: 0
source: 1
observer b: 1
...
複製代碼

輸出是相似的,能夠清楚地看到從新訂閱開啓了全新的 interval 序列。可是,在收到 intervalnext 通知前,a 還收到了包含 BehaviorSubject 初始值-1的 next 通知,b 會收到包含 BehaviorSubject 當前值0的 next 通知。

若是使用 publishReplay(1) 來代替 publish() 的話,狀況又會有所不一樣,輸出以下所示:

source: subscribing
source: 0
observer a: 0
source: unsubscribed
observer b: 0
source: subscribing
source: 0
observer b: 0
source: 1
observer b: 1
...
複製代碼

輸出也是相似的,能夠清楚地看到從新訂閱開啓了全新的 interval 序列。可是,b 在收到源 observable 的第一個 next 通知以前會收到重放的 next 通知。

綜上所述,根據示例咱們能夠發現,當對未完成的源 observable 使用 refCount 時,publishpublishBehaviorpublishReplay 的行爲都如預期通常,沒有讓人出乎意料之處。

shareReplay 的做用是什麼?

在 RxJS 5.4.0 版本中引入了 shareReplay 操做符。它與 publishReplay().refCount() 十分類似,只是有一個細微的差異。

share 相似, shareReplay 傳給 multicast 操做符的也是 subject 的工廠函數。這意味着當從新訂閱源 observable 時,會使用工廠函數來建立出一個新的 subject 。可是,只有當前一個被訂閱 subject 未完成的狀況下,工廠函數纔會返回新的 subject 。

publishReplay 傳給 multicast 操做符的是 ReplaySubject 實例,而不是工廠函數,這是影響行爲不一樣的緣由。

對調用了 publishReplay().refCount() 的 observable 進行從新訂閱,subject 會一直重放它的可重放通知。可是,對調用了 shareReplay() 的 observable 進行從新訂閱,行爲未必如前者同樣,若是 subject 還未完成,會建立一個新的 subject 。因此區別在於,使用調用了 shareReplay() 的 observable 的話,當引用計數歸零時,若是 subject 還未完成的話,可重放的通知會被沖洗掉。

不徹底使用準則

根據咱們看過的這些示例,能夠概括出以下使用準則:

  • refCount 能夠與 publish 及其變種一塊兒使用,從而自動地取消源 observable 的訂閱。
  • 當使用 refCount 來自動取消已完成的源 observable 的訂閱時,publishReplaypublishLast 的行爲會如預期同樣,可是,對於後來的訂閱,publishpublishBehavior 的行爲並沒太大幫助,因此你應該只使用 publishpublishBehavior 來自動取消訂閱。
  • 當使用 refCount 來自動取消未完成的源 observable 的訂閱時,publishpublishBehaviorpublishRelay 的行爲都會如預期同樣。
  • shareReplay() 的行爲相似於 publishReplay().refCount(),在對二者進行選擇時,應該根據在對源 observable 進行從新訂閱時,你是否想要衝洗掉可重放的通知。

上面所描述的 shareReplay 的行爲只適用於 RxJS 5.5 以前的版本。在 5.5.0 beta 中,shareReplay 作出了變動: 當引用計數歸零時,操做符再也不取消源 observable 的訂閱。

這項變化當即使得引用計數變得多餘,由於只有當源 observable 完成或報錯時,源 observable 的訂閱纔會取消訂閱。這項變化也意味着只有在處理錯誤時,shareReplaypublishReplay().refCount() 纔有所不一樣:

  • 若是源 observable 報錯,publishReplay().refCount() 返回的 observable 的任何後來訂閱者都將收到錯誤。
  • 可是,shareReplay 返回的 observable 的任何後來訂閱者都將產生一個源 observable 的新訂閱。

===================================結尾分界線=================================

這是多播三連的最後一篇,也應該是年前更新的最後一篇,在這裏提早祝你們春節快樂,闔家歡樂,18年開開心心學 Rx 。

順便預告下,年後回來咱們會來兩篇實戰型的文章。

相關文章
相關標籤/搜索