RxJS源碼解析(六)——Scheduler

image

DevUI是一支兼具設計視角和工程視角的團隊,服務於華爲雲 DevCloud平臺和華爲內部數箇中後臺系統,服務於設計師和前端工程師。
官方網站:[devui.design
]( https://devui.design/)Ng組件庫ng-devui(歡迎Star)
官方交流羣:添加DevUI小助手(微信號:devui-official)
DevUIHelper插件:DevUIHelper-LSP(歡迎Star)

引言

在這以前,我一直都沒有講過 Scheduler 的做用,那麼本章就開始講解 Scheduler 的設計思路和基本結構。RxJS 的存在是爲了處理異步 IO,而異步 IO 所包含的一系列 API 確定也是要通過進一步的封裝才能讓 RxJS 中的異步操做使用。前端

能夠看到,它主要仍是根據 JS 的所可以提供的異步能力來設計這些基本結構。git

  • AsyncScheduler:異步調度器,使用 setInterval 實現。
  • QueueScheduler:隊列異步調度器,繼承了 AsyncScheduler,可是 QueueAction 是一種鏈式結構,使得調度以迭代器的形式進行。
  • AnimationFrameScheduler:使用 reqeustAnimationFrame 實現了幀調度器。
  • AsapScheduler:使用 Promise.resolve().then() 實現的微任務調度器。

SchedulerLike 、 Scheduler & Action

首先,SchedulerLike 提供瞭如下兩個接口。github

export interface SchedulerLike {
  // 標記當前時間
  now(): number;


  // 開啓調度的基礎接口
  schedule<T>(
    work: (this: SchedulerAction<T>, state?: T) => void,
    delay?: number,
    state?: T
  ): Subscription;
}

Scheduler 則實現了這些接口。算法

export class Scheduler implements SchedulerLike {


  // 獲取當前時間戳
  public static now: () => number = () => Date.now();


  constructor(
    private SchedulerAction: typeof Action,
    now: () => number = Scheduler.now
) {
    this.now = now;
  }


  public now: () => number;
  // 直接調用 action 的 schedule
  public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
    return new this.SchedulerAction<T>(this, work).schedule(state, delay);
  }
}

Scheduler 爲後續的繼承它的調度器定義了建立方式,經過傳入一個 Action 工廠,使得內部能夠構造特定的 Action 。而 Action 繼承了 Subscription,意味着 Action 其實是一種的訂閱器。segmentfault

export class Action<T> extends Subscription {
  constructor(scheduler: Scheduler, work: (this: SchedulerAction<T>, state?: T) => void) {
    super();
  }
  // Action 開始調度
  public schedule(state?: T, delay: number = 0): Subscription {
    return this;
  }
}

上面的設計是一種名爲 Template Method 的設計模式,這種方法有效地約束了後續的不一樣的 Scheduler 的實現。設計模式

定義一個操做中的算法的骨架,而將一些步驟延遲到子類中。它使得子類能夠不改變一個算法的結構便可重定義該算法的某些特定步驟。

異步調度器

先來了解一下 Scheduler 的子類 AsyncScheduler,餘下全部的 Scheduler 都會繼承它。在這裏,先不急着進行源碼分析,咱們須要先爲了弄清楚調度器的運行原理,瞭解調度器是如何對異步 API 進行封裝的。微信

首先,調度器自己也是基於觀察者模式來進行設計,可是它又獨立於 Rxjs 的 Observable。通常來講, AsyncScheduler 是這樣調用的。前端工程師

const scheduler = AsyncScheduler(AsyncAction);
const subscription = async.schedule(function (counter) {
    console.log(counter);
    // this 綁定了 AsyncAction
    this.schedule(counter + 1, 1000);
}, 1000, 1);


// subscription.unsubscribe();

它的調用棧是這樣的。less

AsyncScheduler.schedule
AsyncAction.schedule
AsyncAction.requestAsyncId
listOnTimeout // 原生事件
processTimers // 原生事件
AsyncScheduler.flush
AsyncAction.execute
AsyncAction.\_execute
AsyncAction.work

AsyncAction.schedule

