RxJS基礎教程

RxJS是一個基於可觀測數據流在異步編程應用中的庫。javascript

ReactiveX is a combination of the best ideas from
the Observer pattern, the Iterator pattern, and functional programming

正如官網所說,RxJS是基於觀察者模式,迭代器模式和函數式編程。所以,首先要對這幾個模式有所理解php

觀察者模式

window.addEventListener('click', function(){
  console.log('click!');
})

JS的事件監聽就是天生的觀察者模式。給window的click事件(被觀察者)綁定了一個listener(觀察者),當事件發生,回調函數就會被觸發css

<!-- more -->html

迭代器模式

迭代器模式,提供一種方法順序訪問一個聚合對象中的各類元素,而又不暴露該對象的內部表示。前端

ES6裏的Iterator便可實現:java

let arr = ['a', 'b', 'c'];
let iter = arr[Symbol.iterator]();

iter.next() // { value: 'a', done: false }
iter.next() // { value: 'b', done: false }
iter.next() // { value: 'c', done: false }
iter.next() // { value: undefined, done: true }

反覆調用迭代對象的next方法,便可順序訪問git

函數式編程

提到函數式編程,就要提到聲明式編程和命令式編程
函數式編程是聲明式編程的體現github

問題:將數組[1, 2, 3]的每一個元素乘以2,而後計算總和。ajax

命令式編程編程

const arr = [1, 2, 3];
let total = 0;

for(let i = 0; i < arr.length; i++) {
  total += arr[i] * 2;
}

聲明式編程

const arr = [1, 2, 3];
let total = arr.map(x => x * 2).reduce((total, value) => total + value)

聲明式的特色是專一於描述結果自己,不關注到底怎麼到達結果。而命令式就是真正實現結果的步驟

聲明式編程把原始數據通過一系列轉換(map, reduce),最後獲得想要的數據

如今前端流行的MVC框架(Vue,React,Angular),也都是提倡:編寫UI結構時使用聲明式編程,在編寫業務邏輯時使用命令式編程

RxJS

RxJS裏有兩個重要的概念須要咱們理解:
Observable (可觀察對象)
Observer (觀察者)

var btn = document.getElementById('btn');
var handler = function() {
  console.log('click');
}
btn.addEventListener('click', handler)

上面這個例子裏:
btn這個DOM元素的click事件就是一個Observable
handler這個函數就是一個Observer,當btn的click事件被觸發,就會調用該函數

改用RxJS編寫;

Rx.Observable.fromEvent(btn, 'click')
.subscribe(() => console.log('click'));

fromEvent把一個event轉成了一個Observable,而後它就能夠被訂閱subscribe

流stream

Observable其實就是數據流stream
是在時間流逝的過程當中產生的一系列事件。它具備時間與事件響應的概念。

咱們能夠把一切輸入都當作數據流來處理,好比說:

  • 用戶操做
  • 網絡響應
  • 定時器
  • Worker

產生新流

當產生了一個流後,咱們能夠經過操做符(Operator)對這個流進行一系列加工操做,而後產生一個新的流

Rx.Observable.fromEvent(window, 'click')
  .map(e => 1)
  .scan((total, now) => total + now)
  .subscribe(value => {
    console.log(value)
  })

map把流轉換成了一個每次產生1的新流,而後scan相似reduce,也會產生一個新流,最後這個流被訂閱。最終實現了:每次點擊累加1的效果

能夠用一個效果圖來表示該過程:
gif

合併流

也能夠對若干個數據流進行組合:

例子:咱們要實現下面這個效果:

gif

Rx.Observable.fromEvent(document.querySelector('input[name=plus]'), 'click')
  .mapTo(1)
  .merge(
    Rx.Observable.fromEvent(document.querySelector('input[name=minus]'), 'click')
      .mapTo(-1)
  )
  .scan((total, now) => total + now)
  .subscribe(value => {
    document.querySelector('#counter').innerText = value;
  })

merge能夠把兩個數據流整個在一塊兒,效果能夠參考以下:

gif

