原文連接: blog.angularindepth.com/rxjs-unders…html
本文爲 RxJS 中文社區 翻譯文章,如需轉載,請註明出處,謝謝合做!git
若是你也想和咱們一塊兒,翻譯更多優質的 RxJS 文章以奉獻給你們,請點擊【這裏】es6
照片取自 Unsplash,做者 Kimberly Farmer 。github
我常常會被問及 publish
操做符的相關問題:bash
publish 和 share 之間的區別是什麼?dom
如何導入 refCount 操做符?函數
什麼時候使用 AsyncSubject?post
咱們來解答這些問題,並讓你瞭解到更多內容,首先從基礎入手。ui
多播是一個術語,它用來描述由單個 observable 發出的每一個通知會被多個觀察者所接收的狀況。一個 observable 是否具有多播的能力取決於它是熱的仍是冷的。spa
熱的和冷的 observable 的特徵在於 observable 通知的生產者是在哪建立的。在 Ben Lesh 的 熱的 Vs 冷的 Observables 一文中,他詳細討論了二者間的差別,這些差別能夠概括以下:
timer
observable 就是冷的,每次訂閱時都會建立一個新的定時器。fromEvent
建立的 observable 就是熱的,產生事件的元素存在於 DOM 之中,它不是觀察者訂閱時所建立的。冷的 observables 是單播的,每一個觀察者所接收到的通知都是來自不一樣的生產者,生產者是觀察者訂閱時所建立的。
熱的 observables 是多播的,每一個觀察者所接收到的通知都是來自同一個生產者。
有些時候,須要冷的 observable 具備多播的行爲,RxJS 引入了 Subject
類使之成爲可能。
Subject 便是 observable,又是 observer (觀察者)。經過使用觀察者來訂閱 subject,而後 subject 再訂閱冷的 observable,可讓冷的 observable 變成熱的。這是 RxJS 引入 subjects 的主要用途,在 Ben Lesh 的 關於 RxJS 中的 Subject 一文中,他指出:
多播是 RxJS 中 Subjects 的主要用法。
咱們來看下面的示例:
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
const source = Observable.defer(() => Observable.of(
Math.floor(Math.random() * 100)
));
function observer(name: string) {
return {
next: (value: number) => console.log(`observer ${name}: ${value}`),
complete: () => console.log(`observer ${name}: complete`)
};
}
const subject = new Subject<number>();
subject.subscribe(observer("a"));
subject.subscribe(observer("b"));
source.subscribe(subject);
複製代碼
示例中的 source
是冷的。每次觀察者訂閱 source
時,傳給 defer
的工廠函數會建立一個發出隨機數後完成的 observable 。
要讓 source
變成多播的,須要觀察者訂閱 subject,而後 subject 再訂閱 source
。source
只會看到一個訂閱 ( subscription ),它也只生成一個包含隨機數的 next
通知和一個 complete
通知。Subject 會將這些通知發送給它的觀察者,輸出以下所示:
observer a: 42
observer b: 42
observer a: complete
observer b: complete
複製代碼
此示例能夠做爲 RxJS 多播的基本心智模型: 一個源 observable,一個訂閱源 observable 的 subject 和多個訂閱 subject 的觀察者。
RxJS 引入了 multicast
操做符,它能夠應用於 observable ,使其變成熱的。此操做符封裝了 subject 用於多播 observable 時所涉及的基礎結構。
在看 multicast
操做符以前,咱們使用一個簡單實現的 multicast
函數來替代上面示例中的 subject :
function multicast<T>(source: Observable<T>) {
const subject = new Subject<T>();
source.subscribe(subject);
return subject;
}
const m = multicast(source);
m.subscribe(observer("a"));
m.subscribe(observer("b"));
複製代碼
代碼改變後,示例的輸出以下:
observer a: complete
observer b: complete
複製代碼
這並非咱們想要的結果。在函數內部訂閱 subject 使得 subject 在被觀察者訂閱以前就已經收到了 next
和 complete
通知,因此觀察者只能收到 complete
通知。
這是可避免的,任何鏈接多播基礎結構的函數的調用者須要可以在 subject 訂閱源 observable 時進行控制。RxJS 的 multicast
操做符經過返回一個特殊的 observable 類型 ConnectableObservable 來實現的。
ConnectableObservable 封裝了多播的基礎結構,但它不會當即訂閱源 observable ,只有當它的 connect
方法調用時,它纔會訂閱源 observable 。
咱們來使用 multicast
操做符:
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/multicast";
const source = Observable.defer(() => Observable.of(
Math.floor(Math.random() * 100)
));
function observer(name: string) {
return {
next: (value: number) => console.log(`observer ${name}: ${value}`),
complete: () => console.log(`observer ${name}: complete`)
};
}
const m = source.multicast(new Subject<number>());
m.subscribe(observer("a"));
m.subscribe(observer("b"));
m.connect();
複製代碼
代碼改變後,如今觀察者能夠收到 next
通知了:
observer a: 54
observer b: 54
observer a: complete
observer b: complete
複製代碼
調用 connect
時,傳入 multicast
操做符的 subject 會訂閱源 observable,而 subject 的觀察者會收到多播通知,這正符合 RxJS 多播的基本心智模型。
ConnectableObservable 還有另一個方法 refCount
,它能夠用來肯定源 observable 什麼時候產生了訂閱。
refCount
看上去就像是操做符,也就是說,它是在 observable 上調用的方法而且返回另外一個 observable,可是它只是 ConnectableObservable
的方法並且不須要導入。顧名思義,refCount
返回 observable, 它負責維護已產生的訂閱的引用計數。
當觀察者訂閱負責引用計數的 observable 時,引用計數會增長,若是前一個引用計數爲0的話,負責多播基礎結構的 subject 會訂閱源 observable 。當觀察者取消訂閱時,引用計數會減小,若是引用計數歸零的話,subject 會取消對源 observable 的訂閱。
咱們來使用 refCount
:
const m = source.multicast(new Subject<number>()).refCount();
m.subscribe(observer("a"));
m.subscribe(observer("b"));
複製代碼
代碼改變後,輸出以下所示:
observer a: 42
observer a: complete
observer b: complete
複製代碼
只有第一個觀察者收到了 next
通知。咱們來看看緣由。
示例中的源 observable 會當即發出通知。也就是說,一旦訂閱了,源 observable 就會發出 next
和 complete
通知,complete
通知致使在第二個觀察者訂閱以前第一個就取消了訂閱。當第一個取消訂閱時,引用計數會歸零,因此負責多播基礎結構的 subject 也會取消源 observable 的訂閱。
當第二個觀察者訂閱時,subject 會再次訂閱源 observable,但因爲 subject 已經收到了 complete
通知,因此它沒法被重用。
向 multicast
傳入 subject 的工廠函數能夠解決此問題:
const m = source.multicast(() => new Subject<number>()).refCount();
m.subscribe(observer("a"));
m.subscribe(observer("b"));
複製代碼
代碼改變後,每次源 observable 被訂閱時,都會建立一個新的 subject,輸出以下所示:
observer a: 42
observer a: complete
observer b: 54
observer b: complete
複製代碼
由於源 observable 會當即發出通知,因此觀察者收到的通知是分開的。將 source
進行修改,以便延遲通知:
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/delay";
import "rxjs/add/operator/multicast";
const source = Observable.defer(() => Observable.of(
Math.floor(Math.random() * 100)
)).delay(0);
複製代碼
觀察者依然會收到多播通知,輸出以下所示:
observer a: 42
observer b: 42
observer a: complete
observer b: complete
複製代碼
總結一下,上述示例展現了 multicast
操做符的如下特色:
connect
方法以用於肯定源 observable 什麼時候產生了訂閱;refCount
方法以用於自動管理源 observable 的訂閱;refCount
,必須傳入 Subject
的工廠函數,而不是 Subject
實例;接下來咱們來看 publish
和 share
操做符,以及 publish
的變種,看看它們是如何在 multicast
操做符所提供的基礎之上創建的。
咱們經過下面的示例來看看 publish
操做符:
import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/delay";
import "rxjs/add/operator/publish";
function random() {
return Math.floor(Math.random() * 100);
}
const source = Observable.concat(
Observable.defer(() => Observable.of(random())),
Observable.defer(() => Observable.of(random())).delay(1)
);
function observer(name: string) {
return {
next: (value: number) => console.log(`observer ${name}: ${value}`),
complete: () => console.log(`observer ${name}: complete`)
};
}
const p = source.publish();
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout(() => p.subscribe(observer("c")), 10);
複製代碼
示例中的源 observable 會當即發出一個隨機數,通過短暫的延遲後發出另外一個隨機數,而後完成。這個示例可讓咱們看到訂閱者在 connect
調用前、connect
調用後以及調用過 publish 的 observable 完成後訂閱分別會發生什麼。
publish
操做符是對 multicast
操做符進行了一層薄薄的封裝。它會調用 multicast
並傳入 Subject
。
示例的輸出以下所示:
observer a: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: complete
複製代碼
觀察者收到的通知可概括以下:
a
是在 connect
調用前訂閱的,因此它能收到兩個 next
通知和 complete
通知。b
是在 connect
調用後訂閱的,此時第一個當即發送的 next
通知已經發出過了,因此它只能收到第二個 next
通知和 complete
通知。c
是在源 observable 完成後訂閱的,因此它只能收到 complete
通知。使用 refCount
來代替 connect
:
const p = source.publish().refCount();
p.subscribe(observer("a"));
p.subscribe(observer("b"));
setTimeout(() => p.subscribe(observer("c")), 10);
複製代碼
示例的輸出以下所示:
observer a: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: complete
複製代碼
輸出跟使用 connect
時的相似。這是爲何?
b
沒有收到第一個 next
通知是由於源 observable 的第一個 next
通知是當即發出的,因此只有 a
能收到。
c
是在調用過 publish 的 observable 完成後訂閱的,因此訂閱的引用計數已是0,此時將會再生成一個訂閱。可是,publish
傳給 multicast
的是 subject,而不是工廠函數,由於 subjects 沒法被複用,因此 c
只能收到 complete
通知。
publish
和 multicast
操做符都接受一個可選的 selector
函數,若是指定了此函數,操做符的行爲將會有很大的不一樣。這將在另外一篇文章 multicast 操做符的祕密中詳細介紹。
publish
操做符有幾個變種,它們都以一種相似的方式對 multicast
進行了包裝,傳入的是 subjects,而不是工廠函數。可是,它們傳入的是不一樣類型的 subjects 。
publish
變種使用的特殊類型的 subjects 包括:
BehaviorSubject
ReplaySubject
AsyncSubject
關於如何使用這些特殊類型的 subjects 的答案是: 每一個變種都與一個特殊類型的 subject 相關聯,當你須要的行爲相似於某個 publish
變種時,就使用相對應的 subject 。咱們來看看這些變種的行爲是怎樣的。
publishBehavior
傳給 multicast
的是 BehaviorSubject
,而不是 Subject
。BehaviorSubject
相似於 Subject
,但若是 subject 的訂閱發生在源 observable 發出 next
通知以前,那麼 subject 會發出包含初始值的 next
通知。
咱們更改下示例,給生成隨機數的源 observable 加上短暫的延遲,這樣它就不會當即發出隨機數:
const delayed = Observable.timer(1).switchMapTo(source);
const p = delayed.publishBehavior(-1);
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout(() => p.subscribe(observer("c")), 10);
複製代碼
示例的輸出以下所示:
observer a: -1
observer b: -1
observer a: 42
observer b: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: complete
複製代碼
觀察者收到的通知可概括以下:
a
是在 connect
調用前訂閱的,因此它能收到帶有 subject 的初始值的 next
通知、源 observable 的兩個 next
通知和 complete
通知。b
是在 connect
調用後但在 subject 收到源 observable 的第一個 next
通知前訂閱的,因此它能收到帶有 subject 的初始值的 next
通知、源 observable 的兩個 next
通知和 complete
通知。c
是在源 observable 完成後訂閱的,因此它只能收到 complete
通知。publishReplay
傳給 multicast
的是 ReplaySubject
,而不是 Subject
。顧名思義,每當觀察者訂閱時,ReplaySubject
會重放指定數量的 next
通知。
const p = source.publishReplay(1);
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout(() => p.subscribe(observer("c")), 10);
複製代碼
使用了 publishReplay
,示例的輸出以下所示:
observer a: 42
observer b: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: 54
observer c: complete
複製代碼
觀察者收到的通知可概括以下:
a
是在 connect
調用前訂閱的,此時 subject 尚未收到 next
通知,因此 a
能收到源 observable 的兩個 next
通知和 complete
通知。b
是在 connect
調用後訂閱的,此時 subject 已經收到了源 observable 的第一個 next
通知,因此 b
能收到重放的 next
通知、源 observable 的第二個 next
通知和 complete
通知。c
是在源 observable 完成後訂閱的,因此它能收到重放的 next
通知和 complete
通知。來看看 c
的行爲,很明顯,不一樣於 publish
操做符,publishReplay
操做符適合使用 refCount
方法,由於觀察者在源 observable 完成後訂閱依然能收到任意數量的重放的 next
通知。
publishLast
傳給 multicast
的是 AsyncSubject
,而不是 Subject
。AsyncSubject
是最特別的特殊類型 subjects 。只有當它完成時,纔會發出 next
通知 (若是有 next
通知的話) 和 complete
通知,這個 next
通知是源 observable 中的最後一個 next
通知。
const p = source.publishLast();
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout(() => p.subscribe(observer("c")), 10);
複製代碼
使用了 publishLast
,示例的輸出以下所示:
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: 54
observer c: complete
複製代碼
觀察者收到的通知可概括以下:
a
和 b
都是在源 observable 完成前訂閱的,但直到源 observable 完成它們才能收到通知,它們能收到帶有第二個隨機數的 next
通知和 complete
通知。c
是在源 observable 完成後訂閱的,它能收到帶有第二個隨機數的 next
通知和 complete
通知。與 publishReplay
相似,publishLast
操做符適合使用 refCount
方法,由於觀察者在源 observable 完成後訂閱依然能收到任意數量的重放的 next
通知。
share
操做符相似於使用 publish().refCount()
。可是,share
傳給 multicast
的是工廠函數,這意味着在引用計數爲0以後發生訂閱的話,會建立一個新的 Subject
來訂閱源 observable 。
const s = source.share();
s.subscribe(observer("a"));
s.subscribe(observer("b"));
setTimeout(() => s.subscribe(observer("c")), 10);
複製代碼
使用了 share
,示例的輸出以下所示:
observer a: 42
observer a: 54
observer b: 54
observer a: complete
observer b: complete
observer c: 6
observer c: 9
observer c: complete
複製代碼
觀察者收到的通知可概括以下:
a
訂閱後當即收到第一個 next
通知,隨後是第二個 next
通知和 complete
通知。b
只能收到第二個 next
通知和 complete
通知。c
是在源 observable 完成後訂閱的,會建立一個新的 subject 來訂閱源 observable,它會當即收到第一個 next
通知,隨後是第二個 next
通知和 complete
通知。在上面這些示例中,咱們介紹了 publish
和 share
操做符,當源 observable 完成時,a
和 b
會自動取消訂閱。若是源 observable 報錯,它們也一樣會自動取消訂閱。publish
和 share
操做符還有另一個不一樣點:
publish
返回的 observable 的任何未來的訂閱者都將收到 error
通知。share
返回的 observable 的任何未來的訂閱者會生成源 observable 的一個新訂閱,由於錯誤會自動取消任何訂閱者的訂閱,將其引用計數歸零。就這樣了,本文到此結束。咱們介紹了六個操做符,但它們全是經過一種相似的方式來實現的,它們全都符合同一個基本的心智模型: 一個源 observable、一個訂閱源 observable 的 subject 和多個訂閱 subject 的觀察者。
本文只是簡略地介紹了 refCount
方法。想要深刻了解,請參見 RxJS: 如何使用 refCount。