RxJS 源碼解析(三)—— Operator I

DevUI是一支兼具設計視角和工程視角的團隊,服務於華爲雲 DevCloud平臺和華爲內部數箇中後臺系統,服務於設計師和前端工程師。
官方網站: devui.design
Ng組件庫: ng-devui(歡迎Star)
官方交流羣:添加DevUI小助手(微信號:devui-official)進羣
DevUIHelper插件:DevUIHelper-LSP(歡迎Star)

本篇是 RxJS 源碼解析的第三篇文章,使用源碼的版本是 6.6.0 。本篇文章的內容可能會比較多,請耐心閱讀。爲了方便閱讀,文中的相關代碼均通過裁剪和處理。若有不妥,還請指正。前端

RxJS 中,Creation Operator 主要分爲如下兩類:git

  • 執行通常建立操做的 Normal Creation Operator
  • 執行復雜的建立操做的 Join Creation Operator

pipe 中使用的 operator ,我稱之爲 Pipe Operator ,它主要分爲如下幾類:github

  • 用於數據映射的 Transformation Operators
  • 過濾用的 Filtering Operators
  • 將當前的 Observable 多播化的 Multicasting Operators
  • 處理錯誤的 Error Handling Operators
  • 工具操做函數 Utility Operators
  • Conditional and Boolean Operators
  • Mathematical and Aggregate Operators

限於篇幅本篇將先介紹 Normal Creation Operator ,它的主要做用是幫助開發者快速建立 Observabletypescript

of , empty & throwError

ofemptythrowError ,首先講這三個 operator 的重要緣由是,它提供了一系列基礎的操做:next、complete、以及 error。json

const observableA = of(1);
const observableB = empty();
const observableC = throwError(Error('test'));

observableA.subscribe({
    next: (v) => console.log('A: ' + v),
    complete: () => console.log('A: complete');
    error: (e) => console.log('A: error is ' + e);
});

observableB.subscribe({
    next: (v) => console.log('B: ' + v),
    complete: () => console.log('B: complete');
    error: (e) => console.log('B: error is ' + e);
});


observableC.subscribe({
    next: (v) => console.log('C: ' + v),
    complete: () => console.log('C: complete'),
    error: (e) => console.log(`C: error is (${e}).`),
});


// 打印結果
// A: 1
// A: complete
// B: complete
// C: error is Error: test

of source code

它的構建方式以下,其中,調度器是最後一個參數。segmentfault

export function of<T>(...args: Array<T | SchedulerLike>): Observable<T> {
  let scheduler = args[args.length - 1] as SchedulerLike;
  if (isScheduler(scheduler)) {
    args.pop();
    return scheduleArray(args as T[], scheduler);
  } else {
    return fromArray(args as T[]);
  }
}

of 由兩個函數 fromArrayscheduleArrayfromArray 是一個簡單循環的函數,它將數據循環發送給 Observable 的訂閱者。數組

export function fromArray<T>(input: ArrayLike<T>) {
  return new Observable<T>((subscriber: Subscriber<T>) => {
      // 循環獲取數據
      for (let i = 0, len = array.length; i < len && !subscriber.closed; i++) {
        subscriber.next(array[i]);
      }
          subscriber.complete();
    });
}

empty source code

這部分的代碼很簡單,scheduler 部分能夠忽略。實際上就是在 Observable 中調用 subscriber.complete()promise

export function empty(scheduler?: SchedulerLike) {
  if (scheduler) {
    return new Observable<never>(subscriber => scheduler.schedule(
      () => subscriber.complete()
    ));
  } else {
    return new Observable<never>(subscriber => subscriber.complete());
  }
}

throwError source code

throwErrorempty 的實現是一致的,只不過 complete 換成了 error 。瀏覽器

export function throwError(error: any, scheduler?: SchedulerLike): Observable<never> {
  if (!scheduler) {
    return new Observable(subscriber => subscriber.error(error));
  } else {
    return new Observable(subscriber => scheduler.schedule(
      dispatch, 
      0, 
      { error, subscriber }
    ));
  }
}

// 如下是 調度器中想要執行的狀態。
interface DispatchArg {
  error: any;
  subscriber: Subscriber<any>;
}
// 最終執行的是 subcriber 的 error 方法。
function dispatch({ error, subscriber }: DispatchArg) {
  subscriber.error(error);
}

iif & defer

iifdefer 的表現是一致的。微信

  • defer 的主要做用是延後了具體 Observable 的生成,是一個 Lazy Observable Factory
  • iif 則是縮小了 defer 的表達範圍,主要做用是加強了Rx 的命令式的語義。
