原文連接: medium.com/@benlesh/ho…
本文爲 RxJS 中文社區 翻譯文章,如需轉載,請註明出處,謝謝合做!
若是你也想和咱們一塊兒,翻譯更多優質的 RxJS 文章以奉獻給你們,請點擊【這裏】javascript
TL;DR: 當不想一遍又一遍地建立生產者( producer )時,你須要熱的 Observable 。java
// 冷的
var cold = new Observable((observer) => {
var producer = new Producer();
// observer 會監聽 producer
});複製代碼
// 熱的
var producer = new Producer();
var hot = new Observable((observer) => {
// observer 會監聽 producer
});複製代碼
我最新的文章經過構建 Observable 來學習 Observable 主要是爲了說明 Observable 只是函數。這篇文章的目標是爲了揭開 Observable 自身的神祕面紗,但它並無真正深刻到 Observable 讓初學者最容易困惑的問題: 「熱」與「冷」的概念。git
Observables 是將觀察者和生產者聯繫起來的函數。 僅此而已。它們並不必定要創建生產者,它們只需創建觀察者來監聽生產者,而且一般會返回一個拆卸機制來刪除該監聽器。github
生產者是 Observable 值的來源。它能夠是 Web Socket、DOM 事件、迭代器或在數組中循環的某種東西。基本上,這是你用來獲取值的任何東西,並將它們傳遞給 observe.next(value)
。設計模式
若是底層的生產者是在訂閱期間建立並激活的,那麼 Observable 就是「冷的」。這意味着,若是 Observables 是函數,而生產者是經過調用該函數建立並激活的。數組
下面的示例 Observable 是「冷的」,由於它在訂閱函數(在訂閱該 Observable 時調用)中建立並監聽了 WebSocket :socket
const source = new Observable((observer) => {
const socket = new WebSocket('ws://someurl');
socket.addEventListener('message', (e) => observer.next(e));
return () => socket.close();
});複製代碼
因此任何 source
的訂閱都會獲得本身的 WebSocket 實例,當取消訂閱時,它會關閉 socket 。這意味着 source
是真正的單播,由於生產者只會發送給一個觀察者。這是用來闡述概念的基礎 JSBin 示例。函數
若是底層的生產者是在 訂閱¹ 外部建立或激活的,那麼 Observable 就是「熱的」。學習
若是咱們沿用上面的示例並將 WebSocket 的建立移至 Observable 的外部,那麼 Observable 就會變成「熱的」:ui
const socket = new WebSocket('ws://someurl');
const source = new Observable((observer) => {
socket.addEventListener('message', (e) => observer.next(e));
});複製代碼
如今任何 source
的訂閱都會共享同一個 WebSocket 實例。它實際上會多播給全部訂閱者。但還有個小問題: 咱們再也不使用 Observable 來運行拆卸 socket 的邏輯。這意味着像錯誤和完成這樣的通知再也不會爲咱們來關閉 socket ,取消訂閱也同樣。因此咱們真正想要的實際上是使「冷的」 Observable 變成「熱的」。這是用來展現基礎概念的 JSBin 示例。
從上面展現冷的 Observable 的第一個示例中,你能夠發現全部冷的 Observables 可能都會些問題。就拿一件事來講,若是你不止一次訂閱了 Observable ,而這個 Observable 自己建立一些稀缺的資源,好比 WebSocket 鏈接,你不想一遍又一遍地建立這個 WebSocket 鏈接。實際上真的很容易建立了一個 Observable 的多個訂閱而卻沒有意識到。假如說你想要在 WebSocket 訂閱外部過濾全部的「奇數」和「偶數」值。在此場景下最終你會建立兩個訂閱:
source.filter(x => x % 2 === 0)
.subscribe(x => console.log('even', x));
source.filter(x => x % 2 === 1)
.subscribe(x => console.log('odd', x));複製代碼
在將「冷的」 Observable 變成「熱的」以前,咱們須要介紹一個新的類型: Rx Subject 。它有以下特性:
next
的任何值。subscribe()
傳遞給它的 Observers 都會被添加到內部的觀察者列表。next
值給它,值會從它 observable 那面出來。Rx 中的 Subject 之因此叫作 「Subject」 是由於上面的第3點。在 GoF (譯註: 大名鼎鼎的《設計模式》一書) 的觀察者模式中,「Subjects」 一般是有 addObserver
的類。在這裏,咱們的 addObserver
方法就是 subscribe
。這是用來展現 Rx Subject 的基礎行爲的 JSBin 示例。
瞭解了上面的 Rx Subject 後,咱們可使用一些功能性的程序將任何「冷的」 Observable 變成「熱的」:
function makeHot(cold) {
const subject = new Subject();
cold.subscribe(subject);
return new Observable((observer) => subject.subscribe(observer));
}複製代碼
咱們的新方法 makeHot
接收任何冷的 Observable 並經過建立由所獲得的 Observable 共享的 Subject 將其變成熱的。這是用來演示 JSBin 示例。
還有一點問題,就是沒有追蹤源的訂閱,因此當想要拆卸時該如何作呢?咱們能夠添加一些引用計數來解決這個問題:
function makeHotRefCounted(cold) {
const subject = new Subject();
const mainSub = cold.subscribe(subject);
let refs = 0;
return new Observable((observer) => {
refs++;
let sub = subject.subscribe(observer);
return () => {
refs--;
if (refs === 0) mainSub.unsubscribe();
sub.unsubscribe();
};
});
}複製代碼
如今咱們有了一個熱的 Observable ,當它的全部訂閱結束時,咱們用來引用計數的 refs
會變成0,咱們將取消冷的源 Observable 的訂閱。這是用來演示的 JSBin 示例。
publish()
或 share()
你可能不該該使用上面說起的任何 makeHot
函數,而是應該使用像 publish()
和 share()
這樣的操做符。將冷的 Observable 變成熱的有不少種方式和手段,在 Rx 中有高效和簡潔的方式來完成此任務。關於 Rx 中能夠作此事的各類操做符能夠寫上一整篇文章,但這不是文本的目標。本文的目標是鞏固概念,什麼是「熱的」和「冷的」 Observable 以及它們的真正意義。
在 RxJS 5 中,操做符 share()
會產生一個熱的,引用計數的 Observable ,它能夠在失敗時重試,或在成功時重複執行。由於 Subjects 一旦發生錯誤、完成或取消訂閱,便沒法複用,因此 share()
操做符會重複利用已完結的 Subjects,以使結果 Observable 啓用從新訂閱。
這是 JSBin 示例,演示了在 RxJS 5 中使用 share()
將源 Observable 變熱,以及它能夠重試。
鑑於上述一切,人們可以看到 Observable 是怎樣的,它只是一個函數,實際上能夠同時是「熱的」和「冷的」。或許它觀察了兩個生產者?一個是它建立的而另外一個是它複用的?這可能不是個好主意,但有極少數狀況下多是必要的。例如,多路複用的 WebSocket 必須共享 socket ,但同時發送本身的訂閱並過濾出數據流。
若是在 Observable 中複用了生產者的共享引用,它就是「熱的」,若是在 Observable 中建立了新的生產者,它就是「冷的」。若是二者都作了…。那它究竟是什麼?我猜是「暖的」。
¹ (注意: 生產者在訂閱內部「激活」,直到將來某個時間點才「建立」出來,這種作法是有些奇怪,但使用代理的話,這也是可能的。) 一般「熱的」 Observables 的生產者是在訂閱外部建立和激活的。
² 熱的 Observables 一般是多播的,可是它們可能正在監聽一個只支持一個監聽器的生產者。在這一點上將其稱之爲「多播」有點勉強。