RxJS - Observables, observers 和 operators 簡介

RxJS 是響應式編程 (reactive programming) 強大的工具,今天咱們將深刻介紹 Observables 和 Observers 的內容,以及介紹如何建立本身的操做符 (operators)。javascript

若是你以前已經使用過 RxJS,並但願瞭解 Observable 及 Operators (操做符) 的內部工做原理,那麼這篇文章很是適合你。html

什麼是 Observable

Observable 就是一個擁有如下特性的函數:java

  • 它接收一個 observer 對象做爲參數,該對象中包含 nexterrorcomplete 方法node

  • 它返回一個函數,用於在銷燬 Observable 時,執行清理操做react

在咱們實現的示例中,咱們將定義一個簡單的 unsubscribe 函數來實現取消訂閱的功能。然而在 RxJS 中,返回的是 Subcription 對象,該對象中包含一個 unsubscribe 方法。es6

一個 Observable 對象設置觀察者 (observer),並將它與生產者關聯起來。該生產者多是 DOM 元素產生的 clickinput 事件,也多是更復雜的事件,如 HTTP。typescript

爲了更好地理解 Observable,咱們來自定義 Observable。首先,咱們先來看一個訂閱的例子:編程

const node = document.querySelector('input[type=text]');

const input$ = Rx.Observable.fromEvent(node, 'input');

input$.subscribe({
  next: (event) => console.log(`You just typed ${event.target.value}!`),
  error: (err) => console.log(`Oops... ${err}`),
  complete: () => console.log(`Complete!`)
});

該示例中,Rx.Observable.formEvent() 方法接收一個 input 元素和事件名做爲參數,而後返回一個 $input Observable 對象。接下來咱們使用 subscribe() 方法來定於該 Observable 對象。當觸發 input 事件後,對應的值將會傳遞給 observer 對象。segmentfault

什麼是 Observer

Observer (觀察者) 很是簡單,在上面的示例中,觀察者是一個普通的對象,該對象會做爲 subscribe() 方法的參數。此外 subscribe(next, error, complete) 也是一個有效的語法,但在本文中咱們將討論對象字面量的形式。安全

當 Observable 對象產生新值的時候,咱們能夠經過調用 next() 方法來通知對應的觀察者。若出現異常,則會調用觀察者的 error() 方法。當咱們訂閱 Observable 對象後,只要有新的值,都會通知對應的觀察者。但在如下兩種狀況下,新的值不會再通知對應的觀察者:

  • 已調用 observer 對象的 complete() 方法

  • 消費者對數據再也不感興趣,執行取消訂閱操做

此外在執行最終的 subscribe() 訂閱操做前,咱們傳遞的值能夠通過一系列的鏈式處理操做。執行對應操做的東西叫操做符,每一個操做符執行完後會返回一個新的 Observable 對象,而後繼續咱們的處理流程。

什麼是 Operator

正如上面所說的,Observable 對象可以執行鏈式操做,具體以下所示:

const input$ = Rx.Observable.fromEvent(node, 'input')
  .map(event => event.target.value)
  .filter(value => value.length >= 2)
  .subscribe(value => {
    // use the `value`
});

上面代碼的執行流程以下:

  • 假設用戶在輸入框中輸入字符 a

  • Observable 對象響應對應的 input 事件,而後把值傳遞給 observer

  • map() 操做符返回一個新的 Observable 對象

  • filter() 操做符執行過濾操做,而後又返回一個新的 Observable 對象

  • 最後咱們經過調用 subscribe() 方法,來獲取最終的值

簡而言之,Operator 就是一個函數,它接收一個 Observable 對象,而後返回一個新的 Observable 對象。當咱們訂閱新返回的 Observable 對象時,它內部會自動訂閱前一個 Observable 對象。

自定義 Observable

Observable 構造函數

function Observable(subscribe) {
  this.subscribe = subscribe;
}

每一個 subscribe 回調函數被賦值給 this.subscribe 屬性,該回調函數將會被咱們或其它 Observable 對象調用。

Observer 示例

在咱們深刻介紹前,咱們先來看一個簡單的示例。以前咱們已經建立完 Observable 函數,如今咱們能夠調用咱們的觀察者 (observer),而後傳遞數值 1,而後訂閱它:

const one$ = new Observable((observer) => {
  observer.next(1);
  observer.complete();
});

one$.subscribe({
  next: (value) => console.log(value) // 1
});

即咱們訂閱咱們建立的 Observable 實例,而後經過 subscribe() 方法調用經過構造函數設置的回調函數。

Observable.fromEvent

下面就是咱們須要的基礎結構,即在 Observable 對象上須要新增一個靜態方法 fromEvent

Observable.fromEvent = (element, name) => { };

接下來咱們將參考 RxJS 爲咱們提供的方法來實現自定義的 fromEvent() 方法:

const node = document.querySelector('input');
const input$ = Observable.fromEvent(node, 'input');

按照上面的使用方式,咱們的 fromEvent() 方法須要接收兩個參數,同時須要返回一個新的 Observable 對象,具體以下:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    
  });
};

接下來咱們來實現事件監聽功能:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    element.addEventListener(name, (event) => {}, false);
  });
};

