RxJS 源碼解析(四)—— Operator II

DevUI是一支兼具設計視角和工程視角的團隊,服務於華爲雲 DevCloud平臺和華爲內部數箇中後臺系統,服務於設計師和前端工程師。
官方網站: devui.design
Ng組件庫: ng-devui(歡迎Star)
官方交流羣:添加DevUI小助手(微信號:devui-official)進羣
DevUIHelper插件:DevUIHelper-LSP(歡迎Star)

本篇是 RxJS 源碼解析的第四篇文章,使用源碼的版本是 6.6.0 。本篇文章的內容仍然可能會比較多,請耐心閱讀。爲了方便閱讀,文中的相關代碼均通過裁剪和處理。若有不妥,還請指正。前端

在本文開始以前,先定義一些自定義術語,方便閱讀。git

  • 頂流:調用了操做符的流。
  • 上游流:操做符的內部訂閱器所訂閱的流。
  • 下游流:由操做符的內部訂閱器管理的流。
  • 終結訂閱:訂閱了操做符生成的流的訂閱者。

我並不打算像上一篇那樣,抓着幾個操做符一頓輸出。從這篇開始,不管是 Join Operator、仍是 Transformation Operator,都有很大的規律性。因此我想先總結出來它們的規律,再來對 operator 進行分析。github

如何控制下游流

爲了讓操做符能夠控制下游流,RxJS 經過委託模式,讓操做符生成的了一個特定的 Subscriber,它在內部就能拿到全部傳入的下游流的訂閱。所以,在這裏先介紹兩個 Subscriber: OuterSubscriberInnerSubscribertypescript

  • OuterSubscriber :至關於委託者,提供了三個 notify 的接口—— notifyNextnotifyCompletenotifyError
  • InnerSubscriber :至關於被委託者,在它構造的時候須要傳入 OuterSubscriber ,以後觸發相對應的訂閱操做,它會去調用 OuterSubscriber 相對應的 notify 接口。

其內部實現實際上就是把 InnerSubscribernexterrorcomplete 轉發給 OuterSubscribersegmentfault

export class InnerSubscriber<T, R> extends Subscriber<R> {
  private index = 0;

  constructor(private parent: OuterSubscriber<T, R>, public outerValue: T, public outerIndex: number) {
    super();
  }

  protected _next(value: R): void {
    this.parent.notifyNext(this.outerValue, value, this.outerIndex, this.index++, this);
  }

  protected _error(error: any): void {
    this.parent.notifyError(error, this);
    this.unsubscribe();
  }

  protected _complete(): void {
    this.parent.notifyComplete(this);
    this.unsubscribe();
  }
}

而 OuterSubscriber 的默認實現則是將數據交由終結訂閱轉發出去。數組

export class OuterSubscriber<T, R> extends Subscriber<T> {
  notifyNext(outerValue: T, innerValue: R,
             outerIndex: number, innerIndex: number,
             innerSub: InnerSubscriber<T, R>): void {
    this.destination.next(innerValue);
  }

  notifyError(error: any, innerSub: InnerSubscriber<T, R>): void {
    this.destination.error(error);
  }

  notifyComplete(innerSub: InnerSubscriber<T, R>): void {
    this.destination.complete();
  }
}

不一樣的操做符可能會要生成不一樣的 Subscriber,而生成這些 Subscriber 都會調用 subscribeToResult。這個函數會根據傳入的 ObservableInput ,進行類型判斷,並返回一個正確處理後的訂閱。這裏爲了能夠複用,就調用了以前 from 也使用過的 subscribeTo ,在這個函數中,會處理列表、Promise、以及生成器等數據並返回一個訂閱。緩存

export function subscribeToResult<T, R>(
  outerSubscriber: OuterSubscriber<T, R>,
  result: any,
  outerValue?: T,
  outerIndex: number = 0,
  innerSubscriber: Subscriber<R> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex)
): Subscription | undefined {
  if (innerSubscriber.closed) {
    return undefined;
  }
  if (result instanceof Observable) {
    return result.subscribe(innerSubscriber);
  }
  return subscribeTo(result)(innerSubscriber) as Subscription;
}

