RxJS 源碼解析(五)—— Operator III

DevUI是一支兼具設計視角和工程視角的團隊,服務於華爲雲 DevCloud平臺和華爲內部數箇中後臺系統,服務於設計師和前端工程師。
官方網站: devui.design
Ng組件庫: ng-devui(歡迎Star)
官方交流羣:添加DevUI小助手(微信號:devui-official)
DevUIHelper插件:DevUIHelper-LSP(歡迎Star)

引言

在本文開始以前,先定義一些自定義術語,方便閱讀。前端

  • 頂流:調用了操做符的流。
  • 上游流:操做符的內部訂閱器所訂閱的流。
  • 下游流:由操做符的內部訂閱器管理的流。
  • 下游訂閱:訂閱了操做符生成的流的訂閱者。

在上一篇中,我描述了 OuterSubscriber 和 InnerSubscriber 的做用,並將幾個 Join Creation Operator 的源碼解析了一遍。下面,咱們將進入的是 Transformation Operator 的源碼分析。git

在知道了 OuterSubscriber 和 InnerSubscriber 是一種經過委託模式實現管理下游流訂閱的方法後,我發現其實這種實現技巧用於不少的 operators。那麼本篇及下一篇將會介紹更多這種相似的設計。github

(PS:爲了方便描述,我拿了官網的相關的圖片,時序圖的閱讀順序是從左到右。)typescript

基礎映射

map

map 是最爲基礎的映射,他將上游流的值通過運算,轉發給下游訂閱。這部分源碼不是很複雜,實際上就是作了一層轉發。segmentfault

protected _next(value: T) {
  let result: R;
  try {
    result = this.project.call(this.thisArg, value, this.count++);
  } catch (err) {
    this.destination.error(err);
    return;
  }
  this.destination.next(result);
}

scan

Scan 和 的做用跟 reduce 同樣,傳入一個函數把一組數據打平,可是跟 reduce 不同的點在於,每一次結合完畢都會馬上返回結果。緩存

const clicks1 = fromEvent(document, 'click');
const ones1 = clicks.pipe(mapTo(1));
const seed1 = 0;
const count1 = ones.pipe(
  // 輸入的是返回任意值的函數
  scan((acc, one) => acc + one, seed)
);
count.subscribe(x => console.log(x));

這部分的實現一樣也不是很複雜,在拿到上游流數據後,使用 accumulator 對數據進行累加操做。微信

protected _next(value: T): void {
  // 須要判斷是否帶有初始值。
  if (!this.hasSeed) {
    this.seed = value;
    this.hasSeed = true;
    this.destination.next(value);
  } else {
      return this._tryNext(value);
    }
}

private _tryNext(value: T): void {
  const index = this.index++;
  let result: any;
  try {
      // 計算結果
    result = this.accumulator(<R>this.seed, value, index);
  } catch (err) {
    this.destination.error(err);
  }
    // 保存,以備下次使用
  this.seed = result;
  this.destination.next(result);
}

五種基礎複合映射

所謂複合映射,意思就是這些操做符接收的參數是一個帶有上游流數據做爲參數並返回 Observable 的函數,同時把其中的訂閱數據轉發給下游訂閱。前端工程師

mergeMap,switchMap,exhaustMap,concatMap,mergeScan 是五種複合映射操做符,它使得上游流的數據能夠傳遞給下游流,並交由其處理。 concatMap 和 mergeScan 是 mergeMap 的一種特殊狀況,因此咱們只須要關注剩餘的三種。數據結構

mergeMap,switchMap,exhaustMap,這三種操做符的源碼結構分爲這三個部分:併發

  • 經過 lift 操做,將原有的流映射成新的流。
  • 實現 Operator 接口,經過 call 返回一個 Subscriber。
  • 經過繼承 OuterSubscriber 實現這個 Subscriber。

其中,前兩個部分都擁有很是相似的結構,都是經過這種樣板代碼來進行編寫。

export function someMap<T, R, O extends ObservableInput<any>>(
  project: (value: T, index: number) => O,
): OperatorFunction<T, ObservedValueOf<O> | R> {
  return (source: Observable<T>) => source.lift(new SomeMapOperator(project));
}

class SomeMapOperator<T, R> implements Operator<T, R> {
  constructor(private project: (value: T, index: number) => ObservableInput<R>) {
  }

  call(Subscriber: Subscriber<R>, source: any): any {
    return source.subscribe(new SomeMapSubscriber(Subscriber, this.project));
  }
}

經過 _innerSub 提供的內部註冊方法,在裏面建立 InnerSubscriber,並傳入當前的 OuterSubscriber 。