let test = false;
const observableA = iif(
  () => test,
  of('1'),
  of('2'),
);
const observableB = defer(function() {
    return test ? of('1') : of('2');
});

iif Source Code

看到 iif 的源碼的那一刻我震驚了,什麼叫大道至簡(戰術後仰)。

export function iif<T = never, F = never>(
  condition: () => boolean,
  trueResult: SubscribableOrPromise<T> = EMPTY,
  falseResult: SubscribableOrPromise<F> = EMPTY
): Observable<T|F> {
  // 直接調用了 defer
  return defer(() => condition() ? trueResult : falseResult);
}

defer Source Code

defer 原理上比較簡單:在構造 Observable 的時候,在傳入的訂閱函數中返回一個 Subscription。那麼在這個傳入的訂閱函數中,defer 的過程分爲如下三步:

  • 調用工廠,獲取輸入數據。
  • 調用 from 將數據轉換成一個 observable
  • 返回這個 observable 的訂閱。
export function defer<R extends ObservableInput<any> | void>(observableFactory: () => R): Observable<ObservedValueOf<R>> {
  return new Observable<ObservedValueOf<R>>(subscriber => {
    let input: R | void;
    try {
      // 調用工廠函數,獲取輸入的數據。
      input = observableFactory();
    } catch (err) {
      subscriber.error(err);
      return undefined;
    }
    // 經過 from 將 input 轉換爲 observable。
    const source = input ? from(input as ObservableInput<ObservedValueOf<R>>) : empty();
    
    // 返回一個訂閱器到外部。
    return source.subscribe(subscriber);
  });
}

其中的 ObservedValueOf 是這樣定義的,使用了 ts 的 infer 來推導出 ObservableInput<T>T 的具體類型。

export type ObservedValueOf<OV> = OV extends ObservableInput<infer T> ? T : never;

from

from 提供了一種映射的功能,能夠將傳入的數據映射成 Observables 。它能夠接受如下參數:

  • 原生數組 和 Iterable<T>
  • dom 迭代器
  • Promise<T>
  • Observable<T>

稍微的修剪了一下,源碼以下:

export function from<T>(input: ObservableInput<T>): Observable<T> {
 return new Observable<T>(subscribeTo(input));
}

它直接建立一個新的 Observable,而且調用了 subscribeTo ,根據輸入類型,對輸入進行不一樣的處理。

  • 若是輸入是 Observable,調用 subscribeToObservable
  • 若是輸入是原生數組,調用 subscribeToArray
  • 若是輸入是 Promise,調用 subscribeToPromise
  • 若是輸入是生成器,調用 subscribeToIterable

subscribeToArray

若是輸入是原生數組或者是實現了數組功能的數據結構,那麼直接調用 subscriber.next 把全部數據依次發送給訂閱者。

export const subscribeToArray = <T>(array: ArrayLike<T>) => (subscriber: Subscriber<T>) => {
  for (let i = 0, len = array.length; i < len && !subscriber.closed; i++) {
    subscriber.next(array[i]);
  }
  subscriber.complete();
};

subscribeToObservable

若是輸入是 Obervable,那麼要經過一個特定的 Symbol 取出 Observable,而後再訂閱它。

(基於 Symbol 的特性,當前不少項目都會使用一個固定的 Symbol 對特定數據取值,來驗證這個數據是否是符合類型)

export const subscribeToObservable = <T>(obj: any) => (subscriber: Subscriber<T>) => {
  const obs = obj[Symbol_observable]();
  if (typeof obs.subscribe !== 'function') {
    throw new TypeError('Provided object does not correctly implement Symbol.observable');
  } else {
    return obs.subscribe(subscriber);
  }
};

subscribeToPromise

若是輸入是一個 Promise,那麼經過 then 獲取到 Promise 的內容,並將內容發送給訂閱者。

export const subscribeToPromise = <T>(promise: PromiseLike<T>) => (subscriber: Subscriber<T>) => {
  promise.then(
    (value) => {
      if (!subscriber.closed) {
        subscriber.next(value);
        subscriber.complete();
      }
    },
    (err: any) => subscriber.error(err)
  );
  return subscriber;
};

subscribeToIterable

生成器跟數組的方式相似,也是經過循環的方式將數據發送給訂閱者。

