RxJS 源碼解析(一): Observable & Subscription

DevUI是一支兼具設計視角和工程視角的團隊,服務於華爲雲 DevCloud平臺和華爲內部數箇中後臺系統,服務於設計師和前端工程師。
官方網站: devui.design
Ng組件庫: ng-devui(歡迎Star)
官方交流羣:添加DevUI小助手(微信號:devui-official)進羣
DevUIHelper插件:DevUIHelper-LSP(歡迎Star)

前言

ReactiveX 是 Reactive Extensions 的縮寫,通常簡寫爲 Rx ,最初是 LINQ 的一個擴展,由微軟的架構師 Erik Meijer 領導的團隊開發,在2012年11月開源。Rx 是一個編程模型,目標是提供一致的編程接口,幫助開發者更方便的處理異步數據流。前端

首先,先給出官方對於 Rx 的定義。git

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

翻譯起來有點麻煩,簡而言之,就是基於觀察者隊列實現了對異步和基礎事件的編程。github

Rxjs 是 Rx 的 JavaScript 的實現。本篇文章將簡單的分析一下 Obersvable 和 Subscription 的源碼是怎麼進行的。編程

Observable

可觀察對象是整個 Rx 的核心,主要的做用就是提供了一個觀察者模式,使得調用者能夠經過響應式的方式獲取數據。segmentfault

Observable 實際上就是一個單向鏈表,基本的數據結構以下:後端

class Observable<T> {
  source: Observable<any>;
}

其構造方法與 Promise 相似,經過傳入一個函數包裹操做,並讓這個函數來決定數據傳遞,這個函數的參數包含了一個訂閱器。數組

const observable = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.error(Error('error message'));
  subscriber.complete();
});

訂閱器提供了三個主要方法:nexterrorcomplete。訂閱器的實現很巧妙,其內部實現是一個鏈表。微信

跟 Promise 不一樣,Observable 不會馬上運行這個函數,而是等到它被訂閱後,這個函數纔會被執行,這種惰性求值的特性使得 Observable 能夠在它僅被須要的地方進行計算。前端工程師

lift

lift 方法提供了一個這樣的功能,傳入一個映射函數,並返回一個新的 Observable,這個新的 Observable 的 source 會指向建立它的 Observable。實際上,這種作法就是將這個映射函數用一個外覆類包裹起來,這個外覆類,正是 Observable。那麼,看看它是如何實現。數據結構

lift<R>(operator: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  observable.source = this;
  observable.operator = operator;
  return observable;
}

pipe

Rxjs 跟其餘語言實現的 ReactiveX 不同的地方就是在於,它的映射方法再也不是放在 Observable 內部,而是經過參數的形式傳入到一個管道函數pipe中,在這個函數中,經過對管道函數的數組進行 reduce 後,就可以獲得最終的Observable。這個 reduce 的過程也很巧妙,傳入的函數的參數就是上游的 Observable,返回的就是一個給下游接收的 Observable,那麼就能夠把一個又一個的 Observable 串聯起來

pipe(...operations: OperatorFunction<any, any>\[\]): Observable<any> {
  if (operations.length === 0) {
    return this as any;
  }
  if (operations.length == 1) {
     return operation\[0\];
  }
  
  return operations.reduce((prev, fn) => fn(prev), this);
}

那麼在使用過程當中,pipe 經過重載給傳入的函數提供類型信息。

export function pipe<T>(): UnaryFunction<T, T>;
export function pipe<T, A>(fn1: UnaryFunction<T, A>): UnaryFunction<T, A>;
export function pipe<T, A, B>(fn1: UnaryFunction<T, A>, fn2: UnaryFunction<A, B>): UnaryFunction<T, B>;
// ...

其中 UnaryFunction 表示一元函數,經過這種鏈式操做,使得鏈條上的全部函數均可以拿到上游的類型,並把類型轉化傳遞給下游。

subscribe

當 Observable 一旦調用 subscribe,那麼就意味着其開始執行鏈條中的全部函數。subscribe 傳入的參數是一個包含了 next ,error , complete 三個屬性的對象;也能夠是三個函數,分別對應 next,error,complete。

observable.subscribe((value) {
  console.log(value);
}, (error) {
  console.error(error);
}, () {
  console.log('complete');
});


observable.subscribe({
  next: (value) {
    console.log(value);
  },
  error: (error) {
    console.error(error);
  },
  complete: () {
    console.log('complete');
  },
});

其具體實現是經過將傳入的函數(對象)參數轉化成 Subscriber 對象,而 Subscriber 繼承了 Subscription。最後,返回的就是一個 subscription 給到調用者。

subscribe(
  observerOrNext?: PartialObserver<T> | ((value: T) => void),
  error?: (error: any) => void,
  complete?: () => void): Subscription
) {
  // operator 是一個映射函數
  const {operator} = this;
  const sink = new Subscriber(observerOrNext, error, complete);
  
  if (operator) {
    sink.add(operator.call(sink, this.source));
  } else {
    sink.add(this.source || !sink.syncErrorThrowable ?
      this.\_subscribe(sink) :
      this.\_trySubscribe(sink)
    );
  }
  // 省略了錯誤處理
  
  return sink;
}


\_subscribe(subscriber: Subscriber<any>): TeardownLogic {
  const { source } = this;
  return source && source.subscribe(subscriber);
}


