RxJS 是 Reactive Extensions for JavaScript 的縮寫,起源於 Reactive Extensions,是一個基於可觀測數據流 Stream 結合觀察者模式和迭代器模式的一種異步編程的應用庫。RxJS 是 Reactive Extensions 在 JavaScript 上的實現。javascript
Reactive Extensions(Rx)是對 LINQ 的一種擴展,他的目標是對異步的集合進行操做,也就是說,集合中的元素是異步填充的,好比說從 Web
或者雲端獲取數據而後對集合進行填充。LINQ(Language Integrated Query)語言集成查詢是一組用於 C# 和
Visual Basic 語言的擴展。它容許編寫 C# 或者 Visual Basic 代碼以操做內存數據的方式,查詢數據庫。
RxJS
的主要功能是利用響應式編程的模式來實現 JavaScript 的異步式編程(現前端主流框架 Vue React Angular 都是響應式的開發框架)。前端
RxJS
是基於觀察者模式和迭代器模式以函數式編程思惟來實現的。學習 RxJS
以前咱們須要先了解觀察者模式和迭代器模式,還要對 Stream
流的概念有所認識。下面咱們將對其逐一進行介紹,準備好了嗎?讓咱們如今就開始吧。java
觀察者模式又叫發佈訂閱模式(Publish/Subscribe
),它是一種一對多的關係,讓多個觀察者(Obesver
)同時監聽一個主題(Subject
),這個主題也就是被觀察者(Observable
),被觀察者的狀態發生變化時就會通知全部的觀察者,使得它們可以接收到更新的內容。git
觀察者模式主題和觀察者是分離的,不是主動觸發而是被動監聽。github
舉個常見的例子,例如微信公衆號關注者和微信公衆號之間的信息訂閱。當微信用戶關注微信公衆號 webinfoq
就是一個訂閱過程,webinfoq
負責發佈內容和信息,webinfoq
有內容推送時,webinfoq
的關注者就能收到最新發布的內容。這裏,關注公衆號的朋友就是觀察者的角色,公衆號webinfoq
就是被觀察者的角色。web
示例代碼:ajax
// 定義一個主題類(被觀察者/發佈者) class Subject { constructor() { this.observers = []; // 記錄訂閱者(觀察者)的集合 this.state = 0; // 發佈的初始狀態 } getState() { return this.state; } setState(state) { this.state = state; // 推送新信息 this.notify(); // 通知訂閱者有更新了 } attach(observer) { this.observers.push(observer); // 對觀察者進行登記 } notify() { // 遍歷觀察者集合,一一進行通知 this.observers.forEach(observer = { observer.update(); }) } }
// 定義一個觀察者(訂閱)類 class Observer { constructor(name, subject) { this.name = name; // name 表示觀察者的標識 this.subject = subject; // 觀察者訂閱主題 this.subject.attach(this); // 向登記處傳入觀察者實體 } update() { console.log(`${this.name} update, state: ${this.subject.getState()}`); } } // 建立一個主題 let subject = new Subject(); // 建立三個觀察者: observer$1 observer$2 observer$3 let observer$1 = new Observer("observer$1", subject); let observer$2 = new Observer("observer$2", subject); let observer$3 = new Observer("observer$3", subject); // 主題有更新 subject.setState(1); subject.setState(2); subject.setState(3); // 輸出結果 // observer$1 update, state: 1 // observer$1 update, state: 1 // observer$1 update, state: 1 // observer$2 update, state: 2 // observer$2 update, state: 2 // observer$2 update, state: 2 // observer$3 update, state: 3 // observer$3 update, state: 3 // observer$3 update, state: 3
迭代器(Iterator
)模式又叫遊標(Sursor
)模式,迭代器具備 next
方法,能夠順序訪問一個聚合對象中的各個元素,而不須要暴露該對象的內部表現。數據庫
迭代器模式能夠把迭代的過程從從業務邏輯中分離出來,迭代器將使用者和目標對象隔離開來,即便不瞭解對象的內部構造,也能夠經過迭代器提供的方法順序訪問其每一個元素。npm
瞭解更多可迭代對象:「JS篇」你不知道的 JS 知識點總結(一)編程
使用 ES5 建立一個迭代器
//建立一個迭代類,傳入目標對象 function Iterator(container) { this.list = container.list; this.index = 0; //定義私有的next方法,執行迭代 this.next = function() { if(this.hasNext()) { //判斷是否迭代完畢 return { value: this.list[this.index++], done: false } } return {value: null, done: true} } this.hasNext = function() { if(this.index >= this.list.length) { return false; } return true; } } //定義目標對象 function Container(list) { this.list = list; this.getIterator = function() { return new Iterator(this); //用戶返回一個迭代器 } } //調用 var container = new Container([1, 2, 3, 4, 5]); var iterator = container.getIterator(); iterator.next(); // {value: 1, done: false} iterator.next(); // {value: 2, done: false} iterator.next(); // {value: 3, done: false} iterator.next(); // {value: 4, done: false} iterator.next(); // {value: 5, done: false} iterator.next(); // {value: null, done: true}
使用 ES6 構造一個迭代器
class Iterator { constructor(container) { this.list = container.list; this.index = 0; } next() { if(this.hasNext()) { return { value: this.list[this.index++], done: false } } return {value: null, done: true} } hasNext() { if(this.index >= this.list.length) { return false; } return true; } } class Container { constructor(list) { this.list = list; } getIterator() { return new Iterator(this); } } let container = new Container([1, 2, 3, 4, 5]); let iterator = container.getIterator(); iterator.next(); // {value: 1, done: false} iterator.next(); // {value: 2, done: false} iterator.next(); // {value: 3, done: false} iterator.next(); // {value: 4, done: false} iterator.next(); // {value: 5, done: false} iterator.next(); // {value: null, done: true}
使用 ES6 的 Symbol.iterator
建立一個迭代器
var list = [1, 2, 3, 4, 5]; var iterator = list[Symbol.iterator](); iterator.next(); // {value: 1, done: false} iterator.next(); // {value: 2, done: false} iterator.next(); // {value: 3, done: false} iterator.next(); // {value: 4, done: false} iterator.next(); // {value: 5, done: false} iterator.next(); // {value: null, done: true}
經過上邊的示例代碼咱們能夠得知,咱們不瞭解對象的內部構造,可是能夠經過調用迭代器提供的 next() 方法就能順序訪問其每一個元素。
在這裏能夠將一系列的鼠標點擊、鍵盤點擊產生的事件和將要處理的元素集合看做一種流, 流的特色是數據源的自己是無限的,流在管道中傳輸, 而且能夠在管道的節點上進行處理, 好比篩選, 排序,聚合等。
流數據源(source
)通過數據轉換等中間操做的處理,最後由最終操做獲得前面處理的結果,每次轉換原有 Stream 對象不改變,返回一個新的 Stream 對象(能夠有屢次轉換),這就容許對其操做能夠像鏈條同樣排列,變成一個管道。
爲了對 stream 有一個更感性的認識,咱們說點擊事件能夠看做一種 stream,在介紹 RxJS 以前,咱們不妨先看一個 RxJS 官網上的例子(列舉官方的例子更能充分體現 RxJS 是基於可觀測數據流 Stream 的)。
一般,註冊一個事件偵聽器是這樣的。
document.addEventListener('click', () => console.log('Clicked!'));
使用 RxJS 能夠建立一個 observable
。
import { fromEvent } from 'rxjs'; fromEvent(document, 'click').subscribe(() => console.log('Clicked!'));
結合上邊講到流和設計模式,爲了方便咱們對 RxJS 有進一步的認識,咱們就用代碼本身來實現一個 Obserable
。
其實 Observable 實際上就是一個函數,它接收一個Observer 對象做爲參數,返回一個函數用來取消訂閱。Observer 對象能夠聲明 next、err、complete
方法來處理流的不一樣狀態。
首先咱們定義數據源 Source
// 建立數據源 class Source { constructor() { this.state = 0; this.data = setInterval(() => this.emit(this.state++), 200); } emit(state) { const limit = 10; // 定義數據上限 if (this.onData) { this.onData(state); // 產生數據 } if (state === limit) { if (this.onComplete) { this.onComplete(); // 數據終止 } this.destroy(); } } destroy() { //中止定時器,清除數據 clearInterval(this.data); } }
建立一個 Observable
// 建立 Observable class Observable { constructor() {} getStream() { return new Source(); } subscribe(observer) { // 獲取流數據源 this.stream = this.getStream(); // 轉換 this.stream.onData = (e) => observer.next(e); // 處理流數據 this.stream.onError = (err) => observer.error(err); //處理異常 this.stream.onComplete = () => observer.complete(); //處理流數據終止 // 返回一個函數 return () => { this.stream.destroy(); } } }
調用 subscribe 進行訂閱
const observable = new Observable(); //訂閱 let observer = { next(data) { console.log(data); }, error(err) { console.error(err); }, complete() { console.log('done')} } const unsubscribe = observable.subscribe(observer);
輸出結果
咱們能夠調用 unsubscribe
取消訂閱
//0.5後取消訂閱 setTimeout(unsubscribe, 500);
咱們能夠看到 Observable 做爲生產者與觀察者之間的橋樑,並返回一種方法來解除生產者與觀察者之間的聯繫,其中觀察者用於處理時間序列上數據流。
介紹完 RxJS 的一些前置知識點,下面就讓咱們一塊兒來認識下什麼是 RxJS 吧。
RxJS 中含有兩個基本概念:Observables
與 Observer
。Observables 做爲被觀察者,是一個值或事件的流集合;而 Observer 則做爲觀察者,根據 Observables 進行處理。
Observables 與 Observer 之間的訂閱發佈關係(觀察者模式) 以下:
Observable 屬於全新的 push
體系,讓咱們先了解下什麼是 pull
體系和 push
體系吧。
Pull
和 Push
是兩種不一樣的協議,描述了數據生產者如何與數據消費者進行通訊。
生產者 | 消費者 | |
---|---|---|
pull | 被請求的時候產生數據 | 決定什麼時候請求數據 |
push | 按本身的節奏生產數據 | 對接收的數據進行處理 |
在 Pull
體系中,數據的消費者決定什麼時候從數據生產者那裏獲取數據,而生產者自身並不會意識到何時數據將會被髮送給消費者。
每個 JavaScript函數
都是一個 Pull
體系,函數是數據的生產者,調用函數的代碼經過 '拉出' 一個單一的返回值來消費該數據。
function add(x, y) { console.log('Hello'); return x + y; } const x = add(4, 5);
ES6介紹了 Iterator
迭代器 和 Generator
生成器,另外一種 Pull
體系,調用 iterator.next()
的代碼是消費者,可從中拉取多個值。
在 Push
體系中,數據的生產者決定什麼時候發送數據給消費者,消費者不會在接收數據以前意識到它將要接收這個數據。
Promise
是當今 JS 中最多見的 Push
體系,一個 Promise
(數據的生產者)發送一個 resolved value
(成功狀態的值)來執行一個回調(數據消費者)。可是不一樣於函數的地方的是:Promise
決定着什麼時候數據才被推送至這個回調函數。
RxJS 引入了 Observable (可觀察對象),一個全新的 Push 體系。一個可觀察對象是一個產生多值的生產者,當產生新數據的時候,會主動 "推送給" Observer (觀察者)。
Observable(可觀察對象)是基於推送(Push)運行時執行(lazy)的多值集合。
MagicQ | 單值 | 多值 |
---|---|---|
拉取(Pull) | Function | Iterator |
推送(Push) | Promise | Observable |
Observable
於 Promise
之間的差別:
使用 RxJS 咱們可使用 npm 進行安裝(更多使用方法請參考 github):
npm install rxjs
須要注意的是,不少人認爲 RxJS 中的全部操做都是異步的,但其實這個觀念是錯的。RxJS 的核心特性是它的異步處理能力,但它也是能夠用來處理同步的行爲。具體示例以下:
import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); console.log('start'); observable.subscribe({ next(x) { console.log(x); }, error(err) { console.error(err); }, complete() { console.log('done'); } }); console.log('end');
以上代碼運行後,控制檯的輸出結果:
start 1 2 3 done end
固然咱們也能夠用它處理異步行爲:
import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); setTimeout(() => { subscriber.next(4); subscriber.complete(); }, 1000); }); console.log('start'); observable.subscribe({ next(x) { console.log(x); }, error(err) { console.error(err); }, complete() { console.log('done'); } }); console.log('end');
代碼運行後的輸出結果爲:
start 1 2 3 end 4 done
RxJS 中提供了不少操做符 Operators
,下篇文章咱們將對 Operators
進行介紹,建立類操做符(Creation Operator
)用於建立 Observable 對象。
官網列舉的一些建立類操做符以下:
最後咱們簡單的來看一下,如何使用 from
建立一個 Observable(關於操做符的介紹咱們將在下篇文章進行詳細介紹,保持關注哦)。
from
:能夠把數組、Promise、以及 Iterable 轉化爲 Observable。
//將數組轉換爲 Observable: import { from } from 'rxjs'; const array = [10, 20, 30]; const result = from(array); result.subscribe(x => console.log(x)); // Logs: // 10 // 20 // 30
因爲 RxJS 涉及到的概念和知識點比較寬泛和複雜,咱們須要一步一步的去理解和掌握它,最終能夠作到知其然亦知其因此然。接下來文章中會繼續介紹 RxJS 中涉及到知識點,關注此公衆號webinfoq
不要錯過哦。