RxJS 源碼解析(二)—— Muticasted Observable

DevUI是一支兼具設計視角和工程視角的團隊,服務於華爲雲 DevCloud平臺和華爲內部數箇中後臺系統,服務於設計師和前端工程師。
官方網站: devui.design
Ng組件庫: ng-devui(歡迎Star)
官方交流羣:添加DevUI小助手(微信號:devui-official)進羣
DevUIHelper插件:DevUIHelper-LSP(歡迎Star)

上一篇,咱們分析了 Oberservable 和 Subscription 的具體實現方法。這一篇,將會了解一系列不一樣的 Muticasted Observable(多播觀察源),這些 Observable 在 RxJS 中主要是以 Subject 命名,它們有如下幾種不一樣的實現:前端

  1. Subject
  2. AnonymousSubject
  3. BehaviorSubject
  4. ReplaySubject
  5. AsyncSubject

所謂 Muticasted Observable,就是這個 Observable 能夠持續的發送數據給到訂閱它的訂閱者們。git

注:文中 RxJS 所使用的源碼版本爲 6.6.0github

Subject

Subject 是最基礎的 Muticasted Observable,訂閱者對其進行訂閱後,將會拿到 Subject 以後發送的數據。可是,若是訂閱者在數據發送後再訂閱,那麼它將永遠都拿不到這條數據。用一下例子簡單說明一下:typescript

const subject = new Subject<number>();

// 訂閱以前調用是不會打印 
subject.next(1);

// 訂閱數據
const subscription = subject.subscribe((value) => {
  console.log('訂閱數據A:' + value);
});

// 訂閱後調用會打印數據。
subject.next(2);


// 打印結果 
// 訂閱數據A:2

Subject 的實現經過將觀察員們放入數組中,若是有事件即將到來,通知當前全部已經在位的觀察員們。segmentfault

class Subject<T> extends Observable<T> {
  observers: Observer<T>[] = [];
  // 省略了一些內容

  next(value?: T) {
    if (!this.isStopped) {
      ...
      const { observers } = this;
      const len = observers.length;
      const copy = observers.slice();
      for (let i = 0; i < len; i++) {
        copy[i].next(value);
      }
    }
  }
  
  // error 相似於 next
  error(err: any) {
       ...
    this.hasError = true;
    this.thrownError = err;
    this.isStopped = true;
    const { observers } = this;
    const len = observers.length;
    const copy = observers.slice();
    for (let i = 0; i < len; i++) {
      copy[i].error(err);
    }
    this.observers.length = 0;
  }
  
  // complete 相似於 next
  complete() {
    ...
    this.isStopped = true;
    const { observers } = this;
    const len = observers.length;
    const copy = observers.slice();
    for (let i = 0; i < len; i++) {
      copy[i].complete();
    }
    this.observers.length = 0;
  }
}

經過重寫了 _subscribe ,將觀察員在訂閱時保存到 observers 數組中。數組

_subscribe(subscriber: Subscriber<T>): Subscription {
   if (this.hasError) {
    subscriber.error(this.thrownError);
    return Subscription.EMPTY;
  } else if (this.isStopped) {
    subscriber.complete();
    return Subscription.EMPTY;
  } else {
    // 若是都沒有問題,在這裏將觀察員保存到 observers 數組。
    this.observers.push(subscriber);
    // 提供一個指向於當前觀察者的訂閱對象。
    return new SubjectSubscription(this, subscriber)
  }
}

Subject 經過建立一個新的指向於它的 observable,完成和 Observable 之間的轉換。微信

asObservable(): Observable<T> {
  const observable = new Observable<T>();
  (<any>observable).source = this;
  return observable;
}

AnonymousSubject

AnonymousSubject 是一個 Subject 的 wrapper,它擁有一個 名爲 destination 的 Observer 成員。 Observer 提供了三個方法接口,分別是 next,error 和 complete。前端工程師

export interface Observer<T> {
  closed?: boolean;
  next: (value: T) => void;
  error: (err: any) => void;
  complete: () => void;
}

AnonymousSubject 經過重載 Subject 的 next,error,complete 將調用轉發到 destination 。因爲其重載這三個重要的方法,其自己並不具有 Subject 所提供的功能。AnonymousSubject 重載這些方法的主要做用是爲了將調用轉發到 destination ,也就是提供了一個app

export class AnonymousSubject<T> extends Subject<T> {
  constructor(protected destination?: Observer<T>, source?: Observable<T>) {
    super();
    this.source = source;
  }

