通俗的方式理解RxJS

通俗的方式理解Rx.js

序言

今早看民工叔的文章的時候, 發現對Rxjs所知甚少, 因而去官方看了下教程, 整理出一些東西, 寫成此文。
Rxjs聽說會在2017年流行起來, 由於其處理異步邏輯,數據流, 事件很是擅長。 可是其學習曲線相比Promise, EventEmitter陡峭了很多。 並且民工叔也說:"因爲RxJS的抽象程度很高,因此,能夠用很簡短代碼表達很複雜的含義,這對開發人員的要求也會比較高,須要有比較強的概括能力。" 本文就Rx.js的幾個核心概念作出闡述。 儘量以通俗易懂的方式解釋這些概念。要是本文有誤或不完善的地方,歡迎指出。html

Observable究竟是什麼

先上代碼:前端

let foo = Rx.Observable.create(observer => {
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(x => console.log(x));
foo.subscribe(y => console.log(y));

輸出react

"Hello"
42
"Hello"
42

這裏能夠把foo想象成一個函數,這意味着你每次調用foo都會致使傳入Rx.Observable.create裏的回調函數從新執行一次, 調用的方式爲foo.subscribe(callback), 至關於foo()。 接收函數返回值的方式也從var a = foo()改成經過傳入回調函數的方式獲取。第三行的observer.next表示返回一個值, 你能夠調用屢次,每次調用observer.next後, 會先將next裏的值返回給foo.subcribe裏的回調函數, 執行完後再返回。observer.complete, observer.error來控制流程。 具體看代碼:git

var observable = Rx.Observable.create(observer => {
  try {
    observer.next(1);
    console.log('hello');
    observer.next(2);
    observer.next(3);
    observer.complete();
    observer.next(4);
  } catch (err) {
    observer.error(err); 
  }
});

let = subcription = observable.subscribe(value => {
  console.log(value)
})

運行結果:github

1
hello
2
3

如上的第一個回調函數裏的結構是推薦的結構。 當observable的執行出現異常的時候,經過observer.error將錯誤返回, 然而observable.subscribe的回調函數沒法接收到.由於observer.complete已經調用, 所以observer.next(4)的返回是無效的. Observable不是能夠返回多個值的Promise 雖然得到Promise的值的方式也是經過then函數這種相似的方式, 可是new Promise(callback)裏的callback回調永遠只會執行一次!由於Promise的狀態是不可逆的數組

可使用其餘方式建立Observable, 看代碼:promise

var clicks = Rx.Observable.fromEvent(document, 'click');
clicks.subscribe(x => console.log(x));

當用戶對document產生一個click行爲的時候, 就會打印事件對象到控制檯上。app

Observer是什麼

先看代碼:dom

let foo = Rx.Observable.create(observer => {
  console.log('Hello');
  observer.next(42);
});

let observer = x => console.log(x);
foo.subscribe(observer);

代碼中的第二個變量就是observer. 沒錯, observer就是當Observable"返回"值的時候接受那個值的函數!第一行中的observer其實就是經過foo.subscribe傳入的callback. 只不過稍加封裝了。 怎麼封裝的? 看代碼:異步

let foo = Rx.Observable.create(observer => {
  try {
    console.log('Hello');
    observer.next(42);
    observer.complete();
    observer.next(10);
  } catch(e) { observer.error(e) }
  
});

let observer = {
  next(value) { console.log(value) },
  complete() { console.log('completed'),
  error(err) { console.error(err) }
}
foo.subscribe(observer);

你看到observer被定義成了一個對象, 其實這纔是完整的observer. 傳入一個callback到observable.subcribe至關於傳入了{ next: callback }

Subcription裏的陷阱

Subscription是什麼, 先上代碼:

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));

setTimeout(() => {
  subscription.unsubscribe();
}, 3100)

運行結果:

0
1
2

Rx.Observable.interval能夠返回一個可以發射(返回)0, 1, 2, 3..., n數字的Observable, 返回的時間間隔這裏是1000ms。 第二行中的變量就是subscription。 subscription有一個unsubscribe方法, 這個方法可讓subscription訂閱的observable發射的數據被observer忽略掉.通俗點說就是取消訂閱。

unsubscribe存在一個陷阱。 先看代碼:

var foo = Rx.Observable.create((observer) => {
  var i = 0
  setInterval(() => {
    observer.next(i++)
    console.log('hello')
  }, 1000)
})

const subcription = foo.subscribe((i) => console.log(i))
subcription.unsubscribe()

運行結果:

hello
hello
hello
......
hello

unsubscribe只會讓observer忽略掉observable發射的數據,可是setInterval依然會繼續執行。 這看起來彷佛是一個愚蠢的設計。 因此不建議這樣寫。

Subject

Subject是一種可以發射數據給多個observer的Observable, 這讓Subject看起來就好像是EventEmitter。 先上代碼:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

運行結果:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

與Observable不一樣的是, Subject發射數據給多個observer。 其次, 定義subject的時候並無傳入callback, 這是由於subject自帶next, complete, error等方法。從而能夠發射數據給observer。 這和EventEmitter很相似。observer並不知道他subscribe的是Obervable仍是Subject。 對observer來講是透明的。 並且Subject還有各類派生, 好比說:

BehaviorSubject 可以保留最近的數據,使得當有subscribe的時候,立馬發射出去。看代碼:

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

運行結果:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject 可以保留最近的一些數據, 使得當有subscribe的時候,將這些數據發射出去。看代碼:

var subject = new Rx.ReplaySubject(3); 

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

輸出結果:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

第一行的聲明表示ReplaySubject最大可以記錄的數據的數量是3。

AsyncSubject 只會發射結束前的一個數據。 看代碼:

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

輸出結果:

observerA: 5
observerB: 5

既然subject有next, error, complete三種方法, 那subject就能夠做爲observer! 看代碼:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject);

