DevUI是一支兼具設計視角和工程視角的團隊,服務於華爲雲 DevCloud平臺和華爲內部數箇中後臺系統,服務於設計師和前端工程師。
官方網站:[devui.design
]( https://devui.design/)Ng組件庫: ng-devui(歡迎Star)
官方交流羣:添加DevUI小助手(微信號:devui-official)
DevUIHelper插件:DevUIHelper-LSP(歡迎Star)
在這以前,我一直都沒有講過 Scheduler 的做用,那麼本章就開始講解 Scheduler 的設計思路和基本結構。RxJS 的存在是爲了處理異步 IO,而異步 IO 所包含的一系列 API 確定也是要通過進一步的封裝才能讓 RxJS 中的異步操做使用。前端
能夠看到,它主要仍是根據 JS 的所可以提供的異步能力來設計這些基本結構。git
setInterval
實現。AsyncScheduler
,可是 QueueAction
是一種鏈式結構,使得調度以迭代器的形式進行。reqeustAnimationFrame
實現了幀調度器。Promise.resolve().then()
實現的微任務調度器。首先,SchedulerLike 提供瞭如下兩個接口。github
export interface SchedulerLike { // 標記當前時間 now(): number; // 開啓調度的基礎接口 schedule<T>( work: (this: SchedulerAction<T>, state?: T) => void, delay?: number, state?: T ): Subscription; }
Scheduler 則實現了這些接口。算法
export class Scheduler implements SchedulerLike { // 獲取當前時間戳 public static now: () => number = () => Date.now(); constructor( private SchedulerAction: typeof Action, now: () => number = Scheduler.now ) { this.now = now; } public now: () => number; // 直接調用 action 的 schedule public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription { return new this.SchedulerAction<T>(this, work).schedule(state, delay); } }
Scheduler 爲後續的繼承它的調度器定義了建立方式,經過傳入一個 Action 工廠,使得內部能夠構造特定的 Action 。而 Action 繼承了 Subscription,意味着 Action 其實是一種的訂閱器。segmentfault
export class Action<T> extends Subscription { constructor(scheduler: Scheduler, work: (this: SchedulerAction<T>, state?: T) => void) { super(); } // Action 開始調度 public schedule(state?: T, delay: number = 0): Subscription { return this; } }
上面的設計是一種名爲 Template Method 的設計模式,這種方法有效地約束了後續的不一樣的 Scheduler 的實現。設計模式
定義一個操做中的算法的骨架,而將一些步驟延遲到子類中。它使得子類能夠不改變一個算法的結構便可重定義該算法的某些特定步驟。
先來了解一下 Scheduler 的子類 AsyncScheduler,餘下全部的 Scheduler 都會繼承它。在這裏,先不急着進行源碼分析,咱們須要先爲了弄清楚調度器的運行原理,瞭解調度器是如何對異步 API 進行封裝的。微信
首先,調度器自己也是基於觀察者模式來進行設計,可是它又獨立於 Rxjs 的 Observable。通常來講, AsyncScheduler 是這樣調用的。前端工程師
const scheduler = AsyncScheduler(AsyncAction); const subscription = async.schedule(function (counter) { console.log(counter); // this 綁定了 AsyncAction this.schedule(counter + 1, 1000); }, 1000, 1); // subscription.unsubscribe();
它的調用棧是這樣的。less
AsyncScheduler.schedule AsyncAction.schedule AsyncAction.requestAsyncId listOnTimeout // 原生事件 processTimers // 原生事件 AsyncScheduler.flush AsyncAction.execute AsyncAction.\_execute AsyncAction.work
跟着調用棧分析源碼來溯源,在 AsyncScheduler 的 schedule
方法中,它先構造了 AsyncAction ,而後調用它的 schedule
。在這個方法中,其實是對 Action 的內部狀態進行更新,因此此處關注的地方就是在於 schedule
如何觸發異步 API。異步
class AsyncAction<T> extends Action<T> { constructor( protected scheduler: AsyncScheduler, protected work: (this: SchedulerAction<T>, state?: T) => void ) { super(scheduler, work); } public schedule(state?: T, delay: number = 0): Subscription { if (this.closed) { return this; } this.state = state; const id = this.id; const scheduler = this.scheduler; // 須要對相應的異步 API 進行取消操做 if (id != null) { this.id = this.recycleAsyncId(scheduler, id, delay); } this.pending = true; this.delay = delay; // 從新配置異步 API this.id = this.id || this.requestAsyncId(scheduler, this.id, delay); return this; } }
能夠看到,從 scheduler 傳入的回調函數最終會被 Action 持有,因此調用棧最終執行的 work
實際上就是回調函數。
requestAsyncId
是調用異步 API 的方法,這個方法在 AsyncAction 最終觸發了 setInterval
這一異步 API。那麼實際上,根據 Template Method 的設計,全部繼承 AsyncAction 的 Action 都會經過這個方法實現相對應的異步 API 。
至於 AsyncAction 爲何會使用 setInterval
而不是 setTimeout
,源代碼裏是這樣說明的。
Actions only execute once by default, unless rescheduled from within the scheduled callback. This allows us to implement single and repeat actions via the same code path, without adding API surface area, as well as mimic traditional recursion but across asynchronous boundaries. However, JS runtimes and timers distinguish between intervals achieved by serialsetTimeout
calls vs. a singlesetInterval
call. An interval of serialsetTimeout
calls can be individufenally delayed, which delays scheduling the nextsetTimeout
, and so on.setInterval
attempts to guarantee the interval callback will be invoked more precisely to the interval period, regardless of load. Therefore, we usesetInterval
to schedule single and repeat actions. If the action reschedules itself with the same delay, the interval is not canceled. If the action doesn't reschedule, or reschedules with a different delay, the interval will be canceled after scheduled callback execution.對於某一個 Action 來講,除非它在調度的回調中被從新調度,那麼它默認只會執行一次。這樣的方式可使得咱們經過統一的代碼實現調度單一或重複的 Actions,而無需添加 API,而且能夠模仿傳統遞歸來擴展異步。然而, JS 的運行時或者計時器分別經過串行的
setTimout
或者是單個setInterval
來獲取調用的定時器。串行的setTimout
定時器能夠單獨延遲,這樣作會延遲c下一個setTimout
的調度,以此類推。而setInterval
則無論程序運行的負載如何,它老是嘗試去確保每一次定時器的回調更加精準的安排到合適的間隔時間。所以,咱們使用setInterval
來安排單一或重複的 Actions,若是 action 以相同的時延調度自己,那麼當前定時器不會被取消。若是 action 只沒有從新調度或者以不一樣的時延從新調度,則安排的回調執行後,改定時器會被取消。
class AsyncAction<T> extends Action<T> { protected requestAsyncId( scheduler: AsyncScheduler, id?: any, delay: number = 0 ): any { // 綁定 scheduler,而且把當前的 AsyncAction 看成參數傳入。 return setInterval(scheduler.flush.bind(scheduler, this), delay); } }
因此,在 AsyncScheduler 中,新增的 flush
方法其實是爲 setInterval 服務的,它做爲異步 API 的回調函數,主要步驟以下。
export class AsyncScheduler extends Scheduler { public flush(action: AsyncAction<any>): void { const {actions} = this; if (this.active) { // 使用了一個隊列保存全部輸入的 Actions actions.push(action); return; } let error: any; this.active = true; // 默認 action 也是隊列中的一員 // 將全部隊列中的 Action 進行調用。 do { if (error = action.execute(action.state, action.delay)) { break; } } while (action = actions.shift()); this.active = false; // 出現錯誤時,取消全部未運行 action 的訂閱 if (error) { // 注意,此處不會重複取消訂閱,由於執行錯誤的Action會先退出隊列,再執行循環。 while (action = actions.shift()) { action.unsubscribe(); } throw error; } } }
上述的 flush 調用了 action 的 execute 方法。該方法也是經過處理 action 的內部狀態來得到執行結果,其中會調用 _execute 這一內部方法,這個內部方法主要做用是調用 AsyncAction.work ,並處理它出現的異常。
class AsyncAction<T> extends Action<T> { public execute(state: T, delay: number): any { if (this.closed) { return new Error('executing a cancelled action'); } this.pending = false; // 獲取異常錯誤 const error = this.\_execute(state, delay); if (error) { return error; } else if (this.pending === false && this.id != null) { this.id = this.recycleAsyncId(this.scheduler, this.id, null); } } protected \_execute(state: T, delay: number): any { let errored: boolean = false; let errorValue: any = undefined; try { // work this.work(state); } catch (e) { errored = true; errorValue = !!e && e || new Error(e); } if (errored) { this.unsubscribe(); return errorValue; } } }
在分析到 Action.schedule 的時候,引用了源碼內部的註釋,其中有一句話很重要,那就是 「若是 action 以相同的時延調度自己,那麼當前定時器不會被取消」,因此 recycleAsyncId
這個方法是須要處理這種狀況。
class AsyncAction<T> extends Action<T> { protected recycleAsyncId(scheduler: AsyncScheduler, id: any, delay: number = 0): any { // this.delay === delay 處理了這種狀況。 if (delay !== null && this.delay === delay && this.pending === false) { return id; } // 取消當前的定時器 clearInterval(id); return undefined; } }
AsyncScheduler 能夠說已經把全部的地基都打好了,它能夠直接拿來用,也能夠繼承並重寫一些相關的接口把相應的異步 API 進行替換。
隊列調度器根據調用者傳入的時延來決定使用同步方式的調度仍是 setInterval
方式的調度。
QueueScheduler 單純繼承了 AsyncScheduler,其主要實如今 QueueAction 中,經過重寫 schedule
、 execute
以及 requestAsyncId
等方法來實現這種功能。
export class QueueAction<T> extends AsyncAction<T> { public schedule(state?: T, delay: number = 0): Subscription { // delay > 0 ,執行異步調度 if (delay > 0) { return super.schedule(state, delay); } this.delay = delay; this.state = state; // 不然直接執行同步調度 this.scheduler.flush(this); return this; } public execute(state: T, delay: number): any { // 根據傳入的 delay 判斷是否直接執行 work (同步執行) return (delay > 0 || this.closed) ? super.execute(state, delay) : this.\_execute(state, delay) ; } protected requestAsyncId(scheduler: QueueScheduler, id?: any, delay: number = 0): any { // 根據傳入的 delay 以及自己的 delay 來決定是否使用異步 if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) { return super.requestAsyncId(scheduler, id, delay); } // delay 爲 0,直接同步調度 return scheduler.flush(this); } }
幀調度器根據調用者傳入的時延來決定使用 requestAnimationFrame
仍是 setInterval
,微任務調度器則是根據時延來決定使用 Promise.reslove().then()
仍是 setInterval
。
二者的調用相似,以致於能夠結合起來分析。
它們的 action 方法均重寫了requestAsyncId
和 recycleAsyncId
, 主要仍是爲了處理不一樣異步 API 。
protected requestAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any { if (delay !== null && delay > 0) { return super.requestAsyncId(scheduler, id, delay); } // 把當前action 加入到 actions 隊列末端 scheduler.actions.push(this); if (!scheduler.scheduled) { // AsapAction 的狀況 const scheduled = Immediate.setImmediate(scheduler.flush.bind(scheduler, null)); // AnimationFrameAction 的狀況 const scheduled = requestAnimationFrame(scheduler.flush.bind(scheduler, null)); scheduler.scheduled = scheduled; } return scheduler.scheduled; } protected recycleAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any { if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) { return super.recycleAsyncId(scheduler, id, delay); } if (scheduler.actions.length === 0) { // AsapAction Immediate.clearImmediate(id); // AnimationFrameAction cancelAnimationFrame(id); scheduler.scheduled = undefined; } return undefined; }
它們的 flush,跟 AsyncScheduler 的 flush 實現思路差很少,依舊是輪詢 actions 隊列調用 action.execute ,只是它們的 flush 須要去處理額外的如下細節。
// export class AnimationFrameScheduler extends AsyncScheduler { export class AsapScheduler extends AsyncScheduler { public flush(action?: AsyncAction<any>): void { this.active = true; this.scheduled = undefined; const {actions} = this; let error: any; let index: number = -1; // 此處順序不能打亂,由於這樣 action = action || actions.shift()!; let count: number = actions.length; do { if (error = action.execute(action.state, action.delay)) { break; } } while (++index < count && (action = actions.shift())); this.active = false; if (error) { while (++index < count && (action = actions.shift())) { action.unsubscribe(); } throw error; } } }
這裏頗有意思的一點, AsapScheduler 並無直接經過 Promise.reslove().then()
來實現。而是把它封裝成 Immediate
,造成 setImmediate
和 clearImmediate
兩個 API ,這樣就使得微任務的調用其餘的定時 API 無異。
內部實現是經過一個 Map 保存標記當前的是第幾個微任務,這裏並不直接保存 Promise,由於 Promise 執行完畢後就自行釋放了,因此它須要的只是一個標記。
let nextHandle = 1; const RESOLVED = (() => Promise.resolve())(); const activeHandles: { \[key: number\]: any } = {}; function findAndClearHandle(handle: number): boolean { if (handle in activeHandles) { delete activeHandles\[handle\]; return true; } return false; } export const Immediate = { setImmediate(cb: () => void): number { const handle = nextHandle++; activeHandles\[handle\] = true; RESOLVED.then(() => findAndClearHandle(handle) && cb()); return handle; }, clearImmediate(handle: number): void { findAndClearHandle(handle); }, };
本篇分析了 RxJS 的調度器相關的一系列內容,經過封裝 JS 異步 API ,調度器實現相對應的異步功能,加強了 RxJS 對異步 IO 的掌控。
咱們是DevUI團隊,歡迎來這裏和咱們一塊兒打造優雅高效的人機設計/研發體系。招聘郵箱:muyang2@huawei.com。
做者:zcx(公衆號:Coder寫字的地方)
原文連接:https://mp.weixin.qq.com/s/vG0aaQmDy7Cqfv0CwJ_d0Q
往期文章推薦