RxJS 系列之二 - Observable 詳解

查看新版教程,請訪問前端修仙之路javascript

RxJS 系列目錄

Observable

在介紹 Observable 以前,咱們要先了解兩個設計模式:html

  • Observer Pattern - (觀察者模式)
  • Iterator Pattern - (迭代器模式)

這兩個模式是 Observable 的基礎,下面咱們先來介紹一下 Observer Pattern。前端

Observer Pattern

觀察者模式定義

觀察者模式軟件設計模式的一種。在此種模式中,一個目標對象管理全部相依於它的觀察者對象,而且在它自己的狀態改變時主動發出通知。這一般透過呼叫各觀察者所提供的方法來實現。此種模式一般被用來實時事件處理系統。 — 維基百科java

觀察者模式又叫發佈訂閱模式(Publish/Subscribe),它定義了一種一對多的關係,讓多個觀察者對象同時監聽某一個主題對象,這個主題對象的狀態發生變化時就會通知全部的觀察者對象,使得它們可以自動更新本身。react

咱們可使用平常生活中,期刊訂閱的例子來形象地解釋一下上面的概念。期刊訂閱包含兩個主要的角色:期刊出版方和訂閱者,他們之間的關係以下:git

  • 期刊出版方 - 負責期刊的出版和發行工做
  • 訂閱者 - 只需執行訂閱操做,新版的期刊發佈後,就會主動收到通知,若是取消訂閱,之後就不會再收到通知

在觀察者模式中也有兩個主要角色:Subject (主題) 和 Observer (觀察者) 。它們分別對應例子中的期刊出版方和訂閱者。接下來咱們來看張圖,從而加深對上面概念的理解。es6

observer-pattern

觀察者模式優缺點

觀察者模式的優勢:github

  • 支持簡單的廣播通訊,自動通知全部已經訂閱過的對象
  • 目標對象與觀察者之間的抽象耦合關係可以單獨擴展以及重用

觀察者模式的缺點:算法

  • 若是一個被觀察者對象有不少的直接和間接的觀察者的話,將全部的觀察者都通知到會花費不少時間
  • 若是在觀察者和觀察目標之間有循環依賴的話,觀察目標會觸發它們之間進行循環調用,可能致使系統崩潰

觀察者模式的應用

在前端領域,觀察者模式被普遍地使用。最多見的例子就是爲 DOM 對象添加事件監聽,具體示例以下:typescript

<button id="btn">確認</button>

function clickHandler(event) {
	console.log('用戶已點擊確認按鈕!');
}
document.getElementById("btn").addEventListener('click', clickHandler);
複製代碼

上面代碼中,咱們經過 addEventListener API 監聽 button 對象上的點擊事件,當用戶點擊按鈕時,會自動執行咱們的 clickHandler 函數。

觀察者模式實戰

Subject 類定義:

class Subject {
    
    constructor() {
        this.observerCollection = []; // 觀察者集合
    }
    
    registerObserver(observer) { // 註冊觀察者
        this.observerCollection.push(observer);
    }
    
    unregisterObserver(observer) { // 移除觀察者
        let index = this.observerCollection.indexOf(observer);
        if(index >= 0) this.observerCollection.splice(index, 1);
    }
    
    notifyObservers() { // 通知全部觀察者
        this.observerCollection.forEach((observer)=>observer.notify());
    }
}
複製代碼

Observer 類定義:

class Observer {
    
    constructor(name) {
        this.name = name;
    }
    
    notify() { 
        console.log(`${this.name} has been notified.`);
    }
}
複製代碼

使用示例:

let subject = new Subject(); // 建立主題對象

let observer1 = new Observer('semlinker'); // 建立觀察者A - 'semlinker'
let observer2 = new Observer('lolo'); // 建立觀察者B - 'lolo'

subject.registerObserver(observer1); // 註冊觀察者A
subject.registerObserver(observer2); // 註冊觀察者B
 
subject.notifyObservers(); // 通知觀察者

subject.unregisterObserver(observer1); // 移除觀察者A

subject.notifyObservers(); // 驗證是否成功移除
複製代碼

以上代碼成功運行後控制檯的輸出結果:

semlinker has been notified. # 輸出一次
2(unknown) lolo has been notified. # 輸出兩次
複製代碼

