RxJs 核心概念之Observable

Observable(可觀察對象)是基於推送(Push)運行時執行(lazy)的多值集合。下方表格對Observable進行了定位(爲解決基於推送的多值問題):javascript

MagicQ 單值 多值
拉取(Pull) 函數 遍歷器
推送(Push) Promise Observable

:當observable被訂閱後,會當即(同步地)推送123 三個值;1秒以後,繼續推送4這個值,最後結束(推送結束通知):html

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

爲獲得observable推送的值,咱們須要訂閱(subscribe)這個Observable:java

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

程序執行後,將在控制檯輸出以下結果:react

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

拉取(Pull) V.S. 推送(Push)

拉取推送是數據生產者和數據消費者之間通訊的兩種不一樣機制。es6

何爲拉取? 在拉取系統中,老是由消費者決定什麼時候從生產者那裏得到數據。生產者對數據傳遞給消費者的時間毫無感知(被動的生產者,主動的消費者)。正則表達式

JavaScript函數是典型的拉取系統:函數是數據的生產者,對函數進行調用的代碼(消費者)從函數調用後的返回值中拉取單值進行消費。安全

// 函數是數據的生產者
let getLuckyNumber = function() {
    return 7;
};

/* let代碼段是數據的消費者,
 * getLuckyNumber對調用時間毫無感知。 
 */
let luckNumber = getLuckyNumber();

ES2015 引入了的 生成器函數 | 遍歷器 (function*)一樣是基於拉取的系統: 調用 iterator.next()的代碼段是消費者,它能夠從生成器函數中拉取多個值。app

function* getLessThanTen() {
  var i = 0;
  while(i < 11) {
    yield i++;
  }
}

// 生產者
let iterator = getLessThanTen();

// 消費者
iterator.next(); // Object {value: 0, done: false}
iterator.next(); // Object {value: 1, done: false}
MagicQ 生產者 消費者
拉取 被動: 在被請求時產生數據 主動: 決定什麼時候請求數據
推送 主動: 控制數據的產生邏輯 被動: 得到數據後進行響應

何爲推送? 在推送系統中生產者決定什麼時候向消費者傳遞數據,消費者對什麼時候收到數據毫無感知(被動的消費者)。異步

現代JavaScript中Promise是典型的推送系統。做爲數據生產者的Promise經過resolve()向數據消費者——回調函數傳遞數據:與函數不一樣,Promise決定向回調函數推送值的時間。async

RxJS在 JavaScript 中引入了Observable(可觀察對象)這個新的推送系統。Observable是多數據值的生產者,向Observer(被動的消費者)推送數據。

  • 函數 調用後同步計算並返回單一值

  • 生成器函數 | 遍歷器 遍歷過程當中同步計算並返回0個到無窮多個值

  • Promise 異步執行中返回或者不返回單一值

  • Observable 同步或者異步計算並返回0個到無窮多個值

Observable 是函數概念的拓展

Observable既不像EventEmitter,也不像是Promise。Observable 中的 Subject 進行多路推送時與 EventEmitter 行爲上有些相似,可是實際上Observable與EventEmitter並不相同。

Observable 更像是一個不須要傳入參數的函數,它拓展了函數的概念使得它能夠返回多個值。

看看下面的例子:

function foo() {
  console.log('Hello');
  return 42;
}

var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y);

輸出結果以下:

"Hello"
42
"Hello"
42

經過Observable能夠實現一樣的行爲:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(function (x) {
  console.log(x);
});
foo.subscribe(function (y) {
  console.log(y);
});

輸出結果相同:

"Hello"
42
"Hello"
42

不論Observable仍是函數都是在運行時進行求值計算的。若是不調用函數,console.log('Hello')就不會執行;若是若是不subscribe(訂閱)Observable,console.log('Hello')也不會執行。此外,調用或者訂閱都是獨立的:兩次調用產生兩個獨立的做用域,兩次訂閱一樣會產生兩個獨立的做用域。EventEmitter老是在同一個做用域中,發射前也不會在乎本身是否已經被訂閱;Observable不會被共享而產生反作用,而且老是在被訂閱時才執行。

訂閱Observable與調用函數相似。

一些人認爲Observable老是是異步的,這個觀點並不正確,若是在控制檯log函數中調用函數:

console.log('before');
console.log(foo.call());
console.log('after');

顯然能夠看到如下輸出:

"before"
"Hello"
42
"after"

Observable的行爲徹底同樣:

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

輸出結果爲:

"before"
"Hello"
42
"after"

訂閱 foo徹底是同步的,與函數的調用同樣。

Observable能夠異步或者同步地產生數據。

那Observable 與函數的不一樣之處在哪裏? Observable能夠在一個時間過程當中‘返回’多個值,而函數卻不能。在函數中你不能夠這麼作:

function foo() {
  console.log('Hello');
  return 42;
  return 100; // 這個語句永遠不會被執行。
}

雖然函數只能有一個返回值,可是在Observable中你徹底能夠這麼作:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // 返回另外一個值
  observer.next(200); // 返回另外一個值
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

輸出結果以下:

"before"
"Hello"
42
100
200
"after"

你甚至能夠異步地返回值:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(() => {
    observer.next(300); // happens asynchronously
  }, 1000);
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

輸出結果:

"before"
"Hello"
42
100
200
"after"
300

結論:

  • func.call() 意味着「同步地給我一個值」

  • observable.subscribe() 意味着「不論是同步或者異步,給我一些值」

