從觀察者模式到迭代器模式系統講解 RxJS Observable(一)

RxJS 是 Reactive Extensions for JavaScript 的縮寫,起源於 Reactive Extensions,是一個基於可觀測數據流 Stream 結合觀察者模式和迭代器模式的一種異步編程的應用庫。RxJS 是 Reactive Extensions 在 JavaScript 上的實現。javascript

Reactive Extensions(Rx)是對 LINQ 的一種擴展,他的目標是對異步的集合進行操做,也就是說,集合中的元素是異步填充的,好比說從 Web
或者雲端獲取數據而後對集合進行填充。LINQ(Language Integrated Query)語言集成查詢是一組用於 C# 和
Visual Basic 語言的擴展。它容許編寫 C# 或者 Visual Basic 代碼以操做內存數據的方式,查詢數據庫。

RxJS 的主要功能是利用響應式編程的模式來實現 JavaScript 的異步式編程(現前端主流框架 Vue React Angular 都是響應式的開發框架)。前端

RxJS 是基於觀察者模式和迭代器模式以函數式編程思惟來實現的。學習 RxJS 以前咱們須要先了解觀察者模式和迭代器模式,還要對 Stream 流的概念有所認識。下面咱們將對其逐一進行介紹,準備好了嗎?讓咱們如今就開始吧。java

RxJS 前置知識點

觀察者模式

觀察者模式又叫發佈訂閱模式(Publish/Subscribe),它是一種一對多的關係,讓多個觀察者(Obesver)同時監聽一個主題(Subject),這個主題也就是被觀察者(Observable),被觀察者的狀態發生變化時就會通知全部的觀察者,使得它們可以接收到更新的內容。git

觀察者模式主題和觀察者是分離的,不是主動觸發而是被動監聽。github

640?wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1

舉個常見的例子,例如微信公衆號關注者和微信公衆號之間的信息訂閱。當微信用戶關注微信公衆號 webinfoq就是一個訂閱過程,webinfoq負責發佈內容和信息,webinfoq有內容推送時,webinfoq的關注者就能收到最新發布的內容。這裏,關注公衆號的朋友就是觀察者的角色,公衆號webinfoq就是被觀察者的角色。web

640?wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1

示例代碼:ajax

640?wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1

// 定義一個主題類(被觀察者/發佈者)
class Subject {
  constructor() {
    this.observers = [];   // 記錄訂閱者(觀察者)的集合
    this.state = 0;        // 發佈的初始狀態
  }
  getState() {
    return this.state;
  }
  setState(state) {        
    this.state = state;    // 推送新信息
    this.notify();         // 通知訂閱者有更新了
  }
  attach(observer) {
    this.observers.push(observer);   // 對觀察者進行登記
  }
  notify() {
    // 遍歷觀察者集合,一一進行通知
    this.observers.forEach(observer = {
      observer.update();   
    })
  }
}
// 定義一個觀察者(訂閱)類
class Observer {
  constructor(name, subject) {  
    this.name = name;   // name 表示觀察者的標識
    this.subject = subject;  // 觀察者訂閱主題
    this.subject.attach(this);  // 向登記處傳入觀察者實體
  }
  update() {
    console.log(`${this.name} update, state: ${this.subject.getState()}`);
  }
}

// 建立一個主題
let subject = new Subject();

// 建立三個觀察者: observer$1 observer$2 observer$3
let observer$1 = new Observer("observer$1", subject);
let observer$2 = new Observer("observer$2", subject);
let observer$3 = new Observer("observer$3", subject);

// 主題有更新
subject.setState(1);
subject.setState(2);
subject.setState(3);

// 輸出結果
// observer$1 update, state: 1
// observer$1 update, state: 1
// observer$1 update, state: 1
// observer$2 update, state: 2
// observer$2 update, state: 2
// observer$2 update, state: 2
// observer$3 update, state: 3
// observer$3 update, state: 3
// observer$3 update, state: 3

