原文連接: medium.com/@benlesh/le…
本文爲 RxJS 中文社區 翻譯文章,如需轉載,請註明出處,謝謝合做!
若是你也想和咱們一塊兒,翻譯更多優質的 RxJS 文章以奉獻給你們,請點擊【這裏】javascript
經過社交媒體或在活動現場,我常常會被問到關於「熱的」 vs 「冷的」 observables,或者 observable 到底是「多播」仍是「單播」。對於 Rx.Observable
,人們以爲它內部的工做原理徹底是黑魔法,這令他們感到十分困惑。當被問及如何描述 observable 時,人們會說:「他們是流」或「他們相似於 promises 」。事實上,我在不少場合甚至在公開演講中都談論過這些內容。java
與 promises 進行比較是必要的,但也是不幸的。鑑於 promieses 和 observables 都是異步基本類型 ( async primitives ),而且 promises 已被 JavaScript 社區普遍使用和熟悉,這一般是一個很好的起點。將 promise 的 then
與 observable 的 subscribe
進行比較,promise 是當即執行,而 observable 是惰性執行,是可取消、可複用的,等等。這是向初學者介紹 observables 的理想方式。git
但這樣有個問題: Observables 與 promises 的不一樣點要遠多於它們之間的相同點。Promises 永遠是多播的。Promise 的解析 ( resolution ) 和拒絕 ( rejection ) 永遠是異步的。當人們處理 observables 時,彷彿就是在處理 promises,因此他們指望二者的行爲也是類似的,但這不併老是對的。Observables 有時是多播的。Observables 一般是異步的。我也有些自責,由於我滋長了這種誤解的蔓延。github
若是你真的想要理解 observable,你能夠本身寫個簡單的。這並無聽上去那麼困難,真心的。將 observable 歸結爲最精簡的部分,無外乎就是某種特定類型的函數,該函數有其針對性的用途。編程
next
、error
和 complete
方法的對象將觀察者 ( observer ) 與生產者 ( producer ) 鏈接,並返回一種手段來拆解與生產者之間的鏈接。觀察者其實是處理函數的註冊表,處理函數能夠隨時間推移推送值。數組
function myObservable(observer) {
const datasource = new DataSource();
datasource.ondata = (e) => observer.next(e);
datasource.onerror = (err) => observer.error(err);
datasource.oncomplete = () => observer.complete();
return () => {
datasource.destroy();
};
}複製代碼
(你能夠點擊這裏進行在線調試)promise
如你所見,並無太多東西,只是一個至關簡單的契約。安全
當談論到 RxJS 或響應式編程時,一般 observables 出現的頻率是最高的。但實際上,觀察者的實現纔是這類響應式編程的中流砥柱。Observables 是惰性的。它們只是函數而已。它們什麼也不作直到你 subscribe
它們,它們裝配好了觀察者,而後就完事了,與沉悶的老式函數並沒有差異,等待着被調用而已。另外一方面,觀察者保持活躍狀態並監聽來自生產者的事件。異步
你可使用任何有 next
、erro
和 complete
方法的簡單 JavaScript 對象 (POJO) 來訂閱 observable,但你所用來訂閱 observable 的 POJO 觀察者真的只是個開始。在 RxJS 5中,咱們須要爲你提供一些保障。下面羅列了一些重要的保障:async
complete
或 error
以後調用 next
。complete
和 error
須要調用取消訂閱邏輯。next
、complete
或 error
處理方法拋出異常,你想要調用取消訂閱邏輯,以確保不會泄露資源。next
、error
和 complete
實際上都是可選的。你無需處理每一個值、錯誤或完成。你可能只是想要處理其中一二。爲了完成列表中的任務,咱們須要將你提供的匿名觀察者包裝在 「SafeObserver」 中以實施上述保障。由於上面的#2,咱們須要追蹤 complete
或 error
是否被調用過。由於#3,咱們須要使 SafeObserver 知道消費者什麼時候要取消訂閱。最後,由於#4,SafeObserver 實際上須要瞭解取消訂閱邏輯,這樣當 complete
或 error
被調用時才能夠調用它。
若是咱們想要用上面臨時實現的 observable 函數來作這些的話,會變得有些粗糙... 這裏有個 JSBin 代碼片斷,你能夠看看並感覺下有多粗糙。我並無想要在這個示例中實現很是正宗的 SafeObserver,由於那麼佔用整篇文章篇幅,下面是咱們的 observable,此次它使用了 SafeObserver:
function myObservable(observer) {
const safeObserver = new SafeObserver(observer);
const datasource = new DataSource();
datasource.ondata = (e) => safeObserver.next(e);
datasource.onerror = (err) => safeObserver.error(err);
datasource.oncomplete = () => safeObserver.complete();
safeObserver.unsub = () => {
datasource.destroy();
};
return safeObserver.unsubscribe.bind(safeObserver);
}複製代碼
將 observables 做爲類/對象使咱們可以輕鬆地將 SafeObserver 應用於傳入的匿名觀察者(和處理函數,若是你喜歡 RxJS 中的 subscribe(fn, fn, fn)
簽名的話) 併爲開發人員提供更好的開發體驗。經過在 Observable 的 subscribe
實現中處理 SafeObserver 的建立,Observables 能夠再次以最簡單的方式來定義:
const myObservable = new Observable((observer) => {
const datasource = new DataSource();
datasource.ondata = (e) => observer.next(e);
datasource.onerror = (err) => observer.error(err);
datasource.oncomplete = () => observer.complete();
return () => {
datasource.destroy();
};
});複製代碼
你會注意到上面的代碼片斷與第一個示例看起來幾乎同樣。但它更容易閱讀,也更容易理解。我擴展了 JSBin 示例來展現 Observable 的最小化實現。
RxJS 中的「操做符」只不過是接收源 observable 的函數,並返回一個新的 observable,當你訂閱該 observable 時它會訂閱源 observable 。咱們能夠實現一個基礎、獨立的操做符,如這個在線 JSBin 示例所示:
function map(source, project) {
return new Observable((observer) => {
const mapObserver = {
next: (x) => observer.next(project(x)),
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return source.subscribe(mapObserver);
});
}複製代碼
最重要的是要注意此操做符在作什麼: 當你訂閱它返回的 observable 時,它會建立 mapObserver
來完成工做並將 observer
和 mapObserver
鏈接起來。
構建操做符鏈實際上只是建立一個將觀察者與訂閱 ( subscription ) 鏈接起來的模板。
若是咱們全部的操做符都是像上面示例中那樣用獨立函數實現的話,將操做符連接起來會有些難看:
map(map(myObservable, (x) => x + 1), (x) => x + 2);複製代碼
能夠想象一下上面的代碼,嵌套了5個或6個更復雜的操做符將會產生更多的參數。致使代碼徹底不可讀。
你可使用簡單的 pipe
實現 (正如 Texas Toland 所建議的那樣),它會對操做符數組進行累加以生成最終的 observable,但這意味着要編寫更復雜的、返回函數的操做符,(點擊這裏查看 JSBin 示例)。這一樣沒有使得一切變得完美:
pipe(myObservable, map(x => x + 1), map(x => x + 2));複製代碼
理想的是咱們可以將操做符以一種更天然的方式連接起來,好比這樣:
myObservable.map(x => x + 1).map(x => x + 2);複製代碼
幸運的是,咱們的 Observable 類已經支持這種操做符的鏈式行爲。它不會給操做符的實現代碼任何額外的複雜度,但它的代價是違背了我所提倡的「摒棄原型 ( prototype )」,一旦添加了足夠你使用的操做符,原型中的方法或許就太多了。點擊 (這裏的 JSBin 示例) 查看咱們添加到 Observable 實現原型中的 map 操做符:
Observable.prototype.map = function (project) {
return new Observable((observer) => {
const mapObserver = {
next: (x) => observer.next(project(x)),
error: (err) => observer.error(err),
complete: () => observer.complete()
};
return this.subscribe(mapObserver);
});
};複製代碼
如今咱們擁有了更好的語法。這種方法還有其餘好處,也更高級。例如,咱們能夠將 Observable 子類化爲特定類型的 observables (例如包裝 Promise 或一組靜態值的 observables),並經過覆蓋它們來對咱們的運算符進行優化。
記住,閱讀以上全部內容後,全部的這一切都是圍繞一個簡單的函數設計的。Observables 是函數,它接收 observer 並返回函數。僅此而已。若是你編寫了一個函數,它接收 observer 並返回函數,那它是異步的,仍是同步的?都不是,它就是個函數。任何函數的行爲都徹底取決於它是如何實現的。因此,當處理 Observable 時,就像你所傳遞的函數引用那樣對待它,而不是一些所謂的魔法,有狀態的外星類型。當你構建操做符鏈式,你真正要作的是構成一個函數,該函數會設置連接在一塊兒的觀察者鏈,並將值傳遞給觀察者。
注意: 示例中的 Observable 實現仍然返回的是函數,而 RxJS 和 es-observable 規範返回的是 Subscription 對象。Subscription 對象是一種更好的設計,但我又得寫一整篇文章來說它。因此我只保留了它的取消訂閱功能,以保持本文中全部示例的簡單性。