RxJS 是一套處理異步編程的 API,那麼我將從異步講起。css
前端編程中的異步有:事件(event)、AJAX、動畫(animation)、定時器(timer)。html
回調地獄就是指層層嵌套的回調函數,形成代碼難以理解,而且難以協調組織複雜的操做。前端
競態條件出現的緣由是沒法保證異步操做的完成會和他們開始時的順序同樣,所以最終結果不可控。好比常見的 AutoComplete 效果,每次輸入後向後端發送請求獲取結果展現在搜索框下面,因爲網絡、後端數據查詢等緣由有可能出現最後發送的請求比以前的請求更快地完成了,這時最終展示的並非最後那個請求的結果,而這並非咱們所但願的。vue
這裏說的內存泄漏指的是單頁應用切換頁面時因爲忘記在合適的時機移除監聽事件形成的內存泄漏。react
異步帶來了狀態的改變,可能會使狀態管理變得很是複雜,尤爲是某個狀態有多個來源時,好比有些應用,一開始有一個默認值,再經過 AJAX 獲取初始狀態,存儲在 localStorage,以後經過 WebSocket 獲取更新。這時查詢狀態多是同步或者異步的,狀態的變動多是主動獲取也多是被動推送的,若是還有各類排序、篩選,狀態管理將會更加複雜。ios
JavaScript 中的 try/catch 只能捕獲同步的錯誤,異步的錯誤不易處理。git
使用 Promise 能夠減輕一些異步問題,如將回調函數變爲串行的鏈式調用,統一同步和異步代碼等,async/await 中也可使用 try/catch 來捕獲錯誤。可是對於複雜的場景,仍然難於處理。並且 Promise 還有其餘的問題,一是隻有一個結果,二是不能夠取消。github
異步編程時不只要面對這些問題,還有下面這些使用方式各異的 API:web
而若是使用 RxJS,能夠用統一的 API 來進行處理,並且藉助 RxJS 各類強大的操做符,咱們能夠更簡單地實現咱們的需求。ajax
咱們都知道 JS 是什麼,那麼什麼是 Rx 呢?Rx 是 Reactive Extension(也叫 ReactiveX)的簡稱,指的是實踐響應式編程的一套工具,Rx 官網首頁的介紹是一套經過可監聽流來作異步編程的 API(An API for asynchronous programming with observable streams)。
Rx 最先是由微軟開發的 LinQ 擴展出來的開源項目,以後由開源社區維護,有多種語言的實現,如 Java 的 RxJava,Python 的 RxPY 等,而 RxJS 就是 Rx 的 JavaScript 語言實現。
RxJS 引入了兩種重要的編程思想:函數式編程和響應式編程。
函數式編程(Functional Programming,簡稱 FP)是一種編程範式,強調使用函數來思考問題、編寫代碼。
In computer science, functional programming is a programming paradigm—a style of building the structure and elements of computer programs—that treats computation as the evaluation of mathematical functions and avoids changing-state and mutable data.
函數式編程的主要設計點在於避免使用狀態和可變的數據,即 stateless and immutable。
函數式編程對函數的使用有一些特殊要求:
聲明式的函數,讓開發者只須要表達」想要作什麼」,而不須要表達「怎麼去作」。
純函數指的是執行結果由輸入參數決定,參數相同時結果相同,不受其餘數據影響,而且不會帶來反作用(Side Effect)的函數。反作用指的是函數作了和自己運算返回值沒有關係的事情,如修改外部變量或傳入的參數對象,甚至是執行 console.log 都算是 Side Effect。前端中常見的反作用有發送 http 請求、操做 DOM、調用 alert 或者 confirm 函數等。知足純函數的特性也叫作引用透明度(Referential Transparency)。
數據不可變就是指這個數據一旦產生,它的值就永遠不會變。JavaScript 中字符串類型和數字類型就是不可改變的,而對象基本都是可變的,可能會帶來各類反作用。如今有各類庫能夠實現 Immutable 特性,如 immutable.js 和 immer.js
中文維基上說響應式編程(Reactive Programming)是一種面向數據流(stream)和變化傳播的編程範式。我的的理解是對數據流進行編程的一種編程範式,使用各類函數建立、組合、過濾數據流,而後經過監聽這個數據流來響應它的變化。響應式編程抽象出了流這個概念,提升了代碼的抽象級別,咱們不用去關心大量的實現細節,而專一於對數據流的操做。
響應式流能夠認爲是隨着時間發出的一系列元素。響應式和觀察者模式有點類似,訂閱者訂閱後,發佈者吐出數據時,訂閱者會響應式進行處理。實際上Rx 組合了觀察者模式(Observer pattern )、迭代器模式(Iterator pattern)和函數式編程。
RxJS 是上面兩種編程思想的結合,可是對於它是否是函數響應式編程(FRP)有比較大的爭議,由於它雖然既是函數式又是響應式可是不符合早期 FRP 的定義。
前端中的 DOM 事件、WebSocket 推送消息、AJAX 請求資源、動畫均可以看做是數據流。
RxJS 對數據採用「推」的方式,當一個數據產生時,會將其推送給對應的處理函數,這個處理函數不用關心數據時同步產生仍是異步產生的,所以處理異步將會變得很是簡單。
RxJS 中不少操做符,每一個操做符都提供了一個小功能,學習 RxJS 最重要的就是學習如何組合操做符來解決複雜問題。
RxJS 倉庫如今移到了 ReactiveX 組織下,最新的大版本爲 6,與以前的版本相比有許多破壞性變動,請注意。
RxJS 的 import 路徑有如下 5 種:
建立 Observable 的方法、types、schedulers 和一些工具方法
import { Observable, Subject, asapScheduler, pipe, of, from, interval, merge, fromEvent, SubscriptionLike, PartialObserver } from 'rxjs';
操做符 operators
import { map, filter, scan } from 'rxjs/operators';
webSocket
import { webSocket } from 'rxjs/webSocket';
ajax
import { ajax } from 'rxjs/ajax';
測試
import { TestScheduler } from 'rxjs/testing';
本文全部 demo 均在 v6.2.1 中測試過
import { fromEvent } from 'rxjs'; import { take } from 'rxjs/operators'; const eleBtn = document.querySelector('#btn') const click$ = fromEvent(eleBtn, 'click') click$.pipe(take(1)) .subscribe(e => { console.log('只可點擊一次') eleBtn.setAttribute('disabled', '') })
這裏演示了 RxJS 的大概用法,經過 fromEvent 將點擊事件轉換爲 RxJS 的 Observable (響應式數據流),take(1) 表示只操做一次,觀察者經過訂閱(subscribe)來響應變化。具體 API 的使用會在後面講到。
表明流的變量用 $ 符號結尾,是 RxJS 中的一種慣例。
RxJS 有一個核心和三個重點,一個核心是 Observable 再加上相關的 Operators,三個重點分別是 Observer、Subject、Schedulers。
我的認爲在文檔中說的 Observable 更確切的說法是 Observable Stream,也就是 Rx 的響應式數據流。
在 RxJS 中 Observable 是可被觀察者,觀察者則是 Observer,它們經過 Observable 的 subscribe 方法進行關聯。
前面提到了 RxJS 結合了觀察者模式和迭代器模式。
對於觀察者模式,咱們其實比較熟悉了,好比各類 DOM 事件的監聽,也是觀察者模式的一種實踐。核心就是發佈者發佈事件,觀察者選擇時機去訂閱(subscribe)事件。
在 ES6 中,Array、String 等可遍歷的數據結構原生部署了迭代器(Iterator )接口。
const numbers = [1, 2, 3] const iterator = numbers[Symbol.iterator]() iterator.next() // {value: 1, done: false} iterator.next() // {value: 2, done: false} iterator.next() // {value: 3, done: false} iterator.next() // {value: undefined, done: true}
觀察者模式和迭代器模式的相同之處是二者都是漸進式使用數據的,只不過從數據使用者的角度來講,觀察者模式數據是推送(push)過來的,而迭代器模式是本身去拉取(pull)的。Rx 中的數據是 Observable 推送的,觀察者不須要主動去拉取。
Observable 與 Array 至關相似,均可以看做是 Collection,只不過 Observable 是 a collection of items over time,是隨時間發出的一序列元素,因此下面咱們會看到 Observable 的一些操做符與 Array 的方法極其類似。
要建立一個 Observable,只要給 new Observable 傳遞一個接收 observer 參數的回調函數,在這個函數中去定義如何發送數據。
import { Observable } from 'rxjs'; const source$ = new Observable(observer => { observer.next(1) observer.next(2) observer.next(3) }) const observer = { next : item => console.log(item) } console.log('start') source$.subscribe(observer) console.log('end')
上面的代碼經過 new Observable 建立了一個 Observable,調用它的 subscribe 方法進行訂閱,執行結果爲依次輸出 'start',1,2,3,'end'。
下面咱們再看一個異步的例子:
import { Observable } from 'rxjs'; const source$ = new Observable(observer => { let number = 1 setInterval(() => { observer.next(number++) }, 1000) }) const observer = { next : item => console.log(item) } console.log('start') source$.subscribe(observer) console.log('end')
先輸出 ’start' 、'end',而後每隔 1000 ms 輸出一個遞增的數字。
經過這兩個小例子,咱們知道 RxJS 既能處理同步的行爲,也能處理異步的。
觀察者 Observer 是一個有三個方法的對象:
error:當 Observable 內部發生錯誤時被調用,以後不會調用 complete,next 方法無效
const source$ = new Observable(observer => { observer.next(1) observer.next(2) observer.complete() observer.next(3) }) const observer = { next: item => console.log(item), complete: () => console.log('complete') } source$.subscribe(observer)
上面的代碼會輸出 1,2,'complete',而不會輸出 3。
const source$ = new Observable(observer => { try { observer.next(1) observer.next(2) throw new Error('there is an exception') observer.complete() } catch (e) { observer.error(e) } }) const observer = { next: item => console.log(item), error: e => console.log(e), complete: () => console.log('complete') } source$.subscribe(observer)
注意 error 以後不會再調用 complete。
Observer 還有簡單形式,即不用構建一個對象,而是直接把函數做爲 subscribe 方法的參數。
source$.subscribe( item => console.log(item), e => console.log(e), () => console.log('complete') )
參數依次爲 next 、error、complete,後面兩個參數能夠省略。
咱們傳給 new Observable 的回調函數若是沒有訂閱是不會執行的,訂閱一個 Observable 就像是執行一個函數,和下面的函數相似。這和咱們常見的那種內部保存有觀察者列表的觀察者模式是不一樣的,Observable 內部沒有這個觀察者列表。
function subscribe (observer) { let number = 1 setInterval(() => { observer.next(number++) }, 1000) } subscribe({ next: item => console.log(item), error: e => console.log(e), complete: () => console.log('complete') })
觀察者想退訂,只要調用訂閱返回的對象的 unsubscribe 方法,這樣觀察者就不再會接受到 Observable 的信息了。
const source$ = new Observable(observer => { let number = 1 setInterval(() => { observer.next(number++) }, 1000) }) const observer = { next : item => console.log(item) } const subscription = source$.subscribe(observer) setTimeout(() => { subscription.unsubscribe() }, 5000)
在 RxJS 中,操做符是用來處理數據流的。咱們每每須要對數據流作一系列處理,才交給 Observer,這時一個操做符就像一個管道同樣,數據進入管道,完成處理,流出管道。
import { interval } from 'rxjs'; import { map } from 'rxjs/operators' const source$ = interval(1000).pipe( map(x => x * x) ) source$.subscribe(x => console.log(x))
interval 操做符創造了一個數據流,interval(1000) 會產生一個每隔 1000 ms 就發出一個從 0 開始遞增的數據。map 操做符和數組的 map 方法相似,能夠對數據流進行處理。具體見演示地址。
這個 map 和數組的 map 方法會產生新的數組相似,它會產生新的 Observable。每個操做符都會產生一個新的 Observable,不會對上游的 Observable 作任何修改,這徹底符合函數式編程「數據不可變」的要求。
上面的 pipe 方法就是數據管道,會對數據流進行處理,上面的例子只有一個 map 操做符進行處理,能夠添加更多的操做符做爲參數。
彈珠圖(Marble diagrams)就是用圖例形象地表示 Observable 和各類操做符的一種方法。
用 - 表示一小段時間,X 表明有錯誤發生, | 表示結束,() 表示同步發生。
上面的例子能夠以下表示:
source: -----0-----1-----2-----3--... map(x => x * x) newest: -----0-----1-----4-----9--...
具體關於彈珠圖的使用能夠查看這個網站http://rxmarbles.com/。
建立 Observable 的這些方法就是用來建立 Observable 數據流的,注意和操做符不一樣,它們是從 rxjs 中導入的,而不是 rxjs/operators 。
以前咱們寫的這種形式:
const source$ = new Observable(observer => { observer.next(1) observer.next(2) observer.next(3) observer.complete() })
使用 of 方法將會很是簡潔:
import {of} from 'rxjs' const source$ = of(1, 2, 3)
上面的代碼用 from 則是這樣:
import {from} from 'rxjs' const source$ = from([1, 2, 3])
from 能夠將可遍歷的對象(iterable)轉化爲一個 Observable,字符串也部署有 iterator 接口,因此也支持。
from 還能夠根據 promise 建立一個 Observable。咱們用 fetch 或者 axios 等類庫發送的請求都是一個 promise 對象,咱們可使用 from 將其處理爲一個 Observable 對象。
用 DOM 事件建立 Observable,第一個參數爲 DOM 對象,第二個參數爲事件名稱。具體示例見前面 RxJS 入門章節的一個簡單例子。
將添加事件處理器、刪除事件處理器的 API 轉化爲 Observable。
function addClickHandler (handler) { document.addEventListener('click', handler) } function removeClickHandler (handler) { document.removeEventListener('click', handler) } fromEventPattern( addClickHandler, removeClickHandler ).subscribe(x => console.log(x))
也能夠是咱們本身實現的和事件相似,擁有註冊監聽和移除監聽的 API。
import { fromEventPattern } from 'rxjs' class EventEmitter { constructor () { this.handlers = {} } on (eventName, handler) { if (!this.handlers[eventName]) { this.handlers[eventName] = [] } if(typeof handler === 'function') { this.handlers[eventName].push(handler) } else { throw new Error('handler 不是函數!!!') } } off (eventName, handler) { this.handlers[eventName].splice(this.handlers[eventName].indexOf(handler), 1) } emit (eventName, ...args) { this.handlers[eventName].forEach(handler => { handler(...args) }) } } const event = new EventEmitter() const subscription = fromEventPattern( event.on.bind(event, 'say'), event.off.bind(event, 'say') ).subscribe(x => console.log(x)) let timer = (() => { let number = 1 return setInterval(() => { if (number === 5) { clearInterval(timer) timer = null } event.emit('say', number++) }, 1000) })() setTimeout(() => { subscription.unsubscribe() }, 3000)
interval 和 JS 中的 setInterval 相似,參數爲間隔時間,下面的代碼每隔 1000 ms 會發出一個遞增的整數。
interval(1000).subscribe(console.log) // 0 // 1 // 2 // ...
timer 則能夠接收兩個參數,第一個參數爲發出第一個值須要等待的時間,第二個參數爲以後的間隔時間。第一個參數能夠是數字,也能夠是一個 Date 對象,第二個參數可省。
操做符 of 產生較少的數據時能夠直接寫如 of(1, 2, 3),可是若是是 100 個呢?這時咱們可使用 range 操做符。
range(1, 100) // 產生 1 到 100 的正整數
empty 是建立一個當即完結的 Observable,throwError 是建立一個拋出錯誤的 Observable,never 則是建立一個什麼也不作的 Observable(不完結、不吐出數據、不拋出錯誤)。這三個操做符單獨用時沒有什麼意義,主要用來與其餘操做符進行組合。目前官方不推薦使用 empty 和 never 方法,而是推薦使用常量 EMPTY 和 NEVER(注意不是方法,已是一個 Observable 對象了)。
defer 建立的 Observable 只有在訂閱時纔會去建立咱們真正想要操做的 Observable。defer 延遲了建立 Observable,而又有一個 Observable 方便咱們去訂閱,這樣也就推遲了佔用資源。
defer(() => ajax(ajaxUrl))
只有訂閱了纔會去發送 ajax 請求。
操做符其實看做是處理數據流的管道,每一個操做符實現了針對某個小的具體應用問題的功能,RxJS 編程最大的難點其實就是如何去組合這些操做符從而解決咱們的問題。
在 RxJS 中,有各類各樣的操做符,有轉化類、過濾類、合併類、多播類、錯誤處理類、輔助工具類等等。通常不須要本身去實現操做符,可是咱們須要知道操做符是一個函數,實現的時候必須考慮如下功能:
以前版本的 RxJS 各類操做符都掛載到了全局 Observable 對象上,能夠這樣鏈式調用:
source$.filter(x => x % 2 === 0).map(x => x * 2)
如今須要這樣使用:
import {filter, map} from 'rxjs/operators' source$.pipe( filter(x => x % 2 === 0), map(x => x * 2) )
其實也很好理解,pipe 就是管道的意思,數據流經過操做符處理,流出而後交給下一個操做符。
map、filter 和數組的 map、filter 方法相似,scan 則是和 reduce 方法相似,mapTo 是將全部發出的數據映射到一個給定的值。
import {mapTo} from 'rxjs/operators' fromEvent(document, 'click').pipe( mapTo('Hi') ).subscribe(x => console.log(x))
每次點擊頁面時都會輸出 Hi。
skipLast 是從數據流中忽略最後發出的若干數據
import { interval } from 'rxjs'; import { take } from 'rxjs/operators'; interval(1000).pipe( take(3) ).subscribe( x => console.log(x), null, () => console.log('complete') ) // 0 // 1 // 2 // 'complete'
使用了 take(3),表示只取 3 個數據,Observable 就進入完結狀態。
import { interval, fromEvent } from 'rxjs' import { takeUntil } from 'rxjs/operators' interval(1000).pipe( takeUntil(fromEvent(document.querySelector('#btn'), 'click')) ).subscribe( x => { document.querySelector('#time').textContent = x + 1 }, null, () => console.log('complete') )
這裏有一個 interval 建立的數據流一直在發出數據,直到當用戶點擊按鈕時中止計時,見演示。
合併類操做符用來將多個數據流合併。
1)concat、merge
concat、merge 都是用來把多個 Observable 合併成一個,可是 concat 要等上一個 Observable 對象 complete 以後纔會去訂閱第二個 Observable 對象獲取數據並把數據傳給下游,而 merge 時同時處理多個 Observable。使用方式以下:
import { interval } from 'rxjs' import { merge, take } from 'rxjs/operators' interval(500).pipe( take(3), merge(interval(300).pipe(take(6))) ).subscribe(x => console.log(x))
能夠點此去比對效果,concat 的結果應該比較好理解,merge 藉助彈珠圖也比較好理解,它是在時間上對數據進行了合併。
source : ----0----1----2| source2: --0--1--2--3--4--5| merge() example: --0-01--21-3--(24)--5|
merge 的邏輯相似 OR,常常用來多個按鈕有部分相同行爲時的處理。
注意最新的官方文檔和RxJS v5.x 到 6 的更新指南中指出不推薦使用 merge、concat、combineLatest、race、zip 這些操做符方法,而是推薦使用對應的靜態方法。
將上面的 merge 改爲從 rxjs 中導入,使用方式變成了合併多個 Observable,而不是一個 Observable 與其餘 Observable 合併。
import { interval,merge } from 'rxjs' import { take } from 'rxjs/operators' merge( interval(500).pipe(take(3)), interval(300).pipe(take(6)) ).subscribe(x => console.log(x))
2)concatAll、mergeAll、switchAll
用來將高階的 Observable 對象壓平成一階的 Observable,和 loadash 中壓平數組的 flatten 方法相似。concatAll 會對內部的 Observable 對象作 concat 操做,和 concat 操做符相似,若是前一個內部 Observable 沒有完結,那麼 concatAll 不會訂閱下一個內部 Observable,mergeAll 則是同時處理。switchAll 比較特殊一些,它老是切換到最新的內部 Observable 對象獲取數據。上游高階 Observable 產生一個新的內部 Observable 時,switchAll 就會當即訂閱最新的內部 Observable,退訂以前的,這也就是 ‘switch’ 的含義。
import { interval } from 'rxjs'; import { map, switchAll, take } from 'rxjs/operators'; interval(1500).pipe( take(2), map(x => interval(1000).pipe( map(y => x + ':' + y), take(2)) ), switchAll() ).subscribe(console.log) // 0:0 // 1:0 // 1:1
內部第一個 Observable 對象的第二個數據還沒來得及發出,第二個 Observable 對象就產生了。
3)concatMap、mergeMap、switchMap
從上面的例子咱們也能夠看到高階 Observable 經常是由 map 操做符將每一個數據映射爲 Observable 產生的,而咱們訂閱的時候須要將其壓平爲一階 Observable,而就是要先使用 map 操做符再使用 concatAll 或 mergeAll 或 switchAll 這些操做符中的一個。RxJS 中提供了對應的更簡潔的 API。使用的效果能夠用下面的公式表示:
concatMap = map + concatAll mergeMap = map + mergeAll switchMap = map + switchAll
4)zip、combineLatest、withLatestFrom
zip 有拉鍊的意思,這個操做符和拉鍊的類似之處在於數據必定是一一對應的。
import { interval } from 'rxjs'; import { zip, take } from 'rxjs/operators'; const source$ = interval(500).pipe(take(3)) const newest$ = interval(300).pipe(take(6)) source$.pipe( zip(newest$, (x, y) => x + y) ).subscribe(x => console.log(x)) // 0 // 2 // 4
zip 是內部的 Observable 都發出相同順序的數據後才交給下游處理,最後一個參數是可選的 resultSelector 參數,這個函數用來處理操做符的結果。上面的示例運行過程以下:
上面若是沒有傳遞最後一個參數 resultSelector 函數,將會依次輸出數組 [0, 0]、[1, 1]、[2, 2]。在更新指南中,官方指出不推薦使用 resultSelector 參數,將會在 v7 中移除。加上以前提到的推薦使用靜態方法,這個示例應該改爲這樣:
import { interval, zip } from 'rxjs'; import { take, map } from 'rxjs/operators'; const source$ = interval(500).pipe(take(3)) const newest$ = interval(300).pipe(take(6)) const add = (x, y) => x + y zip(source$, newest$).pipe( map(x => add(...x)) ).subscribe(x => console.log(x))
使用 zip 當有數據流吐出數據很快,而有數據流發出值很慢時,要當心數據積壓的問題。這時快的數據流已經發出了不少數據,因爲對應的數據還沒發出,RxJS 只能保存數據,快的數據流不斷地發出數據,積壓的數據愈來愈多,消耗的內存也會愈來愈大。
combineLatest 與 zip 不一樣,只要其餘的 Observable 已經發出過值就行,顧名思義,就是與其餘 Observable 最近發出的值結合。
import { interval, combineLatest } from 'rxjs'; import { take } from 'rxjs/operators'; const source$ = interval(500).pipe(take(3)) const newest$ = interval(300).pipe(take(6)) combineLatest(source$, newest$).subscribe(x => console.log(x)) // [0, 0] // [0, 1] // [0, 2] // [1, 2] // [1, 3] // [2, 3] // [2, 4] // [2, 5]
withLatestFrom 沒有靜態方法,只有操做符方法,前面的方法全部 Observable 地位是平等的,而這個方法是使用這個操做符的 Observable 起到了主導做用,即只有它發出值纔會進行合併產生數據發出給下游。
import { interval } from 'rxjs'; import { take, withLatestFrom } from 'rxjs/operators'; const source$ = interval(500).pipe(take(3)) const newest$ = interval(300).pipe(take(6)) source$.pipe( withLatestFrom(newest$) ).subscribe(x => console.log(x)) // [0, 0] // [1, 2] // [2, 4]
5)startWith、forkJoin、race
startWith 是在 Observable 的一開始加入初始數據,同步當即發送,經常使用來提供初始狀態。
import { fromEvent, from } from 'rxjs'; import { startWith, switchMap } from 'rxjs/operators'; const source$ = fromEvent(document.querySelector('#btn'), 'click') let number = 0 const fakeRequest = x => { return new Promise((resolve, reject) => { setTimeout(() => { resolve(number++) }, 1000) }) } source$.pipe( startWith('initData'), switchMap(x => from(fakeRequest(x))) ).subscribe(x => document.querySelector('#number').textContent = x)
這裏經過 startWith 操做符獲取了頁面的初始數據,以後經過點擊按鈕獲取更新數據。
forkJoin 只有靜態方法形式,相似 Promise.all ,它會等內部全部 Observable 都完結以後,將全部 Observable 對象最後發出來的最後一個數據合併成 Observable。
race 操做符產生的 Observable 會徹底鏡像最早吐出數據的 Observable。
const obs1 = interval(1000).pipe(mapTo('fast one')); const obs2 = interval(3000).pipe(mapTo('medium one')); const obs3 = interval(5000).pipe(mapTo('slow one')); race(obs3, obs1, obs2) .subscribe( winner => console.log(winner) ); // result: // a series of 'fast one'
本文中的例子基原本自30 天精通 RxJS,使用 RxJS v6 版本進行重寫。
頁面上有一個 p 標籤存放一個狀態,初始爲 0,有兩個按鈕,一個按鈕點擊後這個狀態增長 1,另外一個按鈕點擊後這個狀態減小 1。
<button id="addButton">Add</button> <button id="minusButton">Minus</button> <p id="state"></p>
這兩個按鈕的點擊事件咱們均可以創建響應式數據流,可使用 mapTo(1) 和 mapTo(-1) 分別表示點擊後增長 1 和減小 1。咱們可使用 EMPTY 建立一個空的數據流來表示這個狀態,用 startWith 設定初始值。而後 merge 這兩個點擊的數據流,可是這還有一個問題,點擊事件的數據流須要與表示狀態的數據流進行邏輯計算,發出最終的狀態,咱們才能去訂閱這個最終的數據流來更改頁面的顯示。而這種累計計算的方法,能夠用 scan 操做符來實現。最終實現以下:
import { fromEvent, EMPTY, merge } from 'rxjs' import { mapTo, startWith, scan } from 'rxjs/operators' const addButton = document.getElementById('addButton') const minusButton = document.getElementById('minusButton') const state = document.getElementById('state') const addClick$ = fromEvent(addButton, 'click').pipe(mapTo(1)) const minusClick$ = fromEvent(minusButton, 'click').pipe(mapTo(-1)) merge( EMPTY.pipe(startWith(0)), addClick$, minusClick$) .pipe( scan((origin, next) => origin + next) ).subscribe(item => { state.textContent = item })
頁面上有一個 id 爲 drag 的 div:
<div id="drag"></div>
頁面 css:
html, body { height: 100%; background-color: tomato; position: relative; } #drag { position: absolute; width: 100px; height: 100px; background-color: #fff; cursor: all-scroll; }
要實現的功能以下:
實現思路:
咱們可使用 fromEvent 去轉化 DOM 事件
const mouseDown$ = fromEvent(eleDrag, 'mousedown') const mouseMove$ = fromEvent(eleBody, 'mousemove') const mouseUp$ = fromEvent(eleBody, 'mouseup')
對於鼠標按下這個數據流,每次鼠標按下事件發生時都轉成鼠標移動的數據流
mouseDown$.pipe( map(mouseDownEvent => mouseMove$) )
鼠標鬆開時,結束監聽鼠標移動,咱們能夠用 takeUntil 表示這個邏輯
mouseDown$.pipe( map(mouseDownEvent => mouseMove$.pipe( takeUntil(mouseUp$) )) )
上面的 map 操做符內將每次 mousedown 映射爲一個 Observable,造成了高階 Observable,咱們須要用 concatlAll 壓平,map 和 concatAll 連用,能夠用更簡潔的 concatMap
mouseDown$.pipe( concatMap(mouseDownEvent => mouseMove$.pipe( takeUntil(mouseUp$) )) )
訂閱這個 mousemove 數據流更新 div 位置。咱們能夠獲取 mousemove event 中的 clientX 和 clientY,減去初始鼠標按下時鼠標相對 div 元素的值來獲得最終 div 的絕對位置的 left 和 top。也可使用 withLatestFrom 操做符,見 demo。
mouseDown$.pipe( concatMap(mouseDownEvent => mouseMove$.pipe( map(mouseMoveEvent => ({ left: mouseMoveEvent.clientX - mouseDownEvent.offsetX, top: mouseMoveEvent.clientY - mouseDownEvent.offsetY })), takeUntil(mouseUp$) )) ).subscribe(position => { eleDrag.style.left = position.left + 'px' eleDrag.style.top = position.top + 'px' })
這裏是一個更復雜一些的例子,當頁面滑動到視頻出頁面時視頻 fixed 定位,這是能夠拖拽移動視頻位置。經過 getValidValue 對視頻拖拽的位置進行了一個限制。
把上游的多個數據緩存起來,當時機合適時再把匯聚的數據傳給下游。
1)buffer、bufferTime、bufferCount、bufferWhen、bufferToggle
對於 buffer 這一組操做符,數據匯聚的形式就是數組。
buffer 接收一個 Observable 做爲 notifier,當 notifier 發出數據時,將 緩存的數據傳給下游。
interval(300).pipe( take(30), buffer(interval(1000)) ).subscribe( x => console.log(x) ) // [0, 1, 2] // [3, 4, 5] // [6, 7, 8] // [9, 10, 11, 12]
bufferTime 是用時間來控制時機,上面能夠改爲 bufferTime(1000)
bufferCount 是用數量來控制時機,如 3 個一組,bufferCount(3)
bufferWhen 接收一個叫作 closeSelector 的參數,它應該返回一個 Observable。經過這個 Observable 來控制緩存。這個函數沒有參數。下面的方法等價於前面的 buffer:
interval(300).pipe( take(30), bufferWhen(() => { return interval(1000) }) ).subscribe( x => console.log(x) )
bufferToggle 和 buffer 的不一樣是能夠不斷地控制緩存窗口的開和關,一個參數是一個 Observable,稱爲 opening,第二個參數是稱爲 closeSelector 的一個函數。這個函數的參數是 opening 產生的數據。前一個參數用來控制緩存的開始時間,後一個控制緩存的結束。與 bufferWhen 相比,它的 closeSelector 能夠接收參數,控制性更強。
咱們可使用 buffer 來作事件的過濾,下面的代碼只有 500ms 內連續點擊兩次以上纔會輸出 ‘success’ 。
fromEvent(document.querySelector('#btn'), 'click').pipe( bufferTime(500), filter(arr => arr.length >= 2) ).subscribe( x => console.log('success') )
2)window、windowTime、windowCount、windowWhen、windowToggle
與前面的 buffer 相似,不過 window 緩存數據匯聚的形式是 Observable,所以造成了高階 Observable。
相似 lodash 的 debounce 和 throttle,用來下降事件的觸發頻率。
咱們作搜索時,經常要對輸入進行 debounce 來減小請求頻率。
fromEvent(document.querySelector('#searchInput'), 'input').pipe( debounceTime(300), map(e => e.target.value) ).subscribe( input => document.querySelector('#text').textContent = input // 發送請求 )
distinct 操做符能夠用來去重,將上游重複的數據過濾掉。
of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1).pipe( zip(interval(1000)), map(arr => arr[0]), distinct() ).subscribe(x => console.log(x))
上面的代碼只會輸出 1, 2, 3, 4
distinct 操做符還能夠接收一個 keySelector 的函數做爲參數,這是官網的一個 typescript 的例子:
interface Person { age: number, name: string } of<Person>( { age: 4, name: 'Foo' }, { age: 7, name: 'Bar' }, { age: 5, name: 'Foo' }, ).pipe( distinct((p: Person) => p.name), ).subscribe(x => console.log(x)) // { age: 4, name: 'Foo' } // { age: 7, name: 'Bar' }
distinctUntilChanged 也是過濾重複數據,可是隻會與上一次發出的元素比較。這個操做符比 distinct 更經常使用。distinct 要與以前發出的不重複的值進行比較,所以要在內部存儲這些值,要當心內存泄漏,而 distinctUntilChanged 只用保存上一個的值。
用來延遲上游 Observable 數據的發出。
delay 能夠接受一個數字(單位默認爲 ms)或者 date 對象做爲延遲控制。
const clicks = fromEvent(document, 'click') const delayedClicks = clicks.pipe(delay(1000)) // 全部點擊事件延遲 1 秒 delayedClicks.subscribe(x => console.log(x))
咱們前面介紹過 bufferWhen,dalayWhen 也帶有 when,在 RxJS 中,這種操做符它接收的參數都是 Observable Factory,即一個返回 Observable 對象的回調函數,用這個 Observable 來進行控制。
每一個 click 都延遲 0 至 5 秒之間的任意一個時間:
const clicks = fromEvent(document, 'click') const delayedClicks = clicks.pipe( delayWhen(event => interval(Math.random() * 5000)), ) delayedClicks.subscribe(x => console.log(x))
異常處理的難點:
對錯誤處理的處理能夠分爲兩類,即恢復(recover)和重試(retry)。
恢復是雖然發生了錯誤可是讓程序繼續運行下去。重試,是認爲這個錯誤是臨時的,重試嘗試發生錯誤的操做。實際中每每配合使用,由於通常重試是由次數限制的,當嘗試超過這個限制時,咱們應該使用恢復的方法讓程序繼續下去。
1)catchError
catchError 用來在管道中捕獲上游傳遞過來的錯誤。
interval(1000).pipe( take(6), map(x => { if (x === 4) { throw new Error('unlucky number 4') } else { return x } }), catchError(err => of(8)) ).subscribe(x => console.log(x)) // 0 // 1 // 2 // 3 // 8
catchError 中的回調函數返回了一個 Observable,當捕獲到上游的錯誤時,調用這個函數,返回的 Observable 中發出的數據會傳遞給下游。所以上面當 x 爲4 時發生了錯誤,會用 8 來替換。
catchError 中的回調函數除了接收錯誤對象爲參數外,還有第二個參數 caught$ 表示上游的 Observable 對象。若是回調函數返回這個 Observable 對象,就會進行重試。
interval(1000).pipe( take(6), map(x => { if (x === 4) { throw new Error('unlucky number 4') } else { return x } }), catchError((err, caught$) => caught$), take(20) ).subscribe(x => console.log(x))
這個代碼會依次輸出 5 次 0, 1, 2, 3。
2)retry
retry 能夠接收一個整數做爲參數,表示重試次數,若是是負數或者沒有傳參,會無限次重試。重試實際上就是退訂再從新訂閱。
interval(1000).pipe( take(6), map(x => { if (x === 4) { throw new Error('unlucky number 4') } else { return x } }), retry(5) // 重試 5 次 ).subscribe(x => console.log(x))
在實際開發中,若是是代碼緣由形成的錯誤,重試沒有意義,若是是由於外部資源致使的異常錯誤適合重試,如用戶網絡或者服務器偶爾不穩定的時候。
3)retryWhen
和前面帶 when 的操做符同樣,retryWhen 操做符接收一個返回 Observable 的回調函數,用這個 Observable 來控制重試的節奏。當這個 Observable 發出一個數據時就會進行一次重試,它完結時 retryWhen 返回的 Observable 也當即完結。
interval(1000).pipe( take(6), map(x => { if (x === 4) { throw new Error('unlucky number 4') } else { return x } }), retryWhen(err$ => err$.pipe( delay(1000), take(5)) ) // 延遲 1 秒後重試,重試 5 次 ).subscribe(x => console.log(x))
retryWhen 的可定製性很是高,不只能夠實現延遲定製,還能夠實現 retry 的控制重試次數。在實踐中,這種重試頻率固定的方法還不夠好,若是以前的重試失敗,以後重試成功的概率也不高。Angular 官網介紹了一個 Exponential backoff 的方法。將每次重試的延遲時間控制爲指數級增加。
import { pipe, range, timer, zip } from 'rxjs'; import { ajax } from 'rxjs/ajax'; import { retryWhen, map, mergeMap } from 'rxjs/operators'; function backoff(maxTries, ms) { return pipe( retryWhen(attempts => range(1, maxTries) .pipe( zip(attempts, (i) => i), map(i => i * i), mergeMap(i => timer(i * ms)) ) ) ); } ajax('/api/endpoint') .pipe(backoff(3, 250)) .subscribe(data => handleData(data)); function handleData(data) { // ... }
4)finalize
返回上游數據流的鏡像 Observable,當上遊的 Observable 完結或出錯時調用傳給它的函數,不影響數據流。
interval(1000).pipe( take(6), map(x => { if (x === 4) { throw new Error('unlucky number 4') } else { return x } }), finalize(() => console.log('finally')) ).subscribe(x => console.log('a'))
咱們可使用 tap 操做符來進行調試。
攔截源 Observable 的每一次發送,執行一個函數,返回源 Observable 的鏡像 Observable。
這個 API 有助於咱們對 Observable 的值進行驗證(debug)和執行一個會帶來反作用的函數,而不會影響源 Observable。如咱們用鼠標進行 canvas 繪圖,鼠標按下是開始畫圖,鼠標鬆開即中止。咱們須要在 mousedown 的時候進行 moveTo,不然此次畫的會和上次畫的連在一塊兒。咱們應該把這個會帶來反作用過程放在 tap 操做符的函數中,這樣纔不會影響原來的數據流。
tap 操做符和訂閱並不相同,tap 返回的 Observable 若是沒有被訂閱,tap 中產生反作用的函數並不會執行。
1) repeat
repeat 用來重複上游 Observable
2)pluck 相似 lodash 的方法 pluck,提取對象的嵌套屬性的值。
const click$ = fromEvent(document, 'click') const tagName$ = click$.pipe(pluck('target', 'tagName')) tagName$.subscribe(x => console.log(x))
等價於:
click$.pipe(map(e => e.target.tagName))
3)toArray
將發出的數據匯聚爲數組
interval(1000).pipe( take(3), toArray() ).subscribe(x => console.log(x)) // [0, 1, 2]
4)partition
將上游的 Observable 分爲兩個,一個 Observable 的數據是符合斷定的數據,另外一個時不符合斷定的數據。
const part$ = interval(1000).pipe( take(6), partition(x => x % 2 === 0) ) part$[0].subscribe(x => console.log(x)) // 0, 2, 4 part$[1].subscribe(x => console.log(x)) // 1, 3, 5
5) 更多操做符
RxJS 中的操做符很是多,這裏只介紹了一部分,更多請查看官網 API。
有一個用於搜索的 input,當輸入時自動發送 ajax,並在下方顯示結果列表,而後能夠選擇結果,這就是咱們常見的 AutoComplete 效果。要實現這個效果有不少細節要考慮,如防止 race condition 和優化請求次數。
<div class="autocomplete"> <input class="input" type="search" id="search" autocomplete="off"> <ul id="suggest-list" class="suggest"></ul> </div>
先獲取兩個 DOM 元素:
const input = document.querySelector('#search'); const suggestList = document.querySelector('#suggest-list');
咱們先將輸入框的 input 的事件轉化爲 Observable。
const input$ = fromEvent(input, 'input');
而後咱們根據輸入的值去發送 ajax 請求,因爲咱們是要獲取最新的值而丟棄以前 ajax 返回的值,咱們應該使用 switchMap 操做符。經過使用這個操做符,咱們解決了 race condition 問題。
input$.pipe( switchMap(e => from(getSuggestList(e.target.value))) )
getSuggestList 是一個發送 ajax 請求的方法,返回 promise,咱們使用 from 來將其轉化爲 Observable。
爲了優化請求,首先 e.target.value 是空字符串時不該該發送請求,而後可使用 debounceTime 減小觸發頻率,也可使用 distinctUntilChanged 操做符來表示只有與上次不一樣時纔去發送請求。咱們還能夠在 API 失敗時重試 3 次。
input$.pipe( filter(e => e.target.value.length > 1), debounceTime(300), distinctUntilChanged(), switchMap( e => from(getSuggestList(e.target.value)).pipe(retry(3)) ) )
而後咱們去訂閱渲染就能夠了。
對於結果列表上的點擊事件,比較簡單,具體見demo。
Observable 的操做符和數組的方法有類似之處,可是也有很大的不一樣,體如今如下兩點:
延遲運算,咱們以前有講到過,就是隻有訂閱後纔會開始對元素進行運算。
由於 Observable 是時間上的集合,操做符不是像數組方法那樣運算完全部元素再返回交給下一個方法,而是一個元素一直運算到底,就像管道中的水流同樣,先發出的數據先通過操做符的運算。
前面的例子都是隻有一個訂閱者的狀況,實際上固然能夠有多個訂閱者,這就是多播(multicast),即一個數據流的內容被多個 Observable 訂閱。
先思考一下下面的例子結果是什麼?
const source$ = interval(1000).pipe( take(3) ) source$.subscribe(x => console.log('Observer 1: ' + x)) setTimeout(() => { source$.subscribe(x => console.log('Observer 2: ' + x)) }, 1000)
你可能會覺得 Observer 2 一秒後才訂閱,錯過了數據 0,所以只會輸出 1 和 2,但實際上會先輸出 0。爲何如此呢?這就涉及到對已錯過數據的兩種處理策略。
第一種策略相似於直播,第二種和點播類似。使用第一種策略的 Observable 叫作 Cold Observable,由於每次都要從新生產數據,是 「冷」的,須要從新發動。第二種,由於一直在生產數據,只要使用後面的數據就能夠了,因此叫 Hot Observable。
RxJS 中如 interval、range 這些方法產生的 Observable 都是 Cold Observable,產生 Hot Observable 的是由 Promise、Event 這些轉化而來的 Observable,它們的數據源都在外部,和 Observer 無關。
前面咱們提到 Observable 都是 lazy evaluation 的,數據管道內的邏輯只有訂閱後纔會執行,可是 Cold Observable 相對更 lazy 一些。Cold Observable 若是沒有訂閱者連數據都不會產生,對於 Hot Observable,數據仍會產生,可是不會進入管道處理。
Hot Observable 是多播,對於 Cold Observable,每次訂閱都從新生產了一份數據流,因此不是多播。下面的例子更加明顯,兩個訂閱者有很大的機率會接收到不一樣的數據。
const source$ = interval(1000).pipe( map(x => Math.floor(Math.random() * 10)), take(3) ) source$.subscribe(x => console.log('Observer 1: ' + x)) setTimeout(() => { source$.subscribe(x => console.log('Observer 2: ' + x)) }, 1000)
若是想要實現多播,就要使用 RxJS 中 Subject。
爲了防止每次訂閱都從新生產一份數據流,咱們可使用中間人,讓這個中間人去訂閱源數據流,觀察者都去訂閱這個中間人。這個中間人能去訂閱數據流,因此是個 Observer,又能被觀察者訂閱,因此也是 Observable。咱們能夠本身實現一個這樣的中間人:
const subject = { observers: [], subscribe: function (observer) { this.observers.push(observer) }, next: function (value) { this.observers.forEach(o => o.next(value)) }, error: function (error) { this.observers.forEach(o => o.error(error)) }, complete: function () { this.observers.forEach(o => o.complete()) } }
這個 subject 擁有 Observer 的 next、error、complete 方法,每次被觀察者訂閱時都會在內部保存這個觀察者。當接收到源數據流的數據時,會把數據發送給每個觀察者。
const source$ = interval(1000).pipe( map(x => Math.floor(Math.random() * 10)), take(3) ) const observerA = { next: x => console.log('Observer A: ' + x), error: null, complete: () => console.log('Observer A completed') } const observerB = { next: x => console.log('Observer B: ' + x), error: null, complete: () => console.log('Observer B completed') } source$.subscribe(subject) subject.subscribe(observerA) setTimeout(() => { subject.subscribe(observerB) }, 1000)
這時咱們發現兩個觀察者接收到的是同一份數據,ObserverB 因爲延遲一秒訂閱,因此少接收到一個數據。將咱們本身實現的 subject 換成 RxJS 中的 Subject,效果相同:
import { Subject } from 'rxjs' const subject = new Subject()
從上面能夠看到,Subject 和 Observable 有一個很大的不一樣:它內部保存有一個觀察者列表。
前面的 subject 是在源數據流發出值時調用 next 方法,向訂閱的觀察者發送這個值,咱們也能夠手動調用 subject 的next 方法送出值:
const observerA = { next: x => console.log('Observer A: ' + x) } const observerB = { next: x => console.log('Observer B: ' + x) } const subject = new Subject() subject.subscribe(observerA) setTimeout(() => { subject.subscribe(observerB) }, 500) subject.next(1) setTimeout(() => { subject.next(2) }, 1000)
總結一下,Subject 既是 Observable 又是 Observer,它會對內部的 observers 清單進行組播(multicast)。
在 RxJS 5 中,若是 Subject 的某個下游數據流產生了錯誤異常,而又沒有被 Observer 處理,那這個 Subject 的其餘 Observer 都會失敗。可是在 RxJS 6 中不會如此。
在 v6 的這個例子 中,ObserverA 沒有對錯誤進行處理,可是並不影響 ObserverB,而在 v5 這個demo中由於 ObserverA 沒有對錯誤進行處理,使得 ObserverB 終止了。很明顯 v6 的這種處理更符合直覺。
1)BehaviorSubject
BehaviorSubject 須要在實例化時給定一個初始值,若是沒有默認是 undefined,每次訂閱時都會發出最新的狀態,即便已經錯過數據的發送時間。
const observerA = { next: x => console.log('Observer A: ' + x) } const observerB = { next: x => console.log('Observer B: ' + x) } const subject = new BehaviorSubject(0) subject.subscribe(observerA) // Observer A: 0 subject.next(1) // Observer A: 1 subject.next(2) // Observer A: 2 subject.next(3) // Observer A: 3 setTimeout(() => { subject.subscribe(observerB) // Observer B: 3 }, 500)
observerB 已經錯過流數據的發送時間,可是訂閱時也能獲取到最新數據 3。
BehaviorSubject 有點相似於狀態,一開始能夠提供初始狀態,以後訂閱均可以獲取最新的狀態。
2)ReplaySubject
ReplaySubject 表示重放,在新的觀察者訂閱時從新發送原來的數據,能夠經過參數指定重放最後幾個數據。
const observerA = { next: x => console.log('Observer A: ' + x) } const observerB = { next: x => console.log('Observer B: ' + x) } const subject = new ReplaySubject(2) // 重放最後兩個 subject.subscribe(observerA) subject.next(1) // Observer A: 1 subject.next(2) // Observer A: 2 subject.next(3) // Observer A: 3 subject.complete() setTimeout(() => { subject.subscribe(observerB) // Observer B: 2 // Observer B: 3 }, 500)
這裏咱們能夠看到,即便 subject 完結後再去訂閱依然能夠重放最後兩個數據。
ReplaySubject(1) 和前面的 BehaviorSubject 是不同的,首前後者能夠提供默認數據,而前者不行,其次前者在 subject 終結後再去訂閱依然能夠獲得最近發出的數據然後者不行。
3)AsyncSubject
AsyncSubject 有點相似 operator last,會在 subject 完結後送出最後一個值。
const subject = new AsyncSubject() subject.subscribe(observerA) subject.next(1) subject.next(2) subject.next(3) subject.complete() // Observer A: 3 setTimeout(() => { subject.subscribe(observerB) // Observer B: 3 }, 500)
observerA 即便早就訂閱了,可是並不會響應前面的 next,完結後才接收到最後一個值 3。
前面咱們寫的 Subject 須要去訂閱源數據流和被觀察者訂閱,寫起來比較繁瑣,咱們能夠藉助操做符來實現。
1)multicast
使用方式以下,接收一個 subject 或者 subject factory。這個操做符返回了一個 connectable 的 Observable。等到執行 connect() 纔會用真的 subject 訂閱 source,並開始發送數據,若是沒有 connect,Observable 是不會執行的。
const source = interval(1000).pipe( map(x => Math.floor(Math.random() * 10)), take(3), multicast(new Subject) ) const observerA = { next: x => console.log('Observer A: ' + x), error: null, complete: () => console.log('Observer A completed') } const observerB = { next: x => console.log('Observer B: ' + x), error: null, complete: () => console.log('Observer B completed') } source.subscribe(observerA) // subject.subscribe(observerA) source.connect() // source.subscribe(subject) setTimeout(() => { source.subscribe(observerB) // subject.subscribe(observerB) }, 1000)
2)refCount
上面使用了 multicast,可是仍是有些麻煩,還須要去手動 connect。這時咱們能夠再搭配 refCount 操做符建立只要有訂閱就會自動 connect 的 Observable。只須要去掉 connect 方法調用,在 multicast 後面再加一個 refCount 操做符。
multicast(new Subject), refCount()
refCount 其實就是自動計數的意思,當 Observer 數量大於 1 時,subject 訂閱上游數據流,減小爲 0 時退訂上游數據流。
3)multicast selector 參數
multicast 第一個參數除了是一個 subject,還能夠是一個 subject factory,即返回 subject 的函數。這時使用了不一樣的中間人,每一個觀察者訂閱時都從新生產數據,適用於退訂了上游以後再次訂閱的場景。
multicast 還能夠接收可選的第二個參數,稱爲 selector 參數。它可使用上游數據流任意屢次,而不會重複訂閱上游的數據。當使用了這個參數時,multicast 不會返回 connectable Observable,而是這個參數(回調函數)返回的 Observable。selecetor 回調函數有一個參數,一般叫作 shared,即 multicast 第一個參數所表明的 subject 對象。
const selector = shared => { return shared.pipe(concat(of('done'))) } const source = interval(1000).pipe( take(3), multicast(new Subject, selector) ) const observerA = { next: x => console.log('Observer A: ' + x), error: null, complete: () => console.log('Observer A completed') } const observerB = { next: x => console.log('Observer B: ' + x), error: null, complete: () => console.log('Observer B completed') } source.subscribe(observerA) setTimeout(() => { source.subscribe(observerB) }, 5000) // Observer A: 0 // Observer A: 1 // Observer A: 2 // Observer A: done // Observer A completed // Observer B: done // Observer B: completed
observerB 訂閱時會調用 selector 函數,subject 即shared 已經完結,可是 concat 依然會在這個 Observable 後面加上 'done'。
能夠利用 selector 處理 「三角關係」的數據流,若有一個 tick$ 數據流,對其進行 delay(500) 操做後的下游 delayTick$, 一個由它們合併獲得的 mergeTick$,這時就造成了三角關係。delayTick$ 和 mergeTick$ 都訂閱了 tick$。
const tick$ = interval(1000).pipe( take(1), tap(x => console.log('source: ' + x)) ) const delayTick$ = tick$.pipe( delay(500) ) const mergeTick$ = merge(tick$, delayTick$).subscribe(x => console.log('observer: ' + x)) // source: 0 // observer: 0 // source: 0 // observer: 0
從上面的結果咱們能夠驗證,tick$ 被訂閱了兩次。
咱們可使用 selector 函數來使其只訂閱一次,將上面的過程移到 selector 函數內便可。
const source$ = interval(1000).pipe( take(1), tap(x => console.log('source: ' + x)) ) const result$ = source$.pipe( multicast(new Subject(), shared => { const tick$ = shared const delayTick$ = tick$.pipe(delay(500)) const mergeTick$ = merge(tick$, delayTick$) return mergeTick$ }) ) result$.subscribe(x => console.log('observer: ' + x))
這時只會輸出一次 'source: 0'。
4)publish
publish 是 multicast 的一種簡寫方式,效果等同於以下:
function publish (selector) { if (selector) { return multicast(() => new Subject(), selector) } else { return multicast(new Subject()) } }
有上一節說到的 selector 函數時,等價於:
multicast(() => new Subject(), selector)
沒有時,等價於:
multicast(new Subject())
5)share
share 是 multicast 和 refCount 的簡寫,share() 等同於在 pipe 中先調用了 multicast(() => new Subject()),再調用了 refCount()。
const source = interval(1000).pipe( take(3), share() ) const observerA = { next: x => console.log('Observer A: ' + x), error: null, complete: () => console.log('Observer A completed') } const observerB = { next: x => console.log('Observer B: ' + x), error: null, complete: () => console.log('Observer B completed') } source.subscribe(observerA) setTimeout(() => { source.subscribe(observerB) }, 5000) // Observer A: 0 // Observer A: 1 // Observer A: 2 // Observer A completed // Observer B: 0 // Observer B: 1 // Observer B: 2 // Observer B completed
因爲 share 是調用了 subject 工廠函數,而不是一個 subject 對象,所以 observerB 訂閱時能夠從新獲取數據。
6)publishLast、publishBehavior、publishReplay
同前面的 publish,只不過使用的不是普通 Subject,而是對應的 AsyncSubject、BehaviorSubject、ReplaySubject。
Scheduler(調度器)用於控制數據流中數據的推送節奏。
import { range, asapScheduler } from 'rxjs' const source$ = range(1, 3, asapScheduler) console.log('before subscribe') source$.subscribe(x => console.log(x)) console.log('subscribed')
上面的代碼,若是去掉 asapScheduler 參數,由於 range 是同步的,會先輸出 1, 2, 3,再輸出 'subscribed',可是加了之後就變成 先輸出 'subscribed',改變了原來數據產生的方式。asap 是 as soon as possible 的縮寫,同步任務完成後就會立刻執行。
Scheduler 擁有一個虛擬時鐘,如 interval 建立的數據流每隔一段時間要發出數據,由 Scheduler 提供時間來判斷是否到了發送數據的時間。
asap 會盡可能使用 micro task,而 async 會使用 macro task。
一些建立數據流的方法能夠提供 Scheduler 參數,合併類操做符如 merge 也能夠,在建立數據流後咱們也可使用操做符,使得產生的下游 Observable 推送數據的節奏由指定的 Scheduler 來控制。這個操做符就是 observeOn。
const tick$ = interval(10) // Intervals are scheduled with async scheduler by default... tick$.pipe( observeOn(animationFrameScheduler) // but we will observe on animationFrame scheduler to ensure smooth animation. ) .subscribe(val => { someDiv.style.height = val + 'px' })
原本每 10 ms 就會發送一個數據,修改 Scheduler 爲 animationFrame 後只有瀏覽器重繪纔會發送數據更新樣式。
咱們還能夠經過操做符 subscribeOn 控制訂閱的時機。
const source$ = new Observable(observer => { console.log('on subscribe') observer.next(1) observer.next(2) observer.next(3) return () => { console.log('on unsubscribe') } }) const tweaked$ = source$.pipe(subscribeOn(asapScheduler)) console.log('before subscribe') tweaked$.subscribe(x => console.log(x)) console.log('subscribed') // before subscribe // subscribed // on subscribe // 1 // 2 // 3
經過 subscribeOn(asapScheduler),咱們把訂閱時間推遲到儘快執行。
RxJS 中有一個 用於測試的 TestScheduler,RxJS 的測試你們能夠查看程墨的《深刻淺出 RxJS》或者其餘資料。
import { TestScheduler } from 'rxjs/testing'
Angular 自身引用了 RxJS,如 http 和 animation 都使用了 Observable,狀態管理可使用 ngrx。
Vue 官方有與 RxJS 集成的 vue-rx。
React 能夠經過 Subject 創建橋樑,Redux 也有與 RxJS 結合的中間件 Redux-Observable。
interval(10000).pipe( switchMap(() => from(axios.get(url))), catchError(err => EMPTY) ).subscribe(data => render(data))
上面的代碼,每隔 10s 去發送一個請求,當某個請求返回出錯時,返回空的 Observable 而不渲染數據。這樣處理貌似正確,可是實際上某個請求出錯時,整個 Observable 終結了,所以輪詢就結束了。爲了保持輪詢,咱們須要進行隔離,把錯誤處理移到 switchMap 內部進行處理。
interval(10000).pipe( switchMap(() => from(axios.get(url)).pipe( catchError(err => EMPTY) )) ).subscribe(data => render(data))
若是沒有及時退訂可能會引起內存泄露,咱們須要經過退訂去釋放資源。
1)命令式管理
const subscription = source$.subscribe(observer) // later... subscription.unsubscribe()
上面的管理方式,數量不多時還好,若是數量較多,將會顯得十分笨拙。
2) 聲明式管理
const kill1 = fromEvent(button, 'click') const kill2 = getStreamOfRouteChanges() const kill3 = new Subject() const merged$ = mege( source1.pipe(takeUntil(kill1)), source2.pipe(takeUntil(kill2)), source3.pipe(takeUntil(kill3)) ) const sub = merged$.subscribe(observer) // later... sub.unsubscribe() // 或者發出任意結束的事件 kill3.next(true)
經過 takeUntil、map 或者其餘操做符組合進行管理。這樣更不容易漏掉某個退訂,訂閱也減小了。
3)讓框架或者某些類庫去處理
好比 Angular 中的 async pipe,當 unmount 時會自動退訂,也不用寫訂閱。
不要過分使用 Rx,它比較適合如下場景:
簡單的應用並不須要 RxJS。
能夠看看徐飛的相關思考:流動的數據——使用 RxJS 構造複雜單頁應用的數據邏輯
Async Iterator 提案已經進入了 ES2018,能夠認爲是 iterator 的異步版本。在 Symbol 上部署了 asyncIterator 的接口,不過它的 next 方法返回的是 { value, done } 對象的 Promise 版本。可使用 for-await-of 進行迭代:
for await (const line of readLines(filePath)) { console.log(line) }
使用 Async Iterator 咱們能夠很容易實現相似 RxJS 操做符的功能:
const map = async function*(fn) { for await(const value of this) yield fn(value) }
其餘如 fromEvent 等也比較容易實現。Async Iterator 擴展庫 axax 的一個例子:
import { fromEvent } from "axax/es5/fromEvent"; const clicks = fromEvent(document, 'click'); for await (const click of clicks) { console.log('a button was clicked'); }
下面是 Benjamin Gruenbaum 用 Async Iterator 實現 AutoComplete 的一個例子:
let tooSoon = false, last; for await (const {target: {value}} of fromEvent(el, "keyup")) { if(!value || tooSoon) continue; if(value === last) continue; last = value; yield await fetch("/autocomplete/" + value); // misses `last` tooSoon = true; delay(500).then(() => tooSoon = false); }
Async Iterator 相比 RxJS,沒有那麼多概念,上手快,也比較容易擴展實現那些操做符。
從數據消費者的角度上看,RxJS 是 push stream,由生產者把數據推送過來,Async Iterator 是 pull stream,是本身去拉取數據。
博客:30 天精通 RxJS
書:深刻淺出RxJS