經過這種設計,使得生成的 Subscriber 擁有控制下游流狀態的能力。這種能力可使得數據裝箱和拆箱都放在同一個 Subscriber 中,同時這樣作也把反作用集中在一個訂閱器中處理,使得操做符在表現上像純函數同樣。微信

下面以及下一篇的內容中,會出現大量 subscribeToResult ,咱們只須要知道,這個函數將訂閱的數據或信息轉發到了 OuterSubscriber 的相關接口中,它的功能再也不贅述。前端工程師

最後,咱們仍是要回歸到 operators 的源碼分析上。由於總體規律和設計已經瞭解完畢,那麼分析每個 operator 的時候,也能經過這些規律來理解某一部分的 operators 爲何要這樣設計。併發

在這裏,咱們繼續沿着上一篇的內容,先分析 Join Creation Operators。

race

所謂 race,意味着全部的流都在進行一場賽跑,跑贏的流能夠留下並繼續發送數據,沒跑贏的只能取消訂閱。

const first = interval(1000).pipe(take(1), mapTo('first'));
const second = interval(2000).pipe(take(1), mapTo('second'));

const race$ = race(first, second);

race$.subscribe((v) => console.log(v));

// 打印結果
// first

race 經過 fromArray 的方式,將輸入的 Observable 交由內部訂閱器來處理。

export function race<T>(...observables: ObservableInput<any>[]): Observable<T> {
  return fromArray(observables).lift(new RaceOperator<T>());
}

RaceSubscriber

RaceSubscriber 保存了這麼幾個狀態。

private hasFirst: boolean = false;
private observables: Observable<any>[] = [];
private subscriptions: Subscription[] = [];

訂閱後上遊流輸出 Observable 會由 observables 緩存起來,然後在上游流輸出完成時,對他們進行訂閱,並保存訂閱對象。

protected _complete() {
  const observables = this.observables;
  const len = observables.length;

  if (len === 0) {
    this.destination.complete();
  } else {
    for (let i = 0; i < len && !this.hasFirst; i++) {
      let observable = observables[i];
      let subscription = subscribeToResult(this, observable, observable as any, i);

      if (this.subscriptions) {
        this.subscriptions.push(subscription);
      }
      this.add(subscription);
    }
    this.observables = null;
  }
}

notify

在 notifyNext 中,RaceSubscriber 能夠獲取下游流的訂閱數據。並對 hasFirst 進行判斷。若是該數據是第一個到達,更新 hasFirst 狀態,並將其他下游流的訂閱取消,這樣作的目的是爲了只讓這個下游流的數據發送給終結訂閱。

notifyNext(
  outerValue: T, innerValue: T,
  outerIndex: number, innerIndex: number,
  innerSub: InnerSubscriber<T, T>
): void {
  if (!this.hasFirst) {
    // 更新狀態
    this.hasFirst = true;

    // 
    for (let i = 0; i < this.subscriptions.length; i++) {
      if (i !== outerIndex) {
        let subscription = this.subscriptions[i];

        subscription.unsubscribe();
        this.remove(subscription);
      }
    }

    this.subscriptions = null;
  }

  this.destination.next(innerValue);
}

zip

zip 是這樣的一種操做符,它如下游流中數據量最少的流爲基準,按照前後順序與其他的下游流結合成新的流。

let age$ = of<number>(27, 25, 29, 30, 35, 40);
let name$ = of<string>('Foo', 'Bar', 'Beer');
let isDev$ = of<boolean>(true, true);

zip(age$, name$, isDev$).pipe(
  map(([age, name, isDev]) => ({ age, name, isDev })),
)
.subscribe(x => console.log(x));

// outputs
// { age: 27, name: 'Foo', isDev: true }
// { age: 25, name: 'Bar', isDev: true }

zip 也同樣,經過 fromArray 的方式,將輸入內容交由內部訂閱器處理。

export function zip<O extends ObservableInput<any>, R>(
  ...observables: O[]
): Observable<ObservedValueOf<O>[]|R> {
  // 經過 fromArray 將傳入的參數以流的形式進入到訂閱中
  return fromArray(observables, undefined).lift(new ZipOperator());
}