輸出結果:

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

也就是說, observable.subscribe能夠傳入一個subject來訂閱其消息。 這就好像是Rxjs中的一顆語法糖, Rxjs有專門的實現。

Multicasted Observables 是一種藉助Subject來將數據發射給多個observer的Observable。 看代碼:

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

multicasted.connect();

Rx.Observable.from可以逐一發射數組中的元素, 在multicasted.connect()調用以前的任何subscribe都不會致使source發射數據。multicasted.connect()至關於以前的observable.subscribe(subject)。所以不能將multicasted.connect()寫在subscribe的前面。由於這會致使在執行multicasted.connect()的時候source發射數據, 可是subject又沒保存數據, 致使兩個subscribe沒法接收到任何數據。

最好是第一個subscribe的時候可以獲得當前已有的數據, 最後一個unsubscribe的時候就中止Observable的執行, 至關於Observable發射的數據都被忽略。

refCount就是可以返回這樣的Observable的方法

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

輸出結果:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

What's Operators?

Observable上有不少方法, 好比說map, filter, merge等等。 他們基於調用它們的observable,返回一個全新的observable。 並且他們都是純方法。 operators分爲兩種, instance operators 和 static operators。 instance operators是存在於observable實例上的方法, 也就是實例方法; static operators是存在於Observable這個類型上的方法, 也就是靜態方法。Rxjs擁有不少強大的operators

本身實現一個operators:

function multiplyByTen(input) {
  var output = Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
  return output;
}

var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));

輸出結果:

10
20
30
40

Rx.js實踐

import React from 'react';
import ReactDOM from 'react-dom';
import Rx from 'rx';

class Main extends React.Component {
  constructor (props) {
    super(props);
    this.state = {count: 0};
  }

  // Click events are now observables! No more proactive approach.
  componentDidMount () {
    const plusBtn = document.getElementById('plus');
    const minusBtn = document.getElementById('minus');

    const plus$ = Rx.Observable.fromEvent(plusBtn, 'click').map(e => 1);
    const minus$ = Rx.Observable.fromEvent(minusBtn, 'click').map(e => -1);

    Rx.Observable.merge(plus$, minus$).scan((acc, n) => acc + n)
      .subscribe(value => this.setState({count: value}));
  }

  render () {
    return (
        <div>
          <button id="plus">+</button>
          <button id="minus">-</button>
          <div>count: {this.state.count}</div>
        </div>
    );
  }
}

ReactDOM.render(<Main/>, document.getElementById('app'));

merge用於合併兩個observable產生一個新的observable。 scan相似於Array中的reduce。 這個例子實現了點擊plus的時候+1, 點擊minus的時候-1。

Rx.js適用的場景

  • 多個複雜的異步或事件組合在一塊兒。
  • 處理多個數據序列

假如沒有被複雜的異步,事件, 數據序列困擾, 若是promise已經足夠的話, 就不必適用Rx.js。

Summary

  • Observable, Observer, Subscription, Subscrib, Subject概念。
  • RxJS適用於解決複雜的異步,事件問題。

文章參考

相關文章
相關標籤/搜索