Observable 剖析

經過使用 Rx.Observable.create 或者是建立操做符建立一個Observable; Observable 被 Observer(觀察者) 訂閱; 在執行時 向觀察者發送next / error / complete 通知;同時執行過程能夠被 終止
Observable 類型的實例具有了以上四個方面的特性,與其餘類型如:Observer 和 Subscription 緊密相關。

咱們重點關注如下四個方面:

  • 建立

  • 訂閱

  • 執行

  • 終止

建立

Rx.Observable.createObservable 構造函數的別名,接受一個參數: subscribe函數。

如下例子會建立一個Observable,每一秒鐘向其訂閱者發射一個'hi' 字符串。

var observable = Rx.Observable.create(function subscribe(observer) {
  var id = setInterval(() => {
    observer.next('hi')
  }, 1000);
});

除了使用create建立Observable,咱們一般還使用建立操做符, 如 offrominterval, 等來建立Observable。

上面例子中,subscribe函數是定義Observable最重要的部分。咱們接下來了解訂閱的含義。

訂閱

上面例子中的observable 能夠以以下方式 訂閱

observable.subscribe(x => console.log(x));

observable.subscribeObservable.create(function subscribe(observer) {...})中的subscribe 同名並不是巧合。雖然在Rx中它們不是同一個對象,可是在工程中,咱們能夠在概念上視二者爲等價物。

調用subscribe的觀察者並不會共享同一個Observable。觀察者調用observable.subscribe 時,Observable.create(function subscribe(observer) {...})中的subscribe會在調用它的觀察者做用域中執行。每一次observable.subscribe的調用,都是彼此獨立的。

訂閱Observable如同調用函數,須要提供相應的回調方法。

訂閱機制與處理事件的addEventListener / removeEventListenerAPI徹底不一樣。經過observable.subscribe,觀察者並不須要在Observable中進行註冊,Observable也不須要維護訂閱者的列表。

訂閱後便進入了Observable的執行階段,在執行階段值和事件將會被傳遞給觀察者供其消費。

執行

只有在被訂閱以後Observable纔會執行,執行的邏輯在Observable.create(function subscribe(observer) {...})中描述,執行後將會在特定時間段內,同步或者異步地成產多個數據值。

Observable在執行過程當中,能夠推送三種類型的值:

  • "Next" 通知: 實際產生的數據,包括數字、字符串、對象等

  • "Error" 通知:一個JavaScript錯誤或者異常

  • "Complete" 通知:一個不帶有值的事件

「Next」 通知是最重要和經常使用的類型:表示事件傳遞給觀察者的數據。錯誤和完成通知僅會在執行階段推送其一,並不會同時推送錯誤和完成通知。

經過所謂的「Observable語法」或者「契約」能夠最好地表達這個規則,「Observable語法」藉助於正則表達式:

next*(error|complete)?

在Observable的執行過程當中,0個或者多個「Next」通知會被推送。在錯誤或者完成通知被推送後,Observable不會再推送任何其餘通知。

下面代碼展現了Observable 在執行過程當中推送3個「Next」 通知而後結束:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

Observable 嚴格遵照 Observable 契約,後面值爲4的「Next」 通知永遠不會被推送:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
  observer.next(4); // 因爲違法契約,4不會被推送
});

使用try/catch塊包裹 subscribe 代碼是一個很讚的想法,若是捕獲了異常,能夠推送錯誤通知:

var observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err); // 捕獲異常後推送錯誤通知
  }
});

終止

Observable的執行多是無限的,做爲觀察者須要主動中斷執行:咱們須要特定的API去終止執行過程。由於特定的觀察者都有特定的執行過程,一旦觀察者得到想要的數據後就須要終止執行過程以避免帶來計算時對內存資源的浪費。

observable.subscribe被調用時,觀察者會與其執行做用域綁定,同時返回一個Subscription類型的對象:

var subscription = observable.subscribe(x => console.log(x));

Subscription對象表示執行過程,經過極簡的API,你能夠終止執行過程。詳情請閱讀Subscription 相關文檔。經過調用subscription.unsubscribe() 你能夠終止執行過程:

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();

在Observable被訂閱後,表明執行過程的Subscription 對象將被返回。對其調用unsubscribe()就能夠終止執行。

每個Observable都須要在 create()的建立過程當中定義終止的邏輯。在function subscribe()中返回自定義的unsubscribe就能夠實現。

下面的例子說明了如何在終止後釋放setInterval的句柄:

var observable = Rx.Observable.create(function subscribe(observer) {
  // 得到定時函數的句柄
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);
  
  // 提供終止方法釋放定時函數的句柄
  return function unsubscribe() {
    clearInterval(intervalID);
  };
});

相似於observable.subscribeObservable.create(function subscribe() {...})的關係,咱們在subscribe中返回的 unsubscribe 也與subscription.unsubscribe在概念上等價。事實上,若是咱們除去Rx的包裝,純粹的JavaScript代碼簡單清晰:

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);
  
  return function unsubscribe() {
    clearInterval(intervalID);
  };
}

var unsubscribe = subscribe({next: (x) => console.log(x)});

// 一段時間後:
unsubscribe(); // 終止

使用Observable、 Observer 和 Subscription這些概念的緣由是,咱們能夠在Observable 契約之下安全、兼容地調用操做符。

相關文章
相關標籤/搜索