private _innerSub(input: ObservableInput<R>, value: T, index: number): void {
  const innerSubscriber = new InnerSubscriber(this, value, index);
  const destination = this.destination as Subscription;
  destination.add(innerSubscriber);
  const innerSubscription = subscribeToResult<T, R>(this, input, undefined, undefined, innerSubscriber);
  
  // 由於這裏的 input 可能不是 observable, 那麼返回的
  // 訂閱結果也可能跟 innserSubscriber 相等,因此這裏要
  // 處理一下。
  if (innerSubscription !== innerSubscriber) {
    destination.add(innerSubscription);
  }
}

最終,交由 subscribeToResult 建立一個內部訂閱來管理下游流。

mergeMap

mergeMap 提供的是一種合併操做,經過在內部維護了多個下游流的訂閱,使得上游流能夠將數據下發給多個下游流。它提供了一個併發數限制的參數,主要用於控制下游流併發的數量。

export function mergeMap<T, R, O extends ObservableInput<any>>(
  project: (value: T, index: number) => O,
  concurrent: number = Number.POSITIVE_INFINITY
): OperatorFunction<T, ObservedValueOf<O> | R> {
  return (source: Observable<T>) => source.lift(new MergeMapOperator(project, concurrent));
}

下面,咱們關注的點將轉移到 MergeMapSubscriber 。首先看看它的數據結構。

export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> {
  // 標記是否已經完成
  private hasCompleted: boolean = false;
  // 上流 observable 的數據緩存 
  private buffer: T[] = [];
  // 當前正在開啓的流的數量
  private active: number = 0;
  // 數據的索引
  protected index: number = 0;

  constructor(
    // 外部傳入的訂閱者
    destination: Subscriber<R>,
    // 須要合併的 Observable 的工廠
    private project: (value: T, index: number) => ObservableInput<R>,
    // 併發數量
    private concurrent: number = Number.POSITIVE_INFINITY,
  ) {
    super(destination);
  }
  ...
}

Subscriber

MergeMapSubscriber 的 _next 調用的時候,會比較 active (下游流的數量) 與 concurrent (最大併發數)的大小,active 小於 concurrent 則調用 _tryNext,不然將已經到來的數據放入緩衝區中,可是你知道的, JavaScript 並無真正的併發,這就是一個異步隊列。而每一次進行 _tryNext,都會經過 project 來建立一個下游流,同時讓更新 active,將下游流傳入並觸發 _innerSub。

protected _next(value: T): void {
  if (this.active < this.concurrent) {
    this._tryNext(value);
  } else {
    this.buffer.push(value);
  }
}

protected _tryNext(value: T) {
  let result: ObservableInput<R>;
  const index = this.index++;
  try {
    result = this.project(value, index);
  } catch (err) {
    this.destination.error(err);
    return;
  }
  this.active++;
  // 
  this._innerSub(result, value, index);
}

在上游流完成時,會觸發 _complete。

protected _complete(): void {
  this.hasCompleted = true;
  if (this.active === 0 && this.buffer.length === 0) {
    this.destination.complete();
  }
  this.unsubscribe();
}

若是全部的下游流都已經完成,且緩衝區中沒有數據,則通知下游訂閱數據已經輸出完畢。

notify

notifyNext 就是單純的將結果傳遞給下游訂閱,而 notifyComplete 則有意思多了。

經過 notifyComplete ,能夠得知哪些流已經完成任務而且關閉。若是 buffer 中存在數據,那麼將數據交由 _next 發送出去並建立新的下游流。過這種遞歸操做,能夠將全部 buffer 中的數據都發送出去。最後判斷上游流和下游流是否是都已經結束了,若是已經結束了,則通知下游訂閱數據已經輸出完畢。

notifyNext(
  outerValue: T, innerValue: R,
  outerIndex: number, innerIndex: number,
  innerSub: InnerSubscriber<T, R>
): void {
  this.destination.next(innerValue);
}

notifyComplete(innerSub: Subscription): void {
  const buffer = this.buffer;
  this.remove(innerSub);
  this.active--;
  if (buffer.length > 0) {
    this._next(buffer.shift());
  } else if (this.active === 0 && this.hasCompleted) {
    this.destination.complete();
  }
}

switchMap

switchMap 提供的是一個上游流爲主的映射操做,當上遊流的訂閱數據到來的時候,舊的下游流會被取消訂閱,而後從新訂閱一組新的下游流。

export function switchMap<T, R, O extends ObservableInput<any>>(
  project: (value: T, index: number) => O
): OperatorFunction<T, ObservedValueOf<O>|R> {
  return (source: Observable<T>) => source.lift(new SwitchMapOperator(project));
}

Subscriber

innerSubscription 保存了當前下游流的訂閱,因此這個操做符只會維護一個下游流的訂閱。

private index: number = 0;
private innerSubscription: Subscription;

當進行 next 操做的時候,會先建立新的下游流,若是舊的下游流存在,那麼會被取消訂閱。