ZipSubscriber

訂閱開始,生成 ZipSubscriber,調用 _next。根據輸入流的類型,將其傳入到不一樣的迭代器中,輸入的流的數據類型包含了如下幾種:

  1. 數組
  2. 生成器 或 迭代器
  3. Observable
protected _next(value: any) {
  const iterators = this.iterators;
  if (isArray(value)) {
    iterators.push(new StaticArrayIterator(value));
  } else if (typeof value[Symbol_iterator] === 'function') {
    iterators.push(new StaticIterator(value[Symbol_iterator]()));
  } else {
    iterators.push(new ZipBufferIterator(this.destination, this, value));
  }
}

相較於 靜態數據而言,Observable 纔是咱們關注的重點。在前面已經講過 OuterSubscriber 的做用,我在這裏再也不贅述。 ZipBufferIterator 經過繼承 OuterSubscriber,並實現了相應的操做,而後維護了這些 Observable 並進行訂閱。

在 zip 中,上游流爲 fromArray 生成的 Observable。當它完成時,會把 next 中存儲的迭代器進行循環調用。在 next 的時候咱們能夠看到,會生成與 ObservableInput 相對應的內容 ,的內部若是實現了訂閱功能,那麼就訂閱這些迭代器,不然,直接按照靜態處理。

protected _complete() {
  const iterators = this.iterators;
  const len = iterators.length;

  this.unsubscribe();
  
  if (len === 0) {
    this.destination.complete();
    return;
  }
  
  this.active = len;
  for (let i = 0; i < len; i++) {
    let iterator: ZipBufferIterator<any, any> = <any>iterators[i];
    if (iterator.stillUnsubscribed) {
      const destination = this.destination as Subscription;
      // 持有並管理該迭代器的訂閱
      destination.add(iterator.subscribe(iterator, i));
    } else {
      // 不是 Observable
      this.active--;
    }
  }
}

ZipBufferIterator 繼承了 OuterSubscriber ,那麼它確定也是經過內部維護一個 InnerSubscriber 來將下游流中的數據轉發出去。

class ZipBufferIterator<T, R> extends OuterSubscriber<T, R> implements LookAheadIterator<T> {
  ...
  subscribe(value: any, index: number) {
    const subscriber = new InnerSubscriber(this, index as any, undefined);
    return subscribeToResult<any, any>(this, this.observable, undefined, undefined, subscriber);
  }
  ...
}

notify

ZipBufferIterator 其內部維護了 InnerSubscriber ,那麼意味着數據會由發送到 notifyNext 中,這裏使用了一個數組將數據緩存起來。

notifyNext(outerValue: T, innerValue: any,
            outerIndex: number, innerIndex: number,
            innerSub: InnerSubscriber<T, R>): void {
  this.buffer.push(innerValue);
  this.parent.checkIterators();
}

然後,會調用 ZipSubscriber.checkIterators, 這個方法決定了終結訂閱的數據來源,同時也給出了終結訂閱完成所須要的條件。

checkIterators() {
  const iterators = this.iterators;
  const len = iterators.length;
  const destination = this.destination;


  // 是否是全部的迭代器都存在數據。
  for (let i = 0; i < len; i++) {
    let iterator = iterators[i];
    if (typeof iterator.hasValue === 'function' && !iterator.hasValue()) {
      return;
    }
  }

  let shouldComplete = false;
  // 終結訂閱最終拿到的數據
  const args: any[] = [];
 
  for (let i = 0; i < len; i++) {
    let iterator = iterators[i];
    let result = iterator.next();

    // 判斷迭代器是否已經完成數據輸出
    if (iterator.hasCompleted()) {
      shouldComplete = true;
    }

    // 若是結果已經到了末尾,意味着最短的數據已經輸出完畢。
    // 有可能數據沒到末尾,可是該迭代器已經結束。
    if (result.done) {
      destination.complete();
      return;
    }
    
        // 收集全部迭代器中的數據。
    args.push(result.value);
  }

  // 發送給終結訂閱
  destination.next(args);

  // 
  if (shouldComplete) {
    destination.complete();
  }
}

