Rx.js實現原理淺析

前言

上次給你們分享了cycle.js的內容,這個框架核心模塊的代碼其實只有一百多行,要理解這個看似複雜的框架,其實最核心的是理解它依賴的異步數據流處理框架——rx.js。今天,給你們分享一下rx.js的實現原理,你們有興趣能夠繼續研究它的源碼,會讓你對異步和響應式編程有更深的理解,進而將rx.js、cycle.js或者僅僅是函數式、響應式編程的思想融入到本身手裏的業務中。
爲了更好地理解rx.js,須要先談談異步編程的實現方案。web

異步實現方案

1. 回調函數

makeHttpCall('/items', 
   items => {
      for (itemId of items) {
         makeHttpCall(`/items/${itemId}/info`,
           itemInfo => {        
              makeHttpCall(`/items/${itemInfo.pic}`,
                img => {
                    showImg(img);
              });   
           });
      }
});

beginUiRendering();

一旦你須要多塊數據時你就陷入了流行的」末日金字塔「或者回調地獄。這段代碼有不少的問題。 其中之一就是風格。當你在這些嵌套的回調函數中添加愈來愈多的邏輯,這段代碼就會變得很複雜很難理解。由於循環還產生了一個更加細微的問題。for循環是同步的控制流語句,這並不能很好的配合異步調用,由於會有延遲,這可能會產生很奇怪的bug。express

2. Promise

makeHttpCall('/items')
    .then(itemId => makeHttpCall(`/items/${itemId}/info`))
    .then(itemInfo => makeHttpCall(`/items/${itemInfo}.pic}`))
    .then(showImg);

鏈式調用毫無疑問是一個進步,理解這段代碼的難度顯著降低。然而,儘管Promises在處理這種單值(或單個錯誤)時很是高效,它有也一些侷限性。Promises在處理用戶連續輸入的數據流時效率怎麼樣呢? 這時Promises處理起來也並不高效,由於它沒有事件的刪除、分配、重試等等的語法定義。編程

3. async/await

使用async/await,配合Promise能夠以同步的方式編寫異步代碼,是我如今最喜歡也最經常使用的異步編程方式。好比:上述實如今請求加載完圖片後再顯示圖片的邏輯,也能夠這樣實現:promise

const showLoadedImg = async () => {
  let getImgInfo = makeHttpCall('/items')
      .then(itemId => makeHttpCall(`/items/${itemId}/info`))
      .then(itemInfo => makeHttpCall(`/items/${itemInfo}.pic}`));
  let loadedImg = await getImgInfo;
  showImg(loadedImg);
}

4. generator

generator我用的不多,以前用過一段時間koa(express原班團隊搞的基於generator的現代web開發框架),不太喜歡代碼在各個模塊間跳來跳去的編寫思路。整個 Generator 函數就是一個封裝的異步任務,或者說是異步任務的容器。異步操做須要暫停的地方,都用 yield 語句註明。調用 Generator 函數,會返回一個內部指針(即遍歷器 )g,調用指針 g 的 next 方法,會移動內部指針(即執行異步任務的第一段),指向第一個遇到的 yield 語句。下面貼一段koa中間件級聯的代碼,你們感覺一下:數據結構

var koa = require('koa');
var app = koa();

// x-response-time

app.use(function *(next){
  var start = new Date;
  yield next;
  var ms = new Date - start;
  this.set('X-Response-Time', ms + 'ms');
});

// logger

app.use(function *(next){
  var start = new Date;
  yield next;
  var ms = new Date - start;
  console.log('%s %s - %s', this.method, this.url, ms);
});

// response

app.use(function *(){
  this.body = 'Hello World';
});

app.listen(3000);

上面的例子在頁面中返回 "Hello World",然而當請求開始時,請求先通過 x-response-time 和 logging 中間件,並記錄中間件執行起始時間。 而後將控制權交給 reponse 中間件。當中間件運行到 yield next 時,函數掛起並將控制前交給下一個中間件。當沒有中間件執行 yield next 時,程序棧會逆序喚起被掛起的中間件來執行接下來的代碼。app

5. rxjs

RxJS是一個解決異步問題的JS開發庫.它起源於 Reactive Extensions 項目,它帶來了觀察者模式和函數式編程的相結合的最佳實踐。 觀察者模式是一個被實踐證實的模式,基於生產者(事件的建立者)和消費者(事件的監聽者)的邏輯分離關係。框架

何況函數式編程方式的引入,如說明性編程,不可變數據結構,鏈式方法調用會使你極大的簡化代碼量。(和回調代碼方式說再見吧)koa

若是你熟悉函數式編程,請把RxJS理解爲異步化的Underscore.js。異步

RxJS 引入了一個重要的數據類型——流(stream)。async

rxjs實現原理

觀察者模式

觀察者模式在 Web 中最多見的應該是 DOM 事件的監聽和觸發。
訂閱:經過 addEventListener 訂閱 document.body 的 click 事件。
發佈:當 body 節點被點擊時,body 節點便會向訂閱者發佈這個消息。