\_trySubscribe(sink: Subscriber<T>): TeardownLogic {
  try {
    return this.\_subscribe(sink);
  } catch (err) {
    // 此處省略了源碼中的一些判斷,不影響閱讀
    sink.error(err);
  }

Subscriber 的 add 方法下面會講。總之,Observable 就像一串或者一個爆竹,只有當它被點燃(subscribe)的時候,纔會把一個又一個的 Observable 點着,最終迸發出巨大聲響,而 subscribe 就是一個找到引線並點燃它們的過程。

Subscription

Subscription 則是經過一種樹結構,它包含了葉節點和一個父節點或者父節點的集合。

class Subscription {
  \_parentOrParents: Subscription;
  \_subscriptions: Subscription\[\];
}

add

add 方法主要的功能是鏈接不一樣的訂閱,配合註釋,其邏輯就是將函數或者訂閱對象包裹後放入成員變量 subscriptions 中,並將這個包裹對象的父訂閱對象設置爲當前對象。

add(logic: Function | Subscription | void): Subscription {
  let subscription = logic;
  if (typeof logic === 'object') {
    // 若是添加進來訂閱已經被取消了,則不進行設置。
    // 若是當前的訂閱已經被取消,添加進來的訂閱也應該要被取消。
    if (subscription === this ||
        subscription.closed ||
        typeof subscription.unsubscribe !== 'function') {
      return subscription;
    } else if (this.closed) {
      subscription.unsubscribe();
      return subscription;
    } else if (!(subscription instanceof Subscription)) {
      const tmp = subscription;
      subscription = new Subscription();
      subscription.\_subscriptions = \[tmp\];
    }
  } else if (typeof logic === 'function' ) {
    subscription = new Subscription(<(() => void)>teardown);
                                    } else {
      // 拋出錯誤。
    }


  // 設置父對象的過程採用懶加載模式。
  let { \_parentOrParents } = subscription;
  if (\_parentOrParents === null) {
    // 若是沒有設置父對象,則設置當前對象爲父對象。
    subscription.\_parentOrParents = this;
  } else if (\_parentOrParents instanceof Subscription) {
    // 若是父對象已是當前的對象,直接返回。
    if (\_parentOrParents === this) {
      return subscription;
    }


    // 添加進來的訂閱的父對象已經存在,那麼用一個數組保存。
    subscription.\_parentOrParents = \[\_parentOrParents, this\];
  } else if (\_parentOrParents.indexOf(this) === -1) {
    // 若是已是數組對象了,而且不存在當前訂閱對象,則設置當前訂閱對象
    \_parentOrParents.push(this);
  } else {
    // 已經設置當前訂閱對象爲父對象
    return subscription;
  }




  // 一樣,設置葉子結點的過程也是用懶加載
  const subscriptions = this.\_subscriptions;
  if (subscriptions === null) {
    this.\_subscriptions = \[subscription\];
  } else {
    subscriptions.push(subscription);
  }


  return subscription

unsubscribe

取消訂閱是訂閱對象的主要功能,它爲觀察者模式提供了終結觀察的方法。

unsubscribe(): void {




  // 已經取消訂閱了。
  if (this.closed) {
    return;
  }




  // 拿到當前想要取消訂閱的相關的對象。
  // 這樣作的目的是防止loop
  let { \_parentOrParents, \_unsubscribe, \_subscriptions } = (<any> this);


  // 設置取消訂閱
  this.closed = true;
  // 設置父對象爲空
  this.\_parentOrParents = null;
  // 設置訂閱爲空
  this.\_subscriptions = null;


  // 父對象多是數組,也多是訂閱對象
  if (\_parentOrParents instanceof Subscription) {
    \_parentOrParents.remove(this);
  } else if (\_parentOrParents !== null) {
    for (let index = 0; index < \_parentOrParents.length; ++index) {
      const parent = \_parentOrParents\[index\];
      parent.remove(this);
    }
  }


  // \_unsubscribe 是一個外部傳入的函數.
  if (isFunction(\_unsubscribe)) {
    try {
      \_unsubscribe.call(this);
    } catch (e) {
      errors = e instanceof UnsubscriptionError ? flattenUnsubscriptionErrors(e.errors) : \[e\];
    }
  }




  // 將全部的子訂閱取消訂閱
  
  if (isArray(\_subscriptions)) {
    let len = \_subscriptions.length;
    for (const sub of \_subscriptions) {
      if (isObject(sub)) {
        try {
          sub.unsubscribe();
        } catch (e) {
          // 省略錯誤處理
        }
      }
    }
  }
  //

結語

第一篇就先介紹這兩個重要的組成類,由這兩個類引伸出來的組合纔是 Rx 的精華,以後會陸續介紹這些操做函數。

加入咱們

咱們是DevUI團隊,歡迎來這裏和咱們一塊兒打造優雅高效的人機設計/研發體系。招聘郵箱:muyang2@huawei.com。

做者:zcx(公衆號:Coder寫字的地方)

原文連接:https://mp.weixin.qq.com/s/6fVoI_JtSXu6YfZur1TDNw

往期文章推薦

《先後端鑑權二三事》

《Web界面深色模式和主題化開發》

《手把手教你搭建一個灰度發佈環境》

相關文章
相關標籤/搜索