迭代器模式

迭代器(Iterator)模式又叫遊標(Sursor)模式,迭代器具備 next 方法,能夠順序訪問一個聚合對象中的各個元素,而不須要暴露該對象的內部表現。數據庫

迭代器模式能夠把迭代的過程從從業務邏輯中分離出來,迭代器將使用者和目標對象隔離開來,即便不瞭解對象的內部構造,也能夠經過迭代器提供的方法順序訪問其每一個元素。npm

640?wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1

瞭解更多可迭代對象:「JS篇」你不知道的 JS 知識點總結(一)編程

640?wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1

使用 ES5 建立一個迭代器

//建立一個迭代類,傳入目標對象
function Iterator(container) {
  this.list = container.list;
  this.index = 0;

  //定義私有的next方法,執行迭代
  this.next = function() {
    if(this.hasNext()) { //判斷是否迭代完畢
      return {
        value: this.list[this.index++],
        done: false
      }
    }
    return {value: null, done: true}
  }
  this.hasNext = function() {
    if(this.index >= this.list.length) {
      return false;
    }
    return true;
  }
}

//定義目標對象
function Container(list) {
  this.list = list;
  this.getIterator = function() {
    return new Iterator(this); //用戶返回一個迭代器
  }
}

//調用
var container = new Container([1, 2, 3, 4, 5]);
var iterator = container.getIterator();
iterator.next();  // {value: 1, done: false}
iterator.next();  // {value: 2, done: false}
iterator.next();  // {value: 3, done: false}
iterator.next();  // {value: 4, done: false}
iterator.next();  // {value: 5, done: false}
iterator.next();  // {value: null, done: true}

使用 ES6 構造一個迭代器

class Iterator {
  constructor(container) {
    this.list = container.list;
    this.index = 0;
  }
  next() {
    if(this.hasNext()) {
      return {
        value: this.list[this.index++],
        done: false
      }
    }
    return {value: null, done: true}
  }
  hasNext() {
    if(this.index >= this.list.length) {
      return false;
    }
    return true;
  }
}

class Container {
  constructor(list) {
    this.list = list;
  }
  getIterator() {
    return new Iterator(this);
  }
}

let container = new Container([1, 2, 3, 4, 5]);
let iterator = container.getIterator();

iterator.next();  // {value: 1, done: false}
iterator.next();  // {value: 2, done: false}
iterator.next();  // {value: 3, done: false}
iterator.next();  // {value: 4, done: false}
iterator.next();  // {value: 5, done: false}
iterator.next();  // {value: null, done: true}

使用 ES6 的 Symbol.iterator 建立一個迭代器

var list = [1, 2, 3, 4, 5];
var iterator = list[Symbol.iterator]();

iterator.next();  // {value: 1, done: false}
iterator.next();  // {value: 2, done: false}
iterator.next();  // {value: 3, done: false}
iterator.next();  // {value: 4, done: false}
iterator.next();  // {value: 5, done: false}
iterator.next();  // {value: null, done: true}

經過上邊的示例代碼咱們能夠得知,咱們不瞭解對象的內部構造,可是能夠經過調用迭代器提供的 next() 方法就能順序訪問其每一個元素。

Stream 流

在這裏能夠將一系列的鼠標點擊、鍵盤點擊產生的事件和將要處理的元素集合看做一種流, 流的特色是數據源的自己是無限的,流在管道中傳輸, 而且能夠在管道的節點上進行處理, 好比篩選, 排序,聚合等。

流數據源(source)通過數據轉換等中間操做的處理,最後由最終操做獲得前面處理的結果,每次轉換原有 Stream 對象不改變,返回一個新的 Stream 對象(能夠有屢次轉換),這就容許對其操做能夠像鏈條同樣排列,變成一個管道。

640?wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1

