RxJS:冷熱模式的比較

Hot vs Cold Observables

理解冷熱兩種模式下的Observables對於掌握Observables來講相當重要,在咱們開始探索這個話題以前,咱們先來讀一讀RxJS官方的定義:json

Cold Observables在被訂閱後運行,也就是說,observables序列僅在subscribe函數被調用後纔會推送數據。與Hot Observables不一樣之處在於,Hot Observables在被訂閱以前就已經開始產生數據,例如mouse move事件。瀏覽器

OK,目前爲止還不錯,讓咱們來舉幾個例子。服務器

首先咱們以一個簡單的只會發射數字1Observable開始。咱們爲這個Observable添加兩個訂閱並打印數據:網絡

let obs = Rx.Observable.create(observer => observer.next(1));

obs.subscribe(v => console.log("1st subscriber: " + v));
obs.subscribe(v => console.log("2nd subscriber: " + v));

運行結果以下:async

1st subscriber: 1
2nd subscriber: 1

如今的問題是,代碼中的obs是冷模式仍是熱模式?假設咱們不知道obs是如何建立的,而是經過調用某個getObservableFromSomewhere()得到的,那麼咱們將沒法得知obs是哪一種模式。所以對於通常的訂閱者來講,並非總能知道所處理的是哪種Observable函數

如今回想下上面引用的官方定義,若是obs是冷模式,那麼它被訂閱後纔會產生「新鮮」的數據。爲了體現「新鮮」這一點,咱們用Date.now()來替代數字1,看看會發生什麼。從新運行上面的代碼咱們獲得:this

1st subscriber: 1465990942935
2nd subscriber: 1465990942936

咱們注意到兩次得到的數據並不相同,這意味着observer.next(Date.now())必定被調用了兩次。換句話說,obs在每次訂閱後開始產生數據,這使得它成爲冷模式下的Observablecode

將Cold Observables轉化爲Hot Observable

如今咱們知道obs是冷模式,那麼給它加加溫會如何?server

let obs = Rx.Observable
            .create(observer => observer.next(Date.now()))
            .publish();

obs.subscribe(v => console.log("1st subscriber: " + v));
obs.subscribe(v => console.log("2nd subscriber: " + v));

obs.connect();

先不要管publishconnect這兩個操做符是什麼,咱們來看下上面代碼的輸出:事件

1st subscriber: 1465994477014
2nd subscriber: 1465994477014

顯然observer.next(Date.now())只被調用了一次(由於只產生了一次數據),那麼obs已是熱模式了?Emmm,不徹底是。咱們能夠這麼說:如今的obs比冷模式下要「暖」,可是比熱模式還要「涼」一些。說它暖是由於在不一樣的訂閱裏僅僅產生了一次數據,說它涼是由於它並無在被訂閱以前就開始產生數據。

咱們能夠稍微改動一下,將obs.connect()放到到全部訂閱以前:

let obs = Rx.Observable
            .create(observer => observer.next(Date.now()))
            .publish();
obs.connect();

obs.subscribe(v => console.log("1st subscriber: " + v));
obs.subscribe(v => console.log("2nd subscriber: " + v));

然而,當咱們運行這段代碼時卻沒有任何輸出。其實這是意料之中的,由於如今的obs已是熱模式,它會在建立後當即開始產生數據而無論有沒有被訂閱過。因此在咱們添加兩個訂閱以前,obs已經完成了最初始(也是惟一一次)的推送,所以隨後的訂閱不會收到新的數據,也就不會被觸發。

爲了理解publish的做用,咱們來建立一個能夠持續產生數據的Observable

let obs = Rx.Observable
            .interval(1000)
            .publish()
            .refCount();

obs.subscribe(v => console.log("1st subscriber:" + v));
setTimeout(()
  // delay for a little more than a second and then add second subscriber
  => obs.subscribe(v => console.log("2nd subscriber:" + v)), 1100);

