RxJS 簡介:可觀察對象、觀察者與操做符

RxJS 簡介:可觀察對象、觀察者與操做符

對於響應式編程來講,RxJS 是一個難以想象的工具。今天咱們將深刻探討什麼是 Observable(可觀察對象)和 observer(觀察者),而後瞭解如何建立本身的 operator(操做符)。javascript

若是你以前用過 RxJS,想了解它的內部工做原理,以及 Observable、operator 是如何運做的,這篇文章將很適合你閱讀。html

什麼是 Observable(可觀察對象)?

可觀察對象其實就是一個比較特別的函數,它接受一個「觀察者」(observer)對象做爲參數(在這個觀察者對象中有 「next」、「error」、「complete」等方法),以及它會返回一種解除與觀察者關係的邏輯。例如咱們本身實現的時候會使用一個簡單的 「unsubscribe」 函數來實現退訂功能(即解除與觀察者綁定關係的邏輯)。而在 RxJS 中, 它是一個包含 unsubsribe 方法的訂閱對象(Subscription)。前端

可觀察對象會建立觀察者對象(稍後咱們將詳細介紹它),並將它和咱們但願獲取數據值的「東西」鏈接起來。這個「東西」就是生產者(producer),它可能來自於 click 或者 input 之類的 DOM 事件,是數據值的來源。固然,它也能夠是一些更復雜的狀況,好比經過 HTTP 與服務器交流的事件。java

咱們稍後將要本身寫一個可觀察對象,以便更好地理解它!在此以前,讓咱們先看看一個訂閱對象的例子:node

const node = document.querySelector('input[type=text]');

const input$ = Rx.Observable.fromEvent(node, 'input');

input$.subscribe({
  next: (event) => console.log(`你剛剛輸入了 ${event.target.value}!`),
  error: (err) => console.log(`Oops... ${err}`),
  complete: () => console.log(`完成!`)
});複製代碼

這個例子使用了一個 <input type="text"> 節點,並將其傳入 Rx.Observable.fromEvent() 中。當咱們觸發指定的事件名時,它將會返回一個輸入的 Event 的可觀察對象。(所以咱們在 console.log 中用 ${event.target.value} 能夠獲取輸入值)react

當輸入事件被觸發的時候,可觀察對象會將它的值傳給觀察者。android

什麼是 Observer(觀察者)?

觀察者至關容易理解。在前面的例子中,咱們傳入 .subscribe() 中的對象字面量就是觀察者(訂閱對象將會調用咱們的可觀察對象)。ios

.subscribe(next, error, complete) 也是一種合法的語法,可是咱們如今研究的是對象字面量的狀況。git

當一個可觀察對象產生數據值的時候,它會通知觀察者,當新的值被成功捕獲的時候調用 .next(),發生錯誤的時候調用 .error()github

當咱們訂閱一個可觀察對象的時候,它會持續不斷地將值傳遞給觀察者,直到發生如下兩件事:一種是生產者告知沒有更多的值須要傳遞了,這種狀況它會調用觀察者的 .complete() ;一種是咱們(「消費者」)對以後的值再也不感興趣,決定取消訂閱(unsubsribe)。

若是咱們想要對可觀察對象傳來的值進行組成構建(compose),那麼在值傳達最終的 .subscribe() 代碼塊以前,須要通過一連串的可觀察對象(也就是操做符)處理。這個一連串的「鏈」也就是咱們所說的可觀察對象序列。鏈中的每一個操做符都會返回一個新的可觀察對象,讓咱們的序列可以持續進行下去——這也就是咱們所熟知的「流」。

什麼是 Operator(操做符)?

咱們前面提到,可觀察對象可以進行鏈式調用,也就是說咱們能夠像這樣寫代碼:

const input$ = Rx.Observable.fromEvent(node, 'input')
  .map(event => event.target.value)
  .filter(value => value.length >= 2)
  .subscribe(value => {
    // use the `value`
  });複製代碼