跟着調用棧分析源碼來溯源,在 AsyncScheduler 的 schedule 方法中,它先構造了 AsyncAction ,而後調用它的 schedule 。在這個方法中,其實是對 Action 的內部狀態進行更新,因此此處關注的地方就是在於 schedule 如何觸發異步 API。異步

class AsyncAction<T> extends Action<T> {
  constructor(
    protected scheduler: AsyncScheduler,
    protected work: (this: SchedulerAction<T>, state?: T) => void
  ) {
    super(scheduler, work);
  }


  public schedule(state?: T, delay: number = 0): Subscription {
    if (this.closed) {
      return this;
    }
    this.state = state;
    const id = this.id;
    const scheduler = this.scheduler;
    // 須要對相應的異步 API 進行取消操做
    if (id != null) {
      this.id = this.recycleAsyncId(scheduler, id, delay);
    }
    this.pending = true;
    this.delay = delay;
    // 從新配置異步 API
    this.id = this.id || this.requestAsyncId(scheduler, this.id, delay);


    return this;
  }
}

能夠看到,從 scheduler 傳入的回調函數最終會被 Action 持有,因此調用棧最終執行的 work 實際上就是回調函數。

AsyncAction.requestAsyncId

requestAsyncId 是調用異步 API 的方法,這個方法在 AsyncAction 最終觸發了 setInterval 這一異步 API。那麼實際上,根據 Template Method 的設計,全部繼承 AsyncAction 的 Action 都會經過這個方法實現相對應的異步 API 。

至於 AsyncAction 爲何會使用 setInterval 而不是 setTimeout,源代碼裏是這樣說明的。

Actions only execute once by default, unless rescheduled from within the scheduled callback. This allows us to implement single and repeat actions via the same code path, without adding API surface area, as well as mimic traditional recursion but across asynchronous boundaries. However, JS runtimes and timers distinguish between intervals achieved by serial setTimeout calls vs. a single setInterval call. An interval of serial setTimeout calls can be individufenally delayed, which delays scheduling the next setTimeout, and so on. setInterval attempts to guarantee the interval callback will be invoked more precisely to the interval period, regardless of load. Therefore, we use setInterval to schedule single and repeat actions. If the action reschedules itself with the same delay, the interval is not canceled. If the action doesn't reschedule, or reschedules with a different delay, the interval will be canceled after scheduled callback execution.

對於某一個 Action 來講,除非它在調度的回調中被從新調度,那麼它默認只會執行一次。這樣的方式可使得咱們經過統一的代碼實現調度單一或重複的 Actions,而無需添加 API,而且能夠模仿傳統遞歸來擴展異步。然而, JS 的運行時或者計時器分別經過串行的 setTimout 或者是單個 setInterval 來獲取調用的定時器。串行的 setTimout 定時器能夠單獨延遲,這樣作會延遲c下一個 setTimout 的調度,以此類推。而 setInterval 則無論程序運行的負載如何,它老是嘗試去確保每一次定時器的回調更加精準的安排到合適的間隔時間。所以,咱們使用 setInterval 來安排單一或重複的 Actions,若是 action 以相同的時延調度自己,那麼當前定時器不會被取消。若是 action 只沒有從新調度或者以不一樣的時延從新調度,則安排的回調執行後,改定時器會被取消。

class AsyncAction<T> extends Action<T> {
  protected requestAsyncId(
    scheduler: AsyncScheduler,
    id?: any,
    delay: number = 0
  ): any {
    // 綁定 scheduler,而且把當前的 AsyncAction 看成參數傳入。
    return setInterval(scheduler.flush.bind(scheduler, this), delay);
  }
}

AsyncScheduler.flush

因此,在 AsyncScheduler 中,新增的 flush 方法其實是爲 setInterval 服務的,它做爲異步 API 的回調函數,主要步驟以下。

  • 若是存在運行中的 Action ,它會保存所用調用它的 Action。
  • 若是不存在運行中的 Action,它會執行全部調用隊列中的 Action.execute
  • 處理 Action.execute 的運行錯誤。
export class AsyncScheduler extends Scheduler {


