ReactiveX,又稱 Reactive Extensions(響應式擴展),其中的 X 表明各類語言。由於它實質上只是一個事件流處理庫,不須要什麼其餘依賴。javascript
講一講 ReactiveX 中的 「流」:html
個人理解是,Rx 經過 「流」 的概念,將事件串聯成一個個事件流,各個事件流之間還可進行 "並聯" 的做用。當某個流上的事件被調用時,就能夠觸發咱們設定好的監聽回調。java
那麼什麼樣的事件能夠成爲流呢?答案是任何事件。不管是異步非阻塞事件(setTimeOut、網絡請求等),仍是同步可阻塞事件(點擊事件、對迭代器的遍歷等),一切都是流。此時不得不祭出一張神棍圖:react
事件的串聯是流。比方說,用戶對一個按鈕進行了猛烈的點擊,全部的點擊事件就是一個流;再或者,併發多個網絡請求,它們也是一個流。而 Rx 的主要做用,就是爲流的處理提供了一整套的解決方案:將不一樣的流進行組合,或者監聽事件的觸發及時給予響應等等。git
RxJS 的做者曾在 The introduction to Reactive Programming you've been missing 一文中詳細講解了 RxJS 的初步使用和流的概念,咱們先僅看文中的一幅圖來理解 Rx 的概念:github
這是一個屢次點擊事件造成的事件流,在從左往右的時間線上有不少個點擊事件,而每一個點擊事件的時間間隔各不相同。經過 Rx 咱們能夠對流上的各個事件進行篩選,並獲取到在某一段時間內的連續點擊次數。api
做者本身吐槽過那篇文章實在太長,因此又有了後來的這篇 2 minute introduction to Rx 文章。在文章中有以下解釋:緩存
咱們在頁面上的點擊事件就能夠組成流。好比一個記錄每次點擊時座標的流。隨機點擊頁面屢次以後,可能會產生這樣一個流:網絡
而咱們能夠經過 RxJS 中的方法對這個流內的各個事件進行篩選,好比選出橫座標 x < 250 的點擊:併發
filter( (event) -> event.x < 250 )
也就是說,咱們能夠像對待 JavaScript 中可遍歷對象同樣,對流上的各個事件進行遍歷,選出符合條件的事件。這就是 Rx 的魅力所在。
既然 Rx 是爲了流而生的,那麼最佳運用場景固然是面對一系列較複雜的事件流時了。
好比,用戶在一個 input 內輸入文字。每次keyup
的時候就會根據輸入內容,請求 Wikipedia 的 API 進行搜索:
var input = document.getElementById('input'); // 經過 fromEvent 以及 input keyup 事件建立一個流 var dictionarySuggest = Rx.Observable.fromEvent(input, 'keyup') // 獲取到每次 keyup 時的input value,並經過 filter 保證其合法性 .map(function () { return input.value; }) .filter(function (text) { return !!text; }) .distinctUntilChanged() .debounce(250) // searchWikipedia 爲異步請求方法 .flatMapLatest(searchWikipedia) .subscribe( // onNext function (results) { list = []; list.concat(results.map(createItem)); }, // onError function (err) { logError(err); } );
咱們建立了一個流來處理從用戶keyup
,到searchWikipedia
,再處處理網絡請求結果這一系列事件,而且在其中對事件進行了篩選判斷:
filter
剔除掉不合法的值
distinctUntilChanged
當用戶按下例如 左、右 這種按鈕時,不會改變 input 的值,但也會觸發keyup
事件。這種時候就徹底沒有必要重複發送異步請求了。distinctUntilChanged
會剔除流中有着相同的值的元素
debounce
在過了一段指定的時間還沒觸發事件時才觸發下一個事件。也就是說,在打字過程當中,若是用戶在指定事件間隔(250ms)內沒有再打字,則觸發下一個事件(searchWikipedia);不然咱們認爲用戶在連續打字,因此不會頻繁的發送網絡請求
首先,它是一個flatMap
方法。它用一個指定的函數(searchWikipedia)對原始流中的每一項數據執行變換操做,並返回一個Observable
,flatMap
將全部的返回值組成一個新的流。
其次,flatMapLatest
擁有flatMap
的所有特性。但不一樣的是,在flatMapLatest
的遍歷調用過程當中,若是一個事件 A 尚未觸發完畢獲取到返回值,就觸發了下一個事件 B,則將忽略 A 返回的值。這樣,咱們就能夠避免 A 異步的返回值由於返回較慢,反而覆蓋了以後 B 異步的返回值。用圖解釋以下:
subscribe
建立對流的監聽,並提供了成功和失敗的回調
而在傳統的編寫方法裏,咱們可能會建立 input 的keyup
監聽事件,並緩存上一次的值;每次keyup
時,要判斷當前值是否合法,而且與上一次的值不同。除此之外,還要建立一個定時器,每隔一段時間就用合法的值去請求searchWikipedia
方法 --- 即使這樣,也沒法保證不在用戶連續打字時發送請求。
能夠看到,在咱們把事件串成流並進行處理以後,要比傳統的編寫方式方便不少。
假設咱們要讀取一個 4GB 的大文件,將其加密後寫入到一個新文件裏。直接將整個文件讀到內存裏再加密、寫入確定是不行的,反之,咱們依賴 RxJS 的流,建立多個讀取、加密、寫入事件,造成三個流出來:
文件讀取流:每次調用方法時異步讀取 64k 的文件
加密流:對讀取的文件進行加密
寫入流:將加密好的內容異步寫入新文件
最後對整個observable
進行監聽
var fs = require('fs'); var Rx = require('rx'); // Read/write from stream implementation function readAsync(fd, chunkSize) { /* impl */ } function appendAsync(fd, buffer) { /* impl */ } function encrypt(buffer) { /* impl */} // 打開一個 4GB 的文件,每次只讀取 64k var inFile = fs.openSync('4GBfile.txt', 'r+'); var outFile = fs.openSync('Encrypted.txt', 'w+'); readAsync(inFile, 2 << 15) .map(encrypt) .flatMap(function (data) { return appendAsync(outFile, data); }) .subscribe( // onNext function () { }, // onError function (err) { console.log('An error occurred while encrypting the file: %s', err.message); fs.closeSync(inFile); fs.closeSync(outFile); }, // onCompleted function () { console.log('Successfully encrypted the file.'); fs.closeSync(inFile); fs.closeSync(outFile); } );
由此能夠看出,在應對較複雜的事件流或者處理多個異步事件的時候,使用 RxJS 會有必定優點;但若是複雜度沒有這麼高的時候則沒有太大的使用必要。
目前爲止,本文基本介紹了 RxJS 的核心概念 --- 針對事件流的管理與掌控。
在下一篇文章裏,咱們將會利用 RxJS,完成一個簡單的 github 應用。戳:探索 RxJS - 作一個 github 小應用可查看文章,rxjs-example查看案例源碼。