《深刻淺出RxJS》讀書筆記

rxjs的引入

// 若是以這種方式導入rxjs,那麼整個庫都會導入,咱們通常不可能在項目中運用到rxjs的全部功能
const Rx = require('rxjs');

解決這個問題,可使用深鏈deep link的方式,只導入用的上的功能ajax

import {Observable} from 'rxjs/Observable';

這樣能夠減小沒必要要的依賴,不光能夠優化打包文件的大小,還有利於代碼的穩定性npm

另外目前最新的一種解決方案就是Tree Shaking, Tree Shaking只對import語句導入產生做用,對require不起做用。由於tree shaking的工做方式是對代碼靜態分析,import只能出如今代碼的第一層,不能出如今if分支中。而require能夠出如今if分支中,參數也是能夠動態產生的字符串,因此只能動態執行時才知道require函數式如何執行的,這裏Tree Shaking就不起做用了。數組

rxjs中Tree Shaking不起做用

實際項目中,若是不會使用不少RxJS的功能,建議仍是避免導入所有RxJS的作法,使用npm導入而後經過打包工具來組合promise

Observer的完結

爲了讓Observable有機會告訴Observer已經沒有更多數據了,須要有另一種通訊機制。在Rxjs中,實現這種通訊機制的就是Observer的complete函數緩存

若是你無法預測你的程序會不會出現異常,那麼就須要使用error參數,若是不須要能夠直接給個Null做爲第二個參數異步

const theObserver = {
    next: item => console.log(item),
    null,
    complete: () => console.log('No More Data')
};

什麼時候完結這個Observable對象須要Observable主動調用complete()
在Observable發生error以後,再也不會調用後面的complete().由於在一個Observable對象中,要麼是完結狀態,要麼是出錯狀態。一旦進入出錯那麼就終結了。函數

Observable 可觀察的對象
Observer 觀察者
聯繫二者的橋樑就是subscribe
在Rxjs中,發佈者就是Observable,觀察者就是subscribe函數,這樣就能夠吧觀察者和發佈者聯繫起來工具

如何取消訂閱

const Observable = Rx.Observable;
const onSubscribe = observer => {
    let number = 1;
    const handle = setInterval(() => {
        observer.next(number++);
    }, 1000);
    return {
        unsubscribe: () => {
            clearInterval(handle);
        }
    };
};
const source$ = new Observable(onSubscribe);
const subscription = source$.subscribe(item => console.log(item));
setTimeout(() => {
    subscription.unsubscribe();
}, 3500);

Observable產生的事件,只有Observer經過subscribe訂閱以後纔會收到,在unsubscribe以後就再也不收到優化

Hot Observable和Cold Observable

若是一個Observable對象同時有多個Observer訂閱,若是A在B以前訂閱,那麼B該不應訂閱到錯過的那些數據流。
若是錯過就錯過了那麼這樣的Observable成爲Hot,可是若是B仍然從頭開始訂閱這個Observable那麼這樣的成爲Coldui

若是每次訂閱的時候, 已經有⼀個熱的「⽣產者」準備好了, 那就是Hot Observable, 相反,若是每次訂閱都要產⽣⼀個新的⽣產者, 新的⽣產者就像汽車引擎⼀樣剛啓動時確定是冷的, 因此叫Cold Observable

複雜的問題能夠被分解爲三個小問題

  • 如何產生事件
  • 如何響應事件
  • 什麼樣的發佈者關聯什麼樣的觀察者,也就是什麼時候調用subscribe

Observable產生的事件,只有Observer經過subscribe訂閱以後纔會收到,在unsubscribe以後就不會收到
Observable.create()用來建立一個Observable對象
在RXJS中,和數組的map同樣,做爲操做符的map也接受一個函數參數,不一樣之處是,對於數組的map,是把每一個數組元素都映射爲一個新的值,組合成一個新的數組

操做符分類

  • 建立類(creation)
  • 轉化類(transformation)
  • 過濾類(filtering)
  • 合併類(combination)
  • 多播類(multicasting)
  • 錯誤處理類(error Handling)
  • 輔助⼯具類(utility)
  • 條件分⽀類(conditional&boolean)
  • 數學和合計類(mathmatical&aggregate)