須要注意的是,在觀察者模式中,一般狀況下調用註冊觀察者後,會返回一個函數,用於移除監聽,有興趣的讀者,能夠本身嘗試一下。(備註:在 Angular 1.x 中調用 scope.on() 方法後,就會返回一個函數,用於移除監聽)

Iterator Pattern

迭代器模式定義

迭代器(Iterator)模式,又叫作遊標(Cursor)模式。它提供一種方法順序訪問一個聚合對象中的各個元素,而又不須要暴露該對象的內部表示。迭代器模式能夠把迭代的過程從業務邏輯中分離出來,在使用迭代器模式以後,即便不關心對象的內部構造,也能夠按順序訪問其中的每一個元素。

迭代器模式的優缺點

迭代器模式的優勢:

  • 簡化了遍歷方式,對於對象集合的遍歷,仍是比較麻煩的,對於數組或者有序列表,咱們尚能夠經過遊標取得,但用戶須要在對集合瞭解的前提下,自行遍歷對象,可是對於 hash 表來講,用戶遍歷起來就比較麻煩。而引入迭代器方法後,用戶用起來就簡單的多了。
  • 封裝性良好,用戶只須要獲得迭代器就能夠遍歷,而不用去關心遍歷算法。

迭代器模式的缺點:

  • 遍歷過程是一個單向且不可逆的遍歷

ECMAScript 迭代器

在 ECMAScript 中 Iterator 最先實際上是要採用相似 Python 的 Iterator 規範,就是 Iterator 在沒有元素以後,執行 next會直接拋出錯誤;但後來通過一段時間討論後,決定採更 functional 的作法,改爲在取得最後一個元素以後執行 next 永遠都回傳 { done: true, value: undefined }

一個迭代器對象 ,知道如何每次訪問集合中的一項, 並記錄它的當前在序列中所在的位置。在 JavaScript 中迭代器是一個對象,它提供了一個 next() 方法,返回序列中的下一項。這個方法返回包含 donevalue 兩個屬性的對象。對象的取值以下:

  • 在最後一個元素前:{ done: false, value: elementValue }
  • 在最後一個元素後:{ done: true, value: undefined }

詳細信息能夠參考 - 可迭代協議和迭代器協議

ES 5 迭代器

接下來咱們來建立一個 makeIterator 函數,該函數的參數類型是數組,當調用該函數後,返回一個包含 next() 方法的 Iterator 對象, 其中 next() 方法是用來獲取容器對象中下一個元素。具體示例以下:

function makeIterator(array){
    var nextIndex = 0;
    
    return {
       next: function(){
           return nextIndex < array.length ?
               {value: array[nextIndex++], done: false} :
               {done: true};
       }
    }
}
複製代碼

一旦初始化, next() 方法能夠用來依次訪問可迭代對象中的元素:

var it = makeIterator(['yo', 'ya']);
console.log(it.next().value); // 'yo'
console.log(it.next().value); // 'ya'
console.log(it.next().done);  // true
複製代碼

ES 6 迭代器

在 ES 6 中咱們能夠經過 Symbol.iterator 來建立可迭代對象的內部迭代器,具體示例以下:

let arr = ['a', 'b', 'c'];
let iter = arr[Symbol.iterator]();
複製代碼

調用 next() 方法來獲取數組中的元素:

> iter.next()
{ value: 'a', done: false }
> iter.next()
{ value: 'b', done: false }
> iter.next()
{ value: 'c', done: false }
> iter.next()
{ value: undefined, done: true }
複製代碼

ES 6 中可迭代的對象:

  • Arrays
  • Strings
  • Maps
  • Sets
  • DOM data structures (work in progress)

Observable

RxJS 是基於觀察者模式和迭代器模式以函數式編程思惟來實現的。RxJS 中含有兩個基本概念:Observables 與 Observer。Observables 做爲被觀察者,是一個值或事件的流集合;而 Observer 則做爲觀察者,根據 Observables 進行處理。

Observables 與 Observer 之間的訂閱發佈關係(觀察者模式) 以下:

  • 訂閱:Observer 經過 Observable 提供的 subscribe() 方法訂閱 Observable。
  • 發佈:Observable 經過回調 next 方法向 Observer 發佈事件。

