Rxjs 響應式編程-第二章:序列的深刻研究

Rxjs 響應式編程-第一章:響應式
Rxjs 響應式編程-第二章:序列的深刻研究
Rxjs 響應式編程-第三章: 構建併發程序
Rxjs 響應式編程-第四章 構建完整的Web應用程序
Rxjs 響應式編程-第五章 使用Schedulers管理時間
Rxjs 響應式編程-第六章 使用Cycle.js的響應式Web應用程序css

序列的深刻研究

童年的回憶中的益智視頻遊戲,你必須使用各類技巧在屏幕上引導降低的水流。您能夠拆分流,稍後將它們合併,或者使用傾斜的木板來改變它們的方向。你必需要有創造力才能使水達到最終目標。html

我發現該遊戲與使用Observable序列有不少類似之處。 Observable只是咱們能夠轉換,組合和查詢的事件流。 不管咱們是在處理簡單的Ajax回調仍是在Node.js中處理字節數據都不要緊。 咱們發現流的方式是同樣的。 一旦咱們在流中思考,咱們程序的複雜性就會下降。數據庫

在本章中,咱們將重點介紹如何在程序中有效地使用序列。 到目前爲止,咱們已經介紹瞭如何建立Observable並使用它們進行簡單的操做。爲了釋放它們的力量,咱們必須知道將咱們的程序輸入和輸出轉換爲帶有咱們程序流程的序列。編程

在咱們弄清楚以前,咱們將會遇到一些能夠幫助咱們開始操做序列的基本operator。接下來,咱們將實現一個真實的應用程序,顯示(幾乎)實時發生的地震。 開始吧!json

可視化的Observables

您將要學習咱們在RxJS程序中最常使用的一些運算符。 談論對序列的操做可能感受很抽象。 爲了幫助開發人員以簡單的方式理解Operator,咱們將使用標準的可視化表示序列,稱爲大理石圖。 它們直觀地表示異步數據流,您能夠在RxJS的每一個資源中找到它們。segmentfault

讓咱們使用範圍運算符,它返回一個Observable,它獲得指定範圍內的整數:Rx.Observable.range(1,3);api

它的大理石圖看起來像這樣:數組

image

長箭頭表示Observable,x軸表示時間。每一個圓圈表示Observable經過內部調用onNext()傳出的值。生成第三個值後,range調用了onCompleted,在圖中用垂直線表示。promise

讓咱們看一個涉及幾個Observable的例子。合併運算符采用兩個不一樣的Observable並返回一個具備合併值的新Observable。 interval運算符返回一個Observable,它在給定的時間間隔內產生增量數,以毫秒爲單位。服務器

在下面的代碼中,咱們將合併兩個不一樣的Observable,它們使用interval來以不一樣的間隔生成值:

var a = Rx.Observable.interval(200).map(function(i) { 
    return 'A' + i;
});
var b = Rx.Observable.interval(100).map(function(i) {
    return 'B' + i; 
    
});
Rx.Observable.merge(a, b).subscribe(function(x) {
    console.log(x);
});
B0, A0, B1, B2, A1, B3, B4...

合併運算符的大理石圖以下所示:

image

這裏,沿y軸的虛線箭頭指向應用於序列A和B中每一個元素的變換的最終結果。獲得的Observable由C表示,其中包含A和B的合併元素。若是不一樣Observables同時傳出元素,合併序列中這些元素的順序是隨機的。

基本序列運算符

在RxJS中轉換Observables的數十個運算符中,最經常使用的是具備良好收集處理能力的其餘語言也具備:map,filter和reduce。在JavaScript中,您能夠在Array中找到這些operator。

RxJS遵循JavaScript約定,所以您會發現如下運算符的語法與數組運算符的語法幾乎相同。實際上,咱們將使用數組和Observables同時實現,以顯示兩個API的類似程度。

Map

map是最經常使用的序列轉換運算符。它接受一個Observable和一個函數,並將該函數應用於源Observable中的每一個值。 它返回一個帶有轉換值的新Observable。

image

JS Arrays

var src = [1, 2, 3, 4, 5];
var upper = src.map(function(name) {
    return name * 2; 
});
upper.forEach(logValue);

Observables

var src = Rx.Observable.range(1, 5); 
var upper = src.map(function(name) {
    return name * 2; 
});
upper.subscribe(logValue);