剛纔那個例子的數據流以下:

gif

以RxJS的寫法,就是把按下加1當成一個數據流,把按下減1當成一個數據流,再經過merge把兩個數據流合併,最後經過scan操做符,把新流上的數據累加,這就是咱們想要的計數器效果

扁平化流

有時候,咱們的Observable送出的是一個新的Observable:

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.of(1, 2, 3));
source.subscribe(value => {
  console.log(value)
});

這裏,console打印出來的是對象,而不是咱們想要的1,2,3,這是由於map返回的Rx.Observable.of(1, 2, 3)自己也是個Observable

用圖表示以下:

click  : ------c------------c--------

        map(e => Rx.Observable.of(1,2,3))

source : ------o------------o--------
                \            \
                 (123)|       (123)|

所以,咱們訂閱到的value值就是一個Observable對象,而不是普通數據1,2,3

我想要的其實不是Observable自己,而是屬於這個Observable裏面的那些東西,如今這個情形就是Observable裏面又有Observable,有兩層,但是我想要讓它變成一層就好,該怎麼辦呢?

這就須要把Observable扁平化

const arr = [1, [2, 3], 4];

// 扁平化後:
const flatArr = [1, 2, 3, 4];

concatAll這個操做符就能夠把Observable扁平化

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.of(1, 2, 3));
var example = source.concatAll();

example.subscribe(value => {
  console.log(value)
})
click  : ------c------------c--------

        map(e => Rx.Observable.of(1,2,3))

source : ------o------------o--------
                \            \
                 (123)|       (123)|

                   concatAll()

example: ------(123)--------(123)------------

flatMap操做符也能夠實現一樣的做用,就是寫法有些不一樣:

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.flatMap(e => Rx.Observable.of(1, 2, 3));
source.subscribe(value => {
  console.log(value)
})
click  : ------c------------c--------

        flatMap(e => Rx.Observable.of(1,2,3))

source: ------(123)--------(123)------------

簡單拖拽實例

學完前面幾個操做符,咱們就能夠寫一個簡單的實例了

拖拽的原理是:

  • 監聽拖拽元素的mousedown
  • 監聽body的mousemove
  • 監聽body的mouseup
<style type="text/css">
html, body {
  height: 100%;
  background-color: tomato;
  position: relative;
}

#drag {
  position: absolute;
  display: inline-block;
  width: 100px;
  height: 100px;
  background-color: #fff;
  cursor: all-scroll;
}
</style>
<div id="drag"></div>
const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown');
const mouseUp = Rx.Observable.fromEvent(body, 'mouseup');
const mouseMove = Rx.Observable.fromEvent(body, 'mousemove');

首先給出3個Observable,分別表明3種事件,咱們但願mousedown的時候監聽mousemove,而後mouseup時中止監聽,因而RxJS能夠這麼寫:

const source = mouseDown
.map(event => mouseMove.takeUntil(mouseUp))

takeUntil操做符能夠在某個條件符合時,發送complete事件

source: -------e--------------e-----
                \              \
                  --m-m-m-m|     -m--m-m--m-m|

從圖上能夠看出,咱們還須要把source扁平化,才能獲取所需數據。

完整代碼:

const dragDOM = document.getElementById('drag');
const body = document.body;

const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown');
const mouseUp = Rx.Observable.fromEvent(body, 'mouseup');
const mouseMove = Rx.Observable.fromEvent(body, 'mousemove');

mouseDown
    .flatMap(event => mouseMove.takeUntil(mouseUp))
    .map(event => ({ x: event.clientX, y: event.clientY }))
    .subscribe(pos => {
        dragDOM.style.left = pos.x + 'px';
        dragDOM.style.top = pos.y + 'px';
    })

Observable Observer

前面的例子,咱們都在討論fromEvent轉換的Observable,其實還有不少種方法產生一個Observable,其中create也是一種常見的方法,能夠用來建立自定義的Observable

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

控制檯執行的結果:

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

Observable 執行能夠傳遞三種類型的值:

"Next" 通知: 發送一個值,好比數字、字符串、對象,等等。
"Error" 通知: 發送一個 JavaScript 錯誤 或 異常。
"Complete" 通知: 再也不發送任何值。
"Next" 通知是最重要,也是最多見的類型:它們表示傳遞給觀察者的實際數據。"Error" 和 "Complete" 通知可能只會在 Observable 執行期間發生一次,而且只會執行其中的一個。

var observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err); // 若是捕獲到異常會發送一個錯誤
  }
});

Observer觀察者只是一組回調函數的集合,每一個回調函數對應一種 Observable 發送的通知類型:next、error 和 complete 。

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

Observer和Observable是經過subscribe方法創建聯繫的

observable.subscribe(observer);

unsubscribe

observer訂閱了Observable以後,還能夠取消訂閱

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// 稍後:
subscription.unsubscribe();

unsubscribe陷阱:

let stream$ = new Rx.Observable.create((observer) => {
  let i = 0;
  let id = setInterval(() => {
    console.log('setInterval');
    observer.next(i++);
  },1000)
})

let subscription = stream$.subscribe((value) => {
  console.log('Value', value)
});

setTimeout(() => {
  subscription.unsubscribe();
}, 3000)

3秒後雖然取消了訂閱,可是開啓的setInterval定時器並不會自動清理,咱們須要本身返回一個清理函數

let stream$ = new Rx.Observable.create((observer) => {
  let i = 0;
  let id = setInterval(() => {
    observer.next(i++);
  },1000)

  // 返回了一個清理函數
  return function(){
    clearInterval( id );
  }
})

let subscription = stream$.subscribe((value) => {
  console.log('Value', value)
});

setTimeout(() => {
  subscription.unsubscribe() // 在這咱們調用了清理函數

}, 3000)

Ajax異步操做

<input type="text">
function sendRequest(search) {
  return Rx.Observable.ajax.getJSON(`http://deepred5.com/cors.php?search=${search}`)
    .map(response => response)
}

Rx.Observable.fromEvent(document.querySelector('input'), 'keyup')
  .map(e => e.target.value)
  .flatMap(search => sendRequest(search))
  .subscribe(value => {
    console.log(value)
  })

用戶每次在input框每次進行輸入,均會觸發ajax請求,而且每一個ajax返回的值都會被打印一遍

如今須要實現這樣一個功能:
但願用戶在300ms之內中止輸入,才發送請求(防抖),而且console打印出來的值只要最近的一個ajax返回的

Rx.Observable.fromEvent(document.querySelector('input'), 'keyup')
  .debounceTime(300)
  .map(e => e.target.value)
  .switchMap(search => sendRequest(search))
  .subscribe(value => {
    console.log(value)
  })

debounceTime表示通過n毫秒後,沒有流入新值,那麼纔將值轉入下一個環節
switchMap能取消上一個已無用的請求,只保留最後的請求結果流,這樣就確保處理展現的是最後的搜索的結果

能夠看到,RxJS對異步的處理是很是優秀的,對異步的結果能進行各類複雜的處理和篩選。

React + Redux 的異步解決方案:redux-observable

Redux的action都是同步的,因此默認狀況下也只能處理同步數據流。

爲了生成異步action,處理異步數據流,有許多不一樣的解決方案,例如 redux-thunk、redux-promise、redux-saga 等等。

redux-thunk舉例:

調用一個異步API,首先要先定義三個同步action構造函數,分別表示

  • 請求開始
  • 請求成功
  • 請求失敗

而後再定義一個異步action構造函數,該函數再也不是返回普通的對象,而是返回一個函數,在這個函數裏,進行ajax異步操做,而後根據返回的成功和失敗,分別調用前面定義的同步action

actions.js

export const FETCH_STARTED = 'WEATHER/FETCH_STARTED';
export const FETCH_SUCCESS = 'WEATHER/FETCH_SUCCESS';
export const FETCH_FAILURE = 'WEATHER/FETCH_FAILURE';

// 普通action構造函數,返回普通對象
export const fetchWeatherStarted = () => ({
  type: FETCH_STARTED
});