Proposal-Observable

自定義 Observable

若是你想真正瞭解 Observable,最好的方式就是本身寫一個。其實 Observable 就是一個函數,它接受一個 Observer 做爲參數而後返回另外一個函數。

它的基本特徵:

  • 是一個函數
  • 接受一個 Observer 對象 (包含 next、error、complete 方法的對象) 做爲參數
  • 返回一個 unsubscribe 函數,用於取消訂閱

它的做用:

做爲生產者與觀察者之間的橋樑,並返回一種方法來解除生產者與觀察者之間的聯繫,其中觀察者用於處理時間序列上數據流。接下來咱們來看一下 Observable 的基礎實現:

DataSource - 數據源

class DataSource {
  constructor() {
    let i = 0;
    this._id = setInterval(() => this.emit(i++), 200); // 建立定時器
  }
  
  emit(n) {
    const limit = 10;  // 設置數據上限值
    if (this.ondata) {
      this.ondata(n);
    }
    if (n === limit) {
      if (this.oncomplete) {
        this.oncomplete();
      }
      this.destroy();
    }
  }
  
  destroy() { // 清除定時器
    clearInterval(this._id);
  }
}
複製代碼

myObservable

function myObservable(observer) {
    let datasource = new DataSource(); // 建立數據源
    datasource.ondata = (e) => observer.next(e); // 處理數據流
    datasource.onerror = (err) => observer.error(err); // 處理異常
    datasource.oncomplete = () => observer.complete(); // 處理數據流終止
    return () => { // 返回一個函數用於,銷燬數據源
        datasource.destroy();
    };
}
複製代碼

使用示例:

const unsub = myObservable({
  next(x) { console.log(x); },
  error(err) { console.error(err); },
  complete() { console.log('done')}
});

/** * 移除註釋,能夠測試取消訂閱 */
// setTimeout(unsub, 500); 
複製代碼

具體運行結果,能夠查看線上示例

SafeObserver - 更好的 Observer

上面的示例中,咱們使用一個包含了 next、error、complete 方法的普通 JavaScript 對象來定義觀察者。一個普通的 JavaScript 對象只是一個開始,在 RxJS 5 裏面,爲開發者提供了一些保障機制,來保證一個更安全的觀察者。如下是一些比較重要的原則:

  • 傳入的 Observer 對象能夠不實現全部規定的方法 (next、error、complete 方法)
  • complete 或者 error 觸發以後再調用 next 方法是沒用的
  • 調用 unsubscribe 方法後,任何方法都不能再被調用了
  • completeerror 觸發後,unsubscribe 也會自動調用
  • nextcompleteerror 出現異常時,unsubscribe 也會自動調用以保證資源不會浪費
  • nextcompleteerror是可選的。按需處理便可,沒必要所有處理

爲了完成上述目標,咱們得把傳入的匿名 Observer 對象封裝在一個 SafeObserver 裏以提供上述保障。SafeObserver 的具體實現以下:

class SafeObserver {
  constructor(destination) {
    this.destination = destination;
  }
  
  next(value) {
    // 還沒有取消訂閱,且包含next方法
    if (!this.isUnsubscribed && this.destination.next) {
      try {
        this.destination.next(value);
      } catch (err) {
        // 出現異常時,取消訂閱釋放資源,再拋出異常
        this.unsubscribe();
        throw err;
      }
    }
  }
  
  error(err) {
    // 還沒有取消訂閱,且包含error方法
    if (!this.isUnsubscribed && this.destination.error) {
      try {
        this.destination.error(err);
      } catch (e2) {
        // 出現異常時,取消訂閱釋放資源,再拋出異常
        this.unsubscribe();
        throw e2;
      }
      this.unsubscribe();
    }
  }

  complete() {
    // 還沒有取消訂閱,且包含complete方法
    if (!this.isUnsubscribed && this.destination.complete) {
      try {
        this.destination.complete();
      } catch (err) {
        // 出現異常時,取消訂閱釋放資源,再拋出異常
        this.unsubscribe();
        throw err;
      }
      this.unsubscribe();
    }
  }
  
  unsubscribe() { // 用於取消訂閱
    this.isUnsubscribed = true;
    if (this.unsub) {
      this.unsub();
    }
  }
}
複製代碼