當某一個下游流完成的時候,緩衝區的存在與否會決定終結訂閱的是否完成。

notifyComplete() {
  if (this.buffer.length > 0) {
    this.isComplete = true;
    this.parent.notifyInactive();
  } else {
    this.destination.complete();
  }
}

若是緩衝區存在數據,那麼還得去調用 ZipSubscriber.notifyInactive ,將信息返回給 ZipSubscriber。到了這一步,意味着某一個下游流已經徹底發送完數據了,那麼還得更新 active 的記錄。若是 active 最終爲 0 ,那麼將通知終結訂閱這個流已經完成了。

notifyInactive() {
  this.active--;
  if (this.active === 0) {
    this.destination.complete();
  }
}

CombineLatest

跟 zip 不同,在 CombineLatest 中,每個下游流的新數據都會和其他下游流的當前的數據相結合,從而造成新的數據並重新的流中轉發出去。

export function combineLatest<O extends ObservableInput<any>, R>(
  ...observables: O[]
): Observable<R> {
  return fromArray(observables).lift(new CombineLatestOperator<ObservedValueOf<O>, R>());
}

export class CombineLatestOperator<T, R> implements Operator<T, R> {
  constructor() {}

  call(subscriber: Subscriber<R>, source: any): any {
    return source.subscribe(new CombineLatestSubscriber());
  }
}

起始狀態跟 zip 同樣,也是經過 fromArray 將 ObservableInput 做爲上游流的數據輸入到 CombineLatestSubscriber 中。把目光鎖定這個 Subscriber,深刻了解一下它的心路歷程。

CombineLatestSubscriber

當數據到來的時候,CombineLatestSubscriber 把下游流集體緩存到一個 observables 數組中。

protected _next(observable: any) {
  this.values.push(NONE);
  this.observables.push(observable);
}

當下遊流緩存完畢的時候,上游流也輸出完畢,那麼便會調用 complete。 在這裏,complete 作的事情僅僅是將全部的下游流進行訂閱,並記錄這些流的訂閱狀態。

protected _complete() {
  const observables = this.observables;
  const len = observables.length;
  if (len === 0) {
    this.destination.complete();
  } else {
    this.active = len;
    this.toRespond = len;
    for (let i = 0; i < len; i++) {
      const observable = observables[i];
      const innerSub = new InnerSubscriber(this, observable, i);
      this.add(subscribeToResult(this, observable, undefined, undefined, innerSub));
    }
  }
}

在訂閱完畢全部的下游流後,它們的數據全都會流到 notify 中。

notify

CombineLatestSubscriber 每接收到一個下游流的數據,都會觸發 notifyNexttoRespond 記錄的是剩餘未收到數據的下游流的數量, 當全部下游流都有數據的時候,那麼纔會開始結合。

values 經過初始化的索引緩存了每個下游流當前的數據,當任意一個下游流的數據到來後,都將會更新 values 中對應索引中的緩存數據。

notifyNext(outerValue: T, innerValue: R,
            outerIndex: number, innerIndex: number,
            innerSub: InnerSubscriber<T, R>): void {
  const values = this.values;
  const oldVal = values[outerIndex];

  let toRespond = 0;
  if (this.toRespond) {
    // 若是這個數據爲NONE,那麼則表明當前的
    // 下游流是首次發送數據,則 toRespond 
    // 要減一。
    if (oldVal === NONE) {
      this.toRespond -= 1;
    }
    toRespond = this.toRespond;
  }

  values[outerIndex] = innerValue;

  if (toRespond === 0) {
    this.destination.next(values.slice());
  }
}

以上即是 combineLastest 的核心設計。

至於 notifyComplete ,則是處理了當前正在運行的下游流和終結訂閱的關係。當 active 減小到零的時候,意味着須要通知終結訂閱全部數據已經輸出完畢了。

notifyComplete(unused: Subscriber<R>): void {
  this.active -= 1;
  if (this.active === 0) {
    this.destination.complete();
  }
}

forkJoin

