RxJS筆記

RxJS

《深刻淺出RxJS》讀書筆記

遺留問題

  1. Observable的HOT與COLD對應的實際場景,以及在編碼中的體現

chapter1

html部分javascript

測試你對時間的感受
  按住我一秒鐘而後鬆手
  你的時間:毫秒
  1. jquery實現html

    var time = new Date().getTime();
    $("#hold-me")
        .mousedown(function(event) {
            time = new Date().getTime();
        })
        .mouseup(function(event) {
            if (time) {
                var elapse = new Date().getTime() - time;
                $("#hold-time").text(elapse);
                // 重置time 避免直接觸發mouseup事件,例如在A處點擊而後在B處up
                time = null;
            }
        });
  2. RxJS實現java

    const holdMeButton = document.getElementById("hold-me");
                const mouseDown$ = Rx.Observable.fromEvent(holdMeButton,"mousedown");
                const mouseUp$ = Rx.Observable.fromEvent(holdMeButton,"mouseup");
                // 獲取間隔時間
                const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(),(mouseUpEvent,mouseDownEvent)=>{
                    return mouseUpEvent.timestamp - mouseDownEvent.timestamp
                });
    
                holdTime$.subscribe(ms=>{
                    document.getElementById("hold-time").innerText = ms;
                })
    
                holdTime$.flatMap(ms=>{
                    return Rx.Observable.ajax('https://timing-sense-score-board.herokuapp.com/score/'+ms)
                })
                .map(e=>e.response)
                .subscribe(res=>{
                    document.getElementById("rank").innerText = `你超過了${res.rank}% 的用戶`
                })

chapter2

Koa2的使用

主要用來加載靜態資源,因此使用到了 koa, koa-static
const path = require("path");
const koa = require("koa");
const serve = require("koa-static");
const app = new koa();

app.use(async function (ctx,next) {
    console.log("收到請求...")
    await next()
    console.log(`"${ctx.path}"請求 已處理...`)
})

app.use(serve(path.resolve(__dirname, "../src"))).listen(3001,function(err){
    if(err) throw err;
    console.log("程序啓動成功")
});

ObservableObserver

Observable 可被觀察的對象,Observer觀察者,Observer經過subscribe來觀察Observable對象

RxJS的數據流就是Observable對象:node

  1. 觀察者模式
  2. 迭代器模式

舉個栗子

// 使用 deep-link方式引入函數
const Observable = require("rxjs").Observable;

/*
 * 定義Observable對象的行爲,會產生數據,調用訂閱者的next方法
 * 1. 此處的Observer與訂閱者行爲 theObserver並非同一個對象,而是對theObserver的包裝
 * 2. 若是observer.error被調用,以後的complete或者next就不會被調用啦,同理,complete被調用以後,也不會
 *    再調用next或者error
 * 3. 若是error或者complete一直未調用,則observer就一直在內存中等待被調用
*/
const onSubscribe = observer =>{
    observer.next(1);
    observer.error(2);
    observer.complete(3);
}
// 產生一個Observable對象
const source$ = new Observable(onSubscribe);
// 定義觀察者的行爲 消費Observable對象產生的數據
const theObserver = {
    next:item => console.log(item),
    error:item => console.error(item),
    complete:item => console.log("已完成"),
}
// 創建Observable與Observer的關係
source$.subscribe(theObserver)

退訂subscribe

在訂閱一段事件以後observer再也不響應吐出的信息了,這時能夠退訂,可是Observeable還會一直產生數據
const Observable = require("rxjs").Observable;

const onSubscribe = observer =>{
    let n = 1;
    const handle = setInterval(()=>{
        console.log(`in onSubscribe ${n}`)
        // if(n>3){
        //     observer.complete()
        // }
        observer.next(n++);
    },1000)
    return {
        unsubscribe(){
            // clearInterval(handle)
        }
    }
}

const source$ = new Observable(onSubscribe);

const theObserver = {
    next:item => console.log(item)
}

let subscription = source$.subscribe(theObserver)

setTimeout(()=>{
      // 此處的unsubscribe也是封裝過的
    subscription.unsubscribe()
},3500)

node中執行,會一直打印 in onSubscribe *,可是source$不會再響應jquery

Chapter3 操做符基礎

const Observable = require("rxjs/Observable").Observable;
const of = require("rxjs/observable/of").of;
const map = require("rxjs/operator/map").map;
// 新建一個操做符
// 此處this是外部變量,致使此operator再也不是純函數
Observable.prototype.double = function(){
    // return this::map(x=>x*2)
    return map.call(this,x=>x*2)
}

const source$ = of(1,3,4);
const result$ = source$.double();

result$.subscribe(value=>console.log(value))

lettable/pipeable操做符

解決須要使用call或者bind改變this的操做,這樣是依賴外部環境的,不屬於純函數,也會喪失TS的類型檢查優點
  • lettableObservable對象傳遞給下文,避免使用thisajax

    const Observable = require("rxjs/Observable").Observable;
    require("rxjs/add/observable/of").of;
    require("rxjs/add/operator/map").map;
    require("rxjs/add/operator/let").let;
    
    const source$ = Observable.of(1,2,3);
    const double$ = obs$ => obs$.map(v=>v*2);
    // 接受上文,傳遞到下文
    const result$ = source$.let(double$);
    
    result$.subscribe(console.log)