在這兩種狀況下,src都不會發生改變。

這段代碼和後面的代碼使用的logValue函數:

var logValue = function(val) { 
    console.log(val) 
};

有些狀況下,咱們傳遞給map的函數會進行一些異步計算來轉換值。在這種狀況下,map將沒法按預期工做。 對於這些狀況,最好使用flatMap,後續會介紹到。

Filter

filter接受一個Observable和一個函數,並使用該函數檢測Observable中的每一個元素。它返回一個Observable序列,其中包含函數返回true的全部元素。

image

JS Arrays

var isEven = (function(val) { return val % 2 !== 0; });
var src = [1, 2, 3, 4, 5];
var even = src.filter(isEven);
even.forEach(logValue);

Observables

var isEven = (function(val) { return val % 2 !== 0; });
var src = Rx.Observable.range(1, 5); 
var even = src.filter(isEven);
even.subscribe(logValue);

Reduce

reduce(也稱爲fold)接受一個Observable並返回一個始終包含單個項的新項,這是在每一個元素上應用函數的結果。 該函數接收當前元素和函數先前調用的結果。

image

JS Arrays

var src = [1, 2, 3, 4, 5];
var sum = src.reduce(function(a, b) {
    return a + b;
});
console.log(sum);

Observables

var src = Rx.Observable.range(1, 5);
var sum = src.reduce(function(acc, x) {
    return acc + x;
});
sum.subscribe(logValue);

reduce是操做序列的強大操做符。事實上,它是稱爲聚合運算符的基本實現。

聚合運算符

聚合運算符處理序列並返回單個值。例如, Rx.Observable.first接受一個Observable和一個可選函數,並返回知足函數條件布爾值的第一個元素。

計算序列的平均值也是一個聚合操做.RxJS提供了實例運算符的平均值,可是爲了本節的目的,咱們想看看如何使用reduce實現它。每一個聚合運算符均可以經過僅使用reduce來實現:

sequences/marble.js

var avg = Rx.Observable.range(0, 5)
    .reduce(function(prev, cur) {
        return {
            sum: prev.sum + cur,
            count: prev.count + 1
        };
    }, { sum: 0, count: 0 })
    .map(function(o) {
        return o.sum / o.count;
    });
    
var subscription = avg.subscribe(function(x) {
    console.log('Average is: ', x);
});
Average is: 2

在此代碼中,咱們使用reduce將每一個新值添加到前一個值。由於reduce不能爲咱們提供序列中元素的總數,因此咱們須要對它們進行計數。咱們使用包含兩個字段sum和count的對象組成的初始值調用reduce,其中咱們將存儲到目前爲止的元素總數和總數。每一個新元素都將返回具備更新值的同一對象。

當序列結束時,reduce能夠經過調用onNex返回t包含最終總和和最終計數的對象。但在這裏咱們使用map來返回將總和除以計數的結果。

咱們能夠聚合無限Observables嗎?

想象一下,咱們正在編寫一個程序,讓用戶在行走時得到平均速度。即便用戶還沒有完成行走,咱們也須要可以使用咱們目前所知的速度值進行計算。咱們想要實時記錄無限序列的平均值。 問題是若是序列永遠不會結束,像reduce這樣的聚合運算符將永遠不會調用其Observers的onNext運算符。

對咱們來講幸運的是,RxJS團隊已經考慮過這種狀況,併爲咱們提供了scan操做符,其做用相似於reduce可是會發出每一箇中間結果:

var avg = Rx.Observable.interval(1000)
    .scan(function (prev, cur) {
        return {
            sum: prev.sum + cur,
            count: prev.count + 1
        };
    }, { sum: 0, count: 0 })
    .map(function(o) {
        return o.sum / o.count;
    });
    
var subscription = avg.subscribe( function (x) {
    console.log(x);
});

這樣,咱們能夠聚合須要很長時間才能完成或無限的序列。在前面的示例中,咱們每秒生成一個增量整數,並調用scan替換先前的reduce。咱們如今每秒獲得生成值的平均值。

flatMap

若是你的Observable的結果是仍是Observables,你要怎麼處理?大多數狀況下,您但願在單個序列中統一這些嵌套Observable中的項目。 這正是flatMap的做用。

flatMap運算符接收參數Observable A,其元素也是Observables,並返回一個子元素也是Observable的Observable。讓咱們用圖表可視化它:

image

