函數式編程 | 打造自定義的RxJs操做符

map_operator.jpeg

最近在學習函數式編程相關的知識,這篇文章源於學習自定義RxJs操做符的一點想法,若有理解不正確的地方,還請指出。html


第一步:搭建環境

1. 安裝項目依賴
mkdir rxjs && cd rxjs
yarn init -y
yarn add rxjs rxjs-compat webpack webpack-dev-server typescript ts-loader --save
yarn add webpack-cli --save-dev
2. 設置webpack和typescript

① 建立webpack.cofig.js文件node

const path = require('path');

module.exports = {
  entry: './src/index.ts',
  devtool: 'inline-source-map',
  module: {
    rules: [
      {
        test: /\.tsx?$/,
        use: 'ts-loader',
        exclude: /node_modules/
      }
    ]
  },
  resolve: {
    extensions: [ '.ts', '.js', '.tsx' ]
  },
  output: {
    filename: 'bundle.js',
    path: path.resolve(__dirname, 'dist')
  }
};

② 建立typescript.json文件webpack

{
  "compilerOptions": {
    "outDir": "./dist/",
    "noImplicitAny": true,
    "module": "es6",
    "moduleResolution": "node",
    "sourceMap": true,
    "target": "es6",
    "typeRoots": [
      "node_modules/@types"
    ],
    "lib": [
      "es2017",
      "dom"
    ]
  }
}

③ 在package.json文件下添加啓動命令es6

"scripts": {
    "start": "webpack-dev-server --mode development"
  },
3. 完成項目設置

① 建立index.html並複製粘貼以下代碼web

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Learn RxJS with Coursetro</title>

    <style>
        body { font-family: 'Arial'; background: #ececec; }
        ul { list-style-type: none; padding: 20px; }
        li { padding: 20px; background: white; margin-bottom: 5px; }
    </style>
</head>
<body>
    <script src="/bundle.js"></script>
</body>
</html>

② 建立src文件夾,在項目下建立``index.tsajax

import * as Rx from "rxjs/Observable";

console.log(Rx);

③ 在瀏覽器中訪問 http://localhost:8080,打開控制檯便可看到 Rx 對象數據typescript

第二步:建立通用subscriber

import {from} from "rxjs";

const observable$ = from([1,2,3,4,5]);

const subscriber = {
    next: (value: any) => console.log(value),
    complete: () => console.log('has something done.'),
    error: (err: any) => console.error('something error: ', err)
};

observable$.subscribe(subscriber);

// 輸出:
// 1
// 2
// 3
// 4
// 5

第三步:繼承Subscriber並重寫_next函數

class DoubleSubscriber extends Subscriber<number> {
    protected _next(value: number): void {
        // @ts-ignore
        this.destination.next(value * 2);
    }
}

observable$.subscribe(new DoubleSubscriber(subscriber));

// 輸出:
// 2
// 4
// 6
// 8
// 10

第四步:鏈接source和subscribe,建立一個基礎 pipe 操做符

① 手動鏈接shell

const double = (source: any) => {
  const o$ = new Observable();
  o$.source = source;
  o$.operator = {
      call(sub, source) {
          // @ts-ignore
          source.subscribe(new DoubleSubscriber(sub))
      }
  };
  return o$;
};

observable$.pipe(double).subscribe(subscriber);

② 使用lift連接source和subscriber編程

const double = (source: any) => source.lift({
    call(subscriber: Subscriber<number>, source: any): any {
        source.subscribe(new DoubleSubscriber(subscriber))
    }
});

第五步:建立一個可複用的 操做符

① 使用lift建立可複用的pipe操做符json

// src/operators/multiply/index.ts
/**
 * @format
 * @Author: Alvin
 * @Date 2020-03-05
 * @Last modified by: Alvin
 * @Last modified time: 2020-03-05
 */
import {Subscriber} from "rxjs";
import {PartialObserver} from "rxjs/src/internal/types";

class MultiplySubscriber extends Subscriber<number> {
    constructor(subscriber: PartialObserver<any> | ((value: number) => void), number: number) {
        super(subscriber);
        // @ts-ignore
        this.number = number;
    }
    protected _next(value: number): void {
        // @ts-ignore
        this.destination.next(value * this.number);
    }
}

export const multiply = (number: number) => (source: any) => source.lift({
    call(subscriber: Subscriber<number>, source: any): any {
        source.subscribe(new MultiplySubscriber(subscriber, number))
    }
});


// src/index.ts
observable$.pipe(multiply(3)).subscribe(subscriber);
observable$.pipe(multiply(45)).subscribe(subscriber);

② 複用現有的map實現相同功能的multiply

import {map} from "rxjs/operators";

export const multiply = (number: number) => map((value: number) => value * number);

③ 自定義pipe構建自定義操做符

const pipe = (...fns: Array<Function>) => (source: any) => fns.reduce((acc: any, fn: any) => fn(acc), source)

export const multiply = (number: number) => pipe(
    map((value: number) => value * number),
    filter((value: number) => value < 100)
);

observable$.pipe(multiply(3)).subscribe(subscriber);
observable$.pipe(multiply(45)).subscribe(subscriber);

第六步:建立有RXJS特點的高階map

在前面咱們拆解了與map相似的操做符multiply的實現方式,這個操做符其實很是好理解,由於有JavaScript原生的map對應。接下來分析真正有RxJs特點的高階map —— 把Observable對象玩得更加出神入化的操做符。

① mergeMap

mergeMap marble diagram

mergeMap可以解決異步操做的問題,最典型的應該屬於AJAX請求的處理。在網頁應用當中,每點擊按鈕一次就發送一個AJAX

請求給server,同時還要根據返回的結果在ui上更新狀態。傳統的方法來解決這樣的異步操做代碼會比較繁雜。

mergeMap把用戶的點擊操做看做一個數據流。把AJAX的返回結果也看作一個數據流,爲本來繁雜的解決方式提供了一種簡單解。

fromEvent(document.querySelector("#send"), 'click').pipe(
    mergeMap(() => ajax("apiUrl"))
).subscribe((result: any) => {
    // 常規處理ajax返回的結果
})

稍微講解了merge的使用場景,接下來看看若是手動實現這樣的功能,該如何作呢?

import {fromEvent, Observable, of, Subscriber} from "rxjs";
import {PartialObserver} from "rxjs/src/internal/types";
import {delay, scan} from "rxjs/operators";

class MergeMapSubscriber extends Subscriber<Function> {
    constructor(
        destination: PartialObserver<any> | ((value: any) => void),
        private readonly func: Function
    ) {
        super(destination);
        this.func = func;
    }

    protected _next(value: any): void {
        const o$ = this.func(value);

        o$.subscribe({
            next: (value: any) => {
                this.destination.next(value);
            }
        })
    }
}

const mergeMap = (func: any) => (source: Observable<any>) => source.lift({
    call(subscriber: Subscriber<any>, source: any): any {
        source.subscribe(new MergeMapSubscriber(subscriber, func))
    }
});

const observable$ = fromEvent(document, 'click').pipe(
    scan(i => i + 1, 0),
    mergeMap((value: any) => of(value).pipe(delay(500)))
);

const subscriber = {
    next: (value: any) => console.log("subscriber function output ::: ", value),
    complete: () => console.log('has something done.'),
    error: (err: any) => console.error('something error: ', err)
};

observable$.subscribe(subscriber);

② switchMap

switchMap marble diagram

上面介紹了一個適合於AJAX的mergeMap,但mergeMap會存在一個問題,每個上游數據都將會引起調用Ajax並且會將每個Ajax結果傳遞給下游。這樣的處理方式彷佛並非適合全部的場景。好比對股票等網頁系統等對數據顯示實時性要求比較高的狀況下相對來講,若是沒有處理好將會出現重大經濟損失。而switchMap正是解決了mergeMap這樣弊端的操做符。

import {fromEvent, Observable, of, Subscriber} from "rxjs";
import {PartialObserver} from "rxjs/src/internal/types";
import {delay, scan} from "rxjs/operators";

class SwitchMapSubscriber extends Subscriber<Function> {
    private innerSubscription: any;
    constructor(
        destination: PartialObserver<any> | ((value: any) => void),
        private readonly func: Function
    ) {
        super(destination);
        this.func = func;
    }

    protected _next(value: any): void {
        const o$ = this.func(value);

          // 保證獲取到最新的數據流
        if(this.innerSubscription) {
            this.innerSubscription.unsubscribe();
        }

        this.innerSubscription = o$.subscribe({
            next: (value: any) => {
                this.destination.next(value);
            }
        })
    }
}

const switchMap = (func: any) => (source: Observable<any>) => source.lift({
    call(subscriber: Subscriber<any>, source: any): any {
        source.subscribe(new SwitchMapSubscriber(subscriber, func))
    }
});

export const observable$ = fromEvent(document, 'click').pipe(
    scan(i => i + 1, 0),
    switchMap((value: any) => of(value).pipe(delay(500)))
);

③ concatMap

concatMap marble diagram

concatMap的模式與switchMap基本類似,差異在於獲取最新的數據流後concatMap將其順序推動數據流,而switchMap則將本來的數據流取消訂閱,轉而訂閱最新的數據流。

class ConcatMapSubscriber extends Subscriber<Function> {
    private innerSubscription: any;
    private buffer: Array<any> = [];
    constructor(
        destination: PartialObserver<any> | ((value: any) => void),
        private readonly func: Function
    ) {
        super(destination);
        this.func = func;
    }

    protected _next(value: any): void {
        const {isStopped} = this.innerSubscription || {isStopped: true};

        if(!isStopped) {
            this.buffer = [...this.buffer, value];
        } else {
            const o$ = this.func(value);
            this.innerSubscription = o$.subscribe({
                next: (value: any) => {
                    this.destination.next(value);
                },
                complete: () => {
                    if(this.buffer.length) {
                        const [first, ...rest] = this.buffer;
                        this.buffer = rest;
                        this._next(first);
                    }
                }
            });
                        
              // 順序推動數據流
            this.add(this.innerSubscription);
        }
    }
}

const concatMap = (func: any) => (source: Observable<any>) => source.lift({
    call(subscriber: Subscriber<any>, source: any): any {
        source.subscribe(new ConcatMapSubscriber(subscriber, func))
    }
});

export const observable$ = fromEvent(document, 'click').pipe(
    scan(i => i + 1, 0),
    concatMap((value: any) => of(value).pipe(delay(500))),
    takeUntil(fromEvent(document, 'keydown'))
);
相關文章
相關標籤/搜索