不引入`map`補丁,開發**lettable**寫法的操做符

// ES5實現
function map(project){promise

return function(obj$){
  // 經過上面的Observable生成一個新Observable
  return new Observable(observer=>{
    return obj$.subscribe({
      next:value=>observer.next(project(value)),
      error:err=>observer.error(err),
      complete:()=>observer.complete()
    })
  })
}

}
// 添加操做符
var result$ = source$.let(map(x => x * 3));app

// ES6實現
const map6 = fn => obj$ =>koa

new Observable(observer =>
      obj$.subscribe({
          next: value => observer.next(fn(value)),
          error: err => observer.error(err),
          complete: () => observer.complete()
      })
  );

// 添加操做符
var result$ = source$.let(map6(x => x * 4));異步

`pipeable`是`lettable`的別稱,方便對於`lattable`的理解,V6以上才支持

## Chapter4 建立數據流

> 大多數的操做符是靜態操做符

### 基礎操做符

1.  `create`簡單的返回一個Observable對象
Observable.create = function(subscribe){
  return new Observable(subscribe)
}
```
  1. of列舉數據

    import {Observable} from "rxjs/Observable";
    import "rxjs/add/observable/of"
    // 依次吐出數據,一次性emit
    const source$ = Observable.of(1,2,3);
    // 訂閱
    // 第一個參數是next,第二個參數是error回調,第三個參數是complete回調
    source$.subscribe(console.log,null,()=>{console.log("Complete")})
  2. range產生指定範圍的數據

    const sourc$ = Observable.range(/*初始值*/1,/*個數*/100);
    // 每次只能步進 1
  3. generate循環建立

    至關於for循環
    const source$ = Observable.generate(
      // 初始值
      2,
      // 判斷條件
      value=> value < 10,
      // 步進
      value=> value+0.5,
      // 函數體,產生的結果
      value=> value*value
    )

    使用generate代替range

    const range = function(min,count){
      const max = min + count;
      return Observable.generate(min,v=>vv+1,v=>v*v)
    }
  4. repeat重複數據的數據流

    實例操做符,經過import 'rxjs/add/operator/repeat'引入

    1. V4版本中repeat是靜態屬性,這樣在使用Observable.repeat(1,2)重複1兩次,這樣數據就夠靈活
    2. V5版本中改成實例屬性以後,Observable.of(1,2,4).repeat(2),將產生的1,2,3重複兩次,功能更增強大
    const Observable = require("rxjs").Observable;
    require("rxjs/add/operator/repeat");
    
    const source$ = Observable.create(observer => {
        setTimeout(() => {
            observer.next(1);
        }, 1000);
        setTimeout(() => {
            observer.next(2);
        }, 2000);
        setTimeout(() => {
            observer.next(3);
        }, 3000);
        setTimeout(() => {
            observer.complete(1);
        }, 4000);
        return {
            unsubscribe(){
                console.log("on Unsubscribe")
            }
        }
    });
    
    const repeat$ = source$.repeat(2)
    
    repeat$.subscribe(console.log,null,()=>{console.log("Complete")})
    
    // 1
    // 2
    // 3
    // on Unsubscribe
    // 1
    // 2
    // 3
    // Complete
    // on Unsubscribe
    • 若是沒有observer.complete()repeat不會被調用

      repeat以complete爲契機會再次執行數據源,若是上游一直沒有complete下游就不會執行
    • 由於repeat的存在,第一次數據源執行完(以complete爲契機)後並不會執行observer的complete回調
  5. empty,throw,never

建立異步數據的Observable對象

  1. intervaltimer

    interval相似於setInterval

    require('rxjs/add/observable/interval')
    // 每隔1000ms產生一個數據,初始值爲0,步進爲1
    Observable.interval(1000)'

    timer 是setTimeout的超集

    // 1000ms後開始產生數據,以後每隔1000ms產生一個數據,功能至關於interval
    Observable.timer(1000,1000)
    // 指定日期
    Observable.time(new Date(new Date().getTime() + 12000))
  2. from 把一切轉化爲Observable

    1. 將全部的Iterable的對象都轉化爲Observable對象
    2. 能夠將Promise對象轉化爲Observable對象,功能與fromPromise相同
  3. fromPromise異步處理的對接

    const Observable = require("rxjs").Observable;
    require("rxjs/add/observable/fromPromise");
    
    const promise = Promise.resolve(123);
    Observable.fromPromise(promise).subscribe(console.log, null, () =>
        console.log("Complete")
    );
    //123
    //Complete
    const promise1 = Promise.reject("error");
    Observable.from(
        console.log,
        err => console.log("catch", err),
        () => console.log("Complete!")
    );
    // 未捕獲的Promise錯誤
    // (node:765) UnhandledPromiseRejectionWarning: error
    // (node:765) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing
    // inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (
    // rejection id: 1)
    // (node:765) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
  4. fromEvent鏈接DOM與RxJS的橋樑

    const event$ = Observable.fromEvent(document.getElementById("btn"),"click");
    event$.subscribe(event=>{
      // Do something
    })

    在NodeJs中能夠與EventEmitter交互

    const Observable = require("rxjs").Observable;
    const EventEmitter = require("events");
    require("rxjs/add/observable/fromEvent")
    
    const emitter = new EventEmitter();
    
    const source$ = Observable.fromEvent(emitter,"msg");
    source$.subscribe(console.log,null,()=>console.log("Complete"))
    
    emitter.emit("msg",1)
    // 1
    emitter.emit("msg","haha")
    // haha
    emitter.emit("a-msg","haha")
    //
    emitter.emit("msg",'nihao')
    // nihao

    fromEventHot Observable,也就是數據的產生和訂閱無關,對於fromEvent來講,數據源是外部產生的,不受RxJS控制,這是Hot Observable對象的特色

  5. fromEventPattern針對不規範的事件源

    規範的事件源:DOM事件,EventEmitter事件
  6. ajax見最上面的例子
  7. repeatWhen

    例如 在上游事件結束以後的一段時間再從新訂閱
    const Observable = require("rxjs").Observable;
    require("rxjs/add/operator/repeatWhen")
    
    const notifier = ()=>{
        return Observable.interval(1000);
    }
    
    const source$ = Observable.of(1,2,3);
    // const source$ = Observable.create(observer=>{
    //     observer.next(111);
    //     return {
    //         unsubscribe(){
    //             console.log("on Unsubscribe")
    //         }
    //     }
    // });
    const repeat$ = source$.repeatWhen(notifier);
    
    repeat$.subscribe(console.log,null,()=>console.log("Complete"))
    // 每隔一秒產生一次
    // 1
    // 2
    // 3
    // 1
    // 2
    // 3
    // 1
    // 2
    // 3
    // 1
    // 2
    // 3
    // 1
    // 2
    // 3
    // ^C
  8. defer延遲建立Observable

    針對Observable佔用內存比較大的狀況,懶加載
    const Observable = require("rxjs").Observable;
    require("rxjs/add/observable/defer");
    require("rxjs/add/observable/of");
    
    const observableFactory = ()=>Observable.of(1,2,3);
    const source$ = Observable.defer(observableFactory)

合併數據流

功能需求 操做符
把多個數據流以首尾相連的方式合併 concat,concatAll
把多個數據流以先到先得的方式合併 merge,mergeAll
把多個數據流中的數據以一一對應的方式合併 zipzipAll
持續合併多個數據流中最新產生的數據 combineLatest,combineAll,withLatestFrom
從多個數據流中選取第一個產生內容的數據流 race
在數據流前面添加一個指定數據 startWith
只獲取多個數據流最後產生的數據 forkJoin
高階數據流中切換數據源 switch,exhaust
  1. concat

    1. 實例方法
    2. 靜態方法,若是兩個數據沒有前後關係,推薦使用此方法
    • 實例方法

      const Observable = require("rxjs").Observable;
      require("rxjs/add/operator/of")
      require("rxjs/add/operator/concat")
      
      const source$1 = Observable.of(1,2,3);
      const source$2 = Observable.of(4,5,6);
      
      source$1.concat(source$2).subscribe(console.log,null,()=>console.log("Complete"))
    • 靜態方法

      const Observable = require("rxjs").Observable;
      require("rxjs/add/operator/of")
      require("rxjs/add/observable/concat")
      
      const source$1 = Observable.of(1,2,3);
      const source$2 = Observable.of(4,5,6);
      
      Observable
        .concat(source$1,source$2)
        .subscribe(console.log,null,()=>console.log("Complete"))
`concat`在將上一個數據源傳遞下去的時候會調用上一個`Observable`的`unsubscribe`,若是上一個`Observable`一直爲完結,後續的都不會被調用

```javascript
const source$1 = Observable.internal(1000);
const source$2 = Observable.of(1);
const concated$ = Observable.concat(source$1,source$2);
// 此時 source$2永遠不會被調用
```

在此推測:`rxjs/add/operator/*`下的屬性都是實例屬性,`rxjs/add/observable/*`下的屬性都是實例屬性
  1. merge先到先得

    merge用在同步數據的狀況下和concat表現只,不建議使用
    const Observable = require("rxjs").Observable;
    require("rxjs/add/operator/merge");
    require("rxjs/add/operator/map");
    require("rxjs/add/observable/timer");
    
    const source$1 = Observable.timer(0, 1000).map(x => x + "A");
    const source$2 = Observable.timer(500, 1000).map(x => x + "B");
    const source$3 = Observable.timer(1000, 1000).map(x => x + "C");
    
    // 此時 source$1與source$2永遠不會中止,因此
    source$1
        .merge(source$2, source$3, /*此參數限制了合併的Observable的個數*/ 2)
        .subscribe(console.log, null, () => console.log("Complete"));
    
    // 0A
    // 0B
    // 1A
    // 1B
    // 2A
    // 2B
    // 3A
    // 3B
    // 4A
    // 4B
    // ^C
相關文章
相關標籤/搜索