咱們能夠看到A(A1,A2,A3)中的每一個元素也是可觀察序列。 一旦咱們使用變換函數將flatMap應用於A,咱們獲得一個Observable,其中包含A的不一樣子元素中的全部元素。

flatMap是一個功能強大的運算符,但它比咱們迄今爲止看到的運算符更難理解。能夠把它想象成Observables的concatAll()

concatAll是一個函數,它接受一個數組數組並返回一個「flattened」單個數組,其中包含全部子數組的值,而不是子數組自己。 咱們可使用reduce來實現這樣的功能:

function concatAll(source) {
    return source.reduce(function(a, b) {
        return a.concat(b); 
    });
}

咱們會像這樣使用它:

concatAll([[0, 1, 2], [3, 4, 5], [6, 7, 8]]);
// [0, 1, 2, 3, 4, 5, 6, 7, 8]

flatMap作一樣的事情,但它使Observables而不是數組變扁平。它須要一個源Observable和一個返回一個新的Observable的函數,並將該函數應用於源Observable中的每一個元素,就像map同樣。若是程序在這裏中止,咱們最終會獲得一個會發出Observables的Observable。 可是flatMap向主序列發出每一個新Observable發出的值,將全部Observable「扁平化」爲一個主序列。 最後,咱們得到了一個Observable。

取消序列

在RxJS中,咱們能夠取消正在運行的Observable。 這是一種優於其餘異步通訊形式的優點,例如回調和Promise,一旦被調用就沒法直接取消(儘管某些Promise實現支持取消)。

咱們能夠經過兩種主要方式取消Observable:隱式和顯式。

顯式取消:Disposable

Observables自己沒有取消的方法。相反,當咱們訂閱Observable時,咱們會獲得一個表明該特定訂閱的Disposable對象。而後咱們能夠在該對象中調用方法dispose,而且該訂閱將中止從Observable接收通知。

在下面的示例中,咱們將兩個Observers訂閱到計數器Observable,它每秒發出一個遞增的整數。 兩秒後,咱們取消第二個訂閱,咱們能夠看到它的輸出中止但第一個訂閱者的輸出繼續:

sequences/disposable.js

var counter = Rx.Observable.interval(1000);

var subscription1 = counter.subscribe(function(i) {
    console.log('Subscription 1:', i);
});

var subscription2 = counter.subscribe(function(i) {
    console.log('Subscription 2:', i);
});

setTimeout(function() { 
    console.log('Canceling subscription2!');
    subscription2.dispose();
}, 2000);
Subscription 1: 0 
Subscription 2: 0 
Subscription 1: 1 
Subscription 2: 1 
Canceling subscription2! 
Subscription 1: 2 
Subscription 1: 3 
Subscription 1: 4
...

隱式取消:經過Operater

大多數時候,Operater會自動取消訂閱。當序列結束或知足操做條件時,rangetake等操做符將取消訂閱。更高級的操做符,如withLatestFromflatMapLatest,將根據須要在內部建立和銷燬訂閱,由於它們處理的是運行中的幾個可觀察的內容。簡而言之,大部分訂閱的取消都不該該是你該擔憂的。

被封裝以後的Observables

當您使用包含不提供取消的外部API的Observable時,Observable仍會在取消時中止發出通知,但基礎API不必定會被取消。例如,若是您正在使用封裝Promise的Observable,則Observable將在取消時中止發出,但不會取消基礎Promise。

在下面的代碼中,咱們嘗試取消對包含promise p的Observable的訂閱,同時咱們以傳統的方式設置一個動做來解決promise。 promise應在五秒內resolve,但咱們在建立後當即取消訂閱:

var p = new Promise(function(resolve, reject) {
    window.setTimeout(resolve, 5000);
});

p.then(function() {
    console.log('Potential side effect!');
});

var subscription = Rx.Observable.fromPromise(p).subscribe(function(msg) {
    console.log('Observable resolved!');
});

subscription.dispose();

5秒後,咱們看到:

Potential side effect!

若是咱們取消對Observable的訂閱,它會有效地阻止它接收通知。 可是promise的then方法仍在運行,代表取消Observable並不會取消關聯的Promsie。

瞭解咱們在Observable中使用的外部API的詳細信息很是重要。您可能認爲已取消序列,但底層API會繼續運行並在程序中引發一些反作用。 這些錯誤真的很難捕捉到。

錯誤處理