這比以前的稍微複雜一點,因此咱們一步一步來分析:

  • 首先用inerval(1000)來建立一個每秒產生一個遞增數字的Observable,以0開始。

  • 而後用publish操做符來使不一樣訂閱者之間共享同一個生產環境(熱模式的一個標誌)。

  • 最後使第二個訂閱延遲1秒。

稍後咱們再解釋refCount

運行上面代碼獲得:

1st subscriber:0
1st subscriber:1
2nd subscriber:1
1st subscriber:2
2nd subscriber:2
...

很顯然咱們能夠看到第二個訂閱者並無從0開始,這是由於publish可使不一樣訂閱者共享同一個生產環境,第二個訂閱者和第一個共享了環境,所以它不會得到以前產生的數據。

理解publishrefCountconnect

在上面的例子中,obs到底有多「熱」?咱們修改代碼,使兩次訂閱都延遲2秒。若是obs足夠「熱」,那麼咱們應該看不到數字01,由於它們在被訂閱以前就被髮射出去了。

let obs = Rx.Observable
            .interval(1000)
            .publish()
            .refCount();

setTimeout(() => {
  // delay both subscriptions by 2 seconds
  obs.subscribe(v => console.log("1st subscriber:" + v));
  setTimeout(
    // delay for a little more than a second and then add second subscriber
    () => obs.subscribe(
          v => console.log("2nd subscriber:" + v)), 1100);

},2000);

然而有意思的是,咱們獲得了和修改前徹底同樣的輸出。這意味着咱們的obs更應該被稱爲「暖」模式而不是「熱」模式。這是由於refCount在起做用。publish操做符建立了一個被稱爲ConnectableObservable的序列,該序列對於數據源共享同一個訂閱。然而此時publish操做符尚未訂閱到數據源。它更像一個守門員,保證全部的訂閱都訂閱到ConnectableObservable上面,而不是數據源自己。

connect操做符使ConnectableObservable實際訂閱到數據源。在上面的例子中,咱們用refCount來代替connectrefCount是創建在connect之上的,它可使ConnectableObservable在有第一個訂閱者時訂閱到數據源,並在沒有訂閱者時解除對數據源的訂閱。這實際上對ConnectableObservable的全部訂閱進行了記錄。

若是咱們想obs變成真正的熱模式,咱們就須要手動調用connect

let obs = Rx.Observable
            .interval(1000)
            .publish();
obs.connect();

setTimeout(() => {
  obs.subscribe(v => console.log("1st subscriber:" + v));
  setTimeout(
    () => obs.subscribe(v => console.log("2nd subscriber:" + v)), 1000);

},2000);

輸出以下,咱們的兩個訂閱者都會獲得2以後的數字。

1st subscriber:2
1st subscriber:3
2nd subscriber:3

此時的obs是完徹底全的熱模式,它會在建立後當即產生數據而無論有沒有訂閱者。

如何使用

根據上面的討論,咱們知道很難說一個Observable是徹底冷的仍是熱的。實際上咱們還有一些推送數據給訂閱者的方法沒有討論。一般來講,當咱們處理一些會自動產生數據而無論有沒有人監聽的數據源時,咱們應該使用熱模式。當咱們訂閱這樣的Observable時是得不到數據源發射的歷史數據,而只能得到將來產生的值。

一個典型的例子就是mouse move事件。mouse move無論有沒有人監聽它都會發生。當咱們開始監聽它時,咱們只能得到將來產生的mouse move事件。

冷模式下的Observable屬於懶加載。只有當有人訂閱時纔會產生數據。可是別忘了,這不是絕對的。在以前的例子中咱們知道,一個徹底冷模式的Observable對於每一個訂閱者都會獨立的從新生成數據。可是,一個僅在首次訂閱後纔開始產生數據並使以後的訂閱者共享同一個生產環境發送一樣數據的Observable,咱們又該如何稱呼呢?事情變得模棱兩可,簡單的將Observable分爲冷熱是不夠的。