export const fetchWeatherSuccess = (result) => ({
  type: FETCH_SUCCESS,
  result
})

export const fetchWeatherFailure = (error) => ({
  type: FETCH_FAILURE,
  error
})

// 異步action構造函數,返回一個函數
export const fetchWeather = (cityCode) => {
  return (dispatch) => {
    const apiUrl = `/data/cityinfo/${cityCode}.html`;

    dispatch(fetchWeatherStarted())

    return fetch(apiUrl).then((response) => {
      if (response.status !== 200) {
        throw new Error('Fail to get response with status ' + response.status);
      }

      response.json().then((responseJson) => {
        dispatch(fetchWeatherSuccess(responseJson.weatherinfo));
      }).catch((error) => {
        dispatch(fetchWeatherFailure(error));
      });
    }).catch((error) => {
      dispatch(fetchWeatherFailure(error));
    })
  };
}

如今若是想要異步請求,只要:

// fetchWeather是個異步action構造函數
dispatch(fetchWeather('23333'));

咱們再來看看redux-observable:

調用一個異步API,再也不須要定義一個異步action構造函數,全部的action構造函數都只是返回普通的對象

那麼ajax請求在哪裏發送?

答案是在Epic進行異步操做

Epic是redux-observable的核心原語。
它是一個函數,接收 actions 流做爲參數而且返回 actions 流。 Actions 入, actions 出.
export const FETCH_STARTED = 'WEATHER/FETCH_STARTED';
export const FETCH_SUCCESS = 'WEATHER/FETCH_SUCCESS';
export const FETCH_FAILURE = 'WEATHER/FETCH_FAILURE';

export const fetchWeather = cityCode => ({ type: FETCH_STARTED, cityCode });
export const fetchWeatherSuccess = result => (
  { type: FETCH_SUCCESS, result };
);
export const fetchWeatherFailure = (error) => (
  {
    type: FETCH_FAILURE,
    error
  }
)

export const fetchWeatherEpic = action$ =>
  action$.ofType(FETCH_STARTED)
    .mergeMap(action =>
      ajax.getJSON(`/data/cityinfo/${action.cityCode}.html`)
        .map(response => fetchWeatherSuccess(response.weatherinfo))
        // 這個處理異常的action必須使用Observable.of方法轉爲一個observable
        .catch(error => Observable.of(fetchWeatherFailure(error)))
    );

如今若是想要異步請求,只要:

// fetchWeather只是個普通的action構造函數
dispatch(fetchWeather('23333'));

相較於thunk中間件,使用redux-observable來處理異步action,有如下優勢:

  • 不須要修改action構造函數,返回的仍然是普通對象
  • epics中間件會將action封裝成Observable對象,可使用RxJs的相應api來控制異步流程,它就像一個擁有許多高級功能的Promise,如今咱們在Redux中也能夠獲得它的好處。

總結

原生JS傳統解決異步的方式:callback、Generator、Promise、async/await

RxJS解決的是數據流的問題,它可讓批量數據處理起來更方便

能夠想象的一些使用場景:

  • 多個服務端實時消息流,經過RxJS進行高階處理,最後到 view 層就是很清晰的一個Observable,可是view層自己處理用戶事件依然能夠沿用原有的範式。
  • 爬蟲抓取,每次對一個網站的前5頁作平行請求,每一個請求若是失敗就重試,重試3次以後再放棄。

能夠看出,這種須要對流進行復雜操做的場景更加適合RxJS

公司內部目前的大部分系統,前端就可能不太適合用RxJS,由於大部分是後臺CRUD系統,總體性、實時性的要求都不高,而且也沒有特別複雜的數據流操做

咱們推薦在適合RxJS的地方用RxJS,可是不強求RxJS for everything。RxJS給了咱們另外一種思考和解決問題的方式,但這不必定是必要的

參考

構建流式應用—RxJS詳解

但願是最淺顯易懂的RxJS教學

RxJS入門指引和初步應用

30天精通RxJS系列

相關文章
相關標籤/搜索