  public flush(action: AsyncAction<any>): void {


    const {actions} = this;


    if (this.active) {
      // 使用了一個隊列保存全部輸入的 Actions
      actions.push(action);
      return;
    }


    let error: any;
    this.active = true;
    // 默認 action 也是隊列中的一員
    // 將全部隊列中的 Action 進行調用。
    do {
      if (error = action.execute(action.state, action.delay)) {
        break;
      }
    } while (action = actions.shift());


    this.active = false;


    // 出現錯誤時,取消全部未運行 action 的訂閱
    if (error) {
      // 注意,此處不會重複取消訂閱,由於執行錯誤的Action會先退出隊列,再執行循環。
      while (action = actions.shift()) {
        action.unsubscribe();
      }
      throw error;
    }
  }
}

AsyncAction.execute

上述的 flush 調用了 action 的 execute 方法。該方法也是經過處理 action 的內部狀態來得到執行結果,其中會調用 _execute 這一內部方法,這個內部方法主要做用是調用 AsyncAction.work ,並處理它出現的異常。

class AsyncAction<T> extends Action<T> {
  public execute(state: T, delay: number): any {
    if (this.closed) {
      return new Error('executing a cancelled action');
    }
    this.pending = false;
    // 獲取異常錯誤
    const error = this.\_execute(state, delay);
    if (error) {
      return error;
    } else if (this.pending === false && this.id != null) {
      this.id = this.recycleAsyncId(this.scheduler, this.id, null);
    }
  }


  protected \_execute(state: T, delay: number): any {
    let errored: boolean = false;
    let errorValue: any = undefined;
    try {
      // work
      this.work(state);
    } catch (e) {
      errored = true;
      errorValue = !!e && e || new Error(e);
    }
    if (errored) {
      this.unsubscribe();
      return errorValue;
    }
  }
}

AsyncAction.recycleAsyncId

在分析到 Action.schedule 的時候,引用了源碼內部的註釋,其中有一句話很重要,那就是 「若是 action 以相同的時延調度自己,那麼當前定時器不會被取消」,因此 recycleAsyncId 這個方法是須要處理這種狀況。

class AsyncAction<T> extends Action<T> {
  protected recycleAsyncId(scheduler: AsyncScheduler, id: any, delay: number = 0): any {
    // this.delay === delay 處理了這種狀況。
    if (delay !== null && this.delay === delay && this.pending === false) {
      return id;
    }
    // 取消當前的定時器
    clearInterval(id);
    return undefined;
  }
}

運用 Template Method

AsyncScheduler 能夠說已經把全部的地基都打好了,它能夠直接拿來用,也能夠繼承並重寫一些相關的接口把相應的異步 API 進行替換。

隊列調度器

隊列調度器根據調用者傳入的時延來決定使用同步方式的調度仍是 setInterval 方式的調度。

QueueScheduler 單純繼承了 AsyncScheduler,其主要實如今 QueueAction 中,經過重寫 scheduleexecute 以及 requestAsyncId 等方法來實現這種功能。

export class QueueAction<T> extends AsyncAction<T> {
  public schedule(state?: T, delay: number = 0): Subscription {
    // delay > 0 ,執行異步調度
    if (delay > 0) {
      return super.schedule(state, delay);
    }
    this.delay = delay;
    this.state = state;
    // 不然直接執行同步調度
    this.scheduler.flush(this);
    return this;
  }


  public execute(state: T, delay: number): any {
    // 根據傳入的 delay 判斷是否直接執行 work (同步執行)
    return (delay > 0 || this.closed) ?
      super.execute(state, delay) :
      this.\_execute(state, delay) ;
  }


  protected requestAsyncId(scheduler: QueueScheduler, id?: any, delay: number = 0): any {
    // 根據傳入的 delay 以及自己的 delay 來決定是否使用異步
    if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) {
      return super.requestAsyncId(scheduler, id, delay);
    }
    // delay 爲 0,直接同步調度
    return scheduler.flush(this);
  }
}

幀調度器 與 微任務調度器

幀調度器根據調用者傳入的時延來決定使用 requestAnimationFrame 仍是 setInterval ,微任務調度器則是根據時延來決定使用 Promise.reslove().then() 仍是 setInterval