  next(value: T) {
    const { destination } = this;
    if (destination && destination.next) {
      destination.next(value);
    }
  }

  error(err: any) {
    const { destination } = this;
    if (destination && destination.error) {
      this.destination.error(err);
    }
  }

  complete() {
    const { destination } = this;
    if (destination && destination.complete) {
      this.destination.complete();
    }
  }
}

它也重載 _subscribe,那麼也就不具有 Subject 的保存訂閱者的功能了。異步

_subscribe(subscriber: Subscriber<T>): Subscription {
  const { source } = this;
  if (source) {
    return this.source.subscribe(subscriber);
  } else {
    return Subscription.EMPTY;
  }
}

經過閱讀源碼使用到 AnonymousSubject 的地方,我認爲 AnonymousSubject 主要的功能仍是爲 Subject 的 lift 方法提供一個封裝,lift 須要返回的是一個符合當前類的同構對象。

export class Subject<T> extends Observable<T> {
  lift<R>(operator: Operator<T, R>): Observable<R> {
    const subject = new AnonymousSubject(this, this);
    subject.operator = <any>operator;
    return <any>subject;
  }
}

若是直接從新構造一個 Subject 雖然符合同構,可是存儲了過多的冗餘數據,好比,訂閱的時候就會重複把訂閱者添加到 observers 中;若是直接使用 Observable ,那麼又不符合同構,由於 Observable 並不具有 next,error 和 complete 等功能,那麼這就是一種比較穩妥的作法,經過重載複寫 Subject 的一些方法,使得其既具有同構,也不會重複保存冗餘數據。

BehaviorSubject

BehaviorSubject 爲 Subject 提供了數據持久化(相對於 Subject 自己)功能,它自己存儲了已經到來的數據,能夠看看如下例子。

const subject = new BehaviorSubject<number>(0);

// 初始化後直接訂閱
const subscriptionA = subject.subscribe((value) => {
  console.log('訂閱數據A:' + value);
});

// 訂閱以前調用是不會打印 
subject.next(1);

const subscriptionB = subject.subscribe((value) => {
  console.log('訂閱數據B:' + value);
});

// 訂閱後調用會打印數據。
subject.next(2);


// 打印結果 
// 訂閱數據A:0
// 訂閱數據A:1
// 訂閱數據B:1
// 訂閱數據A:2
//

BehaviorSubject 擁有一個 _value 成員,每次調用 next 發送數據的時候,BehaviorSubject 都會將數據保存到 _value 中。

export class BehaviorSubject<T> extends Subject<T> {

  constructor(private _value: T) {
    super();
  }

  get value(): T {
    return this.getValue();
  }
    
  getValue(): T {
    if (this.hasError) {
      throw this.thrownError;
    } else if (this.closed) {
      throw new ObjectUnsubscribedError();
    } else {
      return this._value;
    }
  }
}

調用 next 的時候,會把傳入的 value 保存起來,並交由 Subject 的 next 來處理。

next(value: T): void {
  super.next(this._value = value);
}

當 BehaviorSubject 被訂閱的時候,也會把當前存儲的數據發送給訂閱者,經過重寫 _subscribe 實現這個功能。

_subscribe(subscriber: Subscriber<T>): Subscription {
  const subscription = super._subscribe(subscriber);
  // 只要訂閱器沒有關閉,那麼就將當前存儲的數據發送給訂閱者。
  if (subscription && !(<SubscriptionLike>subscription).closed) {
    subscriber.next(this._value);
  }
  return subscription;
}

AsyncSubject

AsyncSubject 並無提供相應的異步操做,而是把控制最終數據到來的權力交給調用者,訂閱者只會接收到 AsyncSubject 最終的數據。正如官方例子所展現的的,當它單獨調用 next 的時候,訂閱者並不會接收到數據,而只有當它調用 complete 的時候,訂閱者纔會接收到最終到來的消息。如下例子能夠說明 AsyncSubject 的運做方式。

const subject = new AsyncSubject<number>();


const subscriptionA = subject.subscribe((value) => {
  console.log('訂閱數據A:' + value);
});

// 此處不會觸發訂閱
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

const subscriptionB = subject.subscribe((value) => {
  console.log('訂閱數據B:' + value);
});

// 一樣,這裏不會觸發訂閱
subject.next(5);
// 可是完成方法會觸發訂閱
subject.complete();