靜態操做符的導入路徑rxjs/add/observable/
實例操做符的導入路徑rxjs/add/operator/
在鏈式調用中,靜態操做符只能出如今首位,實例操做符則能夠出如今任何位置。

Tree Shaking幫不上Rxjs什麼忙,由於Tree Shaking只能作靜態代碼檢查,並非程序運行時去檢測一個函數是否真的被調用、只有一個函數在任何代碼中都沒有引用過,纔會認爲這個函數不會被引用。可是RxJS任何一個操做符都是掛在Observable類或者Observable.prototype上的, 賦值給Observable或者Observable.prototype上某個屬性在Tree Shaking看來就是被引⽤, 因此, 所
有的操做符, 無論真實運⾏時是否被調⽤, 都會被Tree Shaking認爲是會⽤到的代碼, 也就不會當作死代碼刪除。

退訂資源的基本原則:當再也不須要某個Observable對象獲取數據的時候,就要退訂這個Observable對象
在對上游的數據處理中,利用try...catch...的組合捕獲project調用的可能的錯誤,若是真的有錯誤,那就調用下游的error函數

const sub = this.subscribe({
    next: value => {
        try{
            observer.next(project(value))
        }catch(err) {
            observer.error(err);
        }
    },
    error: err => observer.error(err),
    complete: () => observer.complete()
});

關聯Observable

  1. 給Observable打補丁

這種方式比較簡單,能夠直接綁定在prototype上,若是是靜態屬性直接綁定在類上面

  1. 使用bind綁定特定的Observable對象
// 好比咱們本身建立了一個map方法
function map(project) {
    return new Observable(observer => {
        const sub = this.subscribe({
            next: value => observer.next(project(value)),
            error: err => observer.next(error),
            complete: () => observer.complete()
        });
        return {
            unsubscribe: () => {
                sub.unsubscribe();
            }
        };
    });
}
// 這個時候咱們就能夠主動使用bind改變this的指向
const result$ = map.bind(source$)(x => x * 2);
// 或者直接使用call
const result$ = map.call(source$, x => x * 2);
  1. 使用lift

lift是Observable的實例函數,它會返回一個新的Observable對象,經過傳遞給lift的函數參數能夠賦予這個新的Observable對象特殊的功能

function map(project) {
    return this.lift(function(source$) {
        return source$.subscribe({
            next: value => {
                try{
                    this.next(project(value));
                }catch(err) {
                    this.error(err);
                }
            },
            error: err => this.error(error),
            complete: () => this.complete()
        });
    });
}
Observable.prototype.map = map;

create

Observable.create() 其實就是簡單的調用了Observable的構造函數

Observable.create = function(subscribe) {
    return new Observable(subscribe);
}

of

range

range(1, 10) 從1開始吐出10個數據
range(1.5, 3) 從1.5開始吐出3個數據,每次加1

generate

generate相似一個for循環,設定一個初始值,每次遞增這個值,知道知足某個條件爲止
使用generate實現range功能

const range = (start, count) => {
    const max = start + count;
    return Observable.generate(
        start,
        value => value < max,
        value => value + 1,
        value => value
    );
};

全部可以使用for循環完成的操做,均可以使用generate來實現

repeat 重複數據的數據流

const source$ = Observable.of(1,2,3);
const repeated$ = source$.repeat(10);
// 將source$中的數據流重複10遍

empty()

產生一個直接完結的Observable對象

throw()

產生的Observable對象什麼都不作,直接拋出錯誤

never()

產生的Observable對象什麼也不作,既不吐出數據,也不產生錯誤

interval()

接受一個數值類型的參數,表明產生數據的間隔毫秒數

timer()

第一個參數能夠是一個數值,表示多少毫秒以後吐出第一個數值0
若是存在第二個參數,那就會產生一個持續吐出數據的Observable對象,第二個參數就是時間間隔

// 2s後。每隔1s產生一個數值,該數值從0開始遞增
const source$ = Observable.timer(2000, 1000);

from()

能夠將一切轉化爲Observable

fromPromise()