protected _next(value: T) {
  let result: ObservableInput<R>;
  const index = this.index++;
  try {
    // 上游流的數據到來了,建立新的下游流。
    result = this.project(value, index);
  } catch (error) {
    this.destination.error(error);
    return;
  }

  // 舊的下游流取消訂閱
  const innerSubscription = this.innerSubscription;
  if (innerSubscription) {
    innerSubscription.unsubscribe();
  }

  this._innerSub(result, value, index);
}

該 Subscriber 重寫了_complete 。這裏意味着上游流已經輸出完畢,那麼若是下游訂閱

protected _complete(): void {
  const {innerSubscription} = this;
  if (!innerSubscription || innerSubscription.closed) {
    super._complete();
    return;
  }
  this.unsubscribe();
}

notify

跟以前同樣, notifyNext 依舊是將下游流中的數據轉發出去。主要關注點仍是在於 notifyComplete。由於 innerSubscription 被置爲空了,因此調用 this._complete 無心義,不會觸發到其父類函數。

notifyComplete(innerSub: Subscription): void {
  const destination = this.destination as Subscription;
  destination.remove(innerSub);
  this.innerSubscription = null;
  if (this.isStopped) {
    super._complete();
  }
}

若是當前的下游流已經完成了,那麼就要將它從下游訂閱(destination)中移除,若是上游流已經中止(error 或者 complete 被調用,或者被取消訂閱),那麼還得調用 super._complete 表示已經完成。

exhaustMap

switchMap 相反, exhaustMap 提供了一種如下游流爲主的映射操做。若是下游流已經開啓,那麼上游流以後到來的訂閱數據都將會被拋棄,直到該下游流完成訂閱。下游流完成訂閱後,上游流的數據纔會繼續跟新的下游流結合,並造成新的訂閱。

export function exhaustMap<T, R, O extends ObservableInput<any>>(
  project: (value: T, index: number) => O,
): OperatorFunction<T, ObservedValueOf<O>|R> {
  return (source: Observable<T>) => source.lift(new ExhaustMapOperator(project));
}

Subscriber

exhaustMap 的實現很簡單,經過維護 hasSubscription 這樣一個內部狀態,標記下游流是否被訂閱了。 hasCompleted 則是上游流完成狀況的標記。

private hasSubscription = false;
private hasCompleted = false;

訂閱會調用 _next,標記下游流是否已經開啓(訂閱是否已經存在),若是未開啓,則構建新的下游流,並標記 hasSubscriptiontrue

protected _next(value: T): void {
  if (!this.hasSubscription) {
      let result: ObservableInput<R>;
      const index = this.index++;
      try {
        result = this.project(value, index);
      } catch (err) {
        this.destination.error(err);
        return;
      }
      // 標記爲 true
      this.hasSubscription = true;
      this._innerSub(result, value, index);
  }
}

上游流和下游流的數據都已經輸出完畢了,那麼把完成信號傳遞給下游訂閱。

protected _complete(): void {
  this.hasCompleted = true;
  if (!this.hasSubscription) {
    this.destination.complete();
  }
  this.unsubscribe();
}

notify

若是下游流的數據輸出完畢,那麼就應該要將 hasSubscription 標記爲 false

notifyComplete(innerSub: Subscription): void {
  const destination = this.destination as Subscription;
  destination.remove(innerSub);

  // 標記爲 false
  this.hasSubscription = false;

  // 此處判斷上游流是否已經完成
  if (this.hasCompleted) {
    this.destination.complete();
  }
}

concatMap

concatMap 是 mergeMap 的一種特殊形式。

export function concatMap<T, R, O extends ObservableInput<any>>(
  project: (value: T, index: number) => O,
): OperatorFunction<T, ObservedValueOf<O>|R> {
  return mergeMap(project, 1);
}

mergeScan

mergeScan 的源碼跟 mergeMap 相似。只不過就是把傳入的函數替換了一下,而且在內部緩存了上一個結合後的值。

const clicks2 = fromEvent(document, 'click');
const ones2 = click$.pipe(mapTo(1));
const seed2 = 0;
const count2 = one$.pipe(
  // 輸入一個 Observable 工廠
  mergeScan((acc, one) => of(acc + one), seed),
);

concat & merge

上一篇中,關於 concat 和 merge 兩個相關的 operators 並無講到,由於這它們其實最終都是調用 mergeMap。

小結

經過這三個不一樣的映射操做符,使得上游流能夠經過必定的方式跟下游流結合。那麼,結合一張圖,能夠看看相關操做符的關係。

ob_map_rlt

對這些操做符分一下類。

  • 屬於 Transformation Operators 的有:concatMap, concatMapTo, mergeMap, mergeMapTo, switchMap,switchMapTo,exhaustMap,exhaustMapTo。
  • 屬於 Join Creation Operators 的有: merge, concat。
  • 屬於 Join Operators 的有: mergeAll, concatAll, switchAll, startWith,endWith。