// 打印結果 
// 訂閱數據A:5
// 訂閱數據B:5

AsyncSubject 經過保留髮送狀態和完成狀態,來達到以上目的。

export class AsyncSubject<T> extends Subject<T> {
  private value: T = null;
  private hasNext: boolean = false;
  private hasCompleted: boolean = false;
}

AsyncSubject 的 next 不會調用 Subject 的 next,而是保存未完成狀態下最新到來的數據。

next(value: T): void {
  if (!this.hasCompleted) {
    this.value = value;
    this.hasNext = true;
  }
}

那麼 Subject 的 next 會在 AsyncSubject 的 complete 方法中調用。

complete(): void {
  this.hasCompleted = true;
  if (this.hasNext) {
    super.next(this.value);
  }
  super.complete();
}

ReplaySubject

ReplaySubject 的做用是在給定的時間內,發送全部的已經收到的緩衝區數據,當時間過時後,將銷燬以前已經收到的數據,從新收集即將到來的數據。因此在構造的時候,須要給定兩個值,一個是緩衝區的大小(bufferSize),一個是給定緩衝區存活的窗口時間(windowTime),須要注意的是 ReplaySubject 所使用的緩衝區的策略是 FIFO。

下面舉出兩個例子,能夠先感覺一下 ReplaySubject 的行爲。第一個以下:

const subject = new ReplaySubject<string>(3);