myObservable - 使用 SafeObserver

function myObservable(observer) {
  const safeObserver = new SafeObserver(observer); // 建立SafeObserver對象
  const datasource = new DataSource(); // 建立數據源
  datasource.ondata = (e) => safeObserver.next(e);
  datasource.onerror = (err) => safeObserver.error(err);
  datasource.oncomplete = () => safeObserver.complete();

  safeObserver.unsub = () => { // 爲SafeObserver對象添加unsub方法
    datasource.destroy();
  };
  // 綁定this上下文,並返回unsubscribe方法
  return safeObserver.unsubscribe.bind(safeObserver); 
}
複製代碼

使用示例:

const unsub = myObservable({
  next(x) { console.log(x); },
  error(err) { console.error(err); },
  complete() { console.log('done')}
});
複製代碼

具體運行結果,能夠查看線上示例

Operators - 也是函數

Operator 是一個函數,它接收一個 Observable 對象,而後返回一個新的 Observable 對象。當咱們訂閱新返回的 Observable 對象時,它內部會自動訂閱前一個 Observable 對象。接下來咱們來實現經常使用的 map 操做符:

Observable 實現:

class Observable {
  constructor(_subscribe) {
    this._subscribe = _subscribe;
  }
  
  subscribe(observer) {
    const safeObserver = new SafeObserver(observer);
    safeObserver.unsub = this._subscribe(safeObserver);
    return safeObserver.unsubscribe.bind(safeObserver);
  }
}
複製代碼

map 操做符實現:

function map(source, project) {
  return new Observable((observer) => {
    const mapObserver = {
      next: (x) => observer.next(project(x)),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    };
    return source.subscribe(mapObserver);
  });
}
複製代碼

具體運行結果,能夠查看線上示例

改進 Observable - 支持 Operator 鏈式調用

若是把 Operator 都寫成如上那種獨立的函數,咱們鏈式代碼會逐漸變醜:

map(map(myObservable, (x) => x + 1), (x) => x + 2);
複製代碼

對於上面的代碼,想象一下有 五、6 個嵌套着的 Operator,再加上更多、更復雜的參數,基本上就無法兒看了。

你也能夠試下 Texas Toland 提議的簡單版管道實現,合併壓縮一個數組的Operator並生成一個最終的Observable,不過這意味着要寫更復雜的 Operator,上代碼:JSBin。其實寫完後你會發現,代碼也不怎麼漂亮:

pipe(myObservable, map(x => x + 1), map(x => x + 2));
複製代碼

理想狀況下,咱們想將代碼用更天然的方式鏈起來:

myObservable.map(x => x + 1).map(x => x + 2);
複製代碼

幸運的是,咱們已經有了這樣一個 Observable 類,咱們能夠基於 prototype 在不增長複雜度的狀況下支持多 Operators 的鏈式結構,下面咱們採用prototype方式再次實現一下 Observable

Observable.prototype.map = function (project) {
    return new Observable((observer) => {
        const mapObserver = {
            next: (x) => observer.next(project(x)),
            error: (err) => observer.error(err),
            complete: () => observer.complete()
        };
        return this.subscribe(mapObserver);
    });
};
複製代碼

如今咱們終於有了一個還不錯的實現。這樣實現還有其餘好處,例如:能夠寫子類繼承 Observable 類,而後在子類中重寫某些內容以優化程序。

接下來咱們來總結一下該部分的內容:Observable 就是函數,它接受 Observer 做爲參數,又返回一個函數。若是你也寫了一個函數,接收一個 Observer 做爲參數,又返回一個函數,那麼,它是異步的、仍是同步的 ?其實都不是,它就只是一個函數。任何函數的行爲都依賴於它的具體實現,因此當你處理一個 Observable 時,就把它當成一個普通函數,裏面沒有什麼黑魔法。當你要構建 Operator 鏈時,你須要作的其實就是生成一個函數將一堆 Observers 連接在一塊兒,而後讓真正的數據依次穿過它們。

Rx.Observable.create

var observable = Rx.Observable
	.create(function(observer) {
		observer.next('Semlinker'); // RxJS 4.x 之前的版本用 onNext
		observer.next('Lolo');
	});
	
