DevUI是一支兼具設計視角和工程視角的團隊,服務於華爲雲 DevCloud平臺和華爲內部數箇中後臺系統,服務於設計師和前端工程師。
官方網站: devui.design
Ng組件庫: ng-devui(歡迎Star)
官方交流羣:添加DevUI小助手(微信號:devui-official)進羣
DevUIHelper插件:DevUIHelper-LSP(歡迎Star)
本篇是 RxJS 源碼解析的第三篇文章,使用源碼的版本是 6.6.0 。本篇文章的內容可能會比較多,請耐心閱讀。爲了方便閱讀,文中的相關代碼均通過裁剪和處理。若有不妥,還請指正。前端
在 RxJS 中,Creation Operator 主要分爲如下兩類:git
在 pipe 中使用的 operator ,我稱之爲 Pipe Operator ,它主要分爲如下幾類:github
限於篇幅本篇將先介紹 Normal Creation Operator ,它的主要做用是幫助開發者快速建立 Observable。typescript
of 、empty 、throwError ,首先講這三個 operator 的重要緣由是,它提供了一系列基礎的操做:next、complete、以及 error。json
const observableA = of(1); const observableB = empty(); const observableC = throwError(Error('test')); observableA.subscribe({ next: (v) => console.log('A: ' + v), complete: () => console.log('A: complete'); error: (e) => console.log('A: error is ' + e); }); observableB.subscribe({ next: (v) => console.log('B: ' + v), complete: () => console.log('B: complete'); error: (e) => console.log('B: error is ' + e); }); observableC.subscribe({ next: (v) => console.log('C: ' + v), complete: () => console.log('C: complete'), error: (e) => console.log(`C: error is (${e}).`), }); // 打印結果 // A: 1 // A: complete // B: complete // C: error is Error: test
它的構建方式以下,其中,調度器是最後一個參數。segmentfault
export function of<T>(...args: Array<T | SchedulerLike>): Observable<T> { let scheduler = args[args.length - 1] as SchedulerLike; if (isScheduler(scheduler)) { args.pop(); return scheduleArray(args as T[], scheduler); } else { return fromArray(args as T[]); } }
of 由兩個函數 fromArray 和 scheduleArray。fromArray 是一個簡單循環的函數,它將數據循環發送給 Observable 的訂閱者。數組
export function fromArray<T>(input: ArrayLike<T>) { return new Observable<T>((subscriber: Subscriber<T>) => { // 循環獲取數據 for (let i = 0, len = array.length; i < len && !subscriber.closed; i++) { subscriber.next(array[i]); } subscriber.complete(); }); }
這部分的代碼很簡單,scheduler 部分能夠忽略。實際上就是在 Observable 中調用 subscriber.complete()
。promise
export function empty(scheduler?: SchedulerLike) { if (scheduler) { return new Observable<never>(subscriber => scheduler.schedule( () => subscriber.complete() )); } else { return new Observable<never>(subscriber => subscriber.complete()); } }
throwError 跟 empty 的實現是一致的,只不過 complete 換成了 error 。瀏覽器
export function throwError(error: any, scheduler?: SchedulerLike): Observable<never> { if (!scheduler) { return new Observable(subscriber => subscriber.error(error)); } else { return new Observable(subscriber => scheduler.schedule( dispatch, 0, { error, subscriber } )); } } // 如下是 調度器中想要執行的狀態。 interface DispatchArg { error: any; subscriber: Subscriber<any>; } // 最終執行的是 subcriber 的 error 方法。 function dispatch({ error, subscriber }: DispatchArg) { subscriber.error(error); }
iif 和 defer 的表現是一致的。微信
let test = false; const observableA = iif( () => test, of('1'), of('2'), ); const observableB = defer(function() { return test ? of('1') : of('2'); });
看到 iif 的源碼的那一刻我震驚了,什麼叫大道至簡(戰術後仰)。
export function iif<T = never, F = never>( condition: () => boolean, trueResult: SubscribableOrPromise<T> = EMPTY, falseResult: SubscribableOrPromise<F> = EMPTY ): Observable<T|F> { // 直接調用了 defer return defer(() => condition() ? trueResult : falseResult); }
defer 原理上比較簡單:在構造 Observable 的時候,在傳入的訂閱函數中返回一個 Subscription。那麼在這個傳入的訂閱函數中,defer 的過程分爲如下三步:
export function defer<R extends ObservableInput<any> | void>(observableFactory: () => R): Observable<ObservedValueOf<R>> { return new Observable<ObservedValueOf<R>>(subscriber => { let input: R | void; try { // 調用工廠函數,獲取輸入的數據。 input = observableFactory(); } catch (err) { subscriber.error(err); return undefined; } // 經過 from 將 input 轉換爲 observable。 const source = input ? from(input as ObservableInput<ObservedValueOf<R>>) : empty(); // 返回一個訂閱器到外部。 return source.subscribe(subscriber); }); }
其中的 ObservedValueOf 是這樣定義的,使用了 ts 的 infer 來推導出 ObservableInput<T> 中 T 的具體類型。
export type ObservedValueOf<OV> = OV extends ObservableInput<infer T> ? T : never;
from 提供了一種映射的功能,能夠將傳入的數據映射成 Observables 。它能夠接受如下參數:
Iterable<T>
Promise<T>
Observable<T>
稍微的修剪了一下,源碼以下:
export function from<T>(input: ObservableInput<T>): Observable<T> { return new Observable<T>(subscribeTo(input)); }
它直接建立一個新的 Observable,而且調用了 subscribeTo ,根據輸入類型,對輸入進行不一樣的處理。
若是輸入是原生數組或者是實現了數組功能的數據結構,那麼直接調用 subscriber.next
把全部數據依次發送給訂閱者。
export const subscribeToArray = <T>(array: ArrayLike<T>) => (subscriber: Subscriber<T>) => { for (let i = 0, len = array.length; i < len && !subscriber.closed; i++) { subscriber.next(array[i]); } subscriber.complete(); };
若是輸入是 Obervable,那麼要經過一個特定的 Symbol 取出 Observable,而後再訂閱它。
(基於 Symbol 的特性,當前不少項目都會使用一個固定的 Symbol 對特定數據取值,來驗證這個數據是否是符合類型)。
export const subscribeToObservable = <T>(obj: any) => (subscriber: Subscriber<T>) => { const obs = obj[Symbol_observable](); if (typeof obs.subscribe !== 'function') { throw new TypeError('Provided object does not correctly implement Symbol.observable'); } else { return obs.subscribe(subscriber); } };
若是輸入是一個 Promise,那麼經過 then 獲取到 Promise 的內容,並將內容發送給訂閱者。
export const subscribeToPromise = <T>(promise: PromiseLike<T>) => (subscriber: Subscriber<T>) => { promise.then( (value) => { if (!subscriber.closed) { subscriber.next(value); subscriber.complete(); } }, (err: any) => subscriber.error(err) ); return subscriber; };
生成器跟數組的方式相似,也是經過循環的方式將數據發送給訂閱者。
export const subscribeToIterable = <T>(iterable: Iterable<T>) => (subscriber: Subscriber<T>) => { const iterator = (iterable as any)[Symbol_iterator](); do { let item: IteratorResult<T>; try { item = iterator.next(); } catch (err) { subscriber.error(err); return subscriber; } if (item.done) { subscriber.complete(); break; } subscriber.next(item.value); if (subscriber.closed) { break; } } while (true); return subscriber; };
generate 可讓你用一種相似 for 循環的方式得到數據流。不過,我目前尚未遇到過很是須要這種方式生成流的方式,若是你遇到這種狀況,歡迎交流。通常來講,我習慣於這樣調用它。
const observable = generate({ initialState: 1, condition: x => x < 5, iterate: x => x + 1, }); observable.subscribe((value) => { console.log(value); }); // 打印結果 // 1 // 2 // 3 // 4
原來的源碼包含了較多的參數判斷,把內部邏輯梳理一下,實際上就是分爲三個大步驟:
export function generate<S>(options: GenerateOptions<S>): Observable<S> { const initialState = options.initialState; const condition = options.condition; const iterate = options.iterate; // 返回 Observable return new Observable<S>(subscriber => { let state = initialState; try { while (true) { // 判斷結束條件 if (condition && !condition(state)) { subscriber.complete(); break; } // 發送數據給訂閱者 subscriber.next(state); // 調用迭代,獲取下一組數據 state = iterate(state); if (subscriber.closed) { break; } } } catch (err) { subscriber.error(err); } return undefined; }); }
其中 GenerateOptions 包含了三個成員,initialState,condition 以及 iterate 。
export interface GenerateOptions<S> { // 初始狀態 initialState: S; // 結束條件 condition?: (x: S) => boolean; // 迭代方式 iterate: (x: S) => S; }
range 能夠建立一個給定範圍的數字流。這個主要就是提供了一個簡單的語義化函數,主要就是經過循環給訂閱者喂數據。
export function range(start: number = 0, count?: number): Observable<number> { return new Observable<number>(subscriber => { if (count === undefined) { count = start; start = 0; } for (let index = 0; index < count; ++index) { subscriber.next(start + index); if (subscriber.closed) { break; } } return undefined; }); }
fromEvent 是的 Observable 能夠封裝一系列的系統事件。既能夠接受 NodeJS EventEmitter,也能夠接受 DOM EventTarget, JQuery-like event target, NodeList 或者 HTMLCollection 等瀏覽器對象。
const clicksA = fromEvent(document, 'click'); const clicksB = fromEvent($(document), 'click'); clicksA.subscribe(x => console.log('A: ', x)); clicksB.subscribe(x => console.log('B: ', x)); // 每當點擊一下頁面,都會打印出 event 。
它的實現很簡單,根據 target 的對象類型調用其對應的事件監聽函數,而後經過 subscriber 調用 next 獲取到訂閱的輸出。爲了方便閱讀,我稍微的改了一下,讓 fromEvent 只支持 DOM EventTarget。
export interface HasEventTargetAddRemove<E> { addEventListener(type: string, listener: ((evt: E) => void) | null, options?: boolean | AddEventListenerOptions): void; removeEventListener(type: string, listener?: ((evt: E) => void) | null, options?: EventListenerOptions | boolean): void; } // 一個只支持 DOM EventTarget 的 fromEvent export function fromEvent<T>( target: HasEventTargetAddRemove<T>, eventName: string, options?: EventListenerOptions, ): Observable<T> { return new Observable<T>(subscriber => { // 處理結果 const handler = (...e: T[]) => subscriber.next(e.length === 1 ? e[0] : e); // 調用 addEventListener,並讓其在 handler 中處理。 target.addEventListener(eventName, handler, options); // 取消訂閱的時候,直接調用 removeEventListener 對 dom 取消訂閱。 // 返回的是一個函數,這個函數負責了取消訂閱的時,所作的內容。 return () => { target.removeEventListener(eventName, handler, options); }; }); }
上面的代碼能夠分解成這三個步驟:
對於其餘的事件監聽,再也不贅述,流程徹底是同樣。
fromEventPattern 則是對 fromEvent 的泛化。
function addClickHandler(handler) { document.addEventListener('click', handler); } function removeClickHandler(handler) { document.removeEventListener('click', handler); } const clicks = fromEventPattern( addClickHandler, removeClickHandler ); clicks.subscribe(x => console.log(x)); // 點擊的時候,就會輸出點擊事件。
它的源碼的與 fromEvent 相似。
export type NodeEventHandler = (...args: any[]) => void; export function fromEventPattern<T>( addHandler: (handler: NodeEventHandler) => any, removeHandler?: (handler: NodeEventHandler, signal?: any) => void, ): Observable<T | T[]> { return new Observable<T | T[]>(subscriber => { const handler = (...e: T[]) => subscriber.next(e.length === 1 ? e[0] : e); // 有一點不一樣的地方在於,獲取了返回值 addHandler 的返回值 let retValue: any; try { retValue = addHandler(handler); } catch (err) { subscriber.error(err); return undefined; } if (!isFunction(removeHandler)) { return undefined; } // 而後在這裏傳入 removeHandler 中 return () => removeHandler(handler, retValue); }); }
它們都是一種特殊的 Operator ,思路應該是源於 Function.bind ,提供一種轉換操做,將帶有回調的函數轉換成 Observable Factory。
function setTimeoutWithCallback(callback: () => void) { setTimeout(() => { callback(); }, 2000); } const obfactory = bindCallback(setTimeoutWithCallback); const ob1 = obfactory(); const ob2 = obfactory(); const now = Date.now(); ob1.subscribe(() => { console.log('ob1' + (Date.now() - now)); }); setTimeout(() => { ob1.subscribe(() => { console.log('ob1 later: ' + (Date.now() - now)); }); ob2.subscribe(() => { console.log('ob2: ' + (Date.now() - now)); }); }, 3000); // 打印結果: // ob1: 2001 // ob1 later: 3004 // ob2: 5008
如下是 bindNodeCallback 的例子。
/* file: ~/desktop/test.json { "name": "Hello World" } */ import * as fs from 'fs'; const readerFactory = bindNodeCallback(fs.readFile); const reader$ = readerFactory('./src/person.json'); reader$.subscribe({ next: (value) => console.log(value.toString()), error: (err) => console.log(err), complete: () => console.log('complete') }); // 若是沒有錯誤,打印結果以下: // { name: 'Hello World' } // complete // 若是有錯誤,打印結果以下: // [Error: ENOENT: no such file or directory, open './src/person.json'] { // errno: -2, // code: 'ENOENT', // syscall: 'open', // path: './src/person' // }
bindCallback 和 bindNodeCallback 的源碼很是相似。
export function bindCallback<T>( callbackFunc: Function ): (...args: any[]) => Observable<T> { return function (this: any, ...args: any[]): Observable<T> { const context = this; // let subject: AsyncSubject<T>; return new Observable<T>(subscriber => { if (!subject) { subject = new AsyncSubject<T>(); const handler = (...innerArgs: any[]) => { subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs); subject.complete(); }; try { callbackFunc.apply(context, [...args, handler]); } catch (err) { subject.error(err); } } return subject.subscribe(subscriber); }); }; }
bindCallback 和 bindNodeCallback 的源碼惟一不一樣的地方就是在於 handler 這個函數處理的內容不一樣,bindNodeCallback 傳入的函數的回調,第一個參數爲是錯誤信息。
const handler = (...innerArgs: any[]) => { const err = innerArgs.shift(); // 若是第一個參數存在,說明有問題。 if (err) { subject.error(err); return; } subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs); subject.complete(); };
源碼中比較有趣的地方在於,建立的時候,返回的工廠函數包含了一個 AsyncSubject。這個 AsyncSubject 保存了已經到來數據,能夠看看例子中,ob1 被訂閱了2次,第二次訂閱後其實是馬上就能拿到返回值;而 ob2 仍要執行一次 setTimeoutWithCallback。這種設計與這個 bind 的語義相吻合。
上面的 operators 中,我已經把 scheduler 相關的內容進行了裁剪,基本上與 scheduler 無關。而 interval 和 timer 都必須經過 scheduler 來相應的定時操做,因此這部分放到了最後。它們是用於建立定時數據源的 operators 。
const observableA = interval(1000).pipe(take(2)); const observableB = timer(500, 1000).pipe(take(3)); console.log('hello'); observableA.subscribe(value => { console.log('A: ' + value); }); observableB.subscribe(value => { console.log('B: ' + value); }); // 打印結果 // hello // B: 0 // A: 0 // B: 1 // A: 1 // B: 2
interval 和 timer 都使用了一個默認的異步調度器,這個異步調度器主要是經過 setInterval 來實現相應的功能,實際上 Rx 把異步調度器經過 interval 和 timer 轉化成 Observable 的形式提供到給用戶。
timer 的實現以下圖所示。它首先建立了一個 Observable ,而後在訂閱函數中,返回調度器的訂閱。在這裏, scheduler 的 schedule 函數返回了一個 Subscription。
export function timer( dueTime: number | Date = 0, period: number, scheduler: SchedulerLike = async ): Observable<number> { return new Observable(subscriber => { let due = 0; // 判斷是否是 Date 類型 if (dueTime instanceof Date) { due = +dueTime - scheduler.now(); } // 判斷是否是 number 類型 if (isNumeric(dueTime)) { due = dueTime as number; } // 此處調用跟 interval 相似。 return scheduler.schedule(dispatch, due, { index: 0, period, subscriber }); }); }
dispatch 其實是一個遞歸函數,這個函數綁定了 SchedulerAction ,經過傳入訂閱者,使得 Action 內部的 setInterval 可以一直調用 subscriber.next
。
interface TimerState { index: number; period: number; subscriber: Subscriber<number>; } function dispatch(this: SchedulerAction<TimerState>, state: TimerState) { const { index, period, subscriber } = state; subscriber.next(index); if (subscriber.closed) { return; } else if (period === -1) { return subscriber.complete(); } state.index = index + 1; this.schedule(state, period); }
如下是 interval 的源碼。
export function interval(period = 0): Observable<number> { if (!isNumeric(period) || period < 0) { period = 0; } const scheduler = async; return new Observable<number>(subscriber => { // 訂閱器接收 scheduler 的訂閱結果。 subscriber.add( scheduler.schedule(dispatch, period, { subscriber, counter: 0, period }) ); return subscriber; }); }
仔細的分析上面的代碼,我發現 interval 的實現實際上就是 timer 的一個約束版本,它能夠改寫成這樣。
export function interval( period = 0, scheduler: SchedulerLike = async, ): Observable<number> { if (!isNumeric(period) || period < 0) { period = 0; } return timer(period, period, sch); }
總體而言,這部分的源碼並無寫得很繞,刪去了 scheduler 相關的內容後,邏輯馬上就變得清晰了起來。同時,從源碼的風格上能夠看到它們由不一樣的人來編寫。
最後,限於本人的水平,若是文章中有錯誤的地方,歡迎指正。
咱們是DevUI團隊,歡迎來這裏和咱們一塊兒打造優雅高效的人機設計/研發體系。招聘郵箱:muyang2@huawei.com。
做者:zcx(公衆號:Coder寫字的地方)
原文連接:https://mp.weixin.qq.com/s/6fVoI_JtSXu6YfZur1TDNw
往期文章推薦