初學rxjs,本着一個新手的角度完成一個小demo,相信過程當中會有不少你們也遇到過的問題,同時整個過程不斷髮散,講解一些rxjs的核心知識點和API,但願這篇文章能給學習rxjs的同窗們一些啓發。javascript
項目地址html
折線圖有12個點(按時間分佈),每隔2秒(爲了演示方便)刷新出一個點。java
先簡單點想,react
須要一個集中存儲狀態
的地方,這裏的狀態其實就是圖表對應的數據,這個地方每通過一個時間間隔就向服務器請求一次數據,它須要存儲最近12個點對應的數據ios
把這種想法往rxjs上靠。首先咱們先寫個最基本的可觀察對象fetchData$
git
新建src/app.tses6
import {Observable, Observer} from 'rxjs'
import {Mock} from './mock'
const print = x => console.log('x: ', x)
const intervalEmit$ = Observable.interval(2000)
const fetchData$ = Observable.fromPromise(Mock.fetch())
intervalEmit$.subscribe(print)
fetchData$.subscribe(print)複製代碼
新建src/mock.tsgithub
import axios from 'axios'
export class Mock {
static fetch():Promise<Number> {
// base : 20
return axios.get('https://zan.wilddogio.com/age.json')
.then(res => Number(res.data) + Mock.randomAge(10))
}
// random 1 ~ x
static randomAge(x) {
return Math.floor(1 + Math.random() * x)
}
}複製代碼
很簡單一個是每兩秒produce一個遞增值,一個是請求回來一個promiseable值並produce
如今咱們作個組合,也就是每隔兩秒請求回來一個promiseable值並produce,咱們修改app.tsajax
const intervalEmit$ = Observable.interval(2000)
// 第一種
const app$ = intervalEmit$.switchMap(e => Observable.fromPromise(Mock.fetch()))
// 第二種,將switchMap拆開
const fetchData$ = intervalEmit$.map(e => Observable.fromPromise(Mock.fetch()))
const app$ = fetchData$.switch()
// 第三種,使用defer工廠建立Observable
const deferPromise$ = Observable.defer(function () {
return Observable.fromPromise(Mock.fetch())
})
const app$ = intervalEmit$.switchMap(e => deferPromise$)
app$.subscribe(print)複製代碼
先說第三種,它相對單純:),咱們先看下defer
的定義,Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer. 意思也比較好理解,defer接受一個產生observable的函數,當defer所建立的observable被訂閱時就經過該函數建立一個observable對象。express
第一種和第二種放在一塊說,map就不用說了,就是將一個observable通過一個函數
轉換造成另外一個observable,和Array.prototyp.map很像,可是你能夠把它理解成一個時間點上的值或者對一個值的一對一變換。重點說下switch,一樣咱們先看下定義,Converts a higher-order Observable into a first-order Observable by subscribing to only the most recently emitted of those inner Observables. 解釋一下,經過訂閱的方式將一個高階observable轉換爲一個低階observable,同時僅產生一個低階最近產生的值。
首先要先清楚什麼叫高階,
var fn = function(a, b) {return a + b}複製代碼
經過typeof fn能夠看到fn的類型是function
,繼續
var fn1 = fn(1,2)複製代碼
經過typeof fn1能夠看到fn1的類型是number
,OK,它已經不是函數了,那麼如何讓fn1繼續是函數呢,咱們改寫一下
var fn = function(a) {return function(b) {return a + b}}複製代碼
若是此次你還想獲得1+2=3,那麼你須要fn(1)(2)才能獲得,也就是說咱們想獲得最終的結果調用了一次以上的函數,好的這就叫作高階,超過一次就是高階,這和數學裏的高階導數相似的。好了咱們回到switch的主題。
var ob$ // 一個可觀察對象
var higher$ = ob$.實例operator(靜態operator)複製代碼
這裏有一個實例operator
,它就是一個轉換器,它將一個源observable做爲一個模版轉變爲另一個observable,並且源observable是不被改變的,而靜態operator
就像一個observable製造器同樣,一啓動(subscribe)就開始生產。所以
var higher$ = ob$.實例operator(靜態operator)
這裏獲得的higher$就是一個高階observable
了,由於當你訂閱它時,它不像靜態operator產生數據,而是產生observable,因此就像你執行fn(1)產生的是一個新的函數而不是值同樣。下面是個小栗子,能夠看到打印出的是observable。
var print = x=>console.log('x: ', x)
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
higherOrder.subscribe(print)
// x: IntervalObservable {_isScalar: false, period: 1000, scheduler: AsyncScheduler}複製代碼
所以咱們須要switch將high$轉換成低階observable,
var lower$ = higher$.switch()
這樣當咱們訂閱lower$的時候,將會獲得靜態operator
所產生的值,看官方栗子,
var print = x=>console.log('x: ', x)
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var lowerOrder = higherOrder.switch()
lowerOrder.subscribe(print)
//== 第一次點擊 ==
// x: 0
// x: 1
//== 第二次點擊 ==
// x: 0複製代碼
能夠看到,如今打印出的是值了,並且當咱們再次點擊時,Rx.Observable.interval(1000)被從新執行了,這也正是Flattens an Observable-of-Observables by dropping the previous inner Observable once a new one appears.的含義,當外層observable產生值時,它會觸發丟棄最近一次被訂閱的內層observable。咱們知道promise對象一旦建立,它處於pending狀態,最終變爲onFulfille或者onRejected狀態,所以它是不能被取消的。而經過rxjs能夠達到目的,看一個栗子。咱們用express作一個restFul服務器,
app.js
var express = require('express');
var app = express()
app.use(express.static('blog'));
app.get('/delay', function(req, res) {
setTimeout(function(){
res.send('hello world')
},3000)
})
var server = app.listen(3000, function () {
var host = server.address().address
var port = server.address().port
console.log('app listening at http://%s:%s', host, port)
})複製代碼
當服務器接收到http://localhost:3000/delay請求時,延遲三秒發送響應。再看客戶端代碼
最近被取消.html
<script> window.onload = function () { var print = x=>console.log('x: ', x) var ajax$ = Rx.Observable.fromPromise($.ajax('/delay')) var click$ = Rx.Observable.fromEvent(document, 'click') var higher$ = click$.map(e=>Rx.Observable.fromPromise($.ajax('/delay'))) var app$ = higher$.switch() app$.subscribe(print) //當我在三秒內瘋狂點擊5次,其實只返回一次數據,也就是說前四次被unsubscribe了 } </script>複製代碼
此時我在頁面瘋狂點擊五次(三秒以內),你會看到發出了五次請求,可是最終缺只打印出一條hello world,是的前四次都被unsubscribe
了也就是官網中多說的drop,這就達到了撤銷promise的效果。
咱們繼續,如今咱們實現了每兩秒發送一個請求,接下來咱們實現數據的存儲
首先咱們要先存儲夠24個點,以後每來一個點丟棄一個最舊的點。咱們小時候都聽過磁帶,錄音機有倒帶的功能(不是周杰倫給蔡依林寫的那首),所以磁帶存儲了整個過程,你能夠回退到以前播放的任意一個時間點從新播放,其實咱們的一次次請求就像在播放磁帶,咱們想獲取到以前點的最好辦法就是能夠存儲它們,磁帶也有存儲大小,那麼咱們也不可能無限存儲,因此咱們就暫存最近24次記錄。下面rxjs的倒帶replay登場。
在rxjs的api文檔中搜索replay能夠看到兩個東東ReplaySubject
和publishReplay
,前者是一個Subject類,後者是一個Observable實例operator,他們之間有沒有什麼關聯,咱們仍是先來看看他倆該怎麼用吧,先說和Observable更關係更緊密的publishReplay。
public publishReplay(bufferSize: , windowTime: , scheduler: *): ConnectableObservable
export function publishReplay(bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, scheduler) {
return multicast.call(this, new ReplaySubject(bufferSize, windowTime, scheduler));
}複製代碼
原來publishReplay的三個參數都是爲ReplaySubject實例化服務的,那麼對於參數咱們先按下不談,看看這個multicast
,這個this表明Observable實例,那麼在咱們看看這個operator以前,咱們先說下單播
和多播
,這對咱們理解該operator頗有幫助。
雖然到目前爲止咱們尚未講Subject,可是先白話一下單播Observable和多播Subject,單播很高冷(cold)很專一(獨立),她從不主動聯繫別人,只有在別人關注她後,纔會和這我的侃侃而談。再來一我的關注她,和她交流中感覺不到還有別人的存在。而Subject就很熱情(hot)喜歡分享(不獨立)。不論什麼時候關注她,她都樂於將經驗與人分享。下面看兩個小栗子。
Obserable單播
const printA = (val) => console.log('observerA :' + val)
const printB = (val) => console.log('observerB :' + val)
var clicks = Rx.Observable.fromEvent(document, 'click');
var ones = clicks.mapTo(1);
var seed = 0;
var count = ones.scan((acc, one) => acc + one, seed);
count.subscribe(printA);
setTimeout(function() {
console.log('another subscribe.')
ones.scan((acc, one) => acc + one, seed).subscribe(printB)
}, 3000)複製代碼
從圖中能夠看到,3秒之後observerB依然從1開始打印,同時也能夠看出只有別人訂閱她的時候,她纔會和別人溝通。
從這個圖能夠更直觀的看出,當咱們訂閱藍色scan轉換後的observable和紅色scan轉換後的observable時,其實走的是兩個獨立的分支,每次訂閱也都是經過fromEvent建立了一個新的observable,其實observable就是一個函數,當收到訂閱時,就執行函數,在函數中經過訂閱者留下的通知方式通知到訂閱者。再來看Subject多播。
Subject多播
var subject = new Rx.Subject()
subject.subscribe(printA)
setTimeout(function() {
console.log('another subscribe.')
subject.subscribe(printB)
}, 3000)
Rx.Observable.fromEvent(document, 'click').mapTo(1).scan((acc, one) => acc + one, 0)
.do(num => subject.next(num))
.subscribe()複製代碼
從圖中看到雖然observerB3秒後姍姍來到,可是依然分享到了observerA的努力成果,從3開始打印。同時看到subject是主動告知訂閱者,so hot~
能夠看出Subject和Observable的區別,三秒後的訂閱並無建立一個新的分支,也就是沒有新的observable實例以及後續的一些列變換。
這裏咱們簡單講解了Observable的冷、單播和獨立性以及Subject的熱、多播和共享性。那麼咱們回來,繼續說multicast,接受一個Subject實例做爲參數,咱們有理由相信,這個operator是observable實例經過subject實例被賦予了多播的特性。咱們看一個multicast的小栗子。
var clickAddOne$ = Rx.Observable.fromEvent(document, 'click').mapTo(1).scan((acc, one) => acc + one, 0)
var subject = new Rx.Subject
subject.subscribe(printA)
setTimeout(function() {
console.log('another subscribe.')
subject.subscribe(printB)
}, 3000)
var app$ = clickAddOne$.multicast(subject)
app$.subscribe()複製代碼
這段代碼運行起來除了another subscribe.,不論你如何點擊都不會打印其餘信息。看來這個app$不是單純的observable實例,咱們看下rxjs官網對於multicast的描述:
意思大概是,返回值是一個ConnectableObservable實例
,該實例能夠產生數據共享給潛在的訂閱者(即Subject實例上的訂閱者),咱們修改一下代碼。
// app$.subscribe()
app$.connect()複製代碼
從圖中咱們看到了和上面Subject多播一致的結果。這裏咱們看到了一個陌生的方法connect
,ConnectableObservable
繼承自Observable
,同時具備一個connect
方法和一個refCount
方法。connect方法決定什麼時候訂閱生效,同時返回一個方法以決定什麼時候取消全部訂閱。
var clickAddOne$ = Rx.Observable.fromEvent(document, 'click').mapTo(1).scan((acc, one) => acc + one, 0).do(x=>console.log('do: ' + x))
var subject = new Rx.Subject
subject.subscribe(printA)
setTimeout(function() {
console.log('another subscribe.')
subject.subscribe(printB)
}, 3000)
var app$ = clickAddOne$.multicast(subject)
var connector = app$.connect()
setTimeout(function() {
connector.unsubscribe()
}, 6000)複製代碼
6秒事後,點擊不會產生任何打印信息。這裏顯示調用connect和返回實例上的unsubscribe顯得太命令式了,這裏咱們還可使用refCount使得這個過程的關注點放在observer的訂閱和取消上。改寫下上面的例子
var clickAddOne$ = Rx.Observable.fromEvent(document, 'click').mapTo(1).scan((acc, one) => acc + one, 0).do(x=>console.log('do: ' + x))
var subject = new Rx.Subject
var app$ = clickAddOne$.multicast(subject).refCount()
app$.subscribe(printA)
setTimeout(function() {
console.log('another subscribe.')
app$.subscribe(printB)
}, 3000)複製代碼
這更加Observable,同時咱們也達到了Observable多播化的目的,破費!
兜了一大圈回到publishReplay,再看下面的源碼就更清楚了許多
export function publishReplay(bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, scheduler) {
return multicast.call(this, new ReplaySubject(bufferSize, windowTime, scheduler));
}複製代碼
publishReplay自己就是observable.multicast(new ReplaySubject)的語法糖,那麼咱們就來看下ReplaySubject是個啥。先上一個小栗子
const printA = (val) => console.log('observerA :' + val)
const printB = (val) => console.log('observerB :' + val)
var subject = new Rx.ReplaySubject(3);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.subscribe({
next: (v) => console.log('observerC: ' + v)
});複製代碼
能夠看出後兩次subscribe,就打印出了前三次
可觀察對象產生的值,這有點像Observable訂閱,但又不會建立新的Observable實例,這種帶有從新發送之前數據的能力就是ReplaySubject了,所以下面兩端代碼是所實現的功能是同樣的
var app$ = Rx.Observable.interval(1000).multicast(new Rx.ReplaySubject(3)).refCount()
app$.subscribe(printA)
setTimeout(function () {
app$.subscribe(printB)
}, 3000)複製代碼
var app$ = Rx.Observable.interval(1000).publishReplay(3).refCount()
app$.subscribe(printA)
setTimeout(function () {
app$.subscribe(printB)
}, 3000)複製代碼
通過一個個引伸咱們掌握了很多rxjs的核心知識點和api使用,那麼回到demo上,咱們已經完成了每兩秒完成一次rest請求,下面咱們先完成這樣一個任務,當咱們緩存到第23個點時,後面每新增一個點打印update畫圖
。聯繫以前的內容,首先咱們要有一個buffersize爲24的ReplaySubject實例。每次訂閱都會產生以前24個值,可是這裏會有個問題須要經過訂閱來獲取舊的值,訂閱完之後其實這個訂閱就沒有意義了,Replay功能的基礎其實就是buffer能力,但Subject提供的這種Replay能力倒是cold、lazy的,咱們更但願這種replay能力能夠更hot,當到達一個bufferSize,就自動把這個bufferSize的數據produce出來,這有點像interval,通過一個時間間隔就produce一個數據,那麼有沒有相似intervalBuffer這種的靜態operator呢:),咱們先來搜搜和buffer有關的API。
一看這個bufferCount好像挺適合咱們的,估計是buffer了count個數據後,就會產生count個buffer數據。仍是看個小栗子
var source$ = Rx.Observable.interval(1000)
var buffer$ = source$.bufferCount(10)
buffer$.subscribe(x => console.log(x))複製代碼
從圖中能夠看到每隔10秒打印出了一組長度爲10的數字,這顯然不是咱們想要的,咱們但願每秒打印出一組數字,且丟棄最舊的一個數字,看下bufferCount的函數簽名,
public bufferCount(bufferSize: number, startBufferEvery: number): Observable
bufferCount還接受第二個參數,該參數表明了表明了計算bufferSize的起始位置,第一次達到bufferSize就produce,而從第二次起bufferSize從上一次buffer數據的startBufferEvery開始計算,也就是說當第一次produce後,bufferCount爲bufferSize-startBufferEvery,也就是還須要緩存startBufferEvery個纔會produce下一個buffer。改造下上一個栗子。
var source$ = Rx.Observable.interval(1000)
var buffer$ = source$.bufferCount(10, 1)
buffer$.subscribe(x => console.log(x))複製代碼
能夠看到達到了咱們預期。如今咱們完成子任務2,這裏爲了演示方便緩存5個點。
const print = x => console.log('x: ', x)
const intervalEmit$ = Observable.interval(2000)
const fetch$ = intervalEmit$.switchMap(e => Observable.fromPromise(Mock.fetch()))
const app$ = fetch$.bufferCount(5, 1).do('update畫圖')
app$.subscribe(print)複製代碼
OK!
下面咱們完成畫圖功能。
const line = new LineChart(document.getElementById('showAge') as HTMLDivElement)
line.setOptions({
title: {
left: 'center',
text: '動態數據(年齡)'
},
xAxis: {
type: 'time',
splitLine: {
show: false
}
},
yAxis: {
type: 'value',
boundaryGap: [0, '100%'],
splitLine: {
show: false
}
},
series: [{
type: 'line',
data: []
}]
})
line.showLoading()
const now = new Date().getTime()
const span = 2 * 1000
const bufferSize = 12
let counter = 0
const intervalEmit$ = Observable.interval(span)
const fetch$ = intervalEmit$.switchMap(e => Observable.fromPromise(Mock.fetch()))
const app$ = fetch$.bufferCount(bufferSize, 1).map(
buffer => {
counter === 0 && line.hideLoading()
const points = buffer.map((b, index) => {
const point = []
point[0] = now + index * span + span * counter
point[1] = b
return point
})
counter++
return points
}
).do(data => {
debugger;
line.setOptions({
series: [{
data
}]
})
})
app$.subscribe()複製代碼
效果以下
一個簡單的實時監控折線圖的demo就完成了,因爲本人也是初學rxjs,一些知識點不免會有疏漏,但也儘可能作到不誤導,相信你們仍是會有些收穫的。