二者的調用相似,以致於能夠結合起來分析。

Action

它們的 action 方法均重寫了requestAsyncIdrecycleAsyncId, 主要仍是爲了處理不一樣異步 API 。

protected requestAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any {
  if (delay !== null && delay > 0) {
    return super.requestAsyncId(scheduler, id, delay);
  }
  // 把當前action 加入到 actions 隊列末端
  scheduler.actions.push(this);


  if (!scheduler.scheduled) {
      // AsapAction 的狀況
      const scheduled = Immediate.setImmediate(scheduler.flush.bind(scheduler, null));


      // AnimationFrameAction 的狀況
      const scheduled = requestAnimationFrame(scheduler.flush.bind(scheduler, null));


      scheduler.scheduled = scheduled;
  }
  return scheduler.scheduled;
}


protected recycleAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any {
  if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) {
    return super.recycleAsyncId(scheduler, id, delay);
  }
  if (scheduler.actions.length === 0) {
    // AsapAction
    Immediate.clearImmediate(id);
    // AnimationFrameAction
    cancelAnimationFrame(id);


    scheduler.scheduled = undefined;
  }
  return undefined;
}

Scheduler

它們的 flush,跟 AsyncScheduler 的 flush 實現思路差很少,依舊是輪詢 actions 隊列調用 action.execute ,只是它們的 flush 須要去處理額外的如下細節。

  • action 傳入可能爲空。
  • 處理 actions 的狀態。
  • 清空 scheduled,使得 scheduler 可以進行下一次調度。
// export class AnimationFrameScheduler extends AsyncScheduler {
export class AsapScheduler extends AsyncScheduler {
  public flush(action?: AsyncAction<any>): void {
    this.active = true;
    this.scheduled = undefined;


    const {actions} = this;
    let error: any;
    let index: number = -1;
    // 此處順序不能打亂,由於這樣
    action = action || actions.shift()!;
    let count: number = actions.length;


    do {
      if (error = action.execute(action.state, action.delay)) {
        break;
      }
    } while (++index < count && (action = actions.shift()));


    this.active = false;


    if (error) {
      while (++index < count && (action = actions.shift())) {
        action.unsubscribe();
      }
      throw error;
    }
  }
}

Immediate

這裏頗有意思的一點, AsapScheduler 並無直接經過 Promise.reslove().then() 來實現。而是把它封裝成 Immediate,造成 setImmediateclearImmediate 兩個 API ,這樣就使得微任務的調用其餘的定時 API 無異。

內部實現是經過一個 Map 保存標記當前的是第幾個微任務,這裏並不直接保存 Promise,由於 Promise 執行完畢後就自行釋放了,因此它須要的只是一個標記。

let nextHandle = 1;
const RESOLVED = (() => Promise.resolve())();
const activeHandles: { \[key: number\]: any } = {};


function findAndClearHandle(handle: number): boolean {
  if (handle in activeHandles) {
    delete activeHandles\[handle\];
    return true;
  }
  return false;
}


export const Immediate = {
  setImmediate(cb: () => void): number {
    const handle = nextHandle++;
    activeHandles\[handle\] = true;
    RESOLVED.then(() => findAndClearHandle(handle) && cb());
    return handle;
  },


  clearImmediate(handle: number): void {
    findAndClearHandle(handle);
  },
};

總結

本篇分析了 RxJS 的調度器相關的一系列內容,經過封裝 JS 異步 API ,調度器實現相對應的異步功能,加強了 RxJS 對異步 IO 的掌控。

加入咱們

咱們是DevUI團隊,歡迎來這裏和咱們一塊兒打造優雅高效的人機設計/研發體系。招聘郵箱:muyang2@huawei.com

做者:zcx(公衆號:Coder寫字的地方)

原文連接:https://mp.weixin.qq.com/s/vG0aaQmDy7Cqfv0CwJ_d0Q

往期文章推薦

《RxJS 源碼解析(五)—— Operator III》

《Web界面深色模式和主題化開發》

《手把手教你搭建一個灰度發佈環境》

相關文章
相關標籤/搜索