能夠將Promise對象轉化爲Observable對象,Promise若是成功則調用正常的成功回調,若是失敗則調用失敗的回調

fromEvent()

將DOM事件轉化爲Observable對象中的數據

// 將點擊事件轉化爲Observable
const source$ = Observble.fromEvent(document.querySelector('#id'), 'click');

ajax()

用來將ajax的返回轉化爲Observable對象

repeatWhen()

接受一個函數做爲參數,這個函數在上游第一次產生異常是被調用,這個函數應該返回一個Observable對象

const notifier = () => {
    return Observable.interval(1000);
};
const source$ = Observable.of(1,2,3);
const repeat$ = source$.repeatWhen(notifier);

defer()

當defer產生的Observable對象唄訂閱的時候,defer的函數參數就會被調用,逾期這個函數返回另一個Observable

const observableFactory = () => Observable.of(1,2,3);
const source$ = Observable.defer(observableFacatory);

合併類操做符

很多合併類操做符都有兩種形式,既提供靜態操做符,又提供實例操做符。

concat

concat能夠將多個Observable的數據內容一次合併

const source1$ = Observable.of(1,2,3);
const source2$ = Observable.of(4,5,6);
const concated$ = source1$.concat(source2$);
// 或者靜態操做符
const concated$ = Observable.concat(source1$, source2$);

concat開始從下一個Observable抽取數據是發生在前一個Observable對象完結以後,因此參與到這個concat之中的Observable對象應該都能完結。若是一個Observable對象不完結,那排在後面的Observable對象永遠沒有上場的機會

// source1$不完結,永遠輪不到source2$上場
const source1$ = Observable.interval(1000);
const source2$ = Observable.of(1);
const concated$ = source1$.concat(source2$);

merge

先到先得快速經過
merge一樣支持靜態和實例形式的操做符

const Observable = Rx.Observable;
const source1$ = Observable.timer(0, 1000).map(x => x + 'A');
const source2$ = Observable.timer(500, 1000).map(x => x + 'B');
const merged$ = Observable.merge(source1$, source2$);
merged$.subscribe(console.log, null, () => console.log('complete'));

merge第一時間會subscribe上游全部的Observable,而後纔去先到先得的策略,任何一個Observable只要有數據下來,就會傳給下游的Observable對象
image

merge的第一個Observable若是產生的是同步數據流,那會等第一個同步數據流產生完畢以後,再回合併下一個Observable對象,所以merge的主要適用場景仍然是異步數據流。一個比較經常使用的場景就是用於合併DOM事件

merge還有一個可選的參數concurrent,用於指定同時合併的Observable對象的個數

const source1$ = Observable.timer(0, 1000).map(x => x+'A');
const source2$ = Observable.timer(500, 1000).map(x => x+'B');
const source3$ = Observable.timer(1000, 1000).map(x => x+'C');
const merged$ = source1$.merge(source2$, source3$, 2);
merged$.subscribe(console.log, null, () => console.log('complete'));
// 0A 0B 1A 1B 2A 2B...

這裏就限定了優先合併2個Observable對象。而第一二個又不會完結,因此source3$沒有出頭之日。

zip

zip將上游的兩個Obserable合併,而且將他們中的數據一一對應。

// 基本用法
const source1$ = Observable.of(1,2,3);
const source2$ = Observable.of(4,5,6);
const zipped$ = Observable.zip(source1$, source2$);
zipped$.subscribe(console.log, null, () => console.log('completed'));
// [1,4] [2,5] [3,6] completed

當使用zip的時候,它會馬上訂閱上游Observable,而後開始合併數據。對於zip而言上游任何一個Observable完結,zip只要給這個完結的Observable對象吐出全部的數據找到配對的數據,那麼zip就會給下游一個complete信號

const source1$ = Observable.interval(1000);
const source2$ = Observable.of('a', 'b', 'c');
// [0, 'a'] [1, 'b'] [2, 'c'] complete

可是這裏也會有一個問題,若是某個上游的source1$吐出的數據很快,可是source$2吐出的數據慢,那麼zip就不得不先存儲source1$的數據

