[譯]RxJS文檔03——剖析Observable

原文:http://reactivex.io/rxjs/manu...html

Rx.Observalbe.create()或者建立操做符,能夠 建立(created) Observable流。
Observer則能夠 訂閱(subscribed) 這個流。
經過 執行(execute) next()error()complete()能夠向訂閱者推送不一樣的通知。
以後,執行過程可能被 處理掉(disposed)
這四個方面都被集成在Observable實例當中,可是也有一些方面與其餘類型有關,好比ObserverSubscriptionreact

Observable的核心關注點是:編程

  • 建立Observable流安全

  • 訂閱Observable流異步

  • 執行Observable流ide

  • 終止Observable流函數

建立Observable流

Rx.Observable.create能夠說是Observable構造函數的別名,他能夠接受一個參數:subscribe函數。性能

如下的例子建立了一個Observable流,每秒鐘向Observer發出一個字符串類性值hiatom

var observable = Rx.Observable.create(function subscribe(observer) {
  var id = setInterval(() => {
    observer.next('hi')
  }, 1000);
});

Observables流可使用create()建立,可是一般咱們會使用所謂的建立操做符,像of(),from(),interval()等等。code

在上面的例子中,訂閱函數(subscribe function)是描述Observalbe最重要的部分。那麼,讓我來看看何謂訂閱。

訂閱Observable流

在例子中,Observalbe的實例observable能夠被訂閱,像這樣:

observable.subscribe(x => console.log(x));

也許你會注意到,observable.subscribe()subscribe函數在Rx.Observable.create(function subscribe(observer){...})中使用了相同的名字,這並非巧合。
在庫中,他們是不一樣的,但在實際使用中,你能夠認爲他們在概念上是相等的。

Observable不在多個Observer之間共享subscribe。當調用observable.subscribe()並獲得觀察者時,在Rx.Observable.create(function subscribe(observer){...})中傳入的函數將會被執行。每次執行observable.subscribe()都會觸發一個單獨針對當前Observer的運行邏輯。

訂閱一個Observable流就像調用一個函數,流中的數據將會被傳遞給回調函數中。

一個subscribe函數被調用將會開啓一個Observable執行流(Observable execution),向觀察者們輸出流中的值或者事件。

執行Observable流

代碼Rx.Observable.create(function subscribe(observer){...})表明了一個「Observable流」,因爲惰性計算,只用當有Observer訂閱流時,函數纔會被執行。
執行過程當中隨着時間線產生多個數據,方式是同步或異步二選一。

有三個類型的值會在執行流中發出:

  • "Next" 通知:發出一個值,好比數字,字符串,對象等等。

  • "Error"通知:發出一個js錯誤或者異常。

  • Complete通知:不發出任何值,表示流的結束。

Next通知是最重要也是最經常使用的類型:他表明了實際推送給Observer的值。ErrorComplete通知只會在執行流中發出一次,要麼是Error,要麼是Complete

用正表達式的規則能夠很好的表達這種所謂的Observable語法和約定:

next*(error|complete)?

在一個Observable執行流中,會發出0到無限個Next通知。而一旦Error或者Complete通知被髮出,執行流將不會再推送任何消息。

下面的例子展現了一個推送了3個NextComplete的流:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

Observables會嚴格遵照Observable約定,因此下面的代碼將不會推送值4:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
  observer.next(4); // Is not delivered because it would violate the contract
});

在訂閱函數中使用try/catch捕獲可能拋出的異常,也是一個很不錯的作法:

var observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err); // delivers an error if it caught one
  }
});

終止Observable流

Observable流的執行時間線多是無限長的,但一般咱們只用到有限的時間段和觀察者處理業務,所以,咱們須要一種中斷流執行的API。
因爲一個執行過程對於每一個Observer是獨有的,一旦Observer接收到值,那麼也必然須要一種中斷執行的方式,從而能夠節省計算性能和內存空間。

observable.subscribe()被調用,Observer將被附加到新建立的Observable執行過程當中,同時返回了一個對象,Subscription

var subscription = observable.subscribe(x => console.log(x));

Subscription表明了一個持續執行的過程,而且有一套最小化的API容許你中斷流的執行過程。能夠從這裏進一步瞭解Subscription類型。如下例子展現了使用subscription.unsubscribe()中斷持續執行的過程:

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();

當你訂閱流就能夠獲取一個Subscription,表明了持續執行的過程。調用unsubscribe()就能夠中斷執行過程。

當咱們使用create()建立一個Observable流時,每個Observable都必須定義它如何處理獲取到的資源的處理方式。你能夠經過在subscribe()函數中返回一個自定義的unsubscribe函數,達到這個目的。

舉個例子,如下展現瞭如何中斷一個使用setInterval()執行interval的過程:

var observable = Rx.Observable.create(function subscribe(observer) {
  // Keep track of the interval resource
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  // Provide a way of canceling and disposing the interval resource
  return function unsubscribe() {
    clearInterval(intervalID);
  };
});

就像observable.subscribe()相似Observable.create(function subscribe(){..})同樣,咱們從subscribe()返回的unsubscribe()也概念性的等同於subscription.unsubscribe()
事實上,若是咱們移除與響應式編程相關的概念,剩下的就是直白的js代碼了:

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalID);
  };
}

var unsubscribe = subscribe({next: (x) => console.log(x)});

// Later:
unsubscribe(); // dispose the resources

咱們使用Rx,包括ObservableObserverSubscription,其緣由就是爲了使用這些安全(就如Observable約定的)和可組合的操做符。

相關文章
相關標籤/搜索