DevUI是一支兼具設計視角和工程視角的團隊,服務於華爲雲 DevCloud平臺和華爲內部數箇中後臺系統,服務於設計師和前端工程師。
官方網站: devui.design
Ng組件庫: ng-devui(歡迎Star)
官方交流羣:添加DevUI小助手(微信號:devui-official)進羣
DevUIHelper插件:DevUIHelper-LSP(歡迎Star)
本篇是 RxJS 源碼解析的第四篇文章,使用源碼的版本是 6.6.0 。本篇文章的內容仍然可能會比較多,請耐心閱讀。爲了方便閱讀,文中的相關代碼均通過裁剪和處理。若有不妥,還請指正。前端
在本文開始以前,先定義一些自定義術語,方便閱讀。git
我並不打算像上一篇那樣,抓着幾個操做符一頓輸出。從這篇開始,不管是 Join Operator、仍是 Transformation Operator,都有很大的規律性。因此我想先總結出來它們的規律,再來對 operator 進行分析。github
爲了讓操做符能夠控制下游流,RxJS 經過委託模式,讓操做符生成的了一個特定的 Subscriber,它在內部就能拿到全部傳入的下游流的訂閱。所以,在這裏先介紹兩個 Subscriber: OuterSubscriber 和 InnerSubscriber 。typescript
其內部實現實際上就是把 InnerSubscriber 的 next,error,complete 轉發給 OuterSubscriber 。segmentfault
export class InnerSubscriber<T, R> extends Subscriber<R> { private index = 0; constructor(private parent: OuterSubscriber<T, R>, public outerValue: T, public outerIndex: number) { super(); } protected _next(value: R): void { this.parent.notifyNext(this.outerValue, value, this.outerIndex, this.index++, this); } protected _error(error: any): void { this.parent.notifyError(error, this); this.unsubscribe(); } protected _complete(): void { this.parent.notifyComplete(this); this.unsubscribe(); } }
而 OuterSubscriber 的默認實現則是將數據交由終結訂閱轉發出去。數組
export class OuterSubscriber<T, R> extends Subscriber<T> { notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, R>): void { this.destination.next(innerValue); } notifyError(error: any, innerSub: InnerSubscriber<T, R>): void { this.destination.error(error); } notifyComplete(innerSub: InnerSubscriber<T, R>): void { this.destination.complete(); } }
不一樣的操做符可能會要生成不一樣的 Subscriber,而生成這些 Subscriber 都會調用 subscribeToResult。這個函數會根據傳入的 ObservableInput ,進行類型判斷,並返回一個正確處理後的訂閱。這裏爲了能夠複用,就調用了以前 from 也使用過的 subscribeTo ,在這個函數中,會處理列表、Promise、以及生成器等數據並返回一個訂閱。緩存
export function subscribeToResult<T, R>( outerSubscriber: OuterSubscriber<T, R>, result: any, outerValue?: T, outerIndex: number = 0, innerSubscriber: Subscriber<R> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex) ): Subscription | undefined { if (innerSubscriber.closed) { return undefined; } if (result instanceof Observable) { return result.subscribe(innerSubscriber); } return subscribeTo(result)(innerSubscriber) as Subscription; }
經過這種設計,使得生成的 Subscriber 擁有控制下游流狀態的能力。這種能力可使得數據裝箱和拆箱都放在同一個 Subscriber 中,同時這樣作也把反作用集中在一個訂閱器中處理,使得操做符在表現上像純函數同樣。微信
下面以及下一篇的內容中,會出現大量 subscribeToResult ,咱們只須要知道,這個函數將訂閱的數據或信息轉發到了 OuterSubscriber 的相關接口中,它的功能再也不贅述。前端工程師
最後,咱們仍是要回歸到 operators 的源碼分析上。由於總體規律和設計已經瞭解完畢,那麼分析每個 operator 的時候,也能經過這些規律來理解某一部分的 operators 爲何要這樣設計。併發
在這裏,咱們繼續沿着上一篇的內容,先分析 Join Creation Operators。
所謂 race,意味着全部的流都在進行一場賽跑,跑贏的流能夠留下並繼續發送數據,沒跑贏的只能取消訂閱。
const first = interval(1000).pipe(take(1), mapTo('first')); const second = interval(2000).pipe(take(1), mapTo('second')); const race$ = race(first, second); race$.subscribe((v) => console.log(v)); // 打印結果 // first
race 經過 fromArray 的方式,將輸入的 Observable 交由內部訂閱器來處理。
export function race<T>(...observables: ObservableInput<any>[]): Observable<T> { return fromArray(observables).lift(new RaceOperator<T>()); }
RaceSubscriber 保存了這麼幾個狀態。
private hasFirst: boolean = false; private observables: Observable<any>[] = []; private subscriptions: Subscription[] = [];
訂閱後上遊流輸出 Observable 會由 observables 緩存起來,然後在上游流輸出完成時,對他們進行訂閱,並保存訂閱對象。
protected _complete() { const observables = this.observables; const len = observables.length; if (len === 0) { this.destination.complete(); } else { for (let i = 0; i < len && !this.hasFirst; i++) { let observable = observables[i]; let subscription = subscribeToResult(this, observable, observable as any, i); if (this.subscriptions) { this.subscriptions.push(subscription); } this.add(subscription); } this.observables = null; } }
在 notifyNext 中,RaceSubscriber 能夠獲取下游流的訂閱數據。並對 hasFirst 進行判斷。若是該數據是第一個到達,更新 hasFirst 狀態,並將其他下游流的訂閱取消,這樣作的目的是爲了只讓這個下游流的數據發送給終結訂閱。
notifyNext( outerValue: T, innerValue: T, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, T> ): void { if (!this.hasFirst) { // 更新狀態 this.hasFirst = true; // for (let i = 0; i < this.subscriptions.length; i++) { if (i !== outerIndex) { let subscription = this.subscriptions[i]; subscription.unsubscribe(); this.remove(subscription); } } this.subscriptions = null; } this.destination.next(innerValue); }
zip 是這樣的一種操做符,它如下游流中數據量最少的流爲基準,按照前後順序與其他的下游流結合成新的流。
let age$ = of<number>(27, 25, 29, 30, 35, 40); let name$ = of<string>('Foo', 'Bar', 'Beer'); let isDev$ = of<boolean>(true, true); zip(age$, name$, isDev$).pipe( map(([age, name, isDev]) => ({ age, name, isDev })), ) .subscribe(x => console.log(x)); // outputs // { age: 27, name: 'Foo', isDev: true } // { age: 25, name: 'Bar', isDev: true }
zip 也同樣,經過 fromArray 的方式,將輸入內容交由內部訂閱器處理。
export function zip<O extends ObservableInput<any>, R>( ...observables: O[] ): Observable<ObservedValueOf<O>[]|R> { // 經過 fromArray 將傳入的參數以流的形式進入到訂閱中 return fromArray(observables, undefined).lift(new ZipOperator()); }
訂閱開始,生成 ZipSubscriber,調用 _next。根據輸入流的類型,將其傳入到不一樣的迭代器中,輸入的流的數據類型包含了如下幾種:
protected _next(value: any) { const iterators = this.iterators; if (isArray(value)) { iterators.push(new StaticArrayIterator(value)); } else if (typeof value[Symbol_iterator] === 'function') { iterators.push(new StaticIterator(value[Symbol_iterator]())); } else { iterators.push(new ZipBufferIterator(this.destination, this, value)); } }
相較於 靜態數據而言,Observable 纔是咱們關注的重點。在前面已經講過 OuterSubscriber 的做用,我在這裏再也不贅述。 ZipBufferIterator
經過繼承 OuterSubscriber,並實現了相應的操做,而後維護了這些 Observable 並進行訂閱。
在 zip 中,上游流爲 fromArray 生成的 Observable。當它完成時,會把 next 中存儲的迭代器進行循環調用。在 next 的時候咱們能夠看到,會生成與 ObservableInput 相對應的內容 ,的內部若是實現了訂閱功能,那麼就訂閱這些迭代器,不然,直接按照靜態處理。
protected _complete() { const iterators = this.iterators; const len = iterators.length; this.unsubscribe(); if (len === 0) { this.destination.complete(); return; } this.active = len; for (let i = 0; i < len; i++) { let iterator: ZipBufferIterator<any, any> = <any>iterators[i]; if (iterator.stillUnsubscribed) { const destination = this.destination as Subscription; // 持有並管理該迭代器的訂閱 destination.add(iterator.subscribe(iterator, i)); } else { // 不是 Observable this.active--; } } }
ZipBufferIterator
繼承了 OuterSubscriber ,那麼它確定也是經過內部維護一個 InnerSubscriber 來將下游流中的數據轉發出去。
class ZipBufferIterator<T, R> extends OuterSubscriber<T, R> implements LookAheadIterator<T> { ... subscribe(value: any, index: number) { const subscriber = new InnerSubscriber(this, index as any, undefined); return subscribeToResult<any, any>(this, this.observable, undefined, undefined, subscriber); } ... }
ZipBufferIterator
其內部維護了 InnerSubscriber ,那麼意味着數據會由發送到 notifyNext 中,這裏使用了一個數組將數據緩存起來。
notifyNext(outerValue: T, innerValue: any, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, R>): void { this.buffer.push(innerValue); this.parent.checkIterators(); }
然後,會調用 ZipSubscriber.checkIterators, 這個方法決定了終結訂閱的數據來源,同時也給出了終結訂閱完成所須要的條件。
checkIterators() { const iterators = this.iterators; const len = iterators.length; const destination = this.destination; // 是否是全部的迭代器都存在數據。 for (let i = 0; i < len; i++) { let iterator = iterators[i]; if (typeof iterator.hasValue === 'function' && !iterator.hasValue()) { return; } } let shouldComplete = false; // 終結訂閱最終拿到的數據 const args: any[] = []; for (let i = 0; i < len; i++) { let iterator = iterators[i]; let result = iterator.next(); // 判斷迭代器是否已經完成數據輸出 if (iterator.hasCompleted()) { shouldComplete = true; } // 若是結果已經到了末尾,意味着最短的數據已經輸出完畢。 // 有可能數據沒到末尾,可是該迭代器已經結束。 if (result.done) { destination.complete(); return; } // 收集全部迭代器中的數據。 args.push(result.value); } // 發送給終結訂閱 destination.next(args); // if (shouldComplete) { destination.complete(); } }
當某一個下游流完成的時候,緩衝區的存在與否會決定終結訂閱的是否完成。
notifyComplete() { if (this.buffer.length > 0) { this.isComplete = true; this.parent.notifyInactive(); } else { this.destination.complete(); } }
若是緩衝區存在數據,那麼還得去調用 ZipSubscriber.notifyInactive ,將信息返回給 ZipSubscriber。到了這一步,意味着某一個下游流已經徹底發送完數據了,那麼還得更新 active 的記錄。若是 active 最終爲 0 ,那麼將通知終結訂閱這個流已經完成了。
notifyInactive() { this.active--; if (this.active === 0) { this.destination.complete(); } }
跟 zip 不同,在 CombineLatest 中,每個下游流的新數據都會和其他下游流的當前的數據相結合,從而造成新的數據並重新的流中轉發出去。
export function combineLatest<O extends ObservableInput<any>, R>( ...observables: O[] ): Observable<R> { return fromArray(observables).lift(new CombineLatestOperator<ObservedValueOf<O>, R>()); } export class CombineLatestOperator<T, R> implements Operator<T, R> { constructor() {} call(subscriber: Subscriber<R>, source: any): any { return source.subscribe(new CombineLatestSubscriber()); } }
起始狀態跟 zip 同樣,也是經過 fromArray 將 ObservableInput 做爲上游流的數據輸入到 CombineLatestSubscriber 中。把目光鎖定這個 Subscriber,深刻了解一下它的心路歷程。
當數據到來的時候,CombineLatestSubscriber 把下游流集體緩存到一個 observables 數組中。
protected _next(observable: any) { this.values.push(NONE); this.observables.push(observable); }
當下遊流緩存完畢的時候,上游流也輸出完畢,那麼便會調用 complete。 在這裏,complete 作的事情僅僅是將全部的下游流進行訂閱,並記錄這些流的訂閱狀態。
protected _complete() { const observables = this.observables; const len = observables.length; if (len === 0) { this.destination.complete(); } else { this.active = len; this.toRespond = len; for (let i = 0; i < len; i++) { const observable = observables[i]; const innerSub = new InnerSubscriber(this, observable, i); this.add(subscribeToResult(this, observable, undefined, undefined, innerSub)); } } }
在訂閱完畢全部的下游流後,它們的數據全都會流到 notify 中。
CombineLatestSubscriber 每接收到一個下游流的數據,都會觸發 notifyNext。toRespond 記錄的是剩餘未收到數據的下游流的數量, 當全部下游流都有數據的時候,那麼纔會開始結合。
values 經過初始化的索引緩存了每個下游流當前的數據,當任意一個下游流的數據到來後,都將會更新 values 中對應索引中的緩存數據。
notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber<T, R>): void { const values = this.values; const oldVal = values[outerIndex]; let toRespond = 0; if (this.toRespond) { // 若是這個數據爲NONE,那麼則表明當前的 // 下游流是首次發送數據,則 toRespond // 要減一。 if (oldVal === NONE) { this.toRespond -= 1; } toRespond = this.toRespond; } values[outerIndex] = innerValue; if (toRespond === 0) { this.destination.next(values.slice()); } }
以上即是 combineLastest 的核心設計。
至於 notifyComplete ,則是處理了當前正在運行的下游流和終結訂閱的關係。當 active 減小到零的時候,意味着須要通知終結訂閱全部數據已經輸出完畢了。
notifyComplete(unused: Subscriber<R>): void { this.active -= 1; if (this.active === 0) { this.destination.complete(); } }
相較於 combineLatest ,forkJoin 是一種更爲激進的實現。爲何說它激進,由於它判斷合併的條件,從下游流有數據輸出變成了下游流完成數據輸出。它的實現很簡單,只須要計算每一個結束輸出數據的下游流的數量 completed,經過比較 completed 和下游流總數,就能判斷何時結束。須要注意的一點,若是全部流都輸出了數據,那麼 forkJoin 才能把數據發送。
function forkJoinInternal(sources: ObservableInput<any>[], keys: string[] | null): Observable<any> { return new Observable(subscriber => { const len = sources.length; if (len === 0) { subscriber.complete(); return; } const values = new Array(len); let completed = 0; let emitted = 0; // 循環訂閱全部的下游流 for (let i = 0; i < len; i++) { // 將輸入轉換成 Observable const source = from(sources[i]); let hasValue = false; subscriber.add(source.subscribe({ next: value => { if (!hasValue) { hasValue = true; emitted++; } // 記錄當前訂閱的值 values[i] = value; }, error: err => subscriber.error(err), // 處理完成時所須要作的工做 complete: () => { // 更新下游流訂閱完成數 completed++; // 判斷是否全部的下游流訂閱都已經完成 if (completed === len || !hasValue) { if (emitted === len) { // 若是所有的下游流都發送了數據, // 那麼終結訂閱將收到全部的下游流 // 的數據。 subscriber.next(values); } subscriber.complete(); } } })); } }); }
merge 經過調用 mergeMap 來建立合併流,concat 也是經過 mergeMap 來建立相同的合併流。這一部分會在下一章講到。它們兩個惟一不一樣的點就是在於併發的數量上。merge能夠併發訂閱多個下游流,而 concat 同一時間只能訂閱一個下游流。
type Any = ObservableInput<any>; export function merge<T, R>(...observables: Array<ObservableInput<any> | number>): Observable<R> { let concurrent = Number.POSITIVE_INFINITY; let last: any = observables[observables.length - 1]; if (typeof last === 'number') { concurrent = <number>observables.pop(); } return mergeMap<Any, Any>(x => x, concurrent)(fromArray<any>(observables)); }
export function concat1<O extends ObservableInput<any>, R>(...observables: Array<O>): Observable<R> { return mergeMap<O, O>(x => x, 1)(of(...observables)); }
partion 是一種分割操做,經過傳入一個判斷函數,使得輸出的流一分爲二。它經過 filter 來實現,將兩個不一樣的流分離。其中須要注意的是,第二個 filter 中,傳入的是一個求反操做。
export function partition<T>( predicate: (value: T, index: number) => boolean, thisArg?: any ): UnaryFunction<Observable<T>, [Observable<T>, Observable<T>]> { return (source: Observable<T>) => [ filter(predicate, thisArg)(source), // 此處傳入的是一個 not,他把整個 predicate 封裝。 filter(not(predicate, thisArg) as any)(source) ] as [Observable<T>, Observable<T>]; }
總結一下,本章首先給出一些 operators 的綜合規律,而後再對 Join Creation Operators 進行分析,下面分別用一句話將它們總結一下,就結束本篇的內容。
咱們是DevUI團隊,歡迎來這裏和咱們一塊兒打造優雅高效的人機設計/研發體系。招聘郵箱:muyang2@huawei.com。
做者:zcx(公衆號:Coder寫字的地方)
原文連接:https://mp.weixin.qq.com/s/1b141waT_tAxZR-PZC79kg
往期文章推薦