const subscriptionA = subject.subscribe((value) => {
  console.log('訂閱數據A:' + value);
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

const subscriptionB = subject.subscribe((value) => {
  console.log('訂閱數據B:' + value);
});

// 打印結果:
// 訂閱數據A: 1
// 訂閱數據A: 2
// 訂閱數據A: 3
// 訂閱數據A: 4
// 訂閱數據B:2
// 訂閱數據B:3
// 訂閱數據B:4

下面是第二個例子,這個 ReplaySubject 帶有一個窗口時間。

const subject = new ReplaySubject<string>(10, 1000);

const subscriptionA = subject.subscribe((value) => {
  console.log('訂閱數據A:' + value);
});

subject.next('number');
subject.next('string');
subject.next('object');
subject.next('boolean');

setTimeout(() => {
  subject.next('undefined');
  const subscriptionB = subject.subscribe((value) => {
    console.log('訂閱數據B:' + value);
  });
}, 2000);

// 打印結果
// 訂閱數據A:number
// 訂閱數據A:string
// 訂閱數據A:object
// 訂閱數據A:boolean
// 訂閱數據A:undefined
// 訂閱數據B:undefined

其實 ReplaySubject 跟 BehaviorSubject 很相似,可是不一樣的點在於,ReplaySubject 多了緩衝區和窗口時間,也算是擴展了 BehaviorSubject 的使用場景。

在源碼中,還有第三個參數,那就是調度器(scheduler),通常來講,使用默認調度器已經能夠覆蓋大部分需求,關於調度器的部分會在以後講到。

export class ReplaySubject<T> extends Subject<T> {
  private _events: (ReplayEvent<T> | T)[] = [];
  private _bufferSize: number;
  private _windowTime: number;
  private _infiniteTimeWindow: boolean = false;

  constructor(bufferSize: number = Number.POSITIVE_INFINITY,
              windowTime: number = Number.POSITIVE_INFINITY,
              private scheduler?: SchedulerLike) {
    super();
    this._bufferSize = bufferSize < 1 ? 1 : bufferSize;
    this._windowTime = windowTime < 1 ? 1 : windowTime;

    if (windowTime === Number.POSITIVE_INFINITY) {
      this._infiniteTimeWindow = true;
      this.next = this.nextInfiniteTimeWindow;
    } else {
      this.next = this.nextTimeWindow;
    }
  }
}

上面的源碼中,ReplaySubject 在構造時會根據不一樣的窗口時間來設置 next 具體的運行內容,主要如下兩種方式。

  • nextInfiniteTimeWindow
  • nextTimeWindow

nextInfiniteTimeWindow

若是窗口時間是無限的,那麼就意味着緩衝區數據的約束條件只會是未來的數據。

private nextInfiniteTimeWindow(value: T): void {
  const _events = this._events;
  _events.push(value);
  // 根據數據長度和緩衝區大小,決定哪些數據留在緩衝區。
  if (_events.length > this._bufferSize) {
    _events.shift();
  }

  super.next(value);
}

nextTimeWindow

若是窗口時間是有限的,那麼緩衝區的約束條件就由兩條組成:窗口時間和未來的數據。這時,緩衝區數據就由 ReplayEvent 組成。ReplayEvent 保存了到來的數據的內容和其當前的時間戳。

class ReplayEvent<T> {
  constructor(
    readonly public time: number, 
    readonly public value: T
  ) {}
}

那麼經過 _trimBufferThenGetEvents 對緩衝區數據進行生死判斷後,再把完整的數據交由 Subject 的 next 發送出去。

private nextTimeWindow(value: T): void {
  this._events.push(new ReplayEvent(this._getNow(), value));
  this._trimBufferThenGetEvents();

  super.next(value);
}

_trimBufferThenGetEvents 這個方法是根據不一樣的 event 對象中的時間戳與當前的時間戳進行判斷,同時根據緩衝區的大小,從而獲得這個對象中的數據是否可以保留的憑證。

private _trimBufferThenGetEvents(): ReplayEvent<T>[] {
  const now = this._getNow();
  const _bufferSize = this._bufferSize;
  const _windowTime = this._windowTime;
  const _events = <ReplayEvent<T>[]>this._events;

  const eventsCount = _events.length;
  let spliceCount = 0;

  // 因爲緩衝區的是 FIFO,因此時間的排
  // 序必定是從小到大那麼,只須要找到分
  // 割點,就能決定緩衝數據的最小數據長
  // 度。
  while (spliceCount < eventsCount) {
    if ((now - _events[spliceCount].time) < _windowTime) {
      break;
    }
    spliceCount++;
  }
  
  // 緩衝區長度對切割的優先級會更高,
  // 因此若是超出了緩衝區長度,那麼切
  // 割點要由更大的一方決定。
  if (eventsCount > _bufferSize) {
    spliceCount = Math.max(spliceCount, eventsCount - _bufferSize);
  }

  if (spliceCount > 0) {
    _events.splice(0, spliceCount);
  }

  return _events;
}

訂閱過程

ReplaySubject 的訂閱過程比較特殊,由於訂閱的時候須要發送緩衝區數據,並且在不一樣時間進行訂閱也會使得緩衝區中的數據變化,因此訂閱是須要考慮的問題會比較多。那麼,抓住 _infiniteTimeWindow 這個變量來看代碼會變得很容易。

// 如下源碼省略了調度器相關的代碼
_subscribe(subscriber: Subscriber<T>): Subscription {
  const _infiniteTimeWindow = this._infiniteTimeWindow;
  // 窗口時間是無限的則不用考慮
  // 窗口時間是有限的則更新緩衝區
  const _events = _infiniteTimeWindow ? this._events : this._trimBufferThenGetEvents();
  const len = _events.length;
    
  // 建立 subscription
  let subscription: Subscription;
  if (this.isStopped || this.hasError) {
    subscription = Subscription.EMPTY;
  } else {
    this.observers.push(subscriber);
    subscription = new SubjectSubscription(this, subscriber);
  }
      
  // 分類討論不一樣的約束條件
  if (_infiniteTimeWindow) {
    // 窗口時間不是無限的,緩衝區存儲直接就是數據
    for (let i = 0; i < len && !subscriber.closed; i++) {
      subscriber.next(<T>_events[i]);
    }
  } else {
    // 窗口時間不是無限的,緩衝區存儲的是 ReplayEvent
    for (let i = 0; i < len && !subscriber.closed; i++) {
      subscriber.next((<ReplayEvent<T>>_events[i]).value);
    }
  }

  if (this.hasError) {
    subscriber.error(this.thrownError);
  } else if (this.isStopped) {
    subscriber.complete();
  }

  return subscription;
}

最後

本章我主要簡單分析了 5 種主要的 Subject,這些 Subject 實現了不一樣類型的 Muticasted Observable,對 Observable 進行了擴展。

限於本人能力水平有限,若有錯誤,歡迎指出。

加入咱們

咱們是DevUI團隊,歡迎來這裏和咱們一塊兒打造優雅高效的人機設計/研發體系。招聘郵箱:muyang2@huawei.com。

做者:zcx(公衆號:Coder寫字的地方)

原文連接:https://mp.weixin.qq.com/s/i14brW_Ok8JYGoBIcfhs5Q

往期文章推薦

《RxJS 源碼解析(一): Observable & Subscription》

《Web界面深色模式和主題化開發》

《手把手教你搭建一個灰度發佈環境》

相關文章
相關標籤/搜索