若是使用zip組合超過兩個Observable對象,遊戲規則依然同樣,組合而成的Observable吐出的數據依然是數組

combineLatest

合併最後一個數據,從全部輸入Observable對象中那最後一個產生的數據(最新數據),而後把這些數據組合起來傳給下游。

const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
const result$ = source1$.combineLatest(source2$);

image

image

咱們也能夠自由的定製下游數據

const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
const project = (a, b) => `${a} and ${b}`;
const result$ = source1$.combineLatest(source2$, project);

多重依賴的問題:

const original$ = Observable.timer(0, 1000);
const source1$ = original$.map(x => x + 'a');
const source2$ = original$.map(x => x + 'b');
const result$ = source1$.combineLatest(source2$);

withLatestFrom

功能相似於combineLatest,可是給下游推送數據只能由一個

const source1$ = Observable.timer(0, 2000).map(x => 1000 * x);
const source2$ = Observable.timer(500, 1000);
const result$ = source1$.withLatestFrom(source2$, (a,b) => a + b);
// 101 203 305 407...

race

第一個吐出數據的Observable對象就是勝者,race產生的Observable就會徹底採用Observable對象的數據,其他的輸入Observable對象則會被退訂而拋棄。

const source1$ = Observable.timer(0, 2000).map(x => x + 'a');
const source2$ = Observable.timer(500, 2000).map(y => y + 'b');
const winner$ = source1$.race(source2$);
winner$.subscribe(console.log);
// 1a 2a 3a...

startWith

讓一個Observable對象在被訂閱的時候,老是先吐出指定的若干數據

const origin$ = Observable.timer(0, 1000);
const result$ = origin$.startWith('start');
// start
// 0
// 1

startWith的操做符就是爲了知足鏈式調用的需求

original$.map(x => x * 2).startWith('start').map(x => x + 'ok');

forkJoin

只有當全部的Observable對象都完結,肯定不會有新的數據產生的時候,forkJoin就會把全部輸入的Observable對象產生的最後一個數據合併成給下游惟一的數據

const source1$ = Observable.interval(1000).map(x => x + 'a').take(1);
const source2$ = Observable.interval(1000).map(x => x + 'b').take(3);
const concated$ = Observable.forkJoin(source1$, source2$);
concated$.subscribe(console.log);
// ["0a", "2b"]

高階Observable

所謂高階Observable,指的就是產生數據依然是Observable的Observable

// 高階Observable示例
const ho$ = Observable.interval(1000).take(2)
    .map(x => Observable.interval(1500).map(y => x + ':' + y));

concatAll

會對其內部的Observable對象作concat操做

const ho$ = Observable.interval(1000)
    .take(2)
    .map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
const concated$ = ho$.concatAll();
// 0:0 0:1 1:0 1:1

concatAll首先會訂閱上游產生的第一個內部的Observable對象,抽取其中的數據,而後只有當第一個Observable完結的時候纔回去訂閱第二個Observable。這樣很容易產生數據積壓

mergeAll

和concatAll()功能相似,可是隻要上游產生了數據,mergeAll就會當即訂閱

switch

switch的含義就是切換,老是切換到最新的內部Observable對象獲取數據。每當switch的上游高階Observable產生一個內部Observable對象,witch都會⽴刻訂閱最新的內部Observable對象上, 若是已經訂閱了以前的內部Observable對象, 就會退訂那個過期的內部Observable對象, 這個「⽤上新的, 捨棄舊的」動做, 就是切換。

const ho$ = Observable.interval(1000)
    .take(2)
    .map(x => Observable.interval(1500).map(y => x+':'+y).take(2));
const result$ = ho$.switch();

exhaust

exhaust在耗盡當前內部Observable數據以前不會切換到下一個內部Observable對象。和switch同樣,exhaust產生的Observable對象完結前提是最新的內部Observable對象完結並且上游高階Observable對象完結

count

統計上游Observable對象吐出全部數據的個數

const source$ = Observable.of(1,2,3).concat(Observable.of(4,5,6));
const count$ = source$.count(); // 6

max, min

取的最小值和最大值

reduce

規約統計

