Rxjs 響應式編程-第一章:響應式

Rxjs 響應式編程-第一章:響應式
Rxjs 響應式編程-第二章:序列的深刻研究
Rxjs 響應式編程-第三章: 構建併發程序
Rxjs 響應式編程-第四章 構建完整的Web應用程序
Rxjs 響應式編程-第五章 使用Schedulers管理時間
Rxjs 響應式編程-第六章 使用Cycle.js的響應式Web應用程序javascript

響應式

現實世界至關混亂:事件不按照順序發生,應用崩潰,網絡不通。幾乎沒有應用是徹底同步的,因此咱們不得不寫一些異步代碼保持應用的可響應性。大多數的時候是很痛苦的,但也並非不可避免。java

現代應用須要超級快速的響應速度,而且但願可以不漏掉一個字節的處理來自不一樣數據源的數據。然而並無現成的解決方案,由於它們不會隨着咱們添加併發和應用程序狀態而擴展代碼變得愈來愈複雜。程序員

本章向您介紹反應式編程,這是一種天然,簡單的方法處理異步代碼的方式。我會告訴你事件的流程 - 咱們稱之爲Observables - 是處理異步代碼的一種很好的方式。
而後咱們將建立一個Observable,看看響應式思惟和RxJS是怎麼樣改善現有技術,讓你成爲更快樂,更多高效的程序員。數據庫

什麼是響應式?

讓咱們從一個小的響應性RxJS程序開始。 這個程序須要經過單擊按鈕檢索來自不一樣來源的數據,它具備如下要求:編程

  • 它必須統一來自使用不一樣源的JSON結構
  • 最終結果不該包含任何副本
  • 爲了不屢次請求數據,用戶不能重複點擊按鈕

使用RxJS,咱們的代碼相似這樣:json

var button = document.getElementById('retrieveDataBtn');
var source1 = Rx.DOM.getJSON('/resource1').pluck('name');
var source2 = Rx.DOM.getJSON('/resource2').pluck('props', 'name');
function getResults(amount) {
    return source1.merge(source2)
        .pluck('names')
        .flatMap(function(array) { return Rx.Observable.from(array); })
        .distinct()
        .take(amount);
}
var clicks = Rx.Observable.fromEvent(button, 'click');
clicks.debounce(1000)
    .flatMap(getResults(5))
    .subscribe(
        function(value) { console.log('Received value', value); },
        function(err) { console.error(err); },
        function() { console.log('All values retrieved!'); }
    );

不要擔憂不理解這裏的代碼。只要關注於成果便可。你看到的第一件事是咱們使用更少的代碼實現更多的功能。咱們經過使用Observable來實現這一目標。segmentfault

Observable表示數據流。程序也能夠能夠主要表示爲數據流。在前面的示例中,兩個遠程源是Observables,用戶點擊鼠標也是如此。實際上,咱們的程序本質上是一個由按鈕的單擊事件構成的Observable,咱們把它轉變成得到咱們想要的結果。api

響應式編程具備很強的表現力,舉個例子來講,限制鼠標重複點擊的例子。想象一下咱們使用咱們使用promise和callback實現這個功能是有多複雜:咱們須要每秒重置一下點擊次數,而且在用戶點擊以後每秒都要保存點擊狀態。可是這樣子,對於這個小功能來講就顯得過於複雜了,而且所寫代碼與業務功能並無直觀的聯繫。爲了彌補基礎代碼庫的功能不足,在一個大型應用中,這些很小的複雜功能會增長的很是快。數組

經過響應式編,咱們使用debounce方法來限制點擊流次數。這樣就保證每次點擊的間隔時間至少1s,忽略1s之間的點擊次數。咱們不關心內部如何實現,咱們只是表達咱們但願代碼執行的操做,而不是如何操做。promise

這就變得更有趣了。接下來,您將看到反應式編程如何幫助咱們提升課程效率和表現力。

電子表格是可響應的

讓咱們從這樣一個響應性系統的典型例子開始考慮:點子表格。咱們都是使用過吧,但咱們不多停下來思考它們是多麼使人震驚的直觀。假設咱們在電子表格的單元格A1中有一個值,而後咱們能夠在電子表格中的其餘單元格中引用它,而且每當咱們更改A1時,每一個依賴於A1的單元格都會自動更新與A1同步。