咱們不能在回調中使用傳統的try / catch機制,由於它是同步的。 它將在任何異步代碼以前運行,而且沒法捕獲任何錯誤。

在回調函數中,能夠經過將錯誤(若是有)做爲參數傳遞到回調函數。這是有用的,但它使代碼很是脆弱。

讓咱們看看如何捕獲Observables中的錯誤。

onError處理程序

還記得咱們在上面上討論了第一次與觀察者聯繫的觀察者能夠調用的三種方法嗎? 咱們熟悉onNextonCompleted,可是咱們尚未使用onError; 它是有效處理Observable序列中錯誤的關鍵。

爲了瞭解它是如何工做的,咱們將編寫一個簡單的函數來獲取JSON字符串數組,並使用JSON.parse返回一個Observable,它發出從這些字符串解析的對象:

爲了瞭解它是如何工做的,咱們將編寫一個簡單的函數來獲取JSON字符串組成的數組,並使用JSON.parse返回一個Observable,它發出從這些字符串解析的對象:

function getJSON(arr) {
    return Rx.Observable.from(arr).map(function(str) {
        var parsedJSON = JSON.parse(str);
        return parsedJSON;
    });
}

咱們將帶有三個JSON字符串的數組傳遞給getJSON,其中數組中的第二個字符串包含語法錯誤,所以JSON.parse將沒法解析它。 而後咱們將訂閱結果,爲onNext和onError提供處理程序:

getJSON([
    '{"1": 1, "2": 2}',
    '{"success: true}', // Invalid JSON string
    '{"enabled": true}'
]).subscribe(
    function(json) {
        console.log('Parsed JSON: ', json);
    },
    function(err) {
        console.log(err.message);
    }
)
Parsed JSON: { 1: 1, 2: 2 }
JSON.parse: unterminated string at line 1 column 8 of the JSON data

Observable爲第一個結果發出解析的JSON,但在嘗試解析第二個結果時拋出異常。 onError處理程序捕獲並打印出來。默認行爲是,每當發生錯誤時,Observable都會中止發出項目,而且不會調用onCompleted。

錯誤捕獲

到目前爲止,咱們已經看到如何檢測錯誤已經發生並對該信息作了些什麼,可是咱們沒法對它作出響應並繼續咱們正在作的事情。Observable察實例具備catch運算符,它容許咱們對Observable中的錯誤作出反應並繼續使用另外一個Observable。

catch接受一個Observable或一個接收錯誤的函數做爲參數並返回另外一個Observable。 在咱們的場景中,若是原始Observable中存在錯誤,咱們但願Observable發出包含error屬性的JSON對象:

function getJSON(arr) {
    return Rx.Observable.from(arr).map(function(str) {
        var parsedJSON = JSON.parse(str);
        return parsedJSON;
    });
}

var caught = getJSON(['{"1": 1, "2": 2}', '{"1: 1}']).catch(
    Rx.Observable.return({
        error: 'There was an error parsing JSON'
    })
);

caught.subscribe(
    function(json) {
        console.log('Parsed JSON: ', json);
    },
    // Because we catch errors now, `onError` will not be executed
    function(e) {
        console.log('ERROR', e.message);
    }
);

在前面的代碼中,咱們建立了一個新的Observable,它使用catch運算符來捕獲原始Observable中的錯誤。 若是出現錯誤,它將使用僅發出一個項目的Observable繼續序列,並使用描述錯誤的error屬性。 這是輸出:

Parsed JSON: Object { 1: 1, 2: 2 }
Parsed JSON: Object { error: "There was an error parsing JSON" }

這是catch操做符的大理石圖:

image

注意X表示序列出錯。 在這種狀況下,Observable值 - 三角形的不一樣形狀意味着它們是來自另外一個Observable的值。在這裏,這是咱們在發生錯誤時返回的Observable。

catch對於對序列中的錯誤做出反應很是有用,它的行爲與傳統的try / catch塊很是類似。 可是,在某些狀況下,忽略Observable中的項目發生的錯誤並讓序列繼續,這將是很是方便的。 在這些狀況下,咱們可使用重試運算符。

序列重試

有時錯誤就會發生,咱們無能爲力。例如,可能存在請求遠程數據的超時,由於用戶具備不穩定的Internet鏈接,或者咱們查詢的遠程服務器可能崩潰。在這些狀況下,若是咱們可以繼續請求咱們須要的數據直到成功,那將是很好的。 重試操做符的確如此:

sequences/error_handling.js

// This will try to retrieve the remote URL up to 5 times.
Rx.DOM.get('/products').retry(5)
.subscribe(
    function(xhr) { console.log(xhr); },
    function(err) { console.error('ERROR: ', err); }
);

在前面的代碼中,咱們建立了一個函數,該函數返回一個Observable,它使用XMLHttpRequest從URL檢索內容。 由於咱們的鏈接可能有點不穩定,因此咱們在訂閱它以前添加retry(5),確保在出現錯誤的狀況下,它會在放棄並顯示錯誤以前嘗試最多五次。

使用重試時須要瞭解兩件重要事項。首先,若是咱們不傳遞任何參數,它將無限期地重試,直到序列完成沒有錯誤。 若是Observable產生錯誤,這對性能是危險的。 若是咱們使用同步Observable,它將具備與無限循環相同的效果。

其次,重試將始終從新嘗試整個Observable序列,即便某些項目沒有錯誤。若是您在處理項目時形成任何反作用,這一點很重要,由於每次重試都會從新應用它們。

製做實時地震可視化器

使用咱們在本章中到目前爲止所涵蓋的概念,咱們將構建一個使用RxJS的Web應用程序,以向咱們展現實時發生地震的位置。咱們首先要創建一個功能性的反應性實施方案,咱們將隨着時間的推移對其進行改進。 最終結果以下:

image

準備環境

咱們將使用USGS(美國地質調查局)地震數據庫,該數據庫提供多種格式的實時地震數據集。 咱們將以JSONP格式從每週數據集中獲取數據。

咱們還將使用Leaflet(一個JavaScript庫)來渲染交互式地。讓咱們看看咱們的index.html看起來如何,並重點介紹:

examples_earthquake/index.html

<!DOCTYPE html>
<html lang="en-us">
<head>
    <meta charset="utf-8">
    <link rel="stylesheet"
    href="http://cdn.leafletjs.com/leaflet-0.7.3/leaflet.css" />
    <script src="http://cdn.leafletjs.com/leaflet-0.7.3/leaflet.js"></script>
    <script src="../rx.all-4.0.0.js"></script>
    <title>Earthquake map</title>
    <style type="text/css">
        html, body {
        margin: 0;
        padding: 0;
        height: 100%;
        }
        #map { height: 100%; }
    </style>
</head>
<body>
<div id="map"></div>
    <script>
        var QUAKE_URL = 'http://earthquake.usgs.gov/earthquakes/feed/v1.0/' +
        'summary/all_day.geojsonp';
        function loadJSONP(url) {
            var script = document.createElement('script');
            script.src = url;
            var head = document.getElementsByTagName('head')[0];
            head.appendChild(script);
        }
        var map = L.map('map').setView([33.858631, -118.279602], 7);
        L.tileLayer('http://{s}.tile.osm.org/{z}/{x}/{y}.png').addTo(map);
    </script>
    <script src="code.js"></script>
</body>
</html>

檢索地震位置

如今咱們的HTML已準備就緒,咱們能夠爲咱們的應用程序編寫邏輯。首先,咱們須要知道咱們得到了什麼樣的數據以及在地圖上表明地震所需什麼樣的數據。

USGS網站給咱們的JSONP數據看起來像這樣:

examples_earthquake/jsonp_example.txt

eqfeed_callback({
    "type": "FeatureCollection",
    "metadata": {
        "generated": 1408030886000,
        "url": "http://earthquake.usgs.gov/earthquakes/...",
        "title": "USGS All Earthquakes, Past Day",
        "status": 200, "api": "1.0.13", "count": 134
    },
    "features": [
        {
            "type": "Feature",
            "properties": {
                "mag": 0.82,
                "title": "M 0.8 - 3km WSW of Idyllwild-Pine Cove, California",
                "place": "3km WSW of Idyllwild-Pine Cove, California",
                "time": 1408030368460,
                ...
            },
            "geometry": {
                "type": "Point",
                "coordinates": [ -116.7636667, 33.7303333, 17.33 ]
            },
            "id": "ci15538377"
        },
        ...
    ]
})

features數組包含一個對象,其中包含今天發生的每次地震的數據。 那是一大堆數據! 一天以內發生了多少次地震是使人驚訝的(而且可怕)。對於咱們的程序,咱們只須要每次地震的座標,標題和大小。