document.body.addEventListener('click', function listener(e) {
    console.log(e);
},false);
 
document.body.click(); // 模擬用戶點擊

image

迭代器模式

迭代器模式能夠用 JavaScript 提供了 Iterable Protocol 可迭代協議來表示。Iterable Protocol 不是具體的變量類型,而是一種可實現協議。JavaScript 中像 Array、Set 等都屬於內置的可迭代類型,能夠經過 iterator 方法來獲取一個迭代對象,調用迭代對象的 next 方法將獲取一個元素對象,以下示例。

var iterable = [1, 2];
 
var iterator = iterable[Symbol.iterator]();
 
iterator.next(); // => { value: "1", done: false}
iterator.next(); // => { value: "2", done: false}
 
iterator.next(); // => { value: undefined, done: true}

元素對象中:value 表示返回值,done 表示是否已經到達最後。
遍歷迭代器可使用下面作法。

var iterable = [1, 2];
var iterator = iterable[Symbol.iterator]();
 
while(true) {
    let result;
    try {
        result = iterator.next();  // <= 獲取下一個值
    } catch (err) {
        handleError(err);  // <= 錯誤處理
    }
    if (result.done) {
        handleCompleted();  // <= 無更多值(已完成)
        break;
    }
    doSomething(result.value);
}

主要對應三種狀況:
獲取下一個值
調用 next 能夠將元素一個個地返回,這樣就支持了返回屢次值。
無更多值(已完成)
當無更多值時,next 返回元素中 done 爲 true。
錯誤處理
當 next 方法執行時報錯,則會拋出 error 事件,因此能夠用 try catch 包裹 next 方法處理可能出現的錯誤。

RxJS 的觀察者 + 迭代器模式

RxJS 中含有兩個基本概念:Observables 與 Observer。Observables 做爲被觀察者,是一個值或事件的流集合;而 Observer 則做爲觀察者,根據 Observables 進行處理。
Observables 與 Observer 之間的訂閱發佈關係(觀察者模式) 以下:
訂閱:Observer 經過 Observable 提供的 subscribe() 方法訂閱 Observable。
發佈:Observable 經過回調 next 方法向 Observer 發佈事件。

下面爲 Observable 與 Observer 的僞代碼:

// Observer
var Observer = {
    next(value) {
        alert(`收到${value}`);
    }
};
 
// Observable
function Observable (Observer) {
    setTimeout(()=>{
        Observer.next('A');
    },1000)
}
 
// subscribe
Observable(Observer);

上面實際也是觀察者模式的表現,那麼迭代器模式在 RxJS 中如何體現呢?
在 RxJS 中,Observer 除了有 next 方法來接收 Observable 的事件外,還能夠提供了另外的兩個方法:error() 和 complete(),與迭代器模式一一對應。

var Observer = {
    next(value) { /* 處理值*/ },
    error(error) { /* 處理異常 */ },
    complete() { /* 處理已完成態 */ }
};

結合迭代器 Iterator 進行理解:
next()
Observer 提供一個 next 方法來接收 Observable 流,是一種 push 形式;而 Iterator 是經過調用 iterator.next() 來拿到值,是一種 pull 的形式。
complete()
當再也不有新的值發出時,將觸發 Observer 的 complete 方法;而在 Iterator 中,則須要在 next 的返回結果中,當返回元素 done 爲 true 時,則表示 complete。
error()
當在處理事件中出現異常報錯時,Observer 提供 error 方法來接收錯誤進行統一處理;Iterator 則須要進行 try catch 包裹來處理可能出現的錯誤。

下面是 Observable 與 Observer 實現觀察者 + 迭代器模式的僞代碼,數據的逐漸傳遞傳遞與影響其實就是流的表現。

// Observer
var Observer = {
    next(value) {
        alert(`收到${value}`);
    },
    error(error) {
        alert(`收到${value}`);
    },
    complete() {
        alert("complete");
    },
};
 
// Observable
function Observable (Observer) {
    [1,2,3].map(item=>{
        Observer.next(item);
    });
 
    Observer.complete();
    // Observer.error("error message");
}
 
// subscribe
Observable(Observer);

RxJS 基礎實現

有了上面的概念及僞代碼,那麼在 RxJS 中是怎麼建立 Observable 與 Observer 的呢?
建立 Observable
RxJS 提供 create 的方法來自定義建立一個 Observable,可使用 next 來發出流。

var Observable = Rx.Observable.create(observer => {
    observer.next(2);
    observer.complete();
    return  () => console.log('disposed');
});

建立 Observer
Observer 能夠聲明 next、err、complete 方法來處理流的不一樣狀態。

var Observer = Rx.Observer.create(
    x => console.log('Next:', x),
    err => console.log('Error:', err),
    () => console.log('Completed')
);

最後將 Observable 與 Observer 經過 subscribe 訂閱結合起來。

var subscription = Observable.subscribe(Observer);

關於rx.js的使用和API就再也不贅述了,理解了其實現原理,使用起來就很簡單了!

相關文章
相關標籤/搜索