export const subscribeToIterable = <T>(iterable: Iterable<T>) => (subscriber: Subscriber<T>) => {
  const iterator = (iterable as any)[Symbol_iterator]();

  do {
    let item: IteratorResult<T>;
    try {
      item = iterator.next();
    } catch (err) {
      subscriber.error(err);
      return subscriber;
    }
    if (item.done) {
      subscriber.complete();
      break;
    }
    subscriber.next(item.value);
    if (subscriber.closed) {
      break;
    }
  } while (true);

  return subscriber;
};

generate

generate 可讓你用一種相似 for 循環的方式得到數據流。不過,我目前尚未遇到過很是須要這種方式生成流的方式,若是你遇到這種狀況,歡迎交流。通常來講,我習慣於這樣調用它。

const observable = generate({
    initialState: 1,
    condition: x => x < 5,
    iterate: x => x + 1,
});

observable.subscribe((value) => {
    console.log(value);
});

// 打印結果
// 1
// 2
// 3
// 4

原來的源碼包含了較多的參數判斷,把內部邏輯梳理一下,實際上就是分爲三個大步驟:

  • 判斷結束條件, 若是爲假表明已經結束,則應該完成訂閱,不然進行下一步。
  • 發送數據訂閱給到訂閱者。
  • 調用迭代方法,生成下一組數據,重複第一步。
export function generate<S>(options: GenerateOptions<S>): Observable<S> {
  const initialState = options.initialState;
  const condition = options.condition;
  const iterate = options.iterate;

  // 返回 Observable
  return new Observable<S>(subscriber => {
    let state = initialState;
    try {
      while (true) {
        // 判斷結束條件
        if (condition && !condition(state)) {
          subscriber.complete();
          break;
        }
        // 發送數據給訂閱者
        subscriber.next(state);

        // 調用迭代,獲取下一組數據
        state = iterate(state);
        
        if (subscriber.closed) {
          break;
        }
      }
    } catch (err) {
      subscriber.error(err);
    }

    return undefined;
  });
}

其中 GenerateOptions 包含了三個成員,initialStatecondition 以及 iterate

export interface GenerateOptions<S> {
  // 初始狀態
  initialState: S;
  
  // 結束條件
  condition?: (x: S) => boolean;

  // 迭代方式
  iterate: (x: S) => S;
}

range

range 能夠建立一個給定範圍的數字流。這個主要就是提供了一個簡單的語義化函數,主要就是經過循環給訂閱者喂數據。

export function range(start: number = 0, count?: number): Observable<number> {
  return new Observable<number>(subscriber => {
    if (count === undefined) {
      count = start;
      start = 0;
    }

    for (let index = 0; index < count; ++index) {
      subscriber.next(start + index);
      if (subscriber.closed) {
        break;
      }
    }

    return undefined;
  });
}

fromEvent & fromEventPattern

fromEvent

fromEvent 是的 Observable 能夠封裝一系列的系統事件。既能夠接受 NodeJS EventEmitter,也能夠接受 DOM EventTargetJQuery-like event target, NodeList 或者 HTMLCollection 等瀏覽器對象。

const clicksA = fromEvent(document, 'click');
const clicksB = fromEvent($(document), 'click');

clicksA.subscribe(x => console.log('A: ', x));
clicksB.subscribe(x => console.log('B: ', x));

// 每當點擊一下頁面,都會打印出 event 。

它的實現很簡單,根據 target 的對象類型調用其對應的事件監聽函數,而後經過 subscriber 調用 next 獲取到訂閱的輸出。爲了方便閱讀,我稍微的改了一下,讓 fromEvent 只支持 DOM EventTarget。

export interface HasEventTargetAddRemove<E> {
  addEventListener(type: string, listener: ((evt: E) => void) | null, options?: boolean | AddEventListenerOptions): void;
  removeEventListener(type: string, listener?: ((evt: E) => void) | null, options?: EventListenerOptions | boolean): void;
}

// 一個只支持 DOM EventTarget 的 fromEvent
export function fromEvent<T>(
  target: HasEventTargetAddRemove<T>,
  eventName: string,
  options?: EventListenerOptions,
): Observable<T> {

  return new Observable<T>(subscriber => {
    // 處理結果
    const handler = (...e: T[]) => subscriber.next(e.length === 1 ? e[0] : e);
    
    // 調用 addEventListener,並讓其在 handler 中處理。
    target.addEventListener(eventName, handler, options);

    // 取消訂閱的時候,直接調用 removeEventListener 對 dom 取消訂閱。
    // 返回的是一個函數,這個函數負責了取消訂閱的時,所作的內容。
    return () => {
      target.removeEventListener(eventName, handler, options);
    };
  });
}