零散的高階操做符

expand

expand 將傳入的 Observable 工廠進行遞歸操做。與上面的複合映射相似,expand 也是一種複合映射,只不過,他會不斷的去複合下游流的數據,也就是相似上圖的模式。

Subscriber

爲了實現相對應的功能,expand 定義瞭如下數據結構。

export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
  // 當前索引
  private index: number = 0;
  // 已啓動的下游流的數量
  private active: number = 0;
  // 上游流是否已經完成
  private hasCompleted: boolean = false;
  // 對於索引的緩存數據
  private buffer: any[];
  // 下游流工廠
  private project: (value: T, index: number) => ObservableInput<R>,
    // 併發數  
     private concurrent: number;
}

上游流數據到來的時候,跟 mergeMap 比較相似,也會比較 active 和 concurrent,若是 active 大於 concurrent ,那麼便會用buffer緩存上游流的數據,若是 active 小於 concurrent ,那麼直接發送數據給到下游訂閱,並訂閱一個新的下游流。須要注意的一點,爲了防止爆棧,expand 在這裏加了一個判斷條件,在 notify 中,將利用這一條件,來結束遞歸。

protected _next(value: any): void {
  const destination = this.destination;

  if (destination.closed) {
    this._complete();
    return;
  }

  const index = this.index++;
  if (this.active < this.concurrent) {
    destination.next(value);
    try {
      const { project } = this;
      const result = project(value, index);
      this.subscribeToProjection(result, value, index);
    } catch (e) {
      destination.error(e);
    }
  } else {
    this.buffer.push(value);
  }
}
// 訂閱新的下游流
private subscribeToProjection(result: any, value: T, index: number): void {
  this.active++;
  const destination = this.destination as Subscription;
  destination.add(subscribeToResult<T, R>(this, result, value, index));
}

當上遊流完成時,須要標記 hasComplete 爲 true。這一步是結束遞歸的重要標誌。

protected _complete(): void {
  this.hasCompleted = true;
  if (this.hasCompleted && this.active === 0) {
    this.destination.complete();
  }
    this.unsubscribe();
}

notify

那麼 expand 是怎麼構成遞歸的呢,當下遊流有數據到來的時候,他會直接調用 _next。最終造成了 _next -> subscribeToProjection -> next -> notifyNext -> _next 這樣的一條遞歸鏈。

notifyNext(
  outerValue: T, 
  innerValue: R,
    outerIndex: number, 
  innerIndex: number,
    innerSub: InnerSubscriber<T, R>
): void {
    this._next(innerValue);
}

下游流完成時,須要根據 hasCompleted 和 buffer 的狀態來決定是否結束遞歸。在這裏,也造成了一條這樣的遞歸鏈: _next -> subscribeToProjection -> next -> notifyComplete -> _next

notifyComplete(innerSub: Subscription): void {
  const buffer = this.buffer;
  const destination = this.destination as Subscription;
  destination.remove(innerSub);
  this.active--;
  if (buffer && buffer.length > 0) {
    this._next(buffer.shift());
  }
  if (this.hasCompleted && this.active === 0) {
    this.destination.complete();
  }
}

exhaust

exhaust 是一種打平操做,它的源碼並無調用 exhaustMap。它的實現思路很簡單,經過判斷當前是否存在前一個下游流訂閱(hasSubscription),來決定當前到來的下游流是否開啓。

private hasCompleted: boolean = false;
private hasSubscription: boolean = false;


protected _next(value: T): void {
  // 若是存在訂閱,那麼拋棄這個值
  if (!this.hasSubscription) {
    this.hasSubscription = true;
    this.add(subscribeToResult(this, value));
  }
}

protected _complete(): void {
  this.hasCompleted = true;
  if (!this.hasSubscription) {
    this.destination.complete();
  }
}

notifyComplete(innerSub: Subscription): void {
  this.remove(innerSub);
  this.hasSubscription = false;
  if (this.hasCompleted) {
    this.destination.complete();
  }
}

總結

本篇主要的內容集中在分析操做符是如何進行數據的映射,那麼下一篇將講解的是 buffer 和 window 相關的緩存操做符是如何運行和實現的。

加入咱們

咱們是DevUI團隊,歡迎來這裏和咱們一塊兒打造優雅高效的人機設計/研發體系。招聘郵箱:muyang2@huawei.com。

做者:zcx(公衆號:Coder寫字的地方)

原文連接:https://mp.weixin.qq.com/s/lrawMOuHNj6GyQJMqK1Now


往期文章推薦

《好用到飛起!VSCode插件DevUIHelper設計開發全攻略(三)》

《Web界面深色模式和主題化開發》

《手把手教你搭建一個灰度發佈環境》

相關文章
相關標籤/搜索