[譯] 經過構建 Observable 來學習 Observable

原文連接: medium.com/@benlesh/le…
本文爲 RxJS 中文社區 翻譯文章,如需轉載,請註明出處,謝謝合做!
若是你也想和咱們一塊兒,翻譯更多優質的 RxJS 文章以奉獻給你們,請點擊【這裏】javascript

經過社交媒體或在活動現場,我常常會被問到關於「熱的」 vs 「冷的」 observables,或者 observable 到底是「多播」仍是「單播」。對於 Rx.Observable,人們以爲它內部的工做原理徹底是黑魔法,這令他們感到十分困惑。當被問及如何描述 observable 時,人們會說:「他們是流」或「他們相似於 promises 」。事實上,我在不少場合甚至在公開演講中都談論過這些內容。java

與 promises 進行比較是必要的,但也是不幸的。鑑於 promieses 和 observables 都是異步基本類型 ( async primitives ),而且 promises 已被 JavaScript 社區普遍使用和熟悉,這一般是一個很好的起點。將 promise 的 then 與 observable 的 subscribe 進行比較,promise 是當即執行,而 observable 是惰性執行,是可取消、可複用的,等等。這是向初學者介紹 observables 的理想方式。git

但這樣有個問題: Observables 與 promises 的不一樣點要遠多於它們之間的相同點。Promises 永遠是多播的。Promise 的解析 ( resolution ) 和拒絕 ( rejection ) 永遠是異步的。當人們處理 observables 時,彷彿就是在處理 promises,因此他們指望二者的行爲也是類似的,但這不併老是對的。Observables 有時是多播的。Observables 一般是異步的。我也有些自責,由於我滋長了這種誤解的蔓延。github

Observables 只是一個函數,它接收 observer 並返回函數

若是你真的想要理解 observable,你能夠本身寫個簡單的。這並無聽上去那麼困難,真心的。將 observable 歸結爲最精簡的部分,無外乎就是某種特定類型的函數,該函數有其針對性的用途。編程

模型:

  • 函數
  • 接收 observer: observer 是有 nexterrorcomplete 方法的對象
  • 返回一個可取消的函數

目的:

將觀察者 ( observer ) 與生產者 ( producer ) 鏈接,並返回一種手段來拆解與生產者之間的鏈接。觀察者其實是處理函數的註冊表,處理函數能夠隨時間推移推送值。數組

基本實現:

function myObservable(observer) {
    const datasource = new DataSource();
    datasource.ondata = (e) => observer.next(e);
    datasource.onerror = (err) => observer.error(err);
    datasource.oncomplete = () => observer.complete();
    return () => {
        datasource.destroy();
    };
}複製代碼

(你能夠點擊這裏進行在線調試)promise

如你所見,並無太多東西,只是一個至關簡單的契約。安全

安全的觀察者: 讓觀察者變得更好

當談論到 RxJS 或響應式編程時,一般 observables 出現的頻率是最高的。但實際上,觀察者的實現纔是這類響應式編程的中流砥柱。Observables 是惰性的。它們只是函數而已。它們什麼也不作直到你 subscribe 它們,它們裝配好了觀察者,而後就完事了,與沉悶的老式函數並沒有差異,等待着被調用而已。另外一方面,觀察者保持活躍狀態並監聽來自生產者的事件。異步

你可使用任何有 nexterrocomplete 方法的簡單 JavaScript 對象 (POJO) 來訂閱 observable,但你所用來訂閱 observable 的 POJO 觀察者真的只是個開始。在 RxJS 5中,咱們須要爲你提供一些保障。下面羅列了一些重要的保障:async

觀察者保障

  1. 若是你傳遞的觀察者徹底沒有以上所述的三個方法,也是能夠的。
  2. 你不想在 completeerror 以後調用 next
  3. 若是取消訂閱了,那麼你不想任何方法被調用。
  4. 調用 completeerror 須要調用取消訂閱邏輯。
  5. 若是 nextcompleteerror 處理方法拋出異常,你想要調用取消訂閱邏輯,以確保不會泄露資源。
  6. nexterrorcomplete 實際上都是可選的。你無需處理每一個值、錯誤或完成。你可能只是想要處理其中一二。

爲了完成列表中的任務,咱們須要將你提供的匿名觀察者包裝在 「SafeObserver」 中以實施上述保障。由於上面的#2,咱們須要追蹤 completeerror 是否被調用過。由於#3,咱們須要使 SafeObserver 知道消費者什麼時候要取消訂閱。最後,由於#4,SafeObserver 實際上須要瞭解取消訂閱邏輯,這樣當 completeerror 被調用時才能夠調用它。

若是咱們想要用上面臨時實現的 observable 函數來作這些的話,會變得有些粗糙... 這裏有個 JSBin 代碼片斷,你能夠看看並感覺下有多粗糙。我並無想要在這個示例中實現很是正宗的 SafeObserver,由於那麼佔用整篇文章篇幅,下面是咱們的 observable,此次它使用了 SafeObserver:

function myObservable(observer) {
   const safeObserver = new SafeObserver(observer);
   const datasource = new DataSource();
   datasource.ondata = (e) => safeObserver.next(e);
   datasource.onerror = (err) => safeObserver.error(err);
   datasource.oncomplete = () => safeObserver.complete();

   safeObserver.unsub = () => {
       datasource.destroy();
   };

   return safeObserver.unsubscribe.bind(safeObserver);
}複製代碼

設計 Observable: 確保觀察者安全

將 observables 做爲類/對象使咱們可以輕鬆地將 SafeObserver 應用於傳入的匿名觀察者(和處理函數,若是你喜歡 RxJS 中的 subscribe(fn, fn, fn) 簽名的話) 併爲開發人員提供更好的開發體驗。經過在 Observable 的 subscribe 實現中處理 SafeObserver 的建立,Observables 能夠再次以最簡單的方式來定義:

const myObservable = new Observable((observer) => {
    const datasource = new DataSource();
    datasource.ondata = (e) => observer.next(e);
    datasource.onerror = (err) => observer.error(err);
    datasource.oncomplete = () => observer.complete();
    return () => {
        datasource.destroy();
    };
});複製代碼

你會注意到上面的代碼片斷與第一個示例看起來幾乎同樣。但它更容易閱讀,也更容易理解。我擴展了 JSBin 示例來展現 Observable 的最小化實現

操做符: 一樣只是函數

RxJS 中的「操做符」只不過是接收源 observable 的函數,並返回一個新的 observable,當你訂閱該 observable 時它會訂閱源 observable 。咱們能夠實現一個基礎、獨立的操做符,如這個在線 JSBin 示例所示:

function map(source, project) {
  return new Observable((observer) => {
    const mapObserver = {
      next: (x) => observer.next(project(x)),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    };
    return source.subscribe(mapObserver);
  });
}複製代碼

最重要的是要注意此操做符在作什麼: 當你訂閱它返回的 observable 時,它會建立 mapObserver 來完成工做並將 observermapObserver 鏈接起來。
構建操做符鏈實際上只是建立一個將觀察者與訂閱 ( subscription ) 鏈接起來的模板。

設計 Observable: 更優雅的操做符鏈

若是咱們全部的操做符都是像上面示例中那樣用獨立函數實現的話,將操做符連接起來會有些難看:

map(map(myObservable, (x) => x + 1), (x) => x + 2);複製代碼

能夠想象一下上面的代碼,嵌套了5個或6個更復雜的操做符將會產生更多的參數。致使代碼徹底不可讀。

你可使用簡單的 pipe 實現 (正如 Texas Toland 所建議的那樣),它會對操做符數組進行累加以生成最終的 observable,但這意味着要編寫更復雜的、返回函數的操做符,(點擊這裏查看 JSBin 示例)。這一樣沒有使得一切變得完美:

pipe(myObservable, map(x => x + 1), map(x => x + 2));複製代碼

理想的是咱們可以將操做符以一種更天然的方式連接起來,好比這樣:

myObservable.map(x => x + 1).map(x => x + 2);複製代碼

幸運的是,咱們的 Observable 類已經支持這種操做符的鏈式行爲。它不會給操做符的實現代碼任何額外的複雜度,但它的代價是違背了我所提倡的「摒棄原型 ( prototype )」,一旦添加了足夠你使用的操做符,原型中的方法或許就太多了。點擊 (這裏的 JSBin 示例) 查看咱們添加到 Observable 實現原型中的 map 操做符:

Observable.prototype.map = function (project) {
    return new Observable((observer) => {
        const mapObserver = {
            next: (x) => observer.next(project(x)),
            error: (err) => observer.error(err),
            complete: () => observer.complete()
        };
        return this.subscribe(mapObserver);
    });
};複製代碼

如今咱們擁有了更好的語法。這種方法還有其餘好處,也更高級。例如,咱們能夠將 Observable 子類化爲特定類型的 observables (例如包裝 Promise 或一組靜態值的 observables),並經過覆蓋它們來對咱們的運算符進行優化。

TLDR: Observable 是函數,它接收 observer 並返回函數

記住,閱讀以上全部內容後,全部的這一切都是圍繞一個簡單的函數設計的。Observables 是函數,它接收 observer 並返回函數。僅此而已。若是你編寫了一個函數,它接收 observer 並返回函數,那它是異步的,仍是同步的?都不是,它就是個函數。任何函數的行爲都徹底取決於它是如何實現的。因此,當處理 Observable 時,就像你所傳遞的函數引用那樣對待它,而不是一些所謂的魔法,有狀態的外星類型。當你構建操做符鏈式,你真正要作的是構成一個函數,該函數會設置連接在一塊兒的觀察者鏈,並將值傳遞給觀察者。

注意: 示例中的 Observable 實現仍然返回的是函數,而 RxJS 和 es-observable 規範返回的是 Subscription 對象。Subscription 對象是一種更好的設計,但我又得寫一整篇文章來說它。因此我只保留了它的取消訂閱功能,以保持本文中全部示例的簡單性。

相關文章
相關標籤/搜索