image

這些操做對咱們感受很天然,咱們並不會去告訴計算機去根據A1更新單元格或者如何更新;這些單元格就自動這樣子作了。在點子表格中,咱們只須要簡單的聲明咱們須要處理的問題,不用操心計算機如何處理。

鼠標輸入做爲streams

理解如何把事件做爲流,咱們回想一下本章開頭的那個程序。在那裏,咱們使用鼠標點擊做爲用戶點擊時實時生成的無限事件流。這個想法起源於Erik Meijer,也就是Rxjs的做者。他認爲:你的鼠標就是一個數據庫。

在響應式編程中,我把鼠標點擊事件做爲一個咱們能夠查詢和操做的持續的流事件。想象成流而不是一個孤立的事件,這種想法開闢了一種全新的思考方式。咱們能夠在其中操縱還沒有建立的整個值的流。

好好想一想。這種方式有別於咱們以往的編程方式,以前咱們把數據存儲在數據庫,或者數組而且等待這些數據可用以後在使用它們。若是它們尚不可用(舉個例子:一個網絡請求),咱們只能等它們好了纔可使用。

image

咱們能夠將流視爲所在由時間而不是存儲位置分開的數組。不管是時間仍是存儲位,咱們都有元素序列:

image

將您的程序視爲流動的數據序列是理解的RxJS程序的關鍵。這須要一些練習,但並不難。事實上,大多數咱們在任何應用程序中使用的數據均可以表示爲序列。

序列查詢

讓咱們使用傳統javascript傳統的事件綁定技術來實現一個鼠標點擊流。要記錄鼠標點擊的x和y座標,咱們能夠這樣寫:

ch1/thinking_sequences1.js

document.body.addEventListener('mousemove', function(e) {
    console.log(e.clientX, e.clientY);
});

此代碼將按順序打印每次鼠標單擊的x座標和y座標。

輸出以下:

252 183
211 232
153 323
...

看起來像一個序列,不是嗎? 固然,問題在於操縱事件並不像操縱數組那麼容易。 例如,若是咱們想要更改前面的代碼,使其僅記錄前10次位於屏幕右側的單擊事件(至關隨機的目標),咱們會寫像這樣的東西:

var clicks = 0;
document.addEventListener('click', function registerClicks(e) {
    if (clicks < 10) {
        if (e.clientX > window.innerWidth / 2) {
            console.log(e.clientX, e.clientY);
            clicks += 1;
        }
    } else {
        document.removeEventListener('click', registerClicks);
    }
});

爲了知足咱們的要求,咱們經過引入一個全局變量做爲擴展狀態來記錄當前點擊數。 咱們還須要使用嵌套的條件來檢查兩個不一樣的條件。當咱們完成時,咱們必須註銷事件,以避免泄漏內存。

反作用和外部狀態

若是一個動做在其發生的範圍以外產生影響,咱們稱之爲一方反作用。更改函數外部的變量,打印到控制檯或更新數據庫中的值,這些都是反作用。例如改變函數內部的變量是安全的,可是若是該變量超出了咱們函數的範圍,那麼其餘函數也能夠改變它的值,這就意味着這個功能再也不受控制,由於你沒法預測外部會對這個變量做何操做。因此咱們須要跟蹤它,添加檢查以確保它的變化符合咱們的預期。可是這樣子添加的代碼其實與咱們程序無關,確增長程序的複雜度也更容易出錯。雖然反作用老是會有的,可是咱們應該努力減小。這在響應式編程中尤爲重要,由於咱們隨着時間變換會產生不少狀態片斷。因此避免外部狀態和反作用是貫穿本書一條宗旨。

咱們設法知足了咱們的簡單要求,可是爲了實現這樣一個簡單的目標,最終獲得了至關複雜的代碼。對於首次查看它的開發人員來講,不容易懂且維護代碼很困難。 更重要的是,由於咱們仍然須要保存外部撞他,因此咱們很容易在將來發展出玄妙的錯誤。

在這種狀況下咱們想要的只是查詢點擊的「數據庫」。若是咱們是使用關係數據庫,咱們使用聲明性語言SQL:

SELECT x, y FROM clicks LIMIT 10

若是咱們將點擊事件流視爲能夠查詢和轉變的數據源,該怎麼辦?畢竟,它與數據庫沒有什麼不一樣,都是一個能夠處理數據的東西。咱們所須要的只是一個爲咱們提供抽象概念的數據類型。

輸入RxJS及其Observable數據類型:

Rx.Observable.fromEvent(document, 'click')
    .filter(function(c) { return c.clientX > window.innerWidth / 2; })
    .take(10)
    .subscribe(function(c) { console.log(c.clientX, c.clientY) })

這段代碼功能同以前,它能夠這樣子解讀:

建立一個Observable的點擊事件,並過濾掉在點擊事件上發生屏幕左側的點擊。而後只在控制檯打印前10次點擊的座標。

注意即便您不熟悉代碼也很容易閱讀,也沒有必要建立外部變量來保持狀態。這樣使咱們的代碼是自包含的,不容易產生bug。因此也就不必去清除你的狀態。咱們能夠合併,轉換或者單純的傳遞Observables。咱們已經將不容易處理的事件轉變爲有形數據結構,這種數據結構與數組同樣易於使用,但更加靈活。

在下一節,咱們將看到使Observables如此強大的原理。

觀察者和迭代者

要了解Observable的來源,咱們須要查看他們的基礎:Observer和Iterator軟件模式。在本節中咱們將快速瀏覽它們,而後咱們將看到Observables如何結合,簡單而有力。

觀察者模式

對於軟件開發人員來講,很難不聽到Observables就想起觀察者模式。在其中咱們有一個名爲Producer的對象,內部保留訂閱者的列表。當Producer對象發生改變時,訂閱者的update方法會被自動調用。(在觀察者模式的大部分解釋中,這個實體被叫作Subject,爲了不你們和RxJs的本身Subject混淆,咱們稱它爲Producer)。

ch1/observer_pattern.js

function Producer() {
    this.listeners = [];
}

Producer.prototype.add = function(listener) {
    this.listeners.push(listener);
};

Producer.prototype.remove = function(listener) {
    var index = this.listeners.indexOf(listener);
    this.listeners.splice(index, 1);
};

Producer.prototype.notify = function(message) {
    this.listeners.forEach(function(listener) {
        listener.update(message);
    });
};

Producer對象在實例的偵聽器中保留一個動態的Listener列表,每當Producer更新的時候都會調用其notify方法。在下面的代碼中,咱們建立了兩個對象來監聽
notifie,一個Producer的實例。

ch1/observer_pattern.js

// Any object with an 'update' method would work.
var listener1 = {
    update: function(message) {
    console.log('Listener 1 received:', message);
}
};
var listener2 = {
    update: function(message) {
    console.log('Listener 2 received:', message);
}
};
var notifier = new Producer();
notifier.add(listener1);
notifier.add(listener2);
notifier.notify('Hello there!');

當咱們運行這個程序的時候:

Listener 1 received: Hello there!
Listener 2 received: Hello there!

notifier更新內部狀態的時候,listener1listener2都會被更新。這些都不須要咱們去操心。

咱們的實現很簡單,但它說明了觀察者模式容許觀察者和監聽器解耦。

迭代器模式

Observable的另外一主要部分來自Iterator模式。一個Iterator是一個爲消費者提供簡單的遍象它內容的方式,隱藏了消費者內部的實現。

Iterator接口很簡單。它只須要兩個方法:next()來獲取序列中的下一個項目,以及hasNext()來檢查是否還有項目序列。

下面是咱們如何編寫一個對數字數組進行操做的迭代器,而且只返回divisor參數的倍數的元素:

ch1/iterator.js

function iterateOnMultiples(arr, divisor) {
    this.cursor = 0;
    this.array = arr;
    this.divisor = divisor || 1;
}
iterateOnMultiples.prototype.next = function() {
    while (this.cursor < this.array.length) {
    var value = this.array[this.cursor++];
    if (value % this.divisor === 0) {
        return value;
    }
}
};
iterateOnMultiples.prototype.hasNext = function() {
    var cur = this.cursor;
    while (cur < this.array.length) {
        if (this.array[cur++] % this.divisor === 0) {
            return true;
        }
    }
    return false;
};