這段代碼作了下面一系列事情:

  • 咱們先假定用戶輸入了一個「a」
  • 可觀察對象將會對這個輸入事件做出反應,將值傳給下一個觀察者
  • 「a」被傳給了訂閱了咱們初始可觀察對象的 .map()
  • .map() 會返回一個 event.target.value 的新可觀察對象,而後調用它觀察者對象中的 .next()
  • .next() 將會調用訂閱了 .map().filter(),並將 .map() 處理後的值傳遞給它
  • .filter() 將會返回另外一個可觀察對象,.filter() 過濾後留下 .length 大於等於 2 的值,並將其傳給 .next()
  • 咱們經過 .subscribe() 得到了最終的數據值

這短短的幾行代碼作了這麼多的事!若是你還以爲弄不清,只須要記住:

每當返回一個新的可觀察對象,都會有一個新的觀察者掛載到前一個可觀察對象上,這樣就能經過觀察者的「流」進行傳值,對觀察者生產的值進行處理,而後調用 .next() 方法將處理後的值傳遞給下一個觀察者。

簡單來講,操做符將會不斷地依次返回新的可觀察對象,讓咱們的流可以持續進行。做爲用戶而言,咱們不須要關心何時、什麼狀況下須要建立與使用可觀察對象與觀察者,咱們只須要用咱們的訂閱對象進行鏈式調用就好了。

建立咱們本身的 Observable(可觀察對象)

如今,讓咱們開始寫本身的可觀察對象的實現吧。儘管它不會像 Rx 的實現那麼高級,但咱們仍是對完善它充滿信心。

Observable 構造器

首先,咱們須要建立一個 Observable 構造函數,此構造函數接受且僅接受 subscribe 函數做爲其惟一的參數。每一個 Observable 實例都存儲 subscribe 屬性,稍後能夠由觀察者對象調用它:

function Observable(subscribe) {
  this.subscribe = subscribe;
}複製代碼

每一個分配給 this.subscribesubscribe 回調都將會被咱們或者其它的可觀察對象調用。這樣咱們下面作的事情就有意義了。

Observer 示例

在深刻探討實際狀況以前,咱們先看一看基礎的例子。

如今咱們已經配好了可觀察對象函數,能夠調用咱們的觀察者,將 1 這個值傳給它並訂閱它:

const one$ = new Observable((observer) => {
  observer.next(1);
  observer.complete();
});

one$.subscribe({
  next: (value) => console.log(value) // 1
});複製代碼

咱們訂閱了 Observable 實例,將咱們的 observer(對象字面量)傳入構造器中(以後它會被分配給 this.subscribe)。

Observable.fromEvent

如今咱們已經完成了建立本身的 Observable 的基礎步驟。下一步是爲 Observable 添加 static 方法:

Observable.fromEvent = (element, name) => {

};複製代碼

咱們將像使用 RxJS 同樣使用咱們的 Observable:

const node = document.querySelector('input');

const input$ = Observable.fromEvent(node, 'input');複製代碼

這意味着咱們須要返回一個新的 Observable,而後將函數做爲參數傳遞給它:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {

  });
};複製代碼

這段代碼將咱們的函數傳入了構造器中的 this.subscribe。接下來,咱們須要將事件監聽設置好:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    element.addEventListener(name, (event) => {}, false);
  });
};複製代碼

那麼這個 observer 參數是什麼呢?它又是從哪裏來的呢?

這個 observer 其實就是攜帶 nexterrorcomplete 的對象字面量。

這塊其實頗有意思。observer.subscribe() 被調用以前都不會被傳遞,所以 addEventListener 在 Observable 被「訂閱」以前都不會被執行。

一旦調用 subscribe,也就會調用 Observable 構造器內的 this.subscribe 。它將會調用咱們傳入 new Observable(callback) 的 callback,同時也會依次將值傳給咱們的觀察者。這樣,當 Observable 作完一件事的時候,它就會用更新過的值調用咱們觀察者中的 .next() 方法。