相較於 combineLatest ,forkJoin 是一種更爲激進的實現。爲何說它激進,由於它判斷合併的條件,從下游流有數據輸出變成了下游流完成數據輸出。它的實現很簡單,只須要計算每一個結束輸出數據的下游流的數量 completed,經過比較 completed 和下游流總數,就能判斷何時結束。須要注意的一點,若是全部流都輸出了數據,那麼 forkJoin 才能把數據發送。

function forkJoinInternal(sources: ObservableInput<any>[], keys: string[] | null): Observable<any> {
  return new Observable(subscriber => {
    const len = sources.length;
    if (len === 0) {
      subscriber.complete();
      return;
    }
    
    const values = new Array(len);
    let completed = 0;
    let emitted = 0;
    // 循環訂閱全部的下游流
    for (let i = 0; i < len; i++) {
      // 將輸入轉換成 Observable 
      const source = from(sources[i]);
      let hasValue = false;
      
      subscriber.add(source.subscribe({
        next: value => {
          if (!hasValue) {
            hasValue = true;
            emitted++;
          }
          // 記錄當前訂閱的值
          values[i] = value;
        },
        error: err => subscriber.error(err),
        // 處理完成時所須要作的工做
        complete: () => {
          // 更新下游流訂閱完成數
          completed++;
          
          // 判斷是否全部的下游流訂閱都已經完成
          if (completed === len || !hasValue) {
            if (emitted === len) {
              // 若是所有的下游流都發送了數據,
              // 那麼終結訂閱將收到全部的下游流
              // 的數據。
              subscriber.next(values);
            }
            subscriber.complete();
          }
        }
      }));
    }
  });
}

merge & concat

merge 經過調用 mergeMap 來建立合併流,concat 也是經過 mergeMap 來建立相同的合併流。這一部分會在下一章講到。它們兩個惟一不一樣的點就是在於併發的數量上。merge能夠併發訂閱多個下游流,而 concat 同一時間只能訂閱一個下游流。

merge 源碼

type Any = ObservableInput<any>;

export function merge<T, R>(...observables: Array<ObservableInput<any> | number>): Observable<R> {
  let concurrent = Number.POSITIVE_INFINITY;
  let last: any = observables[observables.length - 1];
  if (typeof last === 'number') {
    concurrent = <number>observables.pop();
  }
  return mergeMap<Any, Any>(x => x, concurrent)(fromArray<any>(observables));
}

concat 源碼

export function concat1<O extends ObservableInput<any>, R>(...observables: Array<O>): Observable<R> {
  return mergeMap<O, O>(x => x, 1)(of(...observables));
}

partition

partion 是一種分割操做,經過傳入一個判斷函數,使得輸出的流一分爲二。它經過 filter 來實現,將兩個不一樣的流分離。其中須要注意的是,第二個 filter 中,傳入的是一個求反操做。

export function partition<T>(
  predicate: (value: T, index: number) => boolean,
  thisArg?: any
): UnaryFunction<Observable<T>, [Observable<T>, Observable<T>]> {
  return (source: Observable<T>) => [
    filter(predicate, thisArg)(source),
    
    // 此處傳入的是一個 not,他把整個 predicate 封裝。
    filter(not(predicate, thisArg) as any)(source)
  ] as [Observable<T>, Observable<T>];
}

本篇小結

總結一下,本章首先給出一些 operators 的綜合規律,而後再對 Join Creation Operators 進行分析,下面分別用一句話將它們總結一下,就結束本篇的內容。

  • zip 是以數據量最少的下游流爲基準合成的流。
  • combineLatest 是以數據量最多的下游流爲基準合成的流。
  • forkJoin 如下游流完成的狀況做爲基準合成的流。
  • merge & concat 下一章再講。
  • partion 將輸入流一分爲二,造成兩個流。

加入咱們

咱們是DevUI團隊,歡迎來這裏和咱們一塊兒打造優雅高效的人機設計/研發體系。招聘郵箱:muyang2@huawei.com。

做者:zcx(公衆號:Coder寫字的地方)

原文連接:https://mp.weixin.qq.com/s/1b141waT_tAxZR-PZC79kg

往期文章推薦

《RxJS 源碼解析(三)—— Operator I》

《Web界面深色模式和主題化開發》

《手把手教你搭建一個灰度發佈環境》

相關文章
相關標籤/搜索