上面的代碼能夠分解成這三個步驟:

  • 在閉包中建立一個 handler 函數,handler 函數最終會調用 subscriber.next
  • target 添加指定事件監聽。
  • subscriber 添加一個銷燬 target 事件監聽的邏輯。

對於其餘的事件監聽,再也不贅述,流程徹底是同樣。

fromEventPattern

fromEventPattern 則是對 fromEvent 的泛化。

function addClickHandler(handler) {
  document.addEventListener('click', handler);
}
 
function removeClickHandler(handler) {
  document.removeEventListener('click', handler);
}
 
const clicks = fromEventPattern(
  addClickHandler,
  removeClickHandler
);
clicks.subscribe(x => console.log(x));
 

// 點擊的時候,就會輸出點擊事件。

它的源碼的與 fromEvent 相似。

export type NodeEventHandler = (...args: any[]) => void;

export function fromEventPattern<T>(
  addHandler: (handler: NodeEventHandler) => any,
  removeHandler?: (handler: NodeEventHandler, signal?: any) => void,
): Observable<T | T[]> {

  return new Observable<T | T[]>(subscriber => {
    const handler = (...e: T[]) => subscriber.next(e.length === 1 ? e[0] : e);

    // 有一點不一樣的地方在於,獲取了返回值 addHandler 的返回值
    let retValue: any;
    try {
      retValue = addHandler(handler);
    } catch (err) {
      subscriber.error(err);
      return undefined;
    }
    
    if (!isFunction(removeHandler)) {
      return undefined;
    }
    
    // 而後在這裏傳入 removeHandler 中
    return () => removeHandler(handler, retValue);
  });
}

bindCallback, bindNodeCallback

它們都是一種特殊的 Operator ,思路應該是源於 Function.bind ,提供一種轉換操做,將帶有回調的函數轉換成 Observable Factory

function setTimeoutWithCallback(callback: () => void) {
  setTimeout(() => {
    callback();
  }, 2000);
}

const obfactory = bindCallback(setTimeoutWithCallback);

const ob1 = obfactory();
const ob2 = obfactory();
const now = Date.now();

ob1.subscribe(() => {
  console.log('ob1' + (Date.now() - now));
});

setTimeout(() => {
  ob1.subscribe(() => {
    console.log('ob1 later: ' + (Date.now() - now));
  });


  ob2.subscribe(() => {
    console.log('ob2: ' + (Date.now() - now));
  });
}, 3000);

// 打印結果:
// ob1: 2001
// ob1 later: 3004
// ob2: 5008

如下是 bindNodeCallback 的例子。

/* 
    file: ~/desktop/test.json
    
    { "name": "Hello World" }
 */

import * as fs from 'fs';

const readerFactory = bindNodeCallback(fs.readFile);

const reader$ = readerFactory('./src/person.json');

reader$.subscribe({
    next: (value) => console.log(value.toString()),
    error: (err) => console.log(err),
    complete: () => console.log('complete')
});

// 若是沒有錯誤,打印結果以下:
// { name: 'Hello World' }
// complete

// 若是有錯誤,打印結果以下:
// [Error: ENOENT: no such file or directory, open './src/person.json'] {
//     errno: -2,
//     code: 'ENOENT',
//     syscall: 'open',
//     path: './src/person'
// }

bindCallback 和 bindNodeCallback 的源碼很是相似。

export function bindCallback<T>(
  callbackFunc: Function
): (...args: any[]) => Observable<T> {
  return function (this: any, ...args: any[]): Observable<T> {
    const context = this;
    // 
    let subject: AsyncSubject<T>;

    return new Observable<T>(subscriber => {
      if (!subject) {
           subject = new AsyncSubject<T>();
        const handler = (...innerArgs: any[]) => {
          subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs);
          subject.complete();
        };
        
        try {
          callbackFunc.apply(context, [...args, handler]);
        } catch (err) {
          subject.error(err);
        }
      }
      return subject.subscribe(subscriber);
    });
  };
}

bindCallback 和 bindNodeCallback 的源碼惟一不一樣的地方就是在於 handler 這個函數處理的內容不一樣,bindNodeCallback 傳入的函數的回調,第一個參數爲是錯誤信息。

const handler = (...innerArgs: any[]) => {
    const err = innerArgs.shift();
  // 若是第一個參數存在,說明有問題。
    if (err) {
      subject.error(err);
      return;
    }
  subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs);
  subject.complete();
};