爲了對 stream 有一個更感性的認識,咱們說點擊事件能夠看做一種 stream,在介紹 RxJS 以前,咱們不妨先看一個 RxJS 官網上的例子(列舉官方的例子更能充分體現 RxJS 是基於可觀測數據流 Stream 的)。

一般,註冊一個事件偵聽器是這樣的。

document.addEventListener('click', () => console.log('Clicked!'));

使用 RxJS 能夠建立一個 observable

import { fromEvent } from 'rxjs';
fromEvent(document, 'click').subscribe(() => console.log('Clicked!'));

自定義源建立一個 Observable

結合上邊講到流和設計模式,爲了方便咱們對 RxJS 有進一步的認識,咱們就用代碼本身來實現一個 Obserable

其實 Observable 實際上就是一個函數,它接收一個Observer 對象做爲參數,返回一個函數用來取消訂閱。Observer 對象能夠聲明 next、err、complete 方法來處理流的不一樣狀態。

首先咱們定義數據源 Source

// 建立數據源
class Source {
  constructor() {
    this.state = 0;
    this.data = setInterval(() => this.emit(this.state++), 200);
  }
  
  emit(state) {
    const limit = 10;  // 定義數據上限
    if (this.onData) {
      this.onData(state);  // 產生數據
    }
    if (state === limit) {
      if (this.onComplete) {
        this.onComplete();  // 數據終止
      }
      this.destroy();
    }
  }
  
  destroy() {  //中止定時器,清除數據
    clearInterval(this.data);
  }
}

建立一個 Observable

// 建立 Observable
class Observable {
  constructor() {}
  getStream() {  
    return new Source();
  }
  subscribe(observer) {
    // 獲取流數據源
    this.stream = this.getStream();  
    
    // 轉換
    this.stream.onData = (e) => observer.next(e); // 處理流數據
    this.stream.onError = (err) => observer.error(err);  //處理異常
    this.stream.onComplete = () => observer.complete();  //處理流數據終止
    
    // 返回一個函數
    return () => {
      this.stream.destroy();        
    }
  }
}

調用 subscribe 進行訂閱

const observable = new Observable();
//訂閱
let observer = {
  next(data) { console.log(data); },
  error(err) { console.error(err); },
  complete() { console.log('done')}
}
const unsubscribe = observable.subscribe(observer);

輸出結果

640?wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1

咱們能夠調用 unsubscribe 取消訂閱

//0.5後取消訂閱
setTimeout(unsubscribe, 500);

咱們能夠看到 Observable 做爲生產者與觀察者之間的橋樑,並返回一種方法來解除生產者與觀察者之間的聯繫,其中觀察者用於處理時間序列上數據流。

RxJS(Reactive Extensions for JavaScript)

介紹完 RxJS 的一些前置知識點,下面就讓咱們一塊兒來認識下什麼是 RxJS 吧。

RxJS 中含有兩個基本概念:ObservablesObserver。Observables 做爲被觀察者,是一個值或事件的流集合;而 Observer 則做爲觀察者,根據 Observables 進行處理。

Observables 與 Observer 之間的訂閱發佈關係(觀察者模式) 以下:

  • 訂閱:Observer 經過 Observable 提供的 subscribe() 方法訂閱 Observable。
  • 發佈:Observable 經過回調 next 方法向 Observer 發佈事件。

Observable 屬於全新的 push 體系,讓咱們先了解下什麼是 pull 體系和 push 體系吧。

Pull vs Push

PullPush 是兩種不一樣的協議,描述了數據生產者如何與數據消費者進行通訊。

生產者 消費者
pull 被請求的時候產生數據 決定什麼時候請求數據
push 按本身的節奏生產數據 對接收的數據進行處理

Pull 體系中,數據的消費者決定什麼時候從數據生產者那裏獲取數據,而生產者自身並不會意識到何時數據將會被髮送給消費者。