// 訂閱這個 Observable 
observable.subscribe(function(value) {
	console.log(value);
});
複製代碼

以上代碼運行後,控制檯會依次輸出 'Semlinker' 和 'Lolo' 兩個字符串。

須要注意的是,不少人認爲 RxJS 中的全部操做都是異步的,但其實這個觀念是錯的。RxJS 的核心特性是它的異步處理能力,但它也是能夠用來處理同步的行爲。具體示例以下:

var observable = Rx.Observable
	.create(function(observer) {
		observer.next('Semlinker'); // RxJS 4.x 之前的版本用 onNext
		observer.next('Lolo');
	});
	
console.log('start');
observable.subscribe(function(value) {
	console.log(value);
});
console.log('end');
複製代碼

以上代碼運行後,控制檯的輸出結果:

start
Semlinker
Lolo
end
複製代碼

固然咱們也能夠用它處理異步行爲:

var observable = Rx.Observable
	.create(function(observer) {
		observer.next('Semlinker'); // RxJS 4.x 之前的版本用 onNext
		observer.next('Lolo');
		
		setTimeout(() => {
			observer.next('RxJS Observable');
		}, 300);
	})
	
console.log('start');
observable.subscribe(function(value) {
	console.log(value);
});
console.log('end');
複製代碼

以上代碼運行後,控制檯的輸出結果:

start
Semlinker
Lolo
end
RxJS Observable
複製代碼

從以上例子中,咱們能夠得出一個結論 - Observable 能夠應用於同步和異步的場合。

Observable - Creation Operator

RxJS 中提供了不少操做符,用於建立 Observable 對象,經常使用的操做符以下:

  • create
  • of
  • from
  • fromEvent
  • fromPromise
  • empty
  • never
  • throw
  • interval
  • timer

上面的例子中,咱們已經使用過了 create 操做符,接下來咱們來看一下 of 操做符:

of

var source = Rx.Observable.of('Semlinker', 'Lolo');

source.subscribe({
    next: function(value) {
        console.log(value);
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error);
    }
});
複製代碼

以上代碼運行後,控制檯的輸出結果:

Semlinker
Lolo
complete!
複製代碼

from

var arr = [1, 2, 3];
var source = Rx.Observable.from(arr); // 也支持字符串,如 "Angular 2 修仙之路"

source.subscribe({
    next: function(value) {
        console.log(value);
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error);
    }
});
複製代碼

以上代碼運行後,控制檯的輸出結果:

1
2
3
complete!
複製代碼

fromEvent

Rx.Observable.fromEvent(document.querySelector('button'), 'click');
複製代碼

fromPromise

var source = Rx.Observable
  .fromPromise(new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve('Hello RxJS!');
    },3000)
}));
  
source.subscribe({
    next: function(value) {
    	console.log(value);
    },
    complete: function() {
    	console.log('complete!');
    },
    error: function(error) {
    	console.log(error);
    }
});
複製代碼

以上代碼運行後,控制檯的輸出結果:

Hello RxJS!
complete!
複製代碼

empty

var source = Rx.Observable.empty();

source.subscribe({
    next: function(value) {
        console.log(value);
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error);
    }
});
複製代碼

以上代碼運行後,控制檯的輸出結果:

complete!
複製代碼

empty 操做符返回一個空的 Observable 對象,若是咱們訂閱該對象,它會當即返回 complete 信息。

never

var source = Rx.Observable.never();

source.subscribe({
    next: function(value) {
        console.log(value);
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error);
    }
});
複製代碼

never 操做符會返回一個無窮的 Observable,當咱們訂閱它後,什麼事情都不會發生,它是一個一直存在卻什麼都不作的 Observable 對象。

throw

var source = Rx.Observable.throw('Oop!');

source.subscribe({
	next: function(value) {
		console.log(value);
	},
	complete: function() {
		console.log('complete!');
	},
	error: function(error) {
    	console.log('Throw Error: ' + error);
	}
});
複製代碼

以上代碼運行後,控制檯的輸出結果:

Throw Error: Oop!
複製代碼

throw 操做如,只作一件事就是拋出異常。

interval

var source = Rx.Observable.interval(1000);

source.subscribe({
	next: function(value) {
		console.log(value);
	},
	complete: function() {
		console.log('complete!');
	},
	error: function(error) {
    	console.log('Throw Error: ' + error);
	}
});
複製代碼