那麼以後呢?咱們已經獲得了初始化好的事件監聽器,可是尚未調用 .next()。下面完成它:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    element.addEventListener(name, (event) => {
      observer.next(event);
    }, false);
  });
};複製代碼

咱們都知道,可觀察對象在被銷燬前須要一個「處理後事」的函數,在咱們這個例子中,咱們須要移除事件監聽:

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    const callback = (event) => observer.next(event);
    element.addEventListener(name, callback, false);
    return () => element.removeEventListener(name, callback, false);
  });
};複製代碼

由於這個 Observable 還在處理 DOM API 和事件,所以咱們還不會去調用 .complete()。這樣在技術上就有無限的可用性。

試一試吧!下面是咱們已經寫好的完整代碼:

const node = document.querySelector('input');
const p = document.querySelector('p');

function Observable(subscribe) {
  this.subscribe = subscribe;
}

Observable.fromEvent = (element, name) => {
  return new Observable((observer) => {
    const callback = (event) => observer.next(event);
    element.addEventListener(name, callback, false);
    return () => element.removeEventListener(name, callback, false);
  });
};

const input$ = Observable.fromEvent(node, 'input');

const unsubscribe = input$.subscribe({
  next: (event) => {
    p.innerHTML = event.target.value;
  }
});

// 5 秒以後自動取消訂閱
setTimeout(unsubscribe, 5000);複製代碼

在線示例:

創造咱們本身的 Operator(操做符)

在咱們理解了可觀察對象與觀察者對象的概念以後,咱們能夠更輕鬆地去創造咱們本身的操做符了。咱們在 Observable 對象原型中加上一個新的方法:

Observable.prototype.map=function(mapFn){

};複製代碼

這個方法將會像 JavaScript 中的 Array.prototype.map 同樣使用,不過它能夠對任何值用:

const input$ = Observable.fromEvent(node, 'input')
    .map(event => event.target.value);複製代碼

因此咱們要取得回調函數,並調用它,返回咱們指望獲得的數據。在這以前,咱們須要拿到流中最新的數據值。

下面該作什麼就比較明瞭了,咱們要獲得調用了這個 .map() 操做符的 Observable 實例的引用入口。咱們是在原型鏈上編程,所以能夠直接這麼作:

Observable.prototype.map = function (mapFn) {
  const input = this;
};複製代碼

找找樂子吧!如今咱們能夠在返回的 Obeservable 中調用 subscribe:

Observable.prototype.map = function (mapFn) {
  const input = this;
  return new Observable((observer) => {
      return input.subscribe();
  });
};複製代碼

咱們要返回 input.subscribe() ,由於在咱們退訂的時候,非訂閱對象將會順着鏈一直轉下去,解除每一個 Observable 的訂閱。

這個訂閱對象將容許咱們把以前 Observable.fromEvent 傳來的值傳遞下去,由於它返回了構造器中含有 subscribe 原型的新的 Observable 對象。咱們能夠輕鬆地訂閱它對數據值作出的任何更新!最後,完成經過 map 調用咱們的 mapFn() 的功能:

Observable.prototype.map = function (mapFn) {
  const input = this;
  return new Observable((observer) => {
    return input.subscribe({
      next: (value) => observer.next(mapFn(value)),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
};複製代碼

如今咱們能夠進行鏈式調用了!

const input$ = Observable.fromEvent(node, 'input')
  .map(event => event.target.value);

input$.subscribe({
  next: (value) => {
    p.innerHTML = value;
  }
});複製代碼

注意到最後一個 .subscribe() 再也不和以前同樣傳入 Event 對象,而是傳入了一個 value 了嗎?這說明你成功地建立了一個可觀察對象流。

再試試:

但願這篇文章對你來講還算有趣~:)


掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 AndroidiOSReact前端後端產品設計 等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃

相關文章
相關標籤/搜索