rxjs的世界

rxjs學習了幾個月了,看了大量的東西,在理解Observable的本文借鑑的是漁夫的故事,原文,知識的主線以《深刻淺出rxjs》爲主,動圖借鑑了rxjs中文社區翻譯的文章和國外的一個動圖網站react

正文:git

在思惟的維度上加入時間考量es6

1、函數響應式編程

Rxjs使用了一種不一樣於傳統的編程模式----函數響應式編程github

1.1 函數化編程

函數化編程對函數的使用有一些特殊的要求ajax

  • 聲明式
  • 純函數
  • 數據不可變性 保持原有數據不變,讓新的數據發生變化

爲何最近函數式編程崛起數據庫

  • 從硬件發展角度,函數式編程的性能問題已經不是問題
  • 語言演進,在一個面向對象的語言中添加了函數化編程的概
  • 函數化編程可讓數據流動更明顯

1.2 響應式編程

對面數據流,能夠天然的處理編程

響應式編程裏,最有名的框架Reaactive Extension(ReactiveX 簡稱 Rx) 響應式的擴展json

Rx是一套經過可監聽流來作異步編程的API
最初由微軟提出,有各類語言的實現,等於爲那些語言新增了一些功能擴展
Rx誕生主要是爲了解決異步的問題api

1.3 函數響應式編程的優點

  • 數據流抽象了不少現實問題
  • 擅長處理異步操做
  • 把複雜問題分解成簡單問題的組合

2、基本概念