以上代碼運行後,控制檯的輸出結果:

0
1
2
...
複製代碼

interval 操做符支持一個數值類型的參數,用於表示定時的間隔。上面代碼表示每隔 1s,會輸出一個遞增的值,初始值從 0 開始。

timer

var source = Rx.Observable.timer(1000, 5000);

source.subscribe({
	next: function(value) {
		console.log(value);
	},
	complete: function() {
		console.log('complete!');
	},
	error: function(error) {
    	console.log('Throw Error: ' + error);
	}
});
複製代碼

以上代碼運行後,控制檯的輸出結果:

0 # 1s後
1 # 5s後
2 # 5s後
...
複製代碼

timer 操做符支持兩個參數,第一個參數用於設定發送第一個值需等待的時間,第二個參數表示第一次發送後,發送其它值的間隔時間。此外,timer 操做符也能夠只傳遞一個參數,具體以下:

var source = Rx.Observable.timer(1000);

source.subscribe({
	next: function(value) {
		console.log(value);
	},
	complete: function() {
		console.log('complete!');
	},
	error: function(error) {
    	console.log('Throw Error: ' + error);
	}
});
複製代碼

以上代碼運行後,控制檯的輸出結果:

0
complete!
複製代碼

Subscription

有些時候對於一些 Observable 對象 (如經過 interval、timer 操做符建立的對象),當咱們不須要的時候,要釋放相關的資源,以免資源浪費。針對這種狀況,咱們能夠調用 Subscription 對象的 unsubscribe 方法來釋放資源。具體示例以下:

var source = Rx.Observable.timer(1000, 1000);

// 取得subscription對象
var subscription = source.subscribe({
	next: function(value) {
		console.log(value);
	},
	complete: function() {
		console.log('complete!');
	},
	error: function(error) {
    	console.log('Throw Error: ' + error);
	}
});

setTimeout(() => {
    subscription.unsubscribe();
}, 5000);
複製代碼

RxJS - Observer

Observer (觀察者) 是一個包含三個方法的對象,每當 Observable 觸發事件時,便會自動調用觀察者的對應方法。

Observer 接口定義

interface Observer<T> {
  closed?: boolean; // 標識是否已經取消對Observable對象的訂閱
  next: (value: T) => void;
  error: (err: any) => void;
  complete: () => void;
}
複製代碼

Observer 中的三個方法的做用:

  • next - 每當 Observable 發送新值的時候,next 方法會被調用
  • error - 當 Observable 內發生錯誤時,error 方法就會被調用
  • complete - 當 Observable 數據終止後,complete 方法會被調用。在調用 complete 方法以後,next 方法就不會再次被調用

接下來咱們來看個具體示例:

var observable = Rx.Observable
	.create(function(observer) {
			observer.next('Semlinker');
			observer.next('Lolo');
			observer.complete();
			observer.next('not work');
	});
	
// 建立一個觀察者
var observer = {
	next: function(value) {
		console.log(value);
	},
	error: function(error) {
		console.log(error);
	},
	complete: function() {
		console.log('complete');
	}
}

// 訂閱已建立的observable對象
observable.subscribe(observer);
複製代碼

以上代碼運行後,控制檯的輸出結果:

Semlinker
Lolo
complete
複製代碼

上面的例子中,咱們能夠看出,complete 方法執行後,next 就會失效,因此不會輸出 not work

另外觀察者能夠不用同時包含 next、complete、error 三種方法,它能夠只包含一個 next 方法,具體以下:

var observer = {
	next: function(value) {
		console.log(value);
	}
};
複製代碼

有時候 Observable 多是一個無限的序列,例如 click 事件,對於這種場景,complete 方法就永遠不會被調用。

咱們也能夠在調用 Observable 對象的 subscribe 方法時,依次傳入 next、error、complete 三個函數,來建立觀察者:

observable.subscribe(
    value => { console.log(value); },
    error => { console.log('Error: ', error); },
    () => { console.log('complete'); }
);
複製代碼

Pull vs Push

Pull 和 Push 是數據生產者和數據的消費者兩種不一樣的交流方式。

什麼是Pull?

