《深刻淺出RxJS》讀書筆記
html部分javascript
測試你對時間的感受 按住我一秒鐘而後鬆手 你的時間:毫秒
jquery
實現html
var time = new Date().getTime(); $("#hold-me") .mousedown(function(event) { time = new Date().getTime(); }) .mouseup(function(event) { if (time) { var elapse = new Date().getTime() - time; $("#hold-time").text(elapse); // 重置time 避免直接觸發mouseup事件,例如在A處點擊而後在B處up time = null; } });
RxJS
實現java
const holdMeButton = document.getElementById("hold-me"); const mouseDown$ = Rx.Observable.fromEvent(holdMeButton,"mousedown"); const mouseUp$ = Rx.Observable.fromEvent(holdMeButton,"mouseup"); // 獲取間隔時間 const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(),(mouseUpEvent,mouseDownEvent)=>{ return mouseUpEvent.timestamp - mouseDownEvent.timestamp }); holdTime$.subscribe(ms=>{ document.getElementById("hold-time").innerText = ms; }) holdTime$.flatMap(ms=>{ return Rx.Observable.ajax('https://timing-sense-score-board.herokuapp.com/score/'+ms) }) .map(e=>e.response) .subscribe(res=>{ document.getElementById("rank").innerText = `你超過了${res.rank}% 的用戶` })
主要用來加載靜態資源,因此使用到了koa
,koa-static
const path = require("path"); const koa = require("koa"); const serve = require("koa-static"); const app = new koa(); app.use(async function (ctx,next) { console.log("收到請求...") await next() console.log(`"${ctx.path}"請求 已處理...`) }) app.use(serve(path.resolve(__dirname, "../src"))).listen(3001,function(err){ if(err) throw err; console.log("程序啓動成功") });
Observable
和 Observer
Observable 可被觀察的對象,Observer觀察者,Observer經過subscribe來觀察Observable對象
RxJS的數據流就是Observable對象:node
// 使用 deep-link方式引入函數 const Observable = require("rxjs").Observable; /* * 定義Observable對象的行爲,會產生數據,調用訂閱者的next方法 * 1. 此處的Observer與訂閱者行爲 theObserver並非同一個對象,而是對theObserver的包裝 * 2. 若是observer.error被調用,以後的complete或者next就不會被調用啦,同理,complete被調用以後,也不會 * 再調用next或者error * 3. 若是error或者complete一直未調用,則observer就一直在內存中等待被調用 */ const onSubscribe = observer =>{ observer.next(1); observer.error(2); observer.complete(3); } // 產生一個Observable對象 const source$ = new Observable(onSubscribe); // 定義觀察者的行爲 消費Observable對象產生的數據 const theObserver = { next:item => console.log(item), error:item => console.error(item), complete:item => console.log("已完成"), } // 創建Observable與Observer的關係 source$.subscribe(theObserver)
subscribe
在訂閱一段事件以後observer再也不響應吐出的信息了,這時能夠退訂,可是Observeable還會一直產生數據
const Observable = require("rxjs").Observable; const onSubscribe = observer =>{ let n = 1; const handle = setInterval(()=>{ console.log(`in onSubscribe ${n}`) // if(n>3){ // observer.complete() // } observer.next(n++); },1000) return { unsubscribe(){ // clearInterval(handle) } } } const source$ = new Observable(onSubscribe); const theObserver = { next:item => console.log(item) } let subscription = source$.subscribe(theObserver) setTimeout(()=>{ // 此處的unsubscribe也是封裝過的 subscription.unsubscribe() },3500)
在node
中執行,會一直打印 in onSubscribe *
,可是source$不會再響應jquery
const Observable = require("rxjs/Observable").Observable; const of = require("rxjs/observable/of").of; const map = require("rxjs/operator/map").map; // 新建一個操做符 // 此處this是外部變量,致使此operator再也不是純函數 Observable.prototype.double = function(){ // return this::map(x=>x*2) return map.call(this,x=>x*2) } const source$ = of(1,3,4); const result$ = source$.double(); result$.subscribe(value=>console.log(value))
解決須要使用call或者bind改變this的操做,這樣是依賴外部環境的,不屬於純函數,也會喪失TS的類型檢查優點
lettable
將Observable
對象傳遞給下文,避免使用this
ajax
const Observable = require("rxjs/Observable").Observable; require("rxjs/add/observable/of").of; require("rxjs/add/operator/map").map; require("rxjs/add/operator/let").let; const source$ = Observable.of(1,2,3); const double$ = obs$ => obs$.map(v=>v*2); // 接受上文,傳遞到下文 const result$ = source$.let(double$); result$.subscribe(console.log)
不引入`map`補丁,開發**lettable**寫法的操做符
// ES5實現
function map(project){promise
return function(obj$){ // 經過上面的Observable生成一個新Observable return new Observable(observer=>{ return obj$.subscribe({ next:value=>observer.next(project(value)), error:err=>observer.error(err), complete:()=>observer.complete() }) }) }
}
// 添加操做符
var result$ = source$.let(map(x => x * 3));app
// ES6實現
const map6 = fn => obj$ =>koa
new Observable(observer => obj$.subscribe({ next: value => observer.next(fn(value)), error: err => observer.error(err), complete: () => observer.complete() }) );
// 添加操做符
var result$ = source$.let(map6(x => x * 4));異步
`pipeable`是`lettable`的別稱,方便對於`lattable`的理解,V6以上才支持 ## Chapter4 建立數據流 > 大多數的操做符是靜態操做符 ### 基礎操做符 1. `create`簡單的返回一個Observable對象
Observable.create = function(subscribe){ return new Observable(subscribe) } ```
of
列舉數據
import {Observable} from "rxjs/Observable"; import "rxjs/add/observable/of" // 依次吐出數據,一次性emit const source$ = Observable.of(1,2,3); // 訂閱 // 第一個參數是next,第二個參數是error回調,第三個參數是complete回調 source$.subscribe(console.log,null,()=>{console.log("Complete")})
range
產生指定範圍的數據
const sourc$ = Observable.range(/*初始值*/1,/*個數*/100); // 每次只能步進 1
generate
循環建立
至關於for循環
const source$ = Observable.generate( // 初始值 2, // 判斷條件 value=> value < 10, // 步進 value=> value+0.5, // 函數體,產生的結果 value=> value*value )
使用generate
代替range
const range = function(min,count){ const max = min + count; return Observable.generate(min,v=>vv+1,v=>v*v) }
repeat
重複數據的數據流
實例操做符,經過
import 'rxjs/add/operator/repeat'
引入
- V4版本中repeat是靜態屬性,這樣在使用
Observable.repeat(1,2)
重複1兩次,這樣數據就夠靈活- V5版本中改成實例屬性以後,Observable.of(1,2,4).repeat(2),將產生的1,2,3重複兩次,功能更增強大
const Observable = require("rxjs").Observable; require("rxjs/add/operator/repeat"); const source$ = Observable.create(observer => { setTimeout(() => { observer.next(1); }, 1000); setTimeout(() => { observer.next(2); }, 2000); setTimeout(() => { observer.next(3); }, 3000); setTimeout(() => { observer.complete(1); }, 4000); return { unsubscribe(){ console.log("on Unsubscribe") } } }); const repeat$ = source$.repeat(2) repeat$.subscribe(console.log,null,()=>{console.log("Complete")}) // 1 // 2 // 3 // on Unsubscribe // 1 // 2 // 3 // Complete // on Unsubscribe
若是沒有observer.complete()
repeat不會被調用
repeat以complete爲契機會再次執行數據源,若是上游一直沒有complete下游就不會執行
repeat
的存在,第一次數據源執行完(以complete爲契機)後並不會執行observer的complete回調empty
,throw
,never
interval
和timer
interval
相似於setInterval
require('rxjs/add/observable/interval') // 每隔1000ms產生一個數據,初始值爲0,步進爲1 Observable.interval(1000)'
timer
是setTimeout的超集
// 1000ms後開始產生數據,以後每隔1000ms產生一個數據,功能至關於interval Observable.timer(1000,1000) // 指定日期 Observable.time(new Date(new Date().getTime() + 12000))
from
把一切轉化爲Observable
- 將全部的Iterable的對象都轉化爲Observable對象
- 能夠將Promise對象轉化爲Observable對象,功能與fromPromise相同
fromPromise
異步處理的對接
const Observable = require("rxjs").Observable; require("rxjs/add/observable/fromPromise"); const promise = Promise.resolve(123); Observable.fromPromise(promise).subscribe(console.log, null, () => console.log("Complete") ); //123 //Complete const promise1 = Promise.reject("error"); Observable.from( console.log, err => console.log("catch", err), () => console.log("Complete!") ); // 未捕獲的Promise錯誤 // (node:765) UnhandledPromiseRejectionWarning: error // (node:765) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing // inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). ( // rejection id: 1) // (node:765) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
fromEvent
鏈接DOM與RxJS的橋樑
const event$ = Observable.fromEvent(document.getElementById("btn"),"click"); event$.subscribe(event=>{ // Do something })
在NodeJs中能夠與EventEmitter
交互
const Observable = require("rxjs").Observable; const EventEmitter = require("events"); require("rxjs/add/observable/fromEvent") const emitter = new EventEmitter(); const source$ = Observable.fromEvent(emitter,"msg"); source$.subscribe(console.log,null,()=>console.log("Complete")) emitter.emit("msg",1) // 1 emitter.emit("msg","haha") // haha emitter.emit("a-msg","haha") // emitter.emit("msg",'nihao') // nihao
fromEvent
是Hot Observable,也就是數據的產生和訂閱無關,對於fromEvent
來講,數據源是外部產生的,不受RxJS
控制,這是Hot Observable
對象的特色
fromEventPattern
針對不規範的事件源
規範的事件源:DOM事件,EventEmitter事件
ajax
見最上面的例子repeatWhen
例如 在上游事件結束以後的一段時間再從新訂閱
const Observable = require("rxjs").Observable; require("rxjs/add/operator/repeatWhen") const notifier = ()=>{ return Observable.interval(1000); } const source$ = Observable.of(1,2,3); // const source$ = Observable.create(observer=>{ // observer.next(111); // return { // unsubscribe(){ // console.log("on Unsubscribe") // } // } // }); const repeat$ = source$.repeatWhen(notifier); repeat$.subscribe(console.log,null,()=>console.log("Complete")) // 每隔一秒產生一次 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // ^C
defer
延遲建立Observable
針對Observable佔用內存比較大的狀況,懶加載
const Observable = require("rxjs").Observable; require("rxjs/add/observable/defer"); require("rxjs/add/observable/of"); const observableFactory = ()=>Observable.of(1,2,3); const source$ = Observable.defer(observableFactory)
功能需求 操做符 把多個數據流以首尾相連的方式合併 concat
,concatAll
把多個數據流以先到先得的方式合併 merge
,mergeAll
把多個數據流中的數據以一一對應的方式合併 zip
和zipAll
持續合併多個數據流中最新產生的數據 combineLatest
,combineAll
,withLatestFrom
從多個數據流中選取第一個產生內容的數據流 race
在數據流前面添加一個指定數據 startWith
只獲取多個數據流最後產生的數據 forkJoin
從高階數據流中切換數據源 switch
,exhaust
concat
- 實例方法
- 靜態方法,若是兩個數據沒有前後關係,推薦使用此方法
實例方法
const Observable = require("rxjs").Observable; require("rxjs/add/operator/of") require("rxjs/add/operator/concat") const source$1 = Observable.of(1,2,3); const source$2 = Observable.of(4,5,6); source$1.concat(source$2).subscribe(console.log,null,()=>console.log("Complete"))
靜態方法
const Observable = require("rxjs").Observable; require("rxjs/add/operator/of") require("rxjs/add/observable/concat") const source$1 = Observable.of(1,2,3); const source$2 = Observable.of(4,5,6); Observable .concat(source$1,source$2) .subscribe(console.log,null,()=>console.log("Complete"))
`concat`在將上一個數據源傳遞下去的時候會調用上一個`Observable`的`unsubscribe`,若是上一個`Observable`一直爲完結,後續的都不會被調用 ```javascript const source$1 = Observable.internal(1000); const source$2 = Observable.of(1); const concated$ = Observable.concat(source$1,source$2); // 此時 source$2永遠不會被調用 ``` 在此推測:`rxjs/add/operator/*`下的屬性都是實例屬性,`rxjs/add/observable/*`下的屬性都是實例屬性
merge
先到先得
merge用在同步數據的狀況下和concat表現只,不建議使用
const Observable = require("rxjs").Observable; require("rxjs/add/operator/merge"); require("rxjs/add/operator/map"); require("rxjs/add/observable/timer"); const source$1 = Observable.timer(0, 1000).map(x => x + "A"); const source$2 = Observable.timer(500, 1000).map(x => x + "B"); const source$3 = Observable.timer(1000, 1000).map(x => x + "C"); // 此時 source$1與source$2永遠不會中止,因此 source$1 .merge(source$2, source$3, /*此參數限制了合併的Observable的個數*/ 2) .subscribe(console.log, null, () => console.log("Complete")); // 0A // 0B // 1A // 1B // 2A // 2B // 3A // 3B // 4A // 4B // ^C