探索 RxJS - Core Concept

Steam in ReactiveX

圖片描述

ReactiveX,又稱 Reactive Extensions(響應式擴展),其中的 X 表明各類語言。由於它實質上只是一個事件流處理庫,不須要什麼其餘依賴。javascript


講一講 ReactiveX 中的 「流」:html

個人理解是,Rx 經過 「流」 的概念,將事件串聯成一個個事件流,各個事件流之間還可進行 "並聯" 的做用。當某個流上的事件被調用時,就能夠觸發咱們設定好的監聽回調。java

那麼什麼樣的事件能夠成爲流呢?答案是任何事件。不管是異步非阻塞事件(setTimeOut、網絡請求等),仍是同步可阻塞事件(點擊事件、對迭代器的遍歷等),一切都是流。此時不得不祭出一張神棍圖:react

圖片描述

事件的串聯是流。比方說,用戶對一個按鈕進行了猛烈的點擊,全部的點擊事件就是一個流;再或者,併發多個網絡請求,它們也是一個流。而 Rx 的主要做用,就是爲流的處理提供了一整套的解決方案:將不一樣的流進行組合,或者監聽事件的觸發及時給予響應等等。git

Quick Intro

RxJS 的做者曾在 The introduction to Reactive Programming you've been missing 一文中詳細講解了 RxJS 的初步使用和流的概念,咱們先僅看文中的一幅圖來理解 Rx 的概念:github

圖片描述

這是一個屢次點擊事件造成的事件流,在從左往右的時間線上有不少個點擊事件,而每一個點擊事件的時間間隔各不相同。經過 Rx 咱們能夠對流上的各個事件進行篩選,並獲取到在某一段時間內的連續點擊次數。api

做者本身吐槽過那篇文章實在太長,因此又有了後來的這篇 2 minute introduction to Rx 文章。在文章中有以下解釋:緩存

咱們在頁面上的點擊事件就能夠組成流。好比一個記錄每次點擊時座標的流。隨機點擊頁面屢次以後,可能會產生這樣一個流:網絡

圖片描述

而咱們能夠經過 RxJS 中的方法對這個流內的各個事件進行篩選,好比選出橫座標 x < 250 的點擊:併發

filter( (event) -> event.x < 250 )

圖片描述

也就是說,咱們能夠像對待 JavaScript 中可遍歷對象同樣,對流上的各個事件進行遍歷,選出符合條件的事件。這就是 Rx 的魅力所在。

主要運用場景

既然 Rx 是爲了流而生的,那麼最佳運用場景固然是面對一系列較複雜的事件流時了。

含有異步請求和事件觸發的混合流

好比,用戶在一個 input 內輸入文字。每次keyup的時候就會根據輸入內容,請求 Wikipedia 的 API 進行搜索:

var input = document.getElementById('input');
// 經過 fromEvent 以及 input keyup 事件建立一個流
var dictionarySuggest = Rx.Observable.fromEvent(input, 'keyup')
  // 獲取到每次 keyup 時的input value,並經過 filter 保證其合法性
  .map(function () { return input.value; })
  .filter(function (text) { return !!text; })
  .distinctUntilChanged()
  .debounce(250)
  // searchWikipedia 爲異步請求方法
  .flatMapLatest(searchWikipedia)
  .subscribe(
      // onNext
    function (results) {
      list = [];
      list.concat(results.map(createItem));
    },
    // onError
    function (err) {
      logError(err);
    }
  );

咱們建立了一個流來處理從用戶keyup,到searchWikipedia,再處處理網絡請求結果這一系列事件,而且在其中對事件進行了篩選判斷:

  • filter 剔除掉不合法的值

  • distinctUntilChanged 當用戶按下例如 左、右 這種按鈕時,不會改變 input 的值,但也會觸發keyup事件。這種時候就徹底沒有必要重複發送異步請求了。distinctUntilChanged會剔除流中有着相同的值的元素

  • debounce 在過了一段指定的時間還沒觸發事件時才觸發下一個事件。也就是說,在打字過程當中,若是用戶在指定事件間隔(250ms)內沒有再打字,則觸發下一個事件(searchWikipedia);不然咱們認爲用戶在連續打字,因此不會頻繁的發送網絡請求

  • flatMapLatest

    • 首先,它是一個flatMap方法。它用一個指定的函數(searchWikipedia)對原始流中的每一項數據執行變換操做,並返回一個ObservableflatMap將全部的返回值組成一個新的流。

    • 其次,flatMapLatest擁有flatMap的所有特性。但不一樣的是,在flatMapLatest的遍歷調用過程當中,若是一個事件 A 尚未觸發完畢獲取到返回值,就觸發了下一個事件 B,則將忽略 A 返回的值。這樣,咱們就能夠避免 A 異步的返回值由於返回較慢,反而覆蓋了以後 B 異步的返回值。用圖解釋以下:

圖片描述

  • subscribe 建立對流的監聽,並提供了成功和失敗的回調

而在傳統的編寫方法裏,咱們可能會建立 input 的keyup監聽事件,並緩存上一次的值;每次keyup時,要判斷當前值是否合法,而且與上一次的值不同。除此之外,還要建立一個定時器,每隔一段時間就用合法的值去請求searchWikipedia方法 --- 即使這樣,也沒法保證不在用戶連續打字時發送請求。

能夠看到,在咱們把事件串成流並進行處理以後,要比傳統的編寫方式方便不少。

處理一系列的異步請求隊列

假設咱們要讀取一個 4GB 的大文件,將其加密後寫入到一個新文件裏。直接將整個文件讀到內存裏再加密、寫入確定是不行的,反之,咱們依賴 RxJS 的流,建立多個讀取、加密、寫入事件,造成三個流出來:

  • 文件讀取流:每次調用方法時異步讀取 64k 的文件

  • 加密流:對讀取的文件進行加密

  • 寫入流:將加密好的內容異步寫入新文件

  • 最後對整個observable進行監聽

var fs = require('fs');
var Rx = require('rx');

// Read/write from stream implementation
function readAsync(fd, chunkSize) { /* impl */ }
function appendAsync(fd, buffer) { /* impl */ }
function encrypt(buffer) { /* impl */}

// 打開一個 4GB 的文件,每次只讀取 64k
var inFile = fs.openSync('4GBfile.txt', 'r+');
var outFile = fs.openSync('Encrypted.txt', 'w+');

readAsync(inFile, 2 << 15)
  .map(encrypt)
  .flatMap(function (data) {
    return appendAsync(outFile, data);
  })
  .subscribe(
      // onNext
    function () { },
    // onError
    function (err) {
      console.log('An error occurred while encrypting the file: %s', err.message);
      fs.closeSync(inFile);
      fs.closeSync(outFile);
    },
    // onCompleted
    function () {
      console.log('Successfully encrypted the file.');
      fs.closeSync(inFile);
      fs.closeSync(outFile);
    }
  );

由此能夠看出,在應對較複雜的事件流或者處理多個異步事件的時候,使用 RxJS 會有必定優點;但若是複雜度沒有這麼高的時候則沒有太大的使用必要。

目前爲止,本文基本介紹了 RxJS 的核心概念 --- 針對事件流的管理與掌控。

在下一篇文章裏,咱們將會利用 RxJS,完成一個簡單的 github 應用。戳:探索 RxJS - 作一個 github 小應用可查看文章,rxjs-example查看案例源碼。

相關文章
相關標籤/搜索