const source$ = Observable.range(1, 100);
const reduced$ = source$.reduce((acc, current) => acc + current, 0);
// 參數基本和js中的一致

find和findIndex

在某些狀況下,咱們但願能夠將find和findIndex結合在一塊兒,咱們能夠這樣作

const source$ = Observable.of(3,1,4,1,5,9);
const isEven = x => x % 2 === 0;
const find$ = source$.find(isEven);
const findIndex$ = source$.findIndex(isEven);
const zipped$ =find$.zip(findIndex$);

defaultIfEmpty

defaultIfEmpty()除了檢測上游Observable對象是否爲空以外,還要接受一個默認值做爲參數,若是上游Observable對象是空的,那就把默認值吐出來

const new$ = source$.defaultIfEmpty('this is default');

filter

過濾

first

若是first不接受參數,那麼就是獲取的上游的第一個數據
若是first接受函數做爲參數,那麼就會獲取上游數據中知足函數條件的第一個數據

last

工做方式與first恰好相反,從上游數據的末尾開始尋找符合條件的元素

takeWhile

接受一個斷定函數做爲參數

const source$ = Observable.range(1, 100);
const takeWhile$ = source$.takeWhile(
    value => value % 2 === 0
);

takeUtil

takeUtil是一個里程碑式的過濾類操做符,由於takeUtil讓咱們能夠用Observable對象來控制另外一個Observable對象的數據產生

在RxJS中,建立類操做符是數據流的源頭,其他全部操做符最重要的三類就是合併類、過濾類和轉化類。

map

map用來改變數據流中的數據,具備一一對應的映射功能

const source$ = Rx.Observable.of(1,2,3);
// 注意這裏只能使用普通函數,箭頭函數中的this是綁定在執行環境上的,沒法獲取context中的值
const mapFunc = function(value, index) {
    return `${value} ${this.separator} ${index}`;
}
const context = {separator: ':'};
const result$ = source$.map(mapFunc, context);

result$.subscribe(
    console.log,
    null,
    () => console.log('complete')
);

mapTo

不管上游產生什麼數據,傳給下游的都是一樣的數據

// 將result$中的數據都映射成A
const result$ = source$.mapTo('A');

pluck

pluck就是把上游數據中特定字段的值出來

const source$ = Rx.Observable.of(
    {name: 'RxJS', version: 'v4'},
    {name: 'React', version: 'v15'},
    {name: 'React', version: 'v16'},
    {name: 'RxJS', version: 'v5'}
);
const result$ = source$.pluck('name');
result$.subscribe(
    console.log,
    null,
    () => console.log('complete')
);
// RxJS
// React
// React
// RxJS
// complete

上面的代碼中,pluck方法將對象中的鍵對應的值獲取出來

獲取DOM事件中的值

const click$ = Rx.Observable.fromEvent(document, 'click');
const result$ = click$.pluck('target', 'tagName');
// HTML

將上游數據放在數組中傳給下游操做符

bufferTime

用一個參數來指定產生緩衝窗口的間隔

const source$ = Rx.Observable.timer(0, 100);
// 參數400,就會把時間劃分爲連續的400毫秒長度區塊,上游傳來的數據不會直接傳給下游,而是在該時間區塊的開始就新建一個數組對象推送給下游
const result$ = source$.bufferTime(400);

若是上游在短期內產生了大量的數據,那bufferTime就會有很大的內存壓力,爲了防止出現這種狀況,bufferTime還支持第三個可選參數,用於指定每一個時間區間內緩存的最多數據個數

const result$ = source$.bufferTime(400, 200, 2);

bufferCount

根據個數來界定

bufferWhen

接受一個函數做爲參數,這個參數名爲closingSelector

bufferToggle

buffer

將上游數據放在Observable中傳給下游的操做符

windowTime

用一個參數來指定產生緩衝窗口的間隔

windowCount

windowToggle

window

高階map

全部的高階map的操做符都有一個函數參數project,可是和普通map不一樣,普通map只是把一個數據映射成另一個數據,高階map的函數參數project把一個數據映射成一個Observable對象

const project = (value, index) => {
    return Observable.interval(100).take(5);
}
相關文章
相關標籤/搜索