observable  可觀察量;感受到的事物   [əb'zɜːvəbl]
observer    n. 觀察者 [əb'zɜːvə]
subsecribe    vi. 訂閱   [səb'skraɪb]
subscription   n. 捐獻;訂閱;訂金;簽署    [səb'skrɪpʃ(ə)n]

Observable 就是「能夠被觀察的對象」既「可被觀察者」
Obserer 就是「觀察者」
subscribe 就是二者之間的橋樑數組

2.1 河流的故事

  1. (1)Rx.Observable 是一條河流。
  2. (2)source 做爲一條在河流中捕魚船上的竹筒。魚(data)能夠一個一個的鑽到竹筒中(source)
var source = Rx.Observable.create(subscriber)
  1. (3) subscriber 是位捕魚的漁人,是位好心人,主要任務是把捕獲的魚(data)扔向岸邊的饑民
var subscriber = function(observer) {
    var fishes = fetch('http://www.oa.com/api'); // 捕獲到魚
    observer.next(fishes.fish1); // 把捕獲的第一條魚扔向岸邊的饑民
    observer.next(fishes.fish2); // 把捕獲的第二條魚扔向岸邊的饑民
}
  1. (4)observer 做爲岸邊上饑民。由於來自天南地北,方言不一樣,因此描述本身在獲取到魚後的吃法表述時語法不一樣,但其實實質都是同樣的,有魚了(value=> {})怎麼辦,沒魚了(error => {})怎麼辦,當天的魚扔完了(complete => {})怎麼辦。

方式一:

observer = (
    value => { console.log(value); },
    error => { console.log('Error: ', error); },
    () => { console.log('complete') }
)
source.subscribe(observer)

方式二:

observer = function(value) {
    console.log(value);
}
source.subscribe(observer); 
// 這根捕魚的竹筒不少饑民都翹首以待(subscribe),因此竹筒(source)會被新來的饑民訂閱(subscribe).
固然,饑民不訂閱天然漁人就不會把竹筒(source)中捕獲的魚扔給他。

方式三:

observer = {
    next: function(value) {
        console.log(value);
    },
    error: function(error) {
        console.log('Error: ', error)
    },
    complete: function() {
        console.log('complete')
    }
}
source.subscribe(observer);

//subscribe 河流source知道河流的兩邊有哪些百姓須要救濟,
因此會幫助他subscribe漁人扔出的魚,這樣他就會收到魚了
source.subscribe(observer);
  1. (5)subscription 爲哪一個饑民訂閱了哪一個竹筒的清單。能夠從清單上劃去,那麼這個饑民就再不會受到漁人扔出的魚了
subscription = source.subscribe(observer1);
subscription.unsubscribe(); // 從清單上劃去饑民observer1的訂閱信息,由於observer1已經不是饑民了,不須要救濟了。

五個角色連接起來就是rxjs的實現過程

var 漁人 = function (饑民) {
    var fishes = fetch('server/api'); // 捕獲到必定數量的魚
    饑民.next(fishes.fish1); // 接下來把魚1扔給饑民
    饑民.next(fishes.fish1); // 接下來把魚1扔給饑民
} 
var 饑民1 = { // 饑民要想好不一樣種狀況下的應對方法,不能在沒有捕到魚的時候就餓死。
    next:function (fish) {
        // 有魚扔過來了,把fish煮了吃掉。
    },
    error: function(error) {
       // 捕獲的魚有毒,不能吃,因此要想其餘辦法填飽肚子,能夠選擇吃野菜什麼的,
    },
    complete: function() {
        // 當天的魚扔完了,那麼能夠回家了
    }
}
var 竹筒 = 河流.create(漁人); // 河流中來了一名漁人,那麼他必定會在河流中放下捕魚的竹筒。
清單 = 竹筒.subscribe(饑民1) // 竹筒被饑民1關注後,就能夠收到漁人扔出的魚了。
setTimeout(() => {
        清單.unsubscribe();  // 一年後,饑民擺脫困境,再也不須要救濟,就退訂這個竹筒了。把機會讓給別人。
}, 1年);

對應到真正的rxjs語法,咱們再來一遍

var subscriber = function(observer) { // 建立了一位漁人
    observer.next('fish1');
    observer.next('fish2');
    observer.complete();
}
var observer1 = { // 來了一位饑民1
    next: function(value) {
        console.log(`我接到魚${value}啦,不會捱餓咯`);
    },
    error: function(error) {
        console.log(`哎,捕到的魚由於${error}緣由不能吃`)
    },
    complete: function() {
        console.log('今天的魚發完了')
    }
}
var source = Rx.Observable.create(subscriber); // 河流中來了一名漁人,他在河流中放下捕魚的竹筒。
subscription = source.subscribe(observer1); // 竹筒被饑民1關注後,饑民1能夠收到漁人扔出的魚了。
setTimeout(()=> {
    subscription.unsubscribe(); // 3秒後饑民退訂了竹筒,給其餘饑民機會。
}, 3000);
打印出的結果以下:
// "我接到魚fish1嘮"
// "我接到魚fish2嘮"
// "今天的魚發完了"

下面是對捕魚的三個階段所碰到問題的解決方案

  • (1) 竹筒中如何才能產生魚
  • (2) 竹筒中有魚了,怎麼向外取
  • (3) 取出來後,魚被扔向岸邊的過程當中發生了什麼。

因此操做符的使用也是有前後順序的。

2.2 術語解讀

觀察者模式

  • 如何生成事件,這是發佈者的責任,在Rxjs中是Observalbe對象的工做
  • 如何響應事件,這是觀察者的責任,在Rxjs中由subscribe的參數決定
  • 什麼樣的發佈者關聯什麼樣的觀察者,也就是什麼時候調用subscribe

迭代器模式

2.3 Hot Observable 和 Cold Observable

在一個Observable對象有兩個Obserer對象來訂閱,並且這兩個並非同時訂閱的。
就會形成兩個狀況:

  • 錯過就錯過了 ----Hot Observable
  • 不能錯過,錯過的也得補發 ----Cold Observable

2.4 操做符

在現實的複雜問題,並不會創造一個數據流以後就直接經過subscribe街上一個Observer,每每須要對那個數據流作一系列的處理,而後才交給Observer。數據從管道的一段流入,途徑管道各個環節,當數據到達Observer的時候,已經被管道操做過。

就像上面故事同樣,拿到魚,以後可一作成魚湯,而後加上米飯,最後在給飢餓的人。

在Rxjs中有一系列用於產生Observable函數,那些函數有的憑空創造Observable對象,有的根據外部數據源產生Observable對象,更多的是根據其餘的Observable中的數據來產生新的Obsercable對象,也就是把上游數據轉化爲下游的數據。那些函數統稱爲操做符。

2.5 彈珠圖

彈珠圖,能夠形象且具體的方式來描述數據流

那個網址能夠生成:https://rxviz.com/

3、創造數據流

3.1 建立同步數據流

  1. create: 直接建立

  2. of: 列舉數據

當魚是現成的,可是是散裝的時候,好比昨天還存了幾條在船上,用of裝到竹筒中

var source$ = Observable.of(1, 2, 3);
  1. range: 指定範圍
var source$ = Observable.range(1, 100);
  1. genetate: 循環建立
const source$ = Observable.genetate(
    value => vlalue < 10,
    value => value + 2,
    value => value * value
)
  1. repeat: 重複數據的數據流
    是一個實例操做符
const source$ = Observable.of(1, 2, 3);
const reapeated$ = source$.repeat(10);
=>1,2,3,1,2,3 ... 重複了1,2,3 10次
  1. empty: 直接完結的Observable

  2. never: 一直待着,不作任何事

  3. throw:拋出錯誤

var source = Rx.Observable.empty(); // 一條魚都沒有捕捉到的狀況,
直接觸發observer中complete的執行
結果爲 // complete!
var source = Rx.Observable.never();  // 漁人累了,無論是捕到魚仍是捕
不到魚都沒有力氣向岸邊上的饑民發出告知了。
結果爲 // complete永遠都不會觸發
var source = Rx.Observable.throw('ill'); // 當漁人生病了,或者要去會個老朋友,會向岸邊的饑民(observer)用竹筒吶喊一聲告知,
這樣饑民就想別的辦法(觸發error方法)解決當天的食物問題。

3.2 建立異步數據流

  1. 定時產生interval和timer

對應js裏面的setInterval和setTimeout

const source$ = Observable.interval(1000); // 從0開始沒隔1秒輸出一個0,1,2

const source$ = Observable.timer(1000); // 1s以後產生0,而後結束
const source$ = Observable.timer(2000, 1000);// 2秒以後0, 而後沒隔1s產生1,2,3
  1. form: 可把一切轉化爲Observable,把數組、字符串、promise、Observable

  2. fromEvent(document.body,’click’); 轉化dom事件

  3. formEventPattern 自定義的事件

  4. ajax

Rx.Observable.fromEvent(document.querySelector('#getStart'), 'click')
    .subscribe(
        v => { 
            Rx.Observable.ajax('https://api.github.com/repos/ReactiveX/rxjs', {
                responseType: 'json'
            }).subscribe(value => {
                const startCount = value.response.stargazers_count;
                document.querySelector('#text').innerText = startCount;
            })
        }
    );
  1. repeatWhen: 過一段時間在repeat

  2. defer: 只有在訂閱的時候還會執行,節省內存

const observableFactory = () => Observable.ajax(ajaxUrl);
const source$ = Observable.defer(observableFactory)

4、合併數據流

4.1 基本的合併

  1. concat: 收尾相連
  • 相似數組的concat方法,可是隻有第一個結束還會取第二個,若是第一個是無限的就到不了第二個
  • 支持靜態和動態兩種方式

它按順序訂閱每一個輸入流併發出其中全部的值,同一時間只會存在一個訂閱。只有當前輸入流完成的狀況下才會去訂閱下一個輸入流並將其值傳遞給結果流。

當全部輸入流完成時,結果流就會完成,如何任意輸入流報錯,那麼結果流就會報錯。若是某個輸入流沒有完成的話,那麼結果流便不會完成,這意味着某些流永遠都不會被訂閱。

若是值發出的順序很重要,而且你想要傳給操做符的第一個輸入流先發出值的話,那麼請使用 concat 操做符。舉個例子,有兩個流,一個從緩存中獲取值,一個從遠程服務器獲取值。若是你想要將二者組合起來並確保緩存中的值先發出的話,就可使用 concat 。

  1. merge:先到先得快速經過

跟cancat不一樣的地方是merge會同時訂閱全部的上游Observable,而後對上游數據採起先到先得的策略。

image

  1. zip: 拉鍊式組合
    zip須要配對出現
const source1$ = Rx.Observable.of([1, 2, 3]);
const source2$ = Rx.Observable.of(['a', 'b', 'c']);

const zipped$ = Rx.Observable.zip(source1$, source2$);

zipped$.subscribe(
    console.log,
    null,
    () => console.log('complete')
)

image

  1. combineLatest: 合併最後一個數據

當任何一個上游Observable產生數據時,從全部輸入Observable對象中拿最後一次產生的數據(最新數據),而後把那些數據組合起來傳給下游。

const source1$ = Rx.Observable.timer(500, 1000);
const source2$ = Rx.Observable.timer(1000, 1000);
const result$ = source1$.combineLatest(source2$);

result$.subscribe(
    console.log,
    null,
    () => console.log('complete')
)

image

const a = stream('a', 200, 3, 'partial');
const b = stream('b', 500, 3, 'partial');
combineLatest(a, b).subscribe(fullObserver('latest'));
  1. withLatestFrom

withLatestFrom的功能相似於combineLatest,可是給下游推送數據只能由一個上游Observable對象驅動

concat/merge/zip/combineLatest都是支持靜態操做符合實例操做符兩種方式,並且做爲輸入的Observable對象地位都是對等的;可是withLatestFrom只有實例操做符,並且全部輸入Observable的地位不一樣,調用withLastestFrom的那個Observable對象起到主導數據生產節奏的做用,做爲參數的Observable對象只能貢獻數據,不能控制生產數據的時機。

當有一個主線流,同時還須要其餘流的最新值時,可使用此操做符。��withLatestFrom 與 combineLatest 有些相似,不一樣之處在於 combineLatest 是當任意輸入流發出值時,結果流都發出新的值,而 withLatestFrom 是隻有當主線流發出值時,結果流才發出新的值。

如同 combineLatest ,withLatestFrom 會一直等待每一個輸入流都至少發出一個值,當主線流完成時,結果流有可能在完成時從未發出過值。若是主線流不完成的話,那麼結果流永遠不會完成,若是任意輸入流報錯的話,結果流也將報錯。

在下面的動圖中,能夠看到 withLatestFrom 操做符組合了兩個流 A 和 B ,B 是主線流。每次 B 發出新的值時,結果流都會使用 A 中的最新值來發出組合值:

image

const a = stream('a', 3000, 3, 'partial');
const b = stream('b', 500, 3, 'partial');

b.pipe(withLatestFrom(a)).subscribe(fullObserver('latest'));
const { timer } = Rx;
const { map, withLatestFrom } = RxOperators;
// 沒隔2秒生產一個數據,經過map的映射,實踐產生的數據0、100、200
const source$ = timer(0, 2000).pipe(map(x => 100 * x));
// 從500毫秒,沒隔1秒生產一個從0開始的遞增數字序列
const secondSource$ = timer(500, 1000);

source$.pipe(
  withLatestFrom(secondSource$, (a, b) => a+b)
);

沒個2秒鐘輸出一行
101
203
305
407
時間 source$ secondSource$ 輸出
0 0
500 0
1500 1
2000 100 1 101
2500 2
3500 3
4000 200 3 203
  1. race 勝者通吃

race(): Observable
它自己並對流進行任何組合,而是選擇第一個產生值的流。一旦第一個流發出值後,其餘的流就會被取消訂閱,徹底忽略掉。

當被選中的流完成時,結果流也隨之完成,若是被選中的流報錯,那麼結果流也將報錯。一樣,若是被選中的流不完成,那麼結果流便永遠不會完成。

若是有多個提供值的流時此操做符會很是有用,舉個例子,某個產品的服務器遍及世界各地,但因爲網絡條件,延遲是不可預測的,而且差別巨大。使用 race 的話,能夠向多個數據源發送一樣的請求,隨後消費首個響應請求的結果

使用首先發出值的 observable

const $example = Rx.Observable.race(
    Rx.Observable.interval(1500),
    Rx.Observable.interval(2000),
    Rx.Observable.interval(1200),
    Rx.Observable.interval(1000).mapTo('1s won!')
)

const subscribe = $example.subscribe(val => console.log(val))
// 輸出: "1s won!"..."1s won!"...etc
  1. startWith

函數簽名: startWith(an: Values): Observable 發出給定的第一個值

在開頭補充一些數據

// 每1秒發出值
const source = Rx.Observable.interval(1000);
// 以 -3, -2, -1 開始
const example = source.startWith(-3, -2, -1);
// 輸出: -3, -2, -1, 0, 1, 2....
const subscribe = example.subscribe(val => console.log(val));
  1. forkJoin Rx 版的 Promise.all()

forkJoin(...args, selector : function): Observable 當全部 observables 完成時,發出每一個 observable 的最新值

只有當全部的Observable對象都完結,確認不會有新的數據產生的時候,forkJoin就會把全部輸入Observable對象產生的最後一個數據合併成給下游惟一的數據
它可能與 Promise.all 的使用方式相似。

// RxJS v6+
import { mergeMap } from 'rxjs/operators';
import { forkJoin, of } from 'rxjs';

const myPromise = val =>
  new Promise(resolve =>
    setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
  );

const source = of([1, 2, 3, 4, 5]);
// 發出數組的所有5個結果
const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));
/*
  輸出:
  [
   "Promise Resolved: 1",
   "Promise Resolved: 2",
   "Promise Resolved: 3",
   "Promise Resolved: 4",
   "Promise Resolved: 5"
  ]
*/
const subscribe = example.subscribe(val => console.log(val));

image

const a = stream('a', 200, 3, 'partial');
const b = stream('b', 500, 3, 'partial');

forkJoin(a, b).subscribe(fullObserver('forkJoin'));

9.pairwise

// RxJS v6+
import { pairwise, take } from 'rxjs/operators';
import { interval } from 'rxjs';

// 返回: [0,1], [1,2], [2,3], [3,4], [4,5]
interval(1000)
  .pipe(
    pairwise(),
    take(5)
  )
  .subscribe(console.log);

將前一個值和當前值做爲數組發出

4.2 高階Observable 高級數據流

高階Observable就是生產的數據依然是Observable的Observable,以前介紹的Observable就是一階高階組件

const { interval } = Rx;
const { map, take  } = RxOperators;

interval(1000)
  .pipe(
        take(2),
        map(x => interval(1500).pipe(map(y => x + ":" + y),take(2))))

image

一、操做高階Observable的合併類操做符

處理高階Observable的合併操做符,就是在原來操做符後面添加All

  • .concatAll
  • .mergeAll
  • .combineALL

二、concatAll

此操做符將合併全部內部流發出的值,合併方式就如同 concat 操做符,是按順序鏈接。

在下面的動圖中,能夠看到高階流 H ,它會生成兩個內部流 A 和 B 。 concatAll 操做符首先從流 A 中取值,而後再從流 B 中取值,並將全部值傳遞到結果流中
image

const a = stream(‘a’, 200, 3);
const b = stream(‘b’, 200, 3);
const h = interval(100).pipe(take(2), map(i => [a, b][i]));
h.pipe(concatAll()).subscribe(fullObserver(‘concatAll’));

concatAll首先會訂閱上游產生的第一個內部Observable對象
抽取其中的數據,而後,只有當第一個Observable對象完結的時候,纔會去訂閱第二個內部Obserbale對象。後面的Obsevable對象時懶執行的。

可是如何concatAll消耗內部Observable的速度永遠追不上產生內部Observable對象的速度。如何一直那樣就會形成內存積壓,就是內存泄漏

三、mergeAll

合併全部內部流發出的值,合併方式就如同 merge 操做符,是併發的。

mergeAll只要發現上游產生一個內部Observable就會馬上訂閱,並從中抽取

在下面的動圖中,能夠看到高階流 H ,它會生成兩個內部流 A 和 B 。 mergeAll 操做符將合併這兩個流中的值,每當發出值時值便會傳遞到結果流中。

image

const a = stream(‘a’, 200, 3);
const b = stream(‘b’, 200, 3);
const h = interval(100).pipe(take(2), map(i => [a, b][i]));
h.pipe(mergeAll()).subscribe(fullObserver(‘mergeAll’));

四、combineAll

當源 observable 完成時,對收集的 observables 使用 combineLatest

// RxJS v6+
import { take, map, combineAll } from 'rxjs/operators';
import { interval } from 'rxjs';

// 每秒發出值,並只取前2個
const source = interval(1000).pipe(take(2));
// 將 source 發出的每一個值映射成取前5個值的 interval observable
const example = source.pipe(
  map(val => interval(1000).pipe(map(i => `Result (${val}): ${i}`), take(5)))
);
/*
  soure 中的2個值會被映射成2個(內部的) interval observables,
  這2個內部 observables 每秒使用 combineLatest 策略來 combineAll,
  每當任意一個內部 observable 發出值,就會發出每一個內部 observable 的最新值。
*/
const combined = example.pipe(combineAll());
/*
  輸出:
  ["Result (0): 0", "Result (1): 0"]
  ["Result (0): 1", "Result (1): 0"]
  ["Result (0): 1", "Result (1): 1"]
  ["Result (0): 2", "Result (1): 1"]
  ["Result (0): 2", "Result (1): 2"]
  ["Result (0): 3", "Result (1): 2"]
  ["Result (0): 3", "Result (1): 3"]
  ["Result (0): 4", "Result (1): 3"]
  ["Result (0): 4", "Result (1): 4"]
*/
const subscribe = combined.subscribe(val => console.log(val));

4.3 進化的高階Observable處理

針對上游數據可能產生的積壓狀況,不少場景並不須要無損的數據流鏈接,能夠捨棄一些數據,至於怎麼捨棄,就涉及另外兩個合併類操做符,分別是switch和exhaust,這兩個操做符是concatAll的進化版

五、SwitchAll 切換輸入Obserable

有時候從全部內部流中接收值並不是是咱們想要的效果。在某些場景下,咱們可能只對最新的內部流中的值感興趣。一個比較好的例子就是搜索。當用戶輸入關鍵字時,就向服務器發送請求,由於請求是異步的,因此返回的請求結果是一個 observable 。在請求結果返回以前,若是用戶更新了搜索框中的關鍵字會發生什麼狀況?第二個請求將會發出,如今已經有兩個請求發送給服務器了。可是,第一次搜索的結果用戶已經再也不關心了。更有甚者,若是第一次的搜索結果要是晚於第二次的搜索結果的話 (譯者注: 好比服務器是分佈式的,兩次請求請求的不是同一個節點),那麼用戶看到的結果將是第一次的,這會讓用戶感到困擾。咱們不想讓這種事情發生,這也正是 switchAll 操做符的用武之地。它只會訂閱最新的內部流並忽略(譯者注: 忽略 = 取消訂閱)前一個內部流。

每當SwitchAll的上游高階Observable產生一個內部Observable對象,SwitchAll都會馬上訂閱最新的內部Observable對象上,如歌已經訂閱了以前非內部Observable對象,就會退訂那個過期的內部Observable對象,這種 用上新的,捨棄舊的,就是切換。

能夠看到高階流 H ,它會生成兩個內部流 A 和 B 。switchAll 操做符首先從流 A 中取值,當發出流 B 的時候,會取消對流 A 的訂閱,而後從流 B 中取值,並將值傳遞到結果流中。

image

const a = stream(‘a’, 200, 3);
const b = stream(‘b’, 200, 3);
const h = interval(100).pipe(take(2), map(i => [a, b][i]));
h.pipe(switchAll()).subscribe(fullObserver(‘switchAll’));
  1. exhaust 含義耗盡

在耗盡當前內部Observable的數據以前不會切換到下一個內部Observable對象,
前一個還沒完結,新的有產生了,switch是選擇新的,exhaust是選擇舊的。

5、輔助類操做

5.1 數學類操做符

  • count: 統計數據個數
  • max: 最大值
    max
  • min: 最小值
  • reduce: 很數組同樣
const source$ = Rx.Observable.range(1, 100).reduce((acc, current) => acc + current, 0).subscribe(
    v => {
        console.log('Value', v)
    },
    e => {
        console.log('Error', e)
    },
    () => {
        console.log('Completed')
    }
);

Value 5050
Completed

reduce

5.2 條件布爾類操做符

  • every 所有條件都正確,返回true
  • find 返回知足條件的值
  • findIndex 返回知足條件的值的索引
  • isEmpty 用於檢查一個上游Observable對象是否是空的
  • defaultEnpty 判斷上游是不是空,能夠穿默認值,若是是空,返回默認值
const findIndex$ = Rx.Observable.of(3,1,4,1,5,9).findIndex(x => x % 2 === 0);

6、過濾數據流

過濾類操做符最基本的功能就是對一個給定的數據流中每一個數據判斷是否知足某個條件,若是知足條件就能夠傳遞給下游,不然就拋棄。

  1. filter: 過濾

filter

  1. first: 返回第一個元素

Take1vsFirst

  1. last: 返回最後一個元素

takeLast-take

  1. take: 拿 操做符

take只支持一個參數count,也就是限定拿上游的Observable的數據數量。

  1. takeLast 拿 最後的幾個

takeLast

  1. takeWhile 接受一個斷定函數做爲參數,斷定函數有兩個參數,分別表明上游的數據和對應的序號,takeWhile
const source$ = Rx.Observable.range(1, 100).takeWhile(
    value => value % 2 === 0
).subscribe(
    v => {
        console.log('Value', v)
    },
    e => {
        console.log('Error', e)
    },
    () => {
        console.log('Completed')
    }
);
1 4 9 16 Completed

takeWhile

  1. takeUnit
    能夠用Observable對象來控制另外一個Observable對象的數據產生。

takeUnit的神奇特色就是其參數是另外一個Observable對象notifier,由那個notifier來控制何時結束從上游Oservable拿數據。

const source$ = Rx.Observable.interval(1000);
const notifier$ = Rx.Observable.timer(2500);
const takeUnit$ = source$.takeUnit(notifier$);
  1. skip 跳過

跳過前n個以後全拿

const source$ = Observable.interval(1000);
const skip$ = source$.skip(3);

在等待了3秒以後,skip$會吐出三、四、5...每隔一秒吐出一個遞增的證整數

  1. skipWhile和skipUnit 和take反向

takeWhile1-skipWhile1.jpg

6.2 回壓控制

  1. debounceTime

捨棄掉在兩次輸出之間小於指定時間的發出值,諸如預先知道用戶的輸入頻率的場景下很受歡迎

const input = document.getElementById('example');
// 對於每次鍵盤敲擊,都將映射成當前輸入值
const example = Rx.Observable.fromEvent(input,'keyup').map( 
    i => i.currentTarget.value
);
// 在兩次鍵盤敲擊之間等待0.5秒方纔發出當前值,
// 並丟棄這0.5秒內的全部其餘值
const debouncedInput = example.debounceTime(500);
// 輸出值
const subscribe = debouncedInput.subscribe(val =>{  
    console.log(`Debounced Input: ${val}`);
});
  1. throttleTime

當指定的持續時間通過後發出最新值

每5秒接收最新值:

// 每1秒發出值
const source = Rx.Observable.interval(1000);
/*節流5秒節流結束前發出的最後一個值將從源 observable 中發出*/
const example = source.throttleTime(5000);
// 輸出: 0...6...12
const subscribe = example.subscribe(val => console.log(val));
  1. debounce

根據一個選擇器函數,捨棄掉在兩次輸出之間小於指定時間的發出值,儘管沒有debounceTime使用普遍,但當 debounce 的頻率是變量時,debounce是很重要的

// 發出四個字符串
const example = Rx.Observable.of('WAIT','ONE','SECOND','Last will display');
/*只有在最後一次發送後再通過一秒鐘,纔會發出值,並拋棄在此以前的全部其餘值*/
const debouncedExample = example.debounce(()=> Rx.Observable.timer(1000));
/*在這個示例中,全部的值都將被忽略,除了最後一個輸出: 'Last will display'*

const subscribe = debouncedExample.subscribe(val => console.log(val));

4.throttle

僅當由提供的函數所肯定的持續時間已通過去時才發出值

// 每1秒發出值
const source = Rx.Observable.interval(1000);
// 節流2秒後才發出最新值
const example = source.throttle(val => Rx.Observable.interval(2000));
// 輸出: 0...3...6...9
berconst subscribe = example.subscribe(val => console.log(val));

throttleTime

  1. sample和sampleTime

當提供的 observable 發出時從源 observable中取樣

// 每1秒發出值
const source = Rx.Observable.interval(1000);
// 每2秒對源 observable 最新發出的值進行取樣
const example = source.sample(Rx.Observable.interval(2000));
// 輸出: 2..4..6..8..
const subscribe = example.subscribe(val => console.log(val));
  1. auditTime 和 audio

throttle把第一個暑假傳給下游,audio是把最後一個暑假傳給下游

6.3 其餘過濾方式

  1. ignoreElements
    忽略全部通知,除了 complete 和 error
  2. elementAt
    把上游數據當數組,只獲取指定下標的那一個數據
  3. single
    發出經過表達式的單一項
// 發出 (1,2,3,4,5)
const source = Rx.Observable.from([1,2,3,4,5]);
// 發出匹配斷言函數的一項
const example = source.single(val => val ===4);
// 輸出: 4
const subscribe = example.subscribe(val => console.log(val));

7、異常錯誤處理

try/catch只能在同步代碼中使用

  1. catch

優雅地處理 observable 序列中的錯誤

捕獲 observable 中的錯誤

//emit error
const source = Rx.Observable.throw('This is an error!');
//gracefully handle error, returning observable with error message
const example = source.catch(val => Rx.Observable.of(`I caught: ${val}`));
//output: 'I caught: This is an error'
const subscribe = example.subscribe(val => console.log(val));

捕獲拒絕的 promise

//create promise that immediately rejects
const myBadPromise = () => new Promise((resolve, reject) => reject('Rejected!'));
//emit single value after 1 second
const source = Rx.Observable.timer(1000);
//catch rejected promise, returning observable containing error message
const example = source.flatMap(() => Rx.Observable
                                       .fromPromise(myBadPromise())
                                       .catch(error => Rx.Observable.of(`Bad Promise: ${error}`))
                                    );
//output: 'Bad Promise: Rejected'
const subscribe = example.subscribe(val => console.log(val));
  1. retry

若是發生錯誤,以指定次數重試 observable序列

出錯的話能夠重試2次

//emit value every 1s
const source = Rx.Observable.interval(1000);
const example = source
  .flatMap(val => {
    //throw error for demonstration
    if(val > 5){
      return Rx.Observable.throw('Error!');
    }
    return Rx.Observable.of(val);
  })
  //retry 2 times on error
  .retry(2);
/*
  output: 
  0..1..2..3..4..5..
  0..1..2..3..4..5..
  0..1..2..3..4..5..
  "Error!: Retried 2 times then quit!"
*/
const subscribe = example
  .subscribe({
     next: val => console.log(val),
     error: val => console.log(`${val}: Retried 2 times then quit!`)
});
  1. retryWhen

當發生錯誤時,基於自定義的標準來重試observable 序列

//emit value every 1s
const source = Rx.Observable.interval(1000);
const example = source
  .map(val => {
    if(val > 5){
     //error will be picked up by retryWhen
     throw val;
    }
    return val;
  })
  .retryWhen(errors => errors
               //log error message
               .do(val => console.log(`Value ${val} was too high!`))
               //restart in 5 seconds
               .delayWhen(val => Rx.Observable.timer(val * 1000))
            );
/*
  output: 
  0
  1
  2
  3
  4
  5
  "Value 6 was too high!"
  --Wait 5 seconds then repeat
*/
const subscribe = example.subscribe(console.log);
  1. finally
    報不報錯,都會執行

8、轉化數據流

8.1 映射數據

  1. map 對源 observable 的每一個值應用投射函數
  2. mapTo 將每一個發出值映射成常量
// 每2秒發出值
const source = interval(2000);
// 將全部發出值映射成同一個值concatMapTo
const example = source.pipe(mapTo('HELLO WORLD!'));
// 輸出: 'HELLO WORLD!'...'HELLO WORLD!'...'HELLO WORLD!'...
const subscribe = example.subscribe(val => console.log(val));
  1. pluck 選擇屬性來發出
const source = from([{ name: 'Joe', age: 30 }, { name: 'Sarah', age: 35 }]);
// 提取 name 屬性
const example = source.pipe(pluck('name'));
// 輸出: "Joe", "Sarah"
const subscribe = example.subscribe(val => console.log(val));

8.2 緩存窗口: 無損回壓控制

  1. windowTime和bufferTime

根據時間來緩存上游的數據,基本用法就是一個參數來指定產生緩衝窗口的間隔

const source$ = Rx.Observable.timer(0, 100);
const result$v= source$.windowTime(400);

windowTime的參數是400,也就會把時間劃分爲連續的400毫秒長度區塊,在每一個時間區塊中,上游傳下來的數據不會直接送給下游.

bufferTime產生的是普通的Observable對象,其中數據是數組的形式,bufferTime會把時間區塊內的數據緩衝,在時間區塊結束的時候把全部的緩存數據放在一個數組裏傳給下游。
image
還可使用第二參數,等於指定每一個時間區塊開始的時間間隔

  1. windowCount和bufferCount
    根據個數來界定區間的windowCount和bufferCount
const source$ = Observable.timer(0, 100);
const soutce$ = Observable.windowCount(4);

效果是同樣的

如何第二個參數比第一個參數大,就會丟棄一些數據

  1. windowWhen和bufferWhen
    用Observable對象來控制Observable對象的生成。when就是接受一個函數做爲參數,那個參數爲closingSelector,返回一個Observable對象,用於控制上游的數據分割,每當返回的Observable對象產生數據或者完結時,windowWhen就認爲是一個緩衝區塊的結束,從新開啓一個緩衝窗口。
const source$ = Observable.timer(0,100);
const closingSelector = () => {
    return Observable.timer(400);
}
const result$ = source$.windowWhen(closingSelector);

不經常使用

  1. windowToggle和bufferToggle
    togggle的含義就是兩個狀態至今的切換,windoeToggle和bufferToggle也是利用Observable來控制緩衝窗口的開和關。

須要兩個參數,opening$是一個Observable對象,每當opening$產生一個數據,表明一個緩衝窗口的開始,同時,第二個參數closingSelector也會被調用,用來得到緩衝窗口結束的通知。

const source$ = Observable.timer(0, 100);
const openings$ = Observable.timer(0, 400);

const closingSelector = value => {
    return value % 2 === 0 ? Observable.timer(200): Observable.timer(100);
}
const result$ = source$.windowToggle(openings$, closingSelector);

image

  1. window和buffer
    只支持一個參數,每當notifer$產生一個數據。及時前一個緩存窗口的結束,也是後一個緩存的開始
const source$ = Observable.timer(0, 100);
const notifer$ = Observable.timer(400, 400);
const result$ = source$.window(notifer$);

image

8.3 高階的map

  1. concatMap
  2. mergeMap
  3. switchMap
  4. exhaustMap
  • concatMap = map + concatAll
  • mergeMap = map + mergeAll
  • switchMap = map + switch
  • exhaustMap = map + exhaust

全部xxxxMap名稱模式的操做符,都是一個map加上一個「砸平」操做的組合。

有趣的事是映射操做符 concatMap、 mergeMap 和 switchMap 的使用頻率要遠遠高於它們所對應的處理高階 observable 的操做符 concatAll、 mergeAll 和 switchAll 。可是,若是你細想一下,它們幾乎是同樣的。全部 *Map 的操做符實際上都是經過兩個步驟來生成高階 observables 的,先映射成高階 observables ,再經過相對應的組合邏輯來處理高階 observables 所生成的內部流。

咱們先來看下以前的 meregeAll 操做符的代碼示例:

const a = stream('a', 200, 3);
const b = stream('b', 200, 3);
const h = interval(100).pipe(take(2), map(i => [a, b][i]));
h.pipe(mergeAll()).subscribe(fullObserver('mergeAll'));

map 操做符生成了高階 observables ,而後 mergeAll 操做符將這些內部流的值進行合併,使用 mergeMap 能夠輕鬆替換掉 map 和 mergeAll ,就像這樣:

const a = stream('a', 200, 3);
const b = stream('b', 200, 3);
const h = interval(100).pipe(take(2), mergeMap(i => [a, b][i]));

h.subscribe(fullObserver('mergeMap'));

兩段代碼的結果是徹底相同的

  1. concatMapTo
  2. mergeMapTo
  3. switchMap
  4. expand 相似mergeMap可是會返回給本身,會形成數據爆炸增加

8.4 數據分組

數據分組和合並是相反的,數據分組是把一個數據流拆分爲多個數據流

  1. groupBy

輸出是一個高階Obserable對象,每一個內部Obserable對象包含上游產生的知足某個條件的數據
能夠當作是一個分發器,對於上游推送下來的任何數據,檢查這個數據的key值,若是這個key值是第一次出現,就產生一個新的內部Observable對象,同時這個數據就是內部Observable對象的第一個數據;如何key治已經出現過,就直接把那個數據塞給對應的內部Observable對象。

const intervalStream$ = Observable.interval(1000);
const groupByStream$ = intervalStreanm$.groupBy(
    x => x % 2
)
  1. partition

接受一個斷定函數做爲參數,對上游的每一個數據進行斷定,知足條件的放一個Obserable對象,不知足的放一個,一份爲二

8.5 累計數據

  1. scan
const source$ = Observable.interval(100);
const result$ = source$.scan((accumulation,value) => {
    return accumulation + value
})

image

  1. mergeScan
    跟上面的相似可是返回的是一個observable對象,可是不怎麼用

9、多播

多波就是一個observable能夠有多個subscribe者

9.1 Hot和Cold數據流差別

// 冷的
var cold = new Observable((observer) => {
  var producer = new Producer();
  // observer 會監聽 producer
});
// 熱的
var producer = new Producer();
var hot = new Observable((observer) => {
  // observer 會監聽 producer
});

經過上面的代碼發現,冷的 Observables 在內部建立生產者,熱的 Observables 在外部建立生產者,由於cold在內部建立,因此屢次訂閱就會每次都從新建立,而hot在外部,屢次訂閱都是公用一個。因此能夠產生多播。在rxjs中能夠直接產生Hot Observable:.formPromise .fromEvent .fromEventPattern 那些操做符數據庫都是來自外部,真正的數據源和有沒有Observver沒有任何關係。真正的多播,一定是不論有多少Pbservable來subscribe,推給Observer的都是同樣的數據源,知足那種條件的,就是Hot Observable,由於Hot Observable 中的內容建立和訂閱者無關。

9.2 Subject

如何把cold變成hot的就須要subject

image

  • Observer - 擁有 next、error 和 complete 方法。
  • Observable - 擁有 Observable 的全部操做符,而且你能夠訂閱它。
    Subject 能夠在源 Observable 和多個觀察者之間充當橋樑或代理,使得多個觀察者能夠共享同一個 Observable 執行。
const interval$ = Rx.Observable.interval(1000);
const subject = new Rx.Subject();
interval$.subscribe(subject);
subject.subscribe(val => console.log(`First observer ${val}`));

setTimeout(() => {
  subject.subscribe(val => console.log(`Second observer ${val}`))
}, 2000);

9.2 makeHot操做符

subject並非一個操做符,能夠本身創造一個:

Rx.Observable.prototype.makeHot = function () {
  const cold$ = this;
  const subject = new Rx.Subject();
  cold$.subscribe(subject);
  return subject;
}

const makeTick$ = Rx.Observable.interval(1000).take(3).makeHot();
makeTick$.subscribe(val => console.log(`First observer ${val}`))

setTimeout(() => {
    makeTick$.subscribe(val => console.log(`Second observer ${val}`))
}, 2000)

First observer 0
First observer 1
Second observer 1
First observer 2
Second observer 2

可是上面的代碼有一個漏洞,返回的結果,能夠直接調用next,error等方法,從而影響上游的數據。

Rx.Observable.prototype.makeHot = function () {
  const cold$ = this;
  const subject = new Rx.Subject();
  cold$.subscribe(subject);
  return Rx.Oservable.create((observable) => Rx.Subject.subscribe(observer));
}

可是Subject是不能重複使用的。同時若是上游有多少數據,使用合併操做符進行合併,在傳給下游。

9.3 基礎的多播

  • multicast
  • share
  • publish

multicast是多播操做符的老大,是最底層的實現,因此不怎麼用。

const hotSource$ = coldSource$.multicast(new Subject());

返回的是一個Observable對象,是Observable子類ConnecttableObservable的實例對象。

ConnecttableObservable就是「能夠被鏈接的」Observable,那中Observable對象包含一個connect函數,那個函數的做用是觸發multicast用Subject對象去訂閱上游的Observable,換句話,就是若是不調用connect函數,那個ConnecttableObservable對象就不會從上游Observable哪裏得到任何數據。

const makeTick$ = Rx.Observable.interval(1000).take(3).multicast(new Rx.Subject());
makeTick$.subscribe(val => console.log(`First observer ${val}`))

setTimeout(() => {
    makeTick$.subscribe(val => console.log(`Second observer ${val}`))
}, 2000)

上面不會運行,加上connect()還會運行

const makeTick$ = Rx.Observable.interval(1000).take(3).multicast(new Rx.Subject());
makeTick$.subscribe(val => console.log(`First observer ${val}`))

setTimeout(() => {
    makeTick$.subscribe(val => console.log(`Second observer ${val}`))
}, 2000)

makeTick$.connect()

connect是用來控制多播的時機的,可是手動會比較麻煩,因此,ConnecttableObservable實現可refConunt函數

9.4 refConunt

添加還取消經過個數來本身識別

const makeTick$ = Rx.Observable.interval(1000).take(3).multicast(new Rx.Subject()).refCount();
makeTick$.subscribe(val => console.log(`First observer ${val}`))

setTimeout(() => {
    makeTick$.subscribe(val => console.log(`Second observer ${val}`))
}, 2000)

9.5 工廠函數

可是若是後面的弄成5000,第一個已經完結,第二個就不會再被訂閱

須要傳入的工廠函數

const subjectFactory = () => {
  console.log('enter subjectFactory');
  return new Rx.Subject();
}

const makeTick$ = Rx.Observable.interval(1000)
    .take(3)
    .multicast(subjectFactory)
    .refCount()
makeTick$.subscribe(val => console.log(`First observer ${val}`))

setTimeout(() => {
    makeTick$.subscribe(val => console.log(`Second observer ${val}`))
}, 5000)

enter subjectFactory
First observer 0
First observer 1
First observer 2
enter subjectFactory
Second observer 0
Second observer 1
Second observer 2

es6的寫法

const subjectFactory = () => new Subject();

9.6 祕密參數selector

當使用了第二參數是,就不會返回ConnecttableObservable,而是使用selecter參數。換一句話說,只要指定了selector參數,就指定了multicast返回的Observable對象的生成方法。詳解...

9.6 publish

publish的實現

function publish(selector) {
    if (selector) {
        return this.multicast(() => new Subject(), selector);
    } else {
        return this.multicast(new Subject();
    }
}

使用

const makeTick$ = Rx.Observable.interval(1000)
    .take(3)
    .publish()
    .refCount()
makeTick$.subscribe(val => console.log(`First observer ${val}`))

setTimeout(() => {
    makeTick$.subscribe(val => console.log(`Second observer ${val}`))
}, 5000)

First observer 0
First observer 1
First observer 2

9.7 share

function shareSubjectFactory() {
    return new Subject();
}

function share() {
    return multicast.call(this, shareSubjectFactory).refCount()
}

簡化

Observable.prototype.share = function share() {
    this.multicast(() => new Subject()).refCount();
}

使用

const makeTick$ = Rx.Observable.interval(1000)
    .take(3)
  .share()
makeTick$.subscribe(val => console.log(`First observer ${val}`))

setTimeout(() => {
    makeTick$.subscribe(val => console.log(`Second observer ${val}`))
}, 5000)

First observer 0
First observer 1
First observer 2

Second observer 0
Second observer 1
Second observer 2

9.7高階多播功能

  • publishLast
  • publishReplay
  • publishBehavior
    對應着三個Subject子類
  • AsyncSubject
  • ReplaySubject
  • BehaviorSubject
publishLast和AsyncSubject
function publishLast() {
    return multicast.call(this=, new AsyncSubject())
}

AsyncSubject不會吧上游的Cold Observable的全部數據都轉手給下游,它只會記錄最後一個數據,當上遊Cold Observable完結的時候,才把最後一個數據傳遞給Observer。同時是可重用的,不論下發添加的是什麼數據,返回都是同樣的最後一個數據。

const makeTick$ = Rx.Observable.interval(1000)
    .take(3)
  .publishLast()
  .refCount()
  
makeTick$.subscribe(val => console.log(`First observer ${val}`))

setTimeout(() => {
    makeTick$.subscribe(val => console.log(`Second observer ${val}`))
}, 5000)

First observer 2
Second observer 2

1號Observer在4秒的時候得到publishLast所產生的Observable吐出的第一個也是最後一個數據2

2號Observer在5秒是添加,它會馬上得到第一個也是最後一個數據2

publishReplay和AsyncSubject

重播

實現

function publishReplay(
    bufferSize = Number.POSITIVE_INFINITY,
    windowTime = Number.POSITIVE_INFINITY
) {
    return multicast.call(this, new ReplaySubject(bufferSize, windowTime))
}

兩個參數表明緩存區的大小,通常只會使用第一參數,指定緩存的個數,如何不指定,就是上游來的,多少下游就緩存多少。容易內存溢出。

const makeTick$ = Rx.Observable.interval(1000)
  .take(3)
  .do(x => console.log('source', x))
    .publishReplay()
    .refCount()

makeTick$.subscribe(val => console.log(`First observer ${val}`))

setTimeout(() => {
    makeTick$.subscribe(val => console.log(`Second observer ${val}`))
}, 5000)

source 0
First observer 0
source 1
First observer 1
source 2
First observer 2

Second observer 0
Second observer 1
Second observer 2

2號依然得到了數據,可是沒有從新subscribe上游的,而是publishReplay緩存了,而後回放的。

可是要注意須要給publishReplay一個合理的參數,限制緩存的大小。

publishBehavior和 BehaviorSubject

行爲,就是添加一個默認的行爲,上游尚未吐出數據時,就會馬上得到一個默認數據。

function publishBehavior(value) {
    return multicast.call(this, new BehaviorSubject(value));
}

使用

const makeTick$ = Rx.Observable.interval(1000)
  .take(3)
  .do(x => console.log('source', x))
    .publishBehavior(-1)
    .refCount()

makeTick$.subscribe(val => console.log(`First observer ${val}`))

setTimeout(() => {
    makeTick$.subscribe(val => console.log(`Second observer ${val}`))
}, 5000)

First observer -1

source 0
First observer 0
source 1
First observer 1
source 2
First observer 2

10、scheduler

Scheduler能夠做爲創造類和合並類操做符的函數使用,此外,rxjs還提供了observeOn和subsribeOn兩個操做符,用於在數據管道任何位置插入給定Scheduler。

Scheduler能夠翻譯成「調度器」,用於控制Rxjs數據流中數據信息的推送節奏。
在Rxjs 中,提供了下列Scheduler實例

  • undefined/null 就是不指定,表明同步執行
  • asap 儘快執行的Scheduler
  • async 利用setInterval實現的Scheduler,基於時間吐出數據的場景
  • queue 利用隊列實現的Scheduler,用於迭代一個大的集合的場景
  • animationFrame 用於動畫場景的scheduler

Rxjs默認選擇Scheduler的原則是:儘可能減小併發運行。因此,對於range,就比選擇undefined了;對於很大的數據,就選擇queue;對於時間相關的操做符好比interval,就選擇async

實現原理

  • asap會盡可能是使用Micro Task,而async利用的是Macro Task
  • queue,如何調用它的schedule函數式參數delay是0,就是同步的方式,如何delay參數大於0,那queue表現其實就和async同樣
console.log('beforr schedule');
Rx.Scheduler.async.schedule(() => console.log('async'))
Rx.Scheduler.asap.schedule(() => console.log('asap'))
Rx.Scheduler.queue.schedule(() => console.log('queue'))
console.log('after schedule');

beforr schedule
queue
after schedule
asap
async

支持Scheduler的操做符能夠分爲兩類

第一類就是普通的建立或者組合Observable對象的操做符,是一個可選參數,沒有rxjs回提供一個默認的。

第二類就是存在的惟一功能就是應用Scheduler,因此Scheduler實例必要要有參數的,就兩個:observeOn和subscribeOn

支持scheduler的建立操做符有

  • bindCallback
  • bindNodeCallback
  • empty
  • from
  • formPromise
  • interval
  • of
  • range
  • throw
  • timer

合併操做符

  • concat
  • merge

observeOn

const source$ = Rx.Observable.range(1, 3);
const asapSource$ = source$.observeOn(Rx.Scheduler.asap);

console.log('before subscribe');

asapSource$.subscribe(
  value => console.log('data', value),
  error => console.log('error', error),
  () => console.log('complete')
);

console.log('after subscribe');

before subscribe
after subscribe
data 1
data 2
data 3
complete

subscribeOn 用來調節訂閱 用法相似以上

11、工具

操做符函數的實現,每一個操做符都是一個函數,無論實現什麼功能,都必須考慮下面那些功能要點:

  • 返回一個全新的observable對象
  • 對上游和下游的訂閱及退訂處理
  • 處理異常狀況
  • 及時釋放資源
// 返回一個全新的Obervable對象
function map(project) {
  return new Observable(observe => {
    this.subscribe({
      next: value => observer.next(project(value)),
      error: err => observer.error(error),
      complete: () => observer.complete()
    })
  })
}

// 訂閱和退訂處理
function map(project) {
    return new Observable(observe => {
        const sub = this.subscribe({
            next: value => observer.next(project(value)),
            error: err => observer.error(error),
            complete: () => observer.complete(),
    })
    return {
      unsubscribe: () => {
        sub.unsubscribe();
      }
    }
    })
}

// 處理異常狀況
function map(project) {
    return new Observable(observe => {
        const sub = this.subscribe({
            next: value => {
        try {
          observer.next(project(value))
        } catch (error) {
          observer.error(error)
        }
      },
            error: err => observer.error(error),
            complete: () => observer.complete(),
        })
        return {
            unsubscribe: () => {
                sub.unsubscribe()
            },
        }
    })
}

// 寫完如何關聯

// 給Observable打補丁
Observable.prototype.map = map;

// 使用bind綁定特定Observable對象

const result$ = map.bind(source$)(x => x * 2);

// 使用lift

function map(project) {
    return this.lift(function(source$){
        return source$.subscribe({
            next: value => {
                try {
                    observer.next(project(value))
                } catch (error) {
                    observer.error(error)
                }
            },
            error: err => observer.error(error),
            complete: () => observer.complete(),
        })
    })
}
Observable.prototype.map = map;

被迫更名的函數

do => tap
catch => catchError
switch => switchAll
finally => finalize

也可使用新的

const result$ = source$
    |> filter(x => x % 2 === 0)
    |> map(x => x * 2)

let 在5.5以後使用pipe

12、調試和測試

  • do
  • 調試日誌
Observable.prototype.dubug = function(fn){
    if (global.debug) {
        return this.do(fn);
    } else {
        return this;
    }
}
  • 數據流程流
  • 彈珠圖
  • rxjs-spy
相關文章
相關標籤/搜索