那麼咱們的 observer 參數來自哪裏? 其實 observer 對象就是包含 nexterrorcomplete 方法的對象字面量。

須要注意的是,咱們的 observer 參數不會被傳遞,直到 subscribe() 方法被調用。這意味着 addEventListener() 方法不會被調用,除非你訂閱該 Observable 對象。

當咱們調用 subscribe() 方法,以前設置的 this.subscribe 回調函數會被調用,對應的參數是咱們定義的 observer 對象字面量,接下來將使用新的值,做爲 next() 方法的參數,調用該方法。

很好,那接下來咱們要作什麼?以前版本咱們只是設置了監聽,但沒有調用 observer 對象的 next() 方法,接下來讓咱們來修復這個問題:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    element.addEventListener(name, (event) => {
      observer.next(event);
    }, false);
  });
};

如你所知,當銷燬 Observables 對象時,須要調用一個函數用來執行清理操做。針對目前的場景,在銷燬時咱們須要移除事件監聽:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    const callback = (event) => observer.next(event);
    element.addEventListener(name, callback, false);
    return () => element.removeEventListener(name, callback, false);
  });
};

咱們沒有調用 complete() 方法,由於該 Observable 對象處理的 DOM 相關的事件,在時間維度上它們多是無終止的。

如今讓咱們來驗證一下最終實現的功能:

const node = document.querySelector('input');
const p = document.querySelector('p');

function Observable(subscribe) {
  this.subscribe = subscribe;
}

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    const callback = (event) => observer.next(event);
    element.addEventListener(name, callback, false);
    return () => element.removeEventListener(name, callback, false);
  });
};

const input$ = Observable.fromEvent(node, 'input');

const unsubscribe = input$.subscribe({
  next: (event) => {
    p.innerHTML = event.target.value;
  }
});

// automatically unsub after 5s
setTimeout(unsubscribe, 5000);

自定義操做符

建立咱們本身的操做符應該會更容易一些,如今咱們瞭解 ObservableObservable 背後的概念。咱們將在 Observable 的原型對象上添加一個方法:

Observable.prototype.map = function (mapFn) { };

該方法的功能與 JavaScript 中的 Array.prototype.map 方法相似:

const input$ = Observable.fromEvent(node, 'input')
        .map(event => event.target.value);

因此咱們須要應用回調函數並調用它,這用於獲取咱們所須要的數據。在咱們這樣作以前,咱們須要流中的最新值。這裏是巧妙的部分,在 map() 操做符中,咱們須要訪問 Observable 實例。由於 map 方法在原型上,咱們能夠經過如下方式訪問 Observable 實例:

Observable.prototype.map = function (mapFn) {
  const input = this;
};

接下來咱們在返回的 Observable 對象中執行 input 對象的訂閱操做:

Observable.prototype.map = function(mapFn) {
  const input = this;
  return new Observable((observer) => {
    return input.subscribe();
  });
};

咱們返回了 input.subscribe() 方法執行的結果,由於當咱們執行取消訂閱操做時,將會依次調用每一個 Observable 對象取消訂閱的方法。

最後咱們來完善一下 map 操做符的內部代碼:

Observable.prototype.map = function (mapFn) {
  const input = this;
  return new Observable((observer) => {
    return input.subscribe({
      next: (value) => observer.next(mapFn(value)),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
};

如今咱們已經能夠執行鏈式操做了:

const input$ = Observable.fromEvent(node, 'input')
  .map(event => event.target.value);

input$.subscribe({
  next: (value) => {
    p.innerHTML = value;
  }
});

我有話說

Observable 與 Promise 有什麼區別?

Observable(可觀察對象)是基於推送(Push)運行時執行(lazy)的多值集合。

MagicQ 單值 多值
拉取(Pull) 函數 遍歷器
推送(Push) Promise Observable
  • Promise

    • 返回單個值

    • 不可取消的

  • Observable

    • 隨着時間的推移發出多個值

    • 能夠取消的

    • 支持 map、filter、reduce 等操做符

    • 延遲執行,當訂閱的時候纔會開始執行

什麼是 SafeObserver ?

上面的示例中,咱們使用一個包含了 next、error、complete 方法的普通 JavaScript 對象來定義觀察者。一個普通的 JavaScript 對象只是一個開始,在 RxJS 5 裏面,爲開發者提供了一些保障機制,來保證一個更安全的觀察者。如下是一些比較重要的原則:

  • 傳入的 Observer 對象能夠不實現全部規定的方法 (next、error、complete 方法)

  • complete 或者 error 觸發以後再調用 next 方法是沒用的

  • 調用 unsubscribe 方法後,任何方法都不能再被調用了

  • completeerror 觸發後,unsubscribe 也會自動調用

  • nextcompleteerror 出現異常時,unsubscribe 也會自動調用以保證資源不會浪費

  • nextcompleteerror是可選的。按需處理便可,沒必要所有處理

爲了完成上述目標,咱們得把傳入的匿名 Observer 對象封裝在一個 SafeObserver 裏以提供上述保障。

若想進一步瞭解詳細信息,請參考 Observable詳解 文章中 "自定義 Observable" 章節的內容。

參考資源

相關文章
相關標籤/搜索