咱們能夠這樣子使用咱們的迭代器:

ch1/iterator.js

var consumer = new iterateOnMultiples([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 3);

console.log(consumer.next(), consumer.hasNext()); // 3 true
console.log(consumer.next(), consumer.hasNext()); // 6 true
console.log(consumer.next(), consumer.hasNext()); // 9 false

迭代器很是適合封裝任何類型數據結構的遍歷邏輯。 正如咱們在前面的例子中看到的那樣,迭代器在處理不一樣類型的數據的時候就會變得頗有趣,或者在運行的時候作配置,就像咱們在帶有divisor參數的示例中所作的那樣。

Rx模式和Observable

雖然Observer和Iterator模式自己就很強大,可是二者的結合甚至更好。 咱們稱之爲Rx模式,命名爲
在Reactive Extensions庫以後。咱們將在本書的其他部分使用這種模式。

Observable序列或簡單的Observable是Rx模式的核心。Observable按順序傳遞出來它的值 - 就像迭代器同樣 - 而不是消費者要求它傳出來的值。這個和觀察者模式有相同之處:獲得數據並將它們推送到監聽器。

pull和push

在編程中,基於推送的行爲意味着應用程序的服務器組件向其客戶端發送更新,而不是客戶端必須輪詢服務器以獲取這些更新。這就像是說「不要打電話給咱們; 咱們會打電話給你。「
RxJS是基於推送的,所以事件源(Observable)將推進新值給消費者(觀察者),消費者卻不能去主動請求新值。

更簡單地說,Observable是一個隨着時間的推移可使用其數據的序列。Observables,也就是Observers的消費者至關於觀察者模式中的監聽器。當Observe訂閱一個Observable時,它將在序列中接收到它們可用的值,而沒必要主動請求它們。

到目前爲止,彷佛與傳統觀察者沒有太大區別。 但實際上有兩個本質區別:

  • Observable在至少有一個Observer訂閱它以前不會啓動。
  • 與迭代器同樣,Observable能夠在序列完成時發出信號。

使用Observables,咱們能夠聲明如何對它們發出的元素序列作出反應,而不是對單個項目作出反應。咱們能夠有效地複製,轉換和查詢序列,這些操做將應用於序列的全部元素。

建立Observables

有幾種方法能夠建立Observable,建立函數是最明顯的一種。 Rx.Observable對象中的create方法接受一個Observer參數的回調。 該函數定義了Observable將如何傳出值。這是咱們如何建立一個簡單的Observable:

var observable = Rx.Observable.create(function(observer) {
    observer.onNext('Simon');
    observer.onNext('Jen');
    observer.onNext('Sergi');
    observer.onCompleted(); // We are done
});

當咱們訂閱此Observable時,它經過在其偵聽器上調用onNext方法來發出三個字符串。 而後它調用onCompleted來表示序列已完成。 可是咱們究竟如何訂閱Observable呢?咱們使用Observers來作這件事情。

第一次接觸Observers

Observers監聽Observables。每當Observable中觸發一個事件,它都會在全部Observers中調用相關的方法。

Observers有三種方法:onNextonCompletedonError

onNext    至關於觀察者模式中的update。 當Observable發出新值時調用它。請注意該名稱如何反映咱們訂閱序列的事實,而不只僅是離散值。

onCompleted    表示沒有更多可用數據。 調用onCompleted後,對onNext的進一步調用將不起做用。

onError    在Observable中發生錯誤時調用。 在調用以後,對onNext的進一步調用將不起做用

如下是咱們建立基本觀察者的方法:

var observer = Rx.Observer.create(
    function onNext(x) { console.log('Next: ' + x); },
    function onError(err) { console.log('Error: ' + err); },
    function onCompleted() { console.log('Completed'); }
);

Rx.Observer對象中的create方法接受onNext,onCompleted和onError狀況的函數,並返回一個Observer實例。這三個函數是可選的,您能夠決定要包含哪些函數。例如,若是咱們訂閱無限序列(例如點擊按鈕(用戶能夠永久點擊)),則永遠不會調用onCompleted處理程序。 若是咱們確信序列不能出錯(例如,經過從數組中生成一個Observable),咱們就不須要onError方法了。

使用Observable進行Ajax調用

咱們尚未對Observables作過任何實用的事情。如何建立一個檢索遠程內容的Observable?爲此,咱們將使用Rx.Observable.create包裝XMLHttpRequest對象。

function get(url) {
    return rxjs.Observable.create(function(observer) {
        // Make a traditional Ajax request
        var req = new XMLHttpRequest(); req.open('GET', url);
        req.onload = function() { 
            if (req.status == 200) {
                // If the status is 200, meaning there have been no problems,     
                // Yield the result to listeners and complete the sequence     
                observer.next(req.response);
                observer.completed();
            }
            else {
                // Otherwise, signal to listeners that there has been an error
                observer.error(new Error(req.statusText)); }
            };
            req.onerror = function() {
            observer.error(new Error("Unknown Error"));
        };
        req.send();
    });
}
// Create an Ajax Observable
var test = get('/api/contents.json');

在前面的代碼中,get函數使用create來包裝XMLHttpRequest。若是HTTP GET請求成功,咱們emit它的內容並結束序列(咱們的Observable只會發出一個結果)。 不然,咱們會emit一個錯誤。在最後一行,咱們傳入一個url進行調用。 這將建立Observable,但它不會發出任何請求。這很重要:Observable在至少有一個觀察者描述它們以前不會作任何事情。 因此讓咱們要這樣子作:

// Subscribe an Observer to it
test.subscribe(
    function onNext(x) { console.log('Result: ' + x); }, 
    function onError(err) { console.log('Error: ' + err); }, 
    function onCompleted() { console.log('Completed'); }
);

首先要注意的是,咱們沒有像以前的代碼那樣顯式建立Observer。大多數時候咱們都會使用這個更短的版本,咱們在Observable中使用這三個訂閱Observer案例的函數:next,completed和error。

subscribe而後一切就緒。在subscribe以前,咱們只是聲明瞭Observable和Observer將如何交互。只有當咱們調用subscribe方法時,一切纔開始運行。

始終會有一個Operator

在RxJS中,轉換或查詢序列的方法稱爲Operator。Operator位於靜態Rx.Observable對象和Observable實例中。在咱們的示例中,create就是一個這樣的Operator。

當咱們必須建立一個很是具體的Observable時,create是一個很好的選擇,可是RxJS提供了許多其餘Operator,能夠很容易地爲經常使用源建立Observable。

讓咱們再看看前面的例子。對於像Ajax請求這樣的常見操做,一般有一個Operator可供咱們使用。 在這種狀況下,RxJS DOM庫提供了幾種從DOM相關源建立Observable的方法。因爲咱們正在執行GET請求,咱們可使用Rx.DOM.Request.get,而後咱們的代碼就變成了這個:

Rx.DOM.get('/api/contents.json').subscribe(
    function onNext(data) { console.log(data.response); }, 
    function onError(err) { console.error(err); }
);

rxjs-dom自己支持的rxjs版本比較舊,例子只能作爲示意

這段代碼與咱們以前的代碼徹底相同,但咱們沒必要建立XMLHttpRequest的包裝器: 它已經存在了。另請注意,此次咱們省略了onCompleted回調,由於咱們不打算在Observable complete時作出反應。咱們知道它只會產生一個結果,咱們已經在onNext回調中使用它了。

在本書中咱們將使用這樣的大量便利操做符。這都是基於rxjs自己的能量,這也正式rxjs強大的地方之一。

一種能夠約束所有的數據類型

在RxJS程序中,咱們應該努力將全部數據都放在Observables中,而不只僅是來自異步源的數據。 這樣作能夠很容易地組合來自不一樣來源的數據,例如現有數組與回調結果,或者XMLHttpRequest的結果與用戶觸發的某些事件。

例如,若是咱們有一個數組,其項目須要與來自其餘地方的數據結合使用,最好將此數組轉換爲Observable。(顯然,若是數組只是一個不須要組合的中間變量,則沒有必要這樣作。)在本書中,您將瞭解在哪些狀況下值得將數據類型轉換爲Observables。

RxJS爲operators提供了從大多數JavaScript數據類型建立Observable的功能。 讓咱們回顧一下你將一直使用的最多見的:數組,事件和回調。

從數組建立Observable

咱們可使用通用的operators將任何相似數組或可迭代的對象轉換爲Observable。 from將數組做爲參數並返回一個包含他全部元素的Observable。

Rx.Observable
.from(['Adrià', 'Jen', 'Sergi']) 
.subscribe(
    function(x) { console.log('Next: ' + x); }, 
    function(err) { console.log('Error:', err); },
    function() { console.log('Completed'); }
);

from是和fromEvent一塊兒,是RxJS代碼中最方便和最經常使用的operators之一。

從JavaScript事件建立Observable

當咱們將一個事件轉換爲一個Observable時,它就變成了一個能夠組合和傳遞的第一類值。 例如,這是一個Observable,只要它移動就會傳初鼠標指針的座標。

var allMoves = Rx.Observable.fromEvent(document, 'mousemove')

allMoves.subscribe(function(e) {
  console.log(e.clientX, e.clientY);
});

將事件轉換爲Observable會將事件從以前的事件邏輯中釋放出來。更重要的是,咱們能夠基於原始的Observables建立新的Observable。這些新的是獨立的,可用於不一樣的任務。

var movesOnTheRight = allMoves.filter(function(e) { 
    return e.clientX > window.innerWidth / 2;
});
var movesOnTheLeft = allMoves.filter(function(e) { 
    return e.clientX < window.innerWidth / 2;
});
movesOnTheRight.subscribe(function(e) { 
    console.log('Mouse is on the right:', e.clientX);
});
movesOnTheLeft.subscribe(function(e) { 
    console.log('Mouse is on the left:', e.clientX);
});

在前面的代碼中,咱們從原始的allMoves中建立了兩個Observable。 這些專門的Observable只包含原始的過濾項:movesOnTheRight包含發生在屏幕右側的鼠標事件,movesOnTheLeft包含發生在左側的鼠標事件。 它們都沒有修改原始的Observable:allMoves將繼續發出全部鼠標移動。 Observable是不可變的,每一個應用於它們的operator都會建立一個新的Observable。

從回調函數建立Observable

若是您使用第三方JavaScript庫,則可能須要與基於回調的代碼進行交互。 咱們可使用fromCallbackfromNodeCallback兩個函數將回調轉換爲Observable。Node.js遵循的是在回調函數的第一個參數傳入錯誤對象,代表存在問題。而後咱們使用fromNodeCallback專門從Node.js樣式的回調中建立Observable:

var Rx = require('rx'); // Load RxJS
var fs = require('fs'); // Load Node.js Filesystem module
// Create an Observable from the readdir method
var readdir = Rx.Observable.fromNodeCallback(fs.readdir); // Send a delayed message
var source = readdir('/Users/sergi');
var subscription = source.subscribe(
    function(res) { console.log('List of directories: ' + res);},
    function(err) { console.log('Error: ' + err); },
    function() { console.log('Done!'); 
});

前面的代碼中,咱們使用Node.js的fs.readdir方法建立一個Observable readdir。 fs.readdir接受目錄路徑和回調函數delayedMsg,該函數在檢索目錄內容後調用。

咱們使用readdir和咱們傳遞給原始fs.readdir的相同參數,省掉了回調函數。 這將返回一個Observable,當咱們訂閱一個Observer時,它將正確使用onNext,onError和onCompleted。

總結

在本章中,咱們探討了響應式編程,並瞭解了RxJS如何經過Observable解決其餘問題的方法,例如callback或promise。如今您瞭解爲何Observables功能強大,而且您知道如何建立它們。有了這個基礎,咱們如今能夠繼續建立更有趣的響應式程序。下一章將向您展現如何建立和組合基於序列的程序,這些程序爲Web開發中的一些常見場景提供了更「可觀察」的方法。

關注個人微信公衆號,更多優質文章定時推送
clipboard.png

建立了一個程序員交流微信羣,你們進羣交流IT技術

圖片描述

若是已過時,能夠添加博主微信號15706211347,拉你進羣

相關文章
相關標籤/搜索