在 "拉" 體系中,數據的消費者決定什麼時候從數據生產者那裏獲取數據,而生產者自身並不會意識到何時數據將會被髮送給消費者。

每個 JavaScript 函數都是一個 "拉" 體系,函數是數據的生產者,調用函數的代碼經過 ''拉出" 一個單一的返回值來消費該數據。

const add = (a, b) => a + b;
let sum = add(3, 4);
複製代碼

ES6介紹了 iterator迭代器Generator生成器 —另外一種 "拉" 體系,調用 iterator.next() 的代碼是消費者,可從中拉取多個值

什麼是Push?

在 "推" 體系中,數據的生產者決定什麼時候發送數據給消費者,消費者不會在接收數據以前意識到它將要接收這個數據。

Promise(承諾) 是當今 JS 中最多見的 "推" 體系,一個Promise (數據的生產者)發送一個 resolved value (成功狀態的值)來執行一個回調(數據消費者),可是不一樣於函數的地方的是:Promise 決定着什麼時候數據才被推送至這個回調函數。

RxJS 引入了 Observables (可觀察對象),一個全新的 "推" 體系。一個可觀察對象是一個產生多值的生產者,當產生新數據的時候,會主動 "推送給" Observer (觀察者)。

生產者 消費者
pull拉 被請求的時候產生數據 決定什麼時候請求數據
push推 按本身的節奏生產數據 對接收的數據進行處理

接下來咱們來看張圖,從而加深對上面概念的理解:

pull-vs-push

Observable vs Promise

Observable(可觀察對象)是基於推送(Push)運行時執行(lazy)的多值集合。下方表格對Observable進行了定位。

MagicQ 單值 多值
拉取(Pull) 函數 遍歷器
推送(Push) Promise Observable
  • Promise
    • 返回單個值
    • 不可取消的
  • Observable
    • 隨着時間的推移發出多個值
    • 能夠取消的
    • 支持 map、filter、reduce 等操做符
    • 延遲執行,當訂閱的時候纔會開始執行

延遲計算 & 漸進式取值

延遲計算

全部的 Observable 對象必定會等到訂閱後,纔開始執行,若是沒有訂閱就不會執行。

var source = Rx.Observable.from([1,2,3,4,5]);
var example = source.map(x => x + 1);
複製代碼

上面的示例中,由於 example 對象還未被訂閱,因此不會進行運算。這跟數組不同,具體以下:

var source = [1,2,3,4,5];
var example = source.map(x => x + 1); 
複製代碼

以上代碼運行後,example 中就包含已運算後的值。

漸進式取值

數組中的操做符如:filter、map 每次都會完整執行並返回一個新的數組,纔會繼續下一步運算。具體示例以下:

var source = [1,2,3,4,5];
var example = source
				.filter(x => x % 2 === 0) // [2, 4]
              	.map(x => x + 1) // [3, 5]
複製代碼

關於數組中的 mapfilter 的詳細信息,能夠參考 - RxJS Functional Programming

爲了更好地理解數組操做符的運算過程,咱們能夠參考下圖:

雖然 Observable 運算符每次都會返回一個新的 Observable 對象,但每一個元素都是漸進式獲取的,且每一個元素都會通過操做符鏈的運算後才輸出,而不會像數組那樣,每一個階段都得完整運算。具體示例以下:

var source = Rx.Observable.from([1,2,3,4,5]);
var example = source
              .filter(x => x % 2 === 0)
              .map(x => x + 1)

example.subscribe(console.log);
複製代碼

以上代碼的執行過程以下:

  • source 發出 1,執行 filter 過濾操做,返回 false,該值被過濾掉
  • source 發出 2,執行 filter 過濾操做,返回 true,該值被保留,接着執行 map 操做,值被處理成 3,最後經過 console.log 輸出
  • source 發出 3,執行 filter 過濾操做,返回 false,該值被過濾掉
  • source 發出 4,執行 filter 過濾操做,返回 true,該值被保留,接着執行 map 操做,值被處理成 5,最後經過 console.log 輸出
  • source 發出 5,執行 filter 過濾操做,返回 false,該值被過濾掉

爲了更好地理解 Observable 操做符的運算過程,咱們能夠參考下圖:

學習資源

參考資源

相關文章
相關標籤/搜索