無論怎樣,這兒有一條經驗:當你有一個冷模式的Observable而又想不一樣的訂閱者訂閱它時得到以前產生過的數據時,你可使用publish和它的小夥伴們。

注意:Http下的Observables

Angular2中的Http服務會返回一個冷模式的Observable,這裏咱們須要注意一下其潛在的行爲。

首先咱們來建立一個組件,該組件會從服務器獲取contacts.json文件並渲染爲頁面上的一個列表。

...
@Component({
  ...
  template: `
    <ul>
      <li *ngFor="let contact of contacts | async">{{contact.name}}</li>
    </ul>
    `
  ...
})
export class AppComponent {
  contacts: Observable<Array<any>>;
  constructor (http: Http) {
    this.contacts = http.get('contacts.json')
                        .map(response => response.json().items);
  }
}

在上面的例子中,咱們將一個Observable<Array<any>>傳遞到模板中並使用AsyncPipe

contacts.json文件以下所示:

{
  "items": [
    { "name": "John Conner" },
    { "name": "Arnold Schwarzenegger" }
  ]
}

如今,若是咱們再添加一個列表會怎樣?

1st List
<ul>
  <li *ngFor="let contact of contacts | async">{{contact.name}}</li>
</ul>
2st List
<ul>
  <li *ngFor="let contact of contacts | async">{{contact.name}}</li>
</ul>

運行上述代碼並觀察瀏覽器控制檯的網絡面板,咱們會發現瀏覽器對contacts.json發送了兩次請求,這與咱們用Promise的狀況截然不同。實際上,爲咱們的Observable添加toPromise操做符就能夠消除第二次請求。

不過咱們先別急着回到Promise。用咱們以前學到的冷熱模式,咱們能夠很輕鬆的將例子中的Observable轉化成對全部訂閱都共享同一輩子產環境的Observable,這裏的環境就是HTTP調用。

this.contacts = http.get('contacts.json')
                    .map(response => response.json().items)
                    .publish()
                    .refCount();

這樣就達到了咱們想要的效果嗎?不徹底是。咱們能夠看到此時已經沒有了第二次網絡請求,可是這與使用Promise的狀況還存在一些區別。

假如咱們但願第二個列表延遲500ms再出現,修改代碼以下:

@Component({
  ...
  template: `
    1st List
    <ul>
      <li *ngFor="let contact of contacts | async">{{contact.name}}</li>
    </ul>
    2st List
    <ul>
      <li *ngFor="let contact of contacts2 | async">{{contact.name}}</li>
    </ul>
    `,
  ...
})
export class AppComponent {
  contacts: Observable<Array<any>>;
  contacts2: Observable<Array<any>>;
  constructor (http: Http) {
    this.contacts = http.get('contacts.json')
                        .map(response => response.json().items)
                        .publish()
                        .refCount();

    setTimeout(() => this.contacts2 = this.contacts, 500);
  }
}

注意到咱們的Observable仍是隻有一個,並在500ms後將其賦值給contacts2。運行上面代碼後,咱們發現第二個列表並無出現。

若是你仔細想一想,這實際上是符合邏輯的。當使用publish後咱們將例子中的Observable變爲共享的 - 即熱模式,可是咱們有點加熱過了度。500ms後添加第二個訂閱者時,它只會在有新的數據推送時纔會觸發,咱們指望的是新訂閱者可以得到以前發射過的數據。幸運的是咱們能夠用publishLast替代publish來實現這一需求。

this.contacts = http.get('contacts.json')
                    .map(response => response.json().items)
                    .publishLast()
                    .refCount();

如今運行上面的代碼,咱們能在500ms後看到列表而且沒有發生第二次網絡請求。換句話說,咱們建立了一個可以在被訂閱後觸發並共享第一次發射的數據的Observable

實用技巧

publishrefCount是一個很經常使用的組合,而操做符share能夠同時完成這兩步。

this.contacts = http.get('contacts.json')
  .map(response => response.json().items)
  .share();

Demos

相關文章
相關標籤/搜索