RxJS

一.簡介html

RxJS 是一個庫,它經過使用 observable 序列來編寫異步和基於事件的程序。它提供了一個核心類型 Observable,附屬類型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 啓發的操做符 (map、filter、reduce、every, 等等),這些數組操做符能夠把異步事件做爲集合來處理。react

能夠把 RxJS 當作是用來處理事件的 Lodash 。es6

二.基本概念npm

Observable (可觀察對象): 表示一個概念,這個概念是一個可調用的將來值或事件的集合。編程

Observer (觀察者): 觀察者是由 Observable 發送的值的消費者。觀察者只是一組回調函數的集合,每一個回調函數對應一種 Observable 發送的通知類型:next、error 和 complete 。下面的示例是一個典型的觀察者對象:json

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
複製代碼

要使用觀察者,須要把它提供給 Observable 的 subscribe 方法:數組

observable.subscribe(observer);異步

觀察者只是有三個回調函數的對象,每一個回調函數對應一種 Observable 發送的通知類型async

Subscription (訂閱): 表示 Observable 的執行,主要用於取消 Observable 的執行。 Subscription 是表示可清理資源的對象,一般是 Observable 的執行。Subscription 有一個重要的方法,即 unsubscribe,它不須要任何參數,只是用來清理由 Subscription 佔用的資源。在上一個版本的 RxJS 中,Subscription 叫作 "Disposable" (可清理對象)。函數式編程

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
複製代碼

// Observable 執行是經過使用觀察者調用 subscribe 方法啓動的

subscription.unsubscribe();
複製代碼

Subscription 基本上只有一個 unsubscribe() 函數,這個函數用來釋放資源或去取消 Observable 執行

Operators (操做符): 採用函數式編程風格的純函數 (pure function),使用像 map、filter、concat、flatMap 等這樣的操做符來處理集合。

操做符是 Observable 類型上的方法,好比 .map(...)、.filter(...)、.merge(...),等等。當操做符被調用時,它們不會改變已經存在的 Observable 實例。相反,它們返回一個新的 Observable ,它的 subscription 邏輯基於第一個 Observable 。

操做符是函數,它基於當前的 Observable 建立一個新的 Observable。這是一個無反作用的操做:前面的 Observable 保持不變。

操做符本質上是一個純函數 (pure function),它接收一個 Observable 做爲輸入,並生成一個新的 Observable 做爲輸出。訂閱輸出 Observable 一樣會訂閱輸入 Observable 。在下面的示例中,咱們建立一個自定義操做符函數,它將從輸入 Observable 接收的每一個值都乘以10:

function multiplyByTen(input) {

  var output = Rx.Observable.create(function subscribe(observer) {
  
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
    
  });
  
  return output;
  
}

var input = Rx.Observable.from([1, 2, 3, 4]);

var output = multiplyByTen(input);

output.subscribe(x => console.log(x));

輸出:

10
20
30
40
複製代碼

Subject (主體): 至關於 EventEmitter,而且是將值或事件多路推送給多個 Observer 的惟一方式。

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); 
複製代碼

// 你能夠提供一個 Subject 進行訂閱 執行結果:

observerA: 1

observerB: 1

observerA: 2

observerB: 2

observerA: 3

observerB: 3

使用上面的方法,咱們基本上只是經過 Subject 將單播的 Observable 執行轉換爲多播的。這也說明了 Subjects 是將任意 Observable 執行共享給多個觀察者的惟一方式。

Schedulers (調度器): 調度器控制着什麼時候啓動 subscription 和什麼時候發送通知。 讓你規定 Observable 在什麼樣的執行上下文中發送通知給它的觀察者

var observable = Rx.Observable.create(function (observer) {

        observer.next(1);
  
        observer.next(2);
  
        observer.next(3);
  
        observer.complete();
  
})
.observeOn(Rx.Scheduler.async);

console.log('just before subscribe');

observable.subscribe({

      next: x => console.log('got value ' + x),
      
      error: err => console.error('something wrong occurred: ' + err),
      
      complete: () => console.log('done'),
  
});
console.log('just after subscribe');
輸出結果:

just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
複製代碼

三. 安裝

1.經過 npm 安裝 ES6 版本 npm install rxjs 導入整個核心功能集:

import Rx from 'rxjs/Rx';

Rx.Observable.of(1,2,3)
經過打補丁的方式只導入所須要的(這對於減小 bundling 的體積是十分有用的):

import { Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/map';

Observable.of(1,2,3).map(x => x + '!!!'); // 等等
只導入須要的而且使用被提議的綁定操做符:

注意:這個額外的預發須要編譯器支持而且此語法可能會在沒有任何通知的狀況下徹底從 TC39 撤回!要使用的話須要你本身來承擔風險。

import { Observable } from 'rxjs/Observable';
import { of } from 'rxjs/observable/of';
import { map } from 'rxjs/operator/map';

Observable::of(1,2,3)::map(x => x + '!!!'); // 等等
複製代碼

2.經過 npm 安裝 CommonJS 版本 npm install rxjs 導入全部核心功能:

var Rx = require('rxjs/Rx');

Rx.Observable.of(1,2,3); // 等等
經過打補丁的方式只導入所須要的(這對於減小 bundling 的體積是十分有用的):

var Observable = require('rxjs/Observable').Observable;
// 使用適合的方法在 Observable 上打補丁
require('rxjs/add/observable/of');
require('rxjs/add/operator/map');

Observable.of(1,2,3).map(function (x) { return x + '!!!'; }); // 等等
導入操做符並手動地使用它們(這對於減小 bundling 的體積也十分有用):

var of = require('rxjs/observable/of').of;
var map = require('rxjs/operator/map').map;

map.call(of(1,2,3), function (x) { return x + '!!!'; });
還可使用上面的方法來構建你本身的 Observable 並將其從你本身的模塊中導出。
複製代碼

3.使用 TypeScript 的 CommonJS 模式 當使用 RxJS 時收到了像 error TS2304: Cannot find name 'Promise' 或 error TS2304: Cannot find name 'Iterable' 這樣的報錯信息,那麼你可能須要安裝額外的 typings 。

對於使用 typings 的用戶:

typings install es6-shim --ambient

若是沒有使用 typings 的話,能夠從 /es6-shim/es6-shim.d.ts 拷貝定義好的接口。

在 tsconfig.json 或 CLI 參數中添加類型定義文件。
複製代碼

4.經過 npm 全部全模塊類型 (CJS/ES6/AMD/TypeScript) 要安裝這個庫須要 npm 3及以上版本,使用下面的命令行:

npm install @reactivex/rxjs
若是你使用的仍是 npm 2的話,那麼在這個庫升級至穩定版以前,須要明確地指定庫的版本號:

npm install @reactivex/rxjs@5.0.0-beta.1
複製代碼

5.CDN 對於 CDN,可使用 unpkg 。只須要用當前的版本號來替換下面連接中的 version:

對於 RxJS 5.0.0-beta.1 到 beta.11: https://unpkg.com/@reactivex/rxjs@version/dist/global/Rx.umd.js

對於 RxJS 5.0.0-beta.12 及以上版本: https://unpkg.com/@reactivex/rxjs@version/dist/global/Rx.js
複製代碼

四.第一個示例 註冊事件監聽器的常規寫法。

var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

**使用 RxJS 的話,建立一個 observable 來代替。**

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .subscribe(() => console.log('Clicked!'));  
複製代碼

更多用法:cn.rx.js.org/manual/tuto…

手冊:cn.rx.js.org/manual/inde…

相關文章
相關標籤/搜索