咱們首先要建立一個Observable來檢索數據集併發出單個地震。 這是第一個版本:

examples_earthquake/code.js

var quakes = Rx.Observable.create(function(observer) {
    window.eqfeed_callback = function(response) {
        var quakes = response.features;
        quakes.forEach(function(quake) {
            observer.onNext(quake);
        });
    };
    loadJSONP(QUAKE_URL);
});

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    L.circle([coords[1], coords[0]], size).addTo(map);
});

等等,那個明顯的全局函數window.eqfeed_callback在咱們的代碼中作了什麼? 好吧,事實證實,JSONP URL一般在URL中添加查詢字符串,以指定處理響應的函數名稱,但USGS站點不容許這樣作,所以咱們須要建立一個全局函數 他們決定咱們必須使用的名稱,即eqfeed_callback

咱們的Observable按順序發出全部地震。咱們如今有地震數據生成器!咱們沒必要關心異步流程或者必須將全部邏輯放在同一個函數中。只要咱們訂閱Observable,就會獲得地震數據。

經過在地震觀測中將地震檢索「黑箱」,咱們如今能夠訂閱並處理每次地震。 而後咱們將爲每一個地震繪製一個圓,其大小與其大小成比例。

深刻一些

咱們能夠作得更好嗎?你打賭!在前面的代碼中,咱們仍然經過遍歷數組並調用onNext來管理每一個地震,即便咱們在Observable中將其隔離。

這是可使用flatMap的完美狀況。咱們將使用Rx.Observable.from檢索數據並從features數組中生成一個Observable。 而後咱們將Observable合併回主Observable中:

var quakes = Rx.Observable.create(function(observer) {
    window.eqfeed_callback = function(response) {
        observer.onNext(response);
        observer.onCompleted();
    };
    loadJSONP(QUAKE_URL);
}).flatMap(function transform(dataset) {
    return Rx.Observable.from(dataset.response.features);
});

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    L.circle([coords[1], coords[0]], size).addTo(map);
});

咱們再也不手動管理流程了。 沒有循環或條件來提取單個地震對象並將其傳遞出去。 這是就是發生了什麼:

  1. onNext只發生一次,它產生整個JSON字符串。
  2. 因爲咱們只會產生一次,所以咱們在onNext以後發出完成信號。
  3. 咱們將flatMap調用連接到create的結果,所以flatMap將從Observable中獲取每一個結果(在這種狀況下只有一個),將它用做transform函數的參數,並將該函數產生的Observable合併到源Observable。
  4. 這裏咱們採用包含全部地震的features數組,並從中建立一個Observable。因爲flatMap,這將成爲quakes變量將包含的實際Observable。

5.訂閱不會改變; 它像之前同樣繼續處理地震的數據流。

始終有一種方法

到目前爲止,咱們已經使用了rx.all.js中包含的RxJS運算符,但一般仍是須要借鑑其餘基於RxJS的庫附帶的運算符。在咱們的例子中,咱們將看看RxJS-DOM。RxJS-DOM是一個外部庫,其中包含一個處理JSONP請求的運算符:jsonpRequest。這爲咱們節省了一些代碼,由於咱們不須要使用討厭的全局函數:

examples_earthquake/code1_2.js

var quakes = Rx.DOM.jsonpRequest({
    url: QUAKE_URL,
    jsonpCallback: 'eqfeed_callback'
})
.flatMap(function(result) {
    return Rx.Observable.from(result.response.features);
})
.map(function(quake) {
    return {
        lat: quake.geometry.coordinates[1],
        lng: quake.geometry.coordinates[0],
        size: quake.properties.mag * 10000
    };
});

quakes.subscribe(function(quake) {
    L.circle([quake.lat, quake.lng], quake.size).addTo(map);
});

請記住,要運行此代碼,您須要在HTML中包含RxJS-DOM中的文件rx.dom.js。請注意咱們如何添加一個map運算符,將地震對象轉換爲僅包含咱們可視化所需信息的簡單對象:緯度,經度和地震震級。 咱們在subscribeoperator中寫的功能越少越好。

實時標記

咱們地震應用的版本不會實時更新地震圖。爲了實現這一點,咱們將使用咱們在本章前面看到的interval運算符 - 以及有用的distinct運算符。下面的代碼,而後咱們將完成更改:

examples_earthquake/code1_3.js

var quakes = Rx.Observable
.interval(5000)
.flatMap(function() {
    return Rx.DOM.jsonpRequest({
        url: QUAKE_URL,
        jsonpCallback: 'eqfeed_callback'
    }).retry(3);
})
.flatMap(function(result) {
    return Rx.Observable.from(result.response.features);
})
.distinct(function(quake) { return quake.properties.code; });

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    L.circle([coords[1], coords[0]], size).addTo(map);
});

在前面的代碼中,咱們使用interval來發出新請求並以5秒的固定間隔處理它們。 interval建立一個Observable,每隔五秒發出一個遞增的數字。咱們對這些數字沒有作任何事情; 相反,咱們使用flatMap來檢索jsonpRequest的數據。另請注意咱們如何在首先檢索列表時出現問題時再次嘗試重試。

咱們應用的最後一個運算符是distinct,它只發出以前未發出的元素。 它須要一個函數來返回屬性以檢查是否相等。 這樣咱們就不會重繪已經繪製過的地震。

在不到20行中,咱們編寫了一個應用程序,按期輪詢外部JSONP URL,從其內容中提取具體數據,而後過濾掉已導入的地震。在那以後,咱們在地圖上表示地震,其大小與其大小成比例-全部這些都以獨立,清晰和簡潔的方式編寫,而不依賴於外部狀態。這代表了Observables的表現力。

改進的想法

這裏有一些想法可使用你新得到的RxJS技能,並使這個小應用程序更有趣:

  • 當用戶將鼠標懸停在地震上時,提供一個彈出窗口,顯示有關該特定地震的更多信息。 一種方法是從只有你想要顯示的屬性的地震中建立一個新的Observable,並在懸停時動態過濾它。
  • 在頁面頂部放置一個計數器,顯示當前到目前爲止的地震次數,並天天重置

Operator詳解

本章向您介紹了一些新的運算符,因此這裏是對它們的回顧,以及咱們在應用程序中使用它們的方法。 請記住,您始終能夠在RxJS GitHub站點上找到Operator的完整API文檔。

  • Rx.Observable.from

默認行爲:同步

因爲您在應用程序中使用的許多數據源都來自數組或迭代器,所以有一個運算符能夠從中建立Observable。 from是您最常使用的Operator之一。

使用from,咱們能夠從數組,相似數組的對象(例如,arguments對象或DOM NodeLists)建立Observable,甚至能夠實現可迭代協議的類型,例如StringMapSet

  • Rx.Observable.range

默認行爲:同步

range運算符生成有限的Observable,它發出特定範圍內的整數。它功能多樣,可用於許多場景。 例如,您可使用範圍在像掃雷同樣的遊戲板上生成初始方塊。

  • Rx.Observable.interval

默認行爲:異步

每次須要生成時間間隔的值時,您可能會以interval運算符做爲生成器開始。因爲interval每x毫秒發出一次順序整數(其中x是咱們傳遞的參數),咱們只須要將值轉換爲咱們想要的任何值。 咱們在第3章「構建併發程序」中的遊戲很大程度上基於該技術。

  • Rx.Observable.distinct

默認行爲:與filter的Observable相同

distinct是這些很是簡單的Operator之一,能夠節省大量的開發工做。它會過濾掉已經發出的任何值。 這使咱們避免編寫容易出錯的樣板代碼,咱們將對比傳入的結果決定返回值。就是返回不一樣值。

image

distinct容許咱們使用指定比較方法的函數。另外,咱們能夠不傳遞任何參數,它將使用嚴格的比較來比較數字或字符串等基本類型,並在更復雜的對象的狀況下運行深度比較。

總結

在本章中,咱們介紹瞭如何使用大理石圖表直觀地表示和理解Observable流程。咱們已經介紹了最多見的運算符來轉換Observables,更重要的是,咱們只使用Observable序列構建了一個真實的世界應用程序,避免設置任何外部狀態,循環或條件分支。咱們以聲明的方式表達了咱們的整個程序,而沒必要編碼完成手頭任務的每一步。

在下一章中,咱們將繼續探索Observable序列,此次咱們將介紹更高級的運算符,它們容許您控制程序中的流和數據,用以前沒法想象的代碼!

關注個人微信公衆號,更多優質文章定時推送
clipboard.png

相關文章
相關標籤/搜索