源碼中比較有趣的地方在於,建立的時候,返回的工廠函數包含了一個 AsyncSubject。這個 AsyncSubject 保存了已經到來數據,能夠看看例子中,ob1 被訂閱了2次,第二次訂閱後其實是馬上就能拿到返回值;而 ob2 仍要執行一次 setTimeoutWithCallback。這種設計與這個 bind 的語義相吻合。

interval & timer

上面的 operators 中,我已經把 scheduler 相關的內容進行了裁剪,基本上與 scheduler 無關。而 intervaltimer 都必須經過 scheduler 來相應的定時操做,因此這部分放到了最後。它們是用於建立定時數據源的 operators

  • interval: 傳入的參數表示每隔指定毫秒發送一條數據。
  • timer:傳入的第一個參數是指第一條發送數據的時間間隔,第二個參數是指後續數據發送的間隔。
const observableA = interval(1000).pipe(take(2));

const observableB = timer(500, 1000).pipe(take(3));

console.log('hello');

observableA.subscribe(value => {
  console.log('A: ' + value);
});

observableB.subscribe(value => {
  console.log('B: ' + value);
});

// 打印結果
// hello
// B: 0
// A: 0
// B: 1
// A: 1
// B: 2

intervaltimer 都使用了一個默認的異步調度器,這個異步調度器主要是經過 setInterval 來實現相應的功能,實際上 Rx 把異步調度器經過 intervaltimer 轉化成 Observable 的形式提供到給用戶。

timer Source Code

timer 的實現以下圖所示。它首先建立了一個 Observable ,而後在訂閱函數中,返回調度器的訂閱。在這裏, schedulerschedule 函數返回了一個 Subscription

export function timer(
    dueTime: number | Date = 0, 
  period: number,
  scheduler: SchedulerLike = async
): Observable<number> {

  return new Observable(subscriber => {
    let due = 0;
    // 判斷是否是 Date 類型
    if (dueTime instanceof Date) {
      due = +dueTime - scheduler.now();
    }
    // 判斷是否是 number 類型
    if (isNumeric(dueTime)) {
      due = dueTime as number;
    }
    
    // 此處調用跟 interval 相似。
    return scheduler.schedule(dispatch, due, {
      index: 0, period, subscriber
    });
  });
}

dispatch 其實是一個遞歸函數,這個函數綁定了 SchedulerAction ,經過傳入訂閱者,使得 Action 內部的 setInterval 可以一直調用 subscriber.next

interface TimerState {
  index: number;
  period: number;
  subscriber: Subscriber<number>;
}

function dispatch(this: SchedulerAction<TimerState>, state: TimerState) {
  const { index, period, subscriber } = state;
  subscriber.next(index);

  if (subscriber.closed) {
    return;
  } else if (period === -1) {
    return subscriber.complete();
  }

  state.index = index + 1;
  this.schedule(state, period);
}

interval Source Code

如下是 interval 的源碼。

export function interval(period = 0): Observable<number> {
  
  if (!isNumeric(period) || period < 0) {
    period = 0;
  }
  
  const scheduler = async;
  
  return new Observable<number>(subscriber => {
    // 訂閱器接收 scheduler 的訂閱結果。
    subscriber.add(
      scheduler.schedule(dispatch, period, { subscriber, counter: 0, period })
    );
    return subscriber;
  });
}

仔細的分析上面的代碼,我發現 interval 的實現實際上就是 timer 的一個約束版本,它能夠改寫成這樣。

export function interval(
    period = 0,
  scheduler: SchedulerLike = async,
): Observable<number> {
  
  if (!isNumeric(period) || period < 0) {
    period = 0;
  }
  
  return timer(period, period, sch);
}

本篇小結

總體而言,這部分的源碼並無寫得很繞,刪去了 scheduler 相關的內容後,邏輯馬上就變得清晰了起來。同時,從源碼的風格上能夠看到它們由不一樣的人來編寫。

最後,限於本人的水平,若是文章中有錯誤的地方,歡迎指正。

加入咱們

咱們是DevUI團隊,歡迎來這裏和咱們一塊兒打造優雅高效的人機設計/研發體系。招聘郵箱:muyang2@huawei.com。

做者:zcx(公衆號:Coder寫字的地方)

原文連接:https://mp.weixin.qq.com/s/6fVoI_JtSXu6YfZur1TDNw

往期文章推薦

《RxJS 源碼解析(二)—— Muticasted Observable》

《Web界面深色模式和主題化開發》

《手把手教你搭建一個灰度發佈環境》

相關文章
相關標籤/搜索