每個 JavaScript函數都是一個 Pull 體系,函數是數據的生產者,調用函數的代碼經過 '拉出' 一個單一的返回值來消費該數據。

function add(x, y) {
  console.log('Hello');
  return x + y;
}
const x = add(4, 5);

ES6介紹了 Iterator 迭代器 和 Generator 生成器,另外一種 Pull 體系,調用 iterator.next() 的代碼是消費者,可從中拉取多個值。

Push 體系中,數據的生產者決定什麼時候發送數據給消費者,消費者不會在接收數據以前意識到它將要接收這個數據。

Promise 是當今 JS 中最多見的 Push 體系,一個 Promise (數據的生產者)發送一個 resolved value (成功狀態的值)來執行一個回調(數據消費者)。可是不一樣於函數的地方的是:Promise 決定着什麼時候數據才被推送至這個回調函數。

RxJS 引入了 Observable (可觀察對象),一個全新的 Push 體系。一個可觀察對象是一個產生多值的生產者,當產生新數據的時候,會主動 "推送給" Observer (觀察者)。

Observable(可觀察對象)是基於推送(Push)運行時執行(lazy)的多值集合。

MagicQ 單值 多值
拉取(Pull) Function Iterator
推送(Push) Promise Observable

ObservablePromise 之間的差別:

  • Promise:只能返回單個值,不可取消,要麼 resolve 要麼 reject 而且只響應一次。
  • Observable:隨着時間的推移發出多個值,能夠調用 unsubscribe() 取消訂閱,支持 map、filter、reduce 等操做符,延遲執行,當訂閱的時候纔會開始執行,能夠響應屢次。

RxJS 之 Observable

使用 RxJS 咱們可使用 npm 進行安裝(更多使用方法請參考 github):

npm install rxjs

須要注意的是,不少人認爲 RxJS 中的全部操做都是異步的,但其實這個觀念是錯的。RxJS 的核心特性是它的異步處理能力,但它也是能夠用來處理同步的行爲。具體示例以下:

import { Observable } from 'rxjs';
 
const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});
 
console.log('start');
observable.subscribe({
  next(x) { console.log(x); },
  error(err) { console.error(err); },
  complete() { console.log('done'); }
});
console.log('end');

以上代碼運行後,控制檯的輸出結果:

start
1
2
3
done
end

固然咱們也能夠用它處理異步行爲:

import { Observable } from 'rxjs';
 
const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});
 
console.log('start');
observable.subscribe({
  next(x) { console.log(x); },
  error(err) { console.error(err); },
  complete() { console.log('done'); }
});
console.log('end');

代碼運行後的輸出結果爲:

start
1
2
3
end
4
done

RxJS 使用建立類操做符建立 Observable

RxJS 中提供了不少操做符 Operators,下篇文章咱們將對 Operators 進行介紹,建立類操做符(Creation Operator)用於建立 Observable 對象。

官網列舉的一些建立類操做符以下:

  • ajax
  • bindCallback
  • bindNodeCallback
  • defer
  • empty
  • from
  • fromEvent
  • fromEventPattern
  • generate
  • interval
  • of
  • range
  • throwError
  • timer
  • iif

最後咱們簡單的來看一下,如何使用 from 建立一個 Observable(關於操做符的介紹咱們將在下篇文章進行詳細介紹,保持關注哦)。

from:能夠把數組、Promise、以及 Iterable 轉化爲 Observable。

//將數組轉換爲 Observable:

import { from } from 'rxjs';

const array = [10, 20, 30];
const result = from(array);

result.subscribe(x => console.log(x));

// Logs:
// 10
// 20
// 30

因爲 RxJS 涉及到的概念和知識點比較寬泛和複雜,咱們須要一步一步的去理解和掌握它,最終能夠作到知其然亦知其因此然。接下來文章中會繼續介紹 RxJS 中涉及到知識點,關注此公衆號webinfoq不要錯過哦。

相關文章
相關標籤/搜索