最近在學習函數式編程相關的知識,這篇文章源於學習自定義RxJs操做符的一點想法,若有理解不正確的地方,還請指出。html
mkdir rxjs && cd rxjs yarn init -y yarn add rxjs rxjs-compat webpack webpack-dev-server typescript ts-loader --save yarn add webpack-cli --save-dev
① 建立webpack.cofig.js文件node
const path = require('path'); module.exports = { entry: './src/index.ts', devtool: 'inline-source-map', module: { rules: [ { test: /\.tsx?$/, use: 'ts-loader', exclude: /node_modules/ } ] }, resolve: { extensions: [ '.ts', '.js', '.tsx' ] }, output: { filename: 'bundle.js', path: path.resolve(__dirname, 'dist') } };
② 建立typescript.json文件webpack
{ "compilerOptions": { "outDir": "./dist/", "noImplicitAny": true, "module": "es6", "moduleResolution": "node", "sourceMap": true, "target": "es6", "typeRoots": [ "node_modules/@types" ], "lib": [ "es2017", "dom" ] } }
③ 在package.json文件下添加啓動命令es6
"scripts": { "start": "webpack-dev-server --mode development" },
① 建立index.html並複製粘貼以下代碼web
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta http-equiv="X-UA-Compatible" content="ie=edge"> <title>Learn RxJS with Coursetro</title> <style> body { font-family: 'Arial'; background: #ececec; } ul { list-style-type: none; padding: 20px; } li { padding: 20px; background: white; margin-bottom: 5px; } </style> </head> <body> <script src="/bundle.js"></script> </body> </html>
② 建立src
文件夾,在項目下建立``index.tsajax
import * as Rx from "rxjs/Observable"; console.log(Rx);
③ 在瀏覽器中訪問 http://localhost:8080,打開控制檯便可看到 Rx 對象數據typescript
import {from} from "rxjs"; const observable$ = from([1,2,3,4,5]); const subscriber = { next: (value: any) => console.log(value), complete: () => console.log('has something done.'), error: (err: any) => console.error('something error: ', err) }; observable$.subscribe(subscriber); // 輸出: // 1 // 2 // 3 // 4 // 5
Subscriber
並重寫_next
函數class DoubleSubscriber extends Subscriber<number> { protected _next(value: number): void { // @ts-ignore this.destination.next(value * 2); } } observable$.subscribe(new DoubleSubscriber(subscriber)); // 輸出: // 2 // 4 // 6 // 8 // 10
① 手動鏈接shell
const double = (source: any) => { const o$ = new Observable(); o$.source = source; o$.operator = { call(sub, source) { // @ts-ignore source.subscribe(new DoubleSubscriber(sub)) } }; return o$; }; observable$.pipe(double).subscribe(subscriber);
② 使用lift
連接source和subscriber編程
const double = (source: any) => source.lift({ call(subscriber: Subscriber<number>, source: any): any { source.subscribe(new DoubleSubscriber(subscriber)) } });
① 使用lift建立可複用的pipe操做符json
// src/operators/multiply/index.ts /** * @format * @Author: Alvin * @Date 2020-03-05 * @Last modified by: Alvin * @Last modified time: 2020-03-05 */ import {Subscriber} from "rxjs"; import {PartialObserver} from "rxjs/src/internal/types"; class MultiplySubscriber extends Subscriber<number> { constructor(subscriber: PartialObserver<any> | ((value: number) => void), number: number) { super(subscriber); // @ts-ignore this.number = number; } protected _next(value: number): void { // @ts-ignore this.destination.next(value * this.number); } } export const multiply = (number: number) => (source: any) => source.lift({ call(subscriber: Subscriber<number>, source: any): any { source.subscribe(new MultiplySubscriber(subscriber, number)) } }); // src/index.ts observable$.pipe(multiply(3)).subscribe(subscriber); observable$.pipe(multiply(45)).subscribe(subscriber);
② 複用現有的map實現相同功能的multiply
import {map} from "rxjs/operators"; export const multiply = (number: number) => map((value: number) => value * number);
③ 自定義pipe
構建自定義操做符
const pipe = (...fns: Array<Function>) => (source: any) => fns.reduce((acc: any, fn: any) => fn(acc), source) export const multiply = (number: number) => pipe( map((value: number) => value * number), filter((value: number) => value < 100) ); observable$.pipe(multiply(3)).subscribe(subscriber); observable$.pipe(multiply(45)).subscribe(subscriber);
在前面咱們拆解了與map相似的操做符multiply
的實現方式,這個操做符其實很是好理解,由於有JavaScript原生的map對應。接下來分析真正有RxJs特點的高階map —— 把Observable對象玩得更加出神入化的操做符。
① mergeMap
mergeMap可以解決異步操做的問題,最典型的應該屬於AJAX請求的處理。在網頁應用當中,每點擊按鈕一次就發送一個AJAX
請求給server,同時還要根據返回的結果在ui上更新狀態。傳統的方法來解決這樣的異步操做代碼會比較繁雜。
mergeMap把用戶的點擊操做看做一個數據流。把AJAX的返回結果也看作一個數據流,爲本來繁雜的解決方式提供了一種簡單解。
fromEvent(document.querySelector("#send"), 'click').pipe( mergeMap(() => ajax("apiUrl")) ).subscribe((result: any) => { // 常規處理ajax返回的結果 })
稍微講解了merge的使用場景,接下來看看若是手動實現這樣的功能,該如何作呢?
import {fromEvent, Observable, of, Subscriber} from "rxjs"; import {PartialObserver} from "rxjs/src/internal/types"; import {delay, scan} from "rxjs/operators"; class MergeMapSubscriber extends Subscriber<Function> { constructor( destination: PartialObserver<any> | ((value: any) => void), private readonly func: Function ) { super(destination); this.func = func; } protected _next(value: any): void { const o$ = this.func(value); o$.subscribe({ next: (value: any) => { this.destination.next(value); } }) } } const mergeMap = (func: any) => (source: Observable<any>) => source.lift({ call(subscriber: Subscriber<any>, source: any): any { source.subscribe(new MergeMapSubscriber(subscriber, func)) } }); const observable$ = fromEvent(document, 'click').pipe( scan(i => i + 1, 0), mergeMap((value: any) => of(value).pipe(delay(500))) ); const subscriber = { next: (value: any) => console.log("subscriber function output ::: ", value), complete: () => console.log('has something done.'), error: (err: any) => console.error('something error: ', err) }; observable$.subscribe(subscriber);
② switchMap
上面介紹了一個適合於AJAX的mergeMap,但mergeMap會存在一個問題,每個上游數據都將會引起調用Ajax並且會將每個Ajax結果傳遞給下游。這樣的處理方式彷佛並非適合全部的場景。好比對股票等網頁系統等對數據顯示實時性要求比較高的狀況下相對來講,若是沒有處理好將會出現重大經濟損失。而switchMap正是解決了mergeMap這樣弊端的操做符。
import {fromEvent, Observable, of, Subscriber} from "rxjs"; import {PartialObserver} from "rxjs/src/internal/types"; import {delay, scan} from "rxjs/operators"; class SwitchMapSubscriber extends Subscriber<Function> { private innerSubscription: any; constructor( destination: PartialObserver<any> | ((value: any) => void), private readonly func: Function ) { super(destination); this.func = func; } protected _next(value: any): void { const o$ = this.func(value); // 保證獲取到最新的數據流 if(this.innerSubscription) { this.innerSubscription.unsubscribe(); } this.innerSubscription = o$.subscribe({ next: (value: any) => { this.destination.next(value); } }) } } const switchMap = (func: any) => (source: Observable<any>) => source.lift({ call(subscriber: Subscriber<any>, source: any): any { source.subscribe(new SwitchMapSubscriber(subscriber, func)) } }); export const observable$ = fromEvent(document, 'click').pipe( scan(i => i + 1, 0), switchMap((value: any) => of(value).pipe(delay(500))) );
③ concatMap
concatMap的模式與switchMap基本類似,差異在於獲取最新的數據流後concatMap將其順序推動數據流,而switchMap則將本來的數據流取消訂閱,轉而訂閱最新的數據流。
class ConcatMapSubscriber extends Subscriber<Function> { private innerSubscription: any; private buffer: Array<any> = []; constructor( destination: PartialObserver<any> | ((value: any) => void), private readonly func: Function ) { super(destination); this.func = func; } protected _next(value: any): void { const {isStopped} = this.innerSubscription || {isStopped: true}; if(!isStopped) { this.buffer = [...this.buffer, value]; } else { const o$ = this.func(value); this.innerSubscription = o$.subscribe({ next: (value: any) => { this.destination.next(value); }, complete: () => { if(this.buffer.length) { const [first, ...rest] = this.buffer; this.buffer = rest; this._next(first); } } }); // 順序推動數據流 this.add(this.innerSubscription); } } } const concatMap = (func: any) => (source: Observable<any>) => source.lift({ call(subscriber: Subscriber<any>, source: any): any { source.subscribe(new ConcatMapSubscriber(subscriber, func)) } }); export const observable$ = fromEvent(document, 'click').pipe( scan(i => i + 1, 0), concatMap((value: any) => of(value).pipe(delay(500))), takeUntil(fromEvent(document, 'keydown')) );