Rxjs 響應式編程-第四章 構建完整的Web應用程序

Rxjs 響應式編程-第一章:響應式
Rxjs 響應式編程-第二章:序列的深刻研究
Rxjs 響應式編程-第三章: 構建併發程序
Rxjs 響應式編程-第四章 構建完整的Web應用程序
Rxjs 響應式編程-第五章 使用Schedulers管理時間
Rxjs 響應式編程-第六章 使用Cycle.js的響應式Web應用程序html

構建完整的Web應用程序

在本章中,咱們將構建一個典型的Web應用程序,在前端和後端使用RxJS。咱們將轉換文檔對象模型(DOM)並使用Node.js服務器中的WebSockets進行客戶端 - 服務器通訊。前端

對於用戶界面位,咱們將使用RxJS-DOM庫,這是由RxJS製做的同一團隊的庫,它提供了方便的Operator來處理DOM和瀏覽器相關的東西,這將使咱們的編程更簡潔。對於服務器部分,咱們將使用兩個完善的節點庫,並將一些API與Observables包裝在一塊兒,以便在咱們的應用程序中使用它們。node

在本章以後,您將可以使用RxJS以聲明方式構建用戶界面,使用咱們目前爲止看到的技術並將它們應用於DOM。 您還能夠在任何Node.js項目中使用RxJS,而且可以在任何項目中使用反應式編程和RxJS。程序員

創建一個實時地震Dashboard

咱們將爲地震儀表板應用程序構建服務器和客戶端部件,實時記錄地震的位置並可視化顯示。咱們將在Node.js中構建服務器,而且改進咱們的應用程序,使其更具互動性和更充足的信息量。正則表達式

一開始的代碼以下:數據庫

examples_earthquake/code1_3.jsnpm

var quakes = Rx.Observable
.interval(5000)
.flatMap(function() {
    return Rx.DOM.jsonpRequest({
        url: QUAKE_URL,
        jsonpCallback: 'eqfeed_callback'
    }).retry(3);
})
.flatMap(function(result) {
    return Rx.Observable.from(result.response.features);
})
.distinct(function(quake) { return quake.properties.code; });

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    L.circle([coords[1], coords[0]], size).addTo(map);
});

image

這段代碼已經有一個潛在的錯誤:它能夠在DOM準備好以前執行,每當咱們嘗試在代碼中使用DOM元素時就會拋出錯誤。咱們想要的是在觸發DOMContentLoaded事件以後加載咱們的代碼,這表示瀏覽器已經準備好dom了。編程

RxJS-DOM提供Rx.DOM.readyObservable,當觸發DOMContentLoaded時,它會發出一次。 所以,讓咱們將代碼包裝在initialize函數中,並在訂閱Rx.DOM.ready時執行它:json

examples_earthquake_ui/code1.jssegmentfault

function initialize() {
    var quakes = Rx.Observable
    .interval(5000)
    .flatMap(function() {
    return Rx.DOM.jsonpRequest({
        url: QUAKE_URL,
        jsonpCallback: 'eqfeed_callback'
    });
    })
    .flatMap(function(result) {
        return Rx.Observable.from(result.response.features);
    })
    .distinct(function(quake) { return quake.properties.code; });
    
    quakes.subscribe(function(quake) {
        var coords = quake.geometry.coordinates;
        var size = quake.properties.mag * 10000;
        L.circle([coords[1], coords[0]], size).addTo(map);
    });
}
Rx.DOM.ready().subscribe(initialize);

接下來,咱們將在HTML中添加一個空表,咱們將在下一部分填充地震數據:

<table>
    <thead>
        <tr>
            <th>Location</th>
            <th>Magnitude</th>
            <th>Time</th>
        </tr>
    </thead>
    <tbody id="quakes_info">
    </tbody>
</table>

有了這個,咱們準備開始爲咱們的儀表板編寫新代碼。

添加地震列表

新儀表板的第一個功能是顯示地震的實時列表,包括有關其位置,大小和日期的信息。此列表的數據與來自USGS網站的地圖相同。咱們首先建立一個函數,在給定props對象參數的狀況下返回一個row元素:

examples_earthquake_ui/code2.js

function makeRow(props) {
    var row = document.createElement('tr');
    row.id = props.net + props.code;
    var date = new Date(props.time);
    var time = date.toString();
    [props.place, props.mag, time].forEach(function(text) {
        var cell = document.createElement('td');
        cell.textContent = text;
        row.appendChild(cell);
    });
    return row;
}

props參數與咱們從USGS站點檢索的JSON中的properties屬性相同。

爲了生成行,咱們將再次訂閱地震Observable。此訂閱會在表格中爲每次收到的新地震建立一行。 咱們在initialize函數的末尾添加代碼:

examples_earthquake_ui/code2.js

var table = document.getElementById('quakes_info');
quakes
.pluck('properties')
.map(makeRow)
.subscribe(function(row) { table.appendChild(row); });

pluck運算符從每一個地震對象中提取屬性值,由於它包含makeRow所需的全部信息。 而後咱們將每一個地震對象映射到makeRow,將其轉換爲填充的HTML tr元素。 最後,在訂閱中,咱們將每一個發出的行追加到咱們的table中。

每當咱們收到地震數據時,這應該獲得一個數據稠密的表格。

看起來不錯,並且很容易!不過,咱們能夠作一些改進。首先,咱們須要探索RxJS中的一個重要概念:冷熱Observable。

冷熱Observable

不管Observers是否訂閱它們,「熱」Observable都會發出值。另外一方面,「冷」Observables從Observer開始訂閱就發出整個值序列。

熱Observable

訂閱熱Observable的Observer將接收從訂閱它的確切時刻發出的值。在那一刻訂閱的每一個其餘Observer將收到徹底相同的值。 這相似於JavaScript事件的工做方式。

鼠標事件和股票交易代碼是熱的Observables的例子。在這兩種狀況下,Observable都會發出值,不管它是否有訂閱者,而且在任何訂閱者收聽以前可能已經生成了值。這是一個例子:

hot_cold.js

var onMove = Rx.Observable.fromEvent(document, 'mousemove');
var subscriber1 = onMove.subscribe(function(e) {
    console.log('Subscriber1:', e.clientX, e.clientY);
});
var subscriber2 = onMove.subscribe(function(e) {
    console.log('Subscriber2:', e.clientX, e.clientY);
});
// Result:
// Subscriber1: 23 24
// Subscriber2: 23 24
// Subscriber1: 34 37
// Subscriber2: 34 37
// Subscriber1: 46 49
// Subscriber2: 46 49
// ...

在該示例中,兩個訂閱者在發出Observable時都會收到相同的值。 對於JavaScript程序員來講,這種行爲感受很天然,由於它相似於JavaScript事件的工做方式。

如今讓咱們看看冷Observables是如何工做的。

冷Observable

只有當Observers訂閱它時,冷Observable纔會發出值。

例如,Rx.Observable.range返回一個冷Observable。訂閱它的每一個新觀察者都將收到整個範圍:

hot_cold.js

function printValue(value) {
    console.log(value);
}
var rangeToFive = Rx.Observable.range(1, 5);
var obs1 = rangeToFive.subscribe(printValue); // 1, 2, 3, 4, 5
var obs2 = Rx.Observable
.delay(2000)
.flatMap(function() {
    return rangeToFive.subscribe(printValue); // 1, 2, 3, 4, 5
});

瞭解咱們什麼時候處理熱或冷的Observable對於避免細微和隱藏的錯誤相當重要。例如,Rx.Observable.interval返回一個Observable,它以固定的時間間隔生成一個遞增的整數值。 想象一下,咱們想用它來將相同的值推送給幾個觀察者。 咱們能夠像這樣實現它:

hot_cold.js

var source = Rx.Observable.interval(2000);

var observer1 = source.subscribe(function (x) {
    console.log('Observer 1, next value: ' + x);
});

var observer2 = source.subscribe(function (x) {
    console.log('Observer 2: next value: ' + x);
});

輸出

Observer 1, next value: 0
Observer 2: next value: 0
Observer 1, next value: 1
Observer 2: next value: 1
...

這彷佛沒什麼問題。 但如今想象咱們須要第二個用戶在第一個用戶加入後三秒鐘加入:

hot_cold.js

var source = Rx.Observable.interval(1000);
var observer1 = source.subscribe(function (x) {
    console.log('Observer 1: ' + x);
});
setTimeout(function() {
    var observer2 = source.subscribe(function (x) {
        console.log('Observer 2: ' + x);
    });
}, 3000);

輸出

Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 2: 0
Observer 1: 4
Observer 2: 1
...

如今咱們看到有些東西真的沒了。三秒後訂閱時,observer2接收源已經推送過的全部值,而不是從當前值開始並從那裏繼續,由於Rx.Observable.interval是一個冷Observable。 若是熱和冷Observables之間的的區別不是很清楚的話,那麼這樣的場景可能會使人驚訝。

若是咱們有幾個Observers訂閱冷的Observable,他們將收到相同序列值的副本。嚴格來講,儘管觀察者共享相同的Observable,但它們並無共享相同的值序列。若是咱們但願Observers共享相同的序列,咱們須要一個熱的Observable。

從冷到熱使用publish

咱們可使用publish將冷的Observable變成熱的。調用publish會建立一個新的Observable,它充當原始Observable的代理。它經過訂閱原始版本並將其收到的值推送給訂閱者來實現。

已發佈的Observable其實是一個ConnectableObservable,它有一個名爲connect的額外方法,咱們調用它來開始接收值。 這容許咱們在開始運行以前訂閱它:

hot_cold.js

// Create an Observable that yields a value every second
var source = Rx.Observable.interval(1000);
var publisher = source.publish();
// Even if we are subscribing, no values are pushed yet.
var observer1 = publisher.subscribe(function (x) {
    console.log('Observer 1: ' + x);
});
// publisher connects and starts publishing values
publisher.connect();
setTimeout(function() {
    // 5 seconds later, observer2 subscribes to it and starts receiving
    // current values, not the whole sequence.
    var observer2 = publisher.subscribe(function (x) {
        console.log('Observer 2: ' + x);
    });
}, 5000);

共享冷Observable

讓咱們回到咱們的地震示例。到目前爲止,咱們的代碼看起來很合理;咱們有一個帶有兩個訂閱的Observable地震:一個在地圖上繪製地震,另外一個在表格中列出地震。

但咱們可使代碼更有效率。 經過讓兩個地震用戶,咱們實際上要求兩次數據。 您能夠經過在quakesflatMap操做符中放入一個console.log來檢查。

發生這種狀況是由於quakes是一個冷Observable,而且它會將全部值從新發送給每一個新訂閱者,所以新訂閱意味着新的JSONP請求。這會經過網絡請求兩次相同的資源來影響咱們的應用程序性能。

對於下一個示例,咱們將使用`share·運算符,當Observers的數量從0變爲1時,它自動建立對Observable的預訂。 這使咱們免於從新鏈接:

examples_earthquake_ui/code2.js

var quakes = Rx.Observable
.interval(5000)
.flatMap(function() {
    return Rx.DOM.jsonpRequest({
        url: QUAKE_URL,
        jsonpCallback: 'eqfeed_callback'
    });
})
.flatMap(function(result) {
    return Rx.Observable.from(result.response.features);
})
.distinct(function(quake) { return quake.properties.code; })
.share()

如今地震的行爲就像一個熱的Observable,咱們沒必要擔憂咱們鏈接多少觀察者,由於他們都會收到徹底相同的數據。

緩衝值

咱們以前的代碼運行良好,但請注意,每次咱們收到有關地震的信息時都會插入一個tr節點。 這是低效的,由於每次插入咱們都會修改DOM並致使從新繪製頁面,使瀏覽器沒必要要地計算新佈局。 這可能會致使性能降低。

理想狀況下,咱們會批處理幾個傳入的地震對象,並每隔幾秒插入一批地震對象。手動實現會很棘手,由於咱們必須保留計數器和元素緩衝區,咱們必須記住每次批量重置它們。 可是使用RxJS,咱們可使用一個基於緩衝區的RxJS運算符,好比bufferWithTime

使用bufferWithTime,咱們能夠緩衝傳入的值,並在每x個時間段將它們做爲數組釋放:

examples_earthquake_ui/code3.bufferWithTime.js

var table = document.getElementById('quakes_info');
quakes
.pluck('properties')
.map(makeRow)
❶ .bufferWithTime(500)
❷ .filter(function(rows) { return rows.length > 0; }
.map(function(rows) {
var fragment = document.createDocumentFragment();
rows.forEach(function(row) {
❸ fragment.appendChild(row);
});
return fragment;
})
.subscribe(function(fragment) {
❹ table.appendChild(fragment);
});

這是新代碼中正在發生的事情:

  1. B緩存每一個傳入值並每500毫秒釋放一批值。
  2. 不管如何,bufferWithTime每500ms執行一次,若是沒有傳入值,它將產生一個空數組。 咱們會過濾掉這些空數組。
  3. 咱們將每一行插入一個文檔片斷,這是一個沒有父文檔的文檔。這意味着它不在DOM中,而且修改其內容很是快速和有效。
  4. 最後,咱們將片斷附加到DOM。附加片斷的一個優勢是它被視爲單個操做,只會致使一次重繪。 它還將片斷的子元素附加到咱們附加片斷自己的同一元素。

使用緩衝區和片斷,咱們設法保持行插入性能,同時保持應用程序的實時性(最大延遲爲半秒)。 如今咱們已準備好爲咱們的儀表板添加下一個功能:交互性!

添加交互

咱們如今在地圖上和列表中發生地震,但兩個表示之間沒有相互做用。例如,每當咱們點擊列表上的地圖時,就能夠在地圖上居中地震,並在咱們將鼠標移動到其行上時突出顯示地圖上帶圓圈的地震。 咱們開始吧。

在Leaflet中,您能夠在地圖上繪製並將繪圖放在各自的圖層中,以便您能夠單獨操做它們。 讓咱們建立一組名爲quakeLayer的圖層,咱們將存儲全部地震圈。每一個圓圈都是該組內的一個圖層。 咱們還將建立一個對象codeLayers,咱們將存儲地震代碼和內部圖層ID之間的相關性,以便咱們能夠經過地震ID來查找圓圈:

examples_earthquake_ui/code3.js

var codeLayers = {};
var quakeLayer = L.layerGroup([]).addTo(map);

如今,在初始化內部的地震Observable訂閱中,咱們將每一個圓圈添加到圖層組並將其ID存儲在codeLayers中。 若是這看起來有點錯綜複雜,那是由於這是Leaflet容許咱們在地圖中引用圖層的惟一方式。

examples_earthquake_ui/code3.js

quakes.subscribe(function(quake) {
    var coords = quake.geometry.coordinates;
    var size = quake.properties.mag * 10000;
    var circle = L.circle([coords[1], coords[0]], size).addTo(map);
    quakeLayer.addLayer(circle);
    codeLayers[quake.id] = quakeLayer.getLayerId(circle);
});

咱們如今建立懸停效果。咱們將編寫一個新函數isHovering,它返回一個Observable,它發出一個布爾值,表示在任何給定時刻鼠標是否在特定地震圈上:

examples_earthquake_ui/code3.js

❶ var identity = Rx.helpers.identity;
function isHovering(element) {
❷ var over = Rx.DOM.mouseover(element).map(identity(true));
❸ var out = Rx.DOM.mouseout(element).map(identity(false));
❹ return over.merge(out);
}
  1. Rx.helpers.identity是定義函數。 給定參數x,它返回x。 這樣咱們就沒必要編寫返回它們收到的值的函數。
  2. over是一個Observable,當用戶將鼠標懸停在元素上時會發出true。
  3. out是一個Observable,當用戶將鼠標移動到元素以外時,它會發出false。
  4. isHovering將over和out合併,返回一個Observable,當鼠標懸停在元素上時發出true,當它離開時返回false。

使用isHovering,咱們能夠修改建立rows的訂閱,這樣咱們就能夠在建立時訂閱每行中的事件:

examples_earthquake_ui/code3.js

var table = document.getElementById('quakes_info');
quakes
.pluck('properties')
.map(makeRow)
.bufferWithTime(500)
.filter(function(rows) { return rows.length > 0; })
.map(function(rows) {
    var fragment = document.createDocumentFragment();
    rows.forEach(function(row) {
        fragment.appendChild(row);
    });
    return fragment;
})
.subscribe(function(fragment) {
    var row = fragment.firstChild; // Get row from inside the fragment
    ❶ var circle = quakeLayer.getLayer(codeLayers[row.id]);
    ❷ isHovering(row).subscribe(function(hovering) {
        circle.setStyle({ color: hovering ? '#ff0000' : '#0000ff' });
    });
    ❸ Rx.DOM.click(row).subscribe(function() {
        map.panTo(circle.getLatLng());
    });
    table.appendChild(fragment);
})
  1. 咱們使用從行元素得到的ID在地圖上獲取地震的圓元素。 有了它,codeLayers爲咱們提供了相應的內部ID,它使用quakeLayer.getLayer獲取了circle元素。
  2. 咱們用當前行調用isHovering,而後咱們訂閱生成的Observable。 若是懸停參數爲真,咱們會將圓圈畫成紅色; 否則,它會是藍色的。
  3. 咱們訂閱了從當前行中的click事件建立的Observable。 單擊列表中的行時,地圖將以地圖中相應圓圈爲中心。

使其更高效

經驗豐富的前端開發人員知道在頁面上建立許多事件是致使性能不佳的一個因素。 在前面的示例中,咱們爲每一行建立了三個事件。 若是咱們在列表中得到100次地震,咱們將在頁面周圍浮動300個事件,只是爲了作一些亮點突出工做! 這對於表現來講太糟糕了,咱們能夠作得更好。

由於DOM中的事件老是冒泡(從子元素到父元素),前端開發人員中一個衆所周知的技術是避免將鼠標事件單獨附加到多個元素,而是將它們附加到父元素。 一旦在父項上觸發了事件,咱們就可使用事件的target屬性來查找做爲事件目標的子元素。

由於咱們須要爲事件click和mouseover提供相似的功能,因此咱們將建立一個函數getRowFromEvent:

examples_earthquake_ui/code3.pairwise.js

function getRowFromEvent(event) {
return Rx.Observable
.fromEvent(table, event)
❶ .filter(function(event) {
    var el = event.target;
    return el.tagName === 'TD' && el.parentNode.id.length;
})
❷ .pluck('target', 'parentNode')
❸ .distinctUntilChanged();
}

getRowFromEvent爲咱們提供了事件發生的錶行。 如下是詳細信息:

  1. 咱們確保在表格單元格中發生事件,並檢查該單元格的父級是不是具備ID屬性的行。 這些行是咱們用地震ID標記的行。
  2. pluck運算符在element的target屬性中提取嵌套屬性parentNode。
  3. 這能夠防止屢次得到相同的元素。 例如,使用mouseover事件會發生不少事情。

examples_earthquake_ui/code3.pairwise.js

在上一節中,咱們在每行上附加事件mouseover和mouseout,以便在每次鼠標輸入或退出行時更改地震圈顏色。 如今,咱們將僅使用桌面上的mouseover事件,並結合方便的pairwise運算符:

examples_earthquake_ui/code3.pairwise.js

getRowFromEvent('mouseover')
.pairwise()
.subscribe(function(rows) {
    var prevCircle = quakeLayer.getLayer(codeLayers[rows[0].id]);
    var currCircle = quakeLayer.getLayer(codeLayers[rows[1].id]);
    prevCircle.setStyle({ color: '#0000ff' });
    currCircle.setStyle({ color: '#ff0000' });
});

pairwise將每一個發射值與先前在陣列中發射的值進行分組。 由於咱們老是得到不一樣的行,因此成對將始終產生鼠標剛剛離開的行和鼠標如今懸停的行。 有了這些信息,就能夠相應地爲每一個地震圈着色。

處理click事件更簡單:

examples_earthquake_ui/code3.pairwise.js

getRowFromEvent('click')
.subscribe(function(row) {
    var circle = quakeLayer.getLayer(codeLayers[row.id]);
    map.panTo(circle.getLatLng());
});

咱們能夠回到訂閱quakes來生成行:

examples_earthquake_ui/code3.pairwise.js

quakes
.pluck('properties')
.map(makeRow)
.subscribe(function(row) { table.appendChild(row); });

咱們的代碼如今更加乾淨,而且它不依賴於別處的row。 若是沒有row,getRowFromEvent將不會嘗試產生任何item。

更重要的是,咱們的代碼如今很是高效。 不管咱們檢索的地震信息量如何,咱們老是隻有一個鼠標懸停事件和單擊事件,而不是數百個事件。

從Twitter獲取實時更新

咱們爲地震製做實時儀表板的計劃的第二部分是從Twitter添加與地球上發生的不一樣地震有關的報告和信息。 爲此,咱們將建立一個小型Node.js程序,該程序將獲取與地震相關的文章流。

設置咱們的Node.js環境

讓咱們開始配置咱們的Node.js應用程序吧。除了RxJS,咱們將使用兩個第三方模塊:ws和twit。這種相似的模塊都是讓咱們保持最少的代碼。

首先,讓咱們爲咱們的應用程序建立一個文件夾,並安裝咱們將使用的模塊。 (請注意,npm命令的輸出可能會因軟件包的當前版本而異。)

image

客戶端 - 服務器通訊

如今咱們準備開始構建咱們的應用程序了。讓咱們在tweet_stream文件夾中建立一個名爲index.js的新文件來加載咱們將使用的模塊:

examples_earthquake_ui/tweet_stream/index.js

var WebSocketServer = require('ws').Server; var Twit = require('twit');
var Rx = require('rx');

要使用Twitter API,您須要在Twitter網站中請求使用者密鑰和訪問令牌。 完成後,使用配置對象建立一個新的Twit對象,以下所示:

examples_earthquake_ui/tweet_stream/index.js

var T = new Twit({
    consumer_key: 'rFhfB5hFlth0BHC7iqQkEtTyw',
    consumer_secret: 'zcrXEM1jiOdKyiFFlGYFAOo43Hsz383i0cdHYYWqBXTBoVAr1x', 
    access_token: '14343133-nlxZbtLuTEwgAlaLsmfrr3D4QAoiV2fa6xXUVEwW9', 
    access_token_secret: '57Dr99wECljyyQ9tViJWz0H3obNG3V4cr5Lix9sQBXju1'
});

如今咱們能夠建立一個函數onConnect,它將完成搜索推文和未來與客戶端通訊的全部工做,而且咱們能夠啓動一個WebSocket服務器,一旦WebSocket鏈接並準備好就會調用onConnect:

examples_earthquake_ui/tweet_stream/index.js

function onConnect(ws) {
    console.log('Client connected on localhost:8080');
}
var Server = new WebSocketServer({ port: 8080 });
Rx.Observable.fromEvent(Server, 'connection').subscribe(onConnect);

咱們如今能夠啓動咱們的應用程序,它應該在端口8080上啓動WebSocket鏈接:

~/tweet_stream$ node index.js

因爲咱們還沒有將任何瀏覽器鏈接到此服務器,所以還沒有打印有關客戶端鏈接的消息。如今讓咱們切換到dashboard的代碼並執行此操做。咱們將在RxJS-DOM中使用fromWebSocket運算符:

examples_earthquake_ui/code4.js

function initialize() {
    var socket = Rx.DOM.fromWebSocket('ws://127.0.0.1:8080'); ...

在前面的代碼中,fromWebSocket建立一個Subject,做爲WebSocket服務器的消息的發送者和接收者。 經過調用socket.onNext,咱們將可以向服務器發送消息,經過訂閱套接字,咱們將收到服務器發送給咱們的任何消息。

咱們如今能夠發送包含咱們收到的地震數據的服務器消息:

examples_earthquake_ui/code4.js

quakes.bufferWithCount(100)
.subscribe(function(quakes) {
    console.log(quakes);
    var quakesData = quakes.map(function(quake) {
        return {
            id: quake.properties.net + quake.properties.code,
            lat: quake.geometry.coordinates[1],
            lng: quake.geometry.coordinates[0],
            mag: quake.properties.mag
        };
    });
    socket.onNext(JSON.stringify({quakes: quakesData }));
});

咱們能夠爲來自服務器的消息設置訂閱者:

examples_earthquake_ui/code4.js

socket.subscribe(function(message) {
    console.log(JSON.parse(message.data));
});

如今,當咱們從新加載瀏覽器時,客戶端消息應出如今服務器終端中:

~/tweet_stream$ node index.js
Client connected on localhost:8080

太棒了! 一旦開始從遠程JSONP資源接收地震,瀏覽器就應該向服務器發送命令。 可是如今,服務器徹底忽略了這些消息。 是時候回到咱們的推文流代碼並用它們作點什麼了。

首先,咱們將鏈接到從瀏覽器客戶端到達服務器的消息事件。 每當客戶端發送消息時,WebSocket服務器都會發出包含消息內容的消息事件。 在咱們的例子中,內容是一個JSON字符串。

咱們能夠在onConnect函數中編寫如下代碼:

examples_earthquake_ui/tweet_stream/index.js

var onMessage = Rx.Observable.fromEvent(ws, 'message')
.subscribe(function(quake) {
    quake = JSON.parse(quake);
    console.log(quake);
});

若是咱們從新啓動服務器(終端中的Ctrl-C)並從新加載瀏覽器,咱們應該會看到終端上的地震細節打印出來。這是完美的。 如今咱們已經準備好開始尋找與咱們的地震有關的推文了。

檢索和發送推文

咱們正在使用Node.js twit的流式Twitter客戶端鏈接到Twitter和搜索推文。 從如今開始,服務器中的全部代碼都將在onConnect函數內部發生,由於它假定已經創建了與WebSocket的鏈接。 讓咱們初始化推文流:

examples_earthquake_ui/tweet_stream/index.js

var stream = T.stream('statuses/filter', {
    track: 'earthquake',
    locations: []
});

這告訴咱們的Twit實例T開始流式傳輸Twitter狀態,按關鍵字地震過濾。 固然,這是很是通用的,而不是與如今發生的地震直接相關。 但請注意空位置數組。 這是一個緯度和經度邊界的數組,咱們能夠用它們按地理位置過濾推文,以及地震一詞。 那更加具體! 好的,讓咱們訂閱這個流並開始向瀏覽器發送推文:

examples_earthquake_ui/tweet_stream/index.js

Rx.Observable.fromEvent(stream, 'tweet').subscribe(function(tweetObject) {
    ws.send(JSON.stringify(tweetObject), function(err) {
        if (err) {
            console.log('There was an error sending the message');
        }
    });
});

若是咱們從新啓動服務器並從新加載瀏覽器,咱們應該在瀏覽器中收到推文,開發面板中的控制檯應該打印推文。

這些推文還沒有按地震位置進行過濾。 爲此,咱們須要對收到的每一條地震信息作如下事情:

  • 取每一個地震的經度和緯度對的震中座標,建立一個邊界框,界定咱們認爲與地震相關的推文的地理區域。
  • 累積全部邊界座標,以便發送給客戶端的推文與地圖上的地震保持相關。
  • 每次收到新地震的消息時,都會使用新座標更新twit流。

這是一種方法:

examples_earthquake_ui/tweet_stream/index.js

Rx.Observable
.fromEvent(ws, 'message')
.flatMap(function(quakesObj){
    quakesObj = JSON.parse(quakesObj);
    return Rx.Observable.from(quakesObj.quakes);
    })
❶ .scan([], function(boundsArray, quake) {
❷ var bounds = [
    quake.lng - 0.3, quake.lat - 0.15,
    quake.lng + 0.3, quake.lat + 0.15
].map(function(coordinate) {
    coordinate = coordinate.toString();
    return coordinate.match(/\-?\d+(\.\-?\d{2})?/)[0];
});
boundsArray.concat(bounds);
❸   return boundsArray.slice(Math.max(boundsArray.length - 50, 0));
})
❹ .subscribe(function(boundsArray) {
    stream.stop();
    stream.params.locations = boundsArray.toString();
    stream.start();
});

如下是前面代碼中發生的事情的一步一步:

  1. 咱們再次見到咱們的老朋友scan。 任什麼時候候咱們須要累積結果併產生每一箇中間結果,scan是咱們的朋友。 在這種狀況下,咱們將繼續在boundsArray數組中累積地震座標。
  2. 從地震震中的單緯度/經度座標對,咱們建立一個陣列,其中包含由西北座標和東南座標肯定的區域。 用於近似邊界的數字建立了一個大城市大小的矩形。以後,咱們使用正則表達式將每一個座標的小數精度限制爲兩位小數,以符合Twitter API要求。
  3. 咱們將生成的邊界鏈接到boundsArray,它包含之前每一個地震的邊界。 而後咱們採用最後25對邊界(數組中的50個項目),由於這是Twitter API的限制。
  4. 最後,咱們訂閱了Observable,在onNext函數中,咱們從新啓動當前的twit流來從新加載更新的位置,以便經過咱們新的累積位置數組進行過濾,轉換爲字符串。

從新啓動服務器並從新加載瀏覽器後,咱們應該在瀏覽器應用程序中收到相關的推文。 可是如今,咱們只能看到開發人員控制檯中顯示的原始對象。 在下一節中,咱們將生成HTML以在儀表板中顯示推文。

在Dashboard上顯示推文

既然咱們正在接收來自服務器的推文,那麼剩下要作的就是在屏幕上很好地展現它們。 爲此,咱們將建立一個新的HTML元素,咱們附加傳入的推文:

examples_earthquake_ui/index_final.html

<div id="tweet_container"></div>

咱們還將更新socket Observable訂閱以處理傳入的tweet對象並將它們附加到咱們剛剛建立的tweet_container元素:

examples_earthquake_ui/code5.js

socket
.map(function(message) { return JSON.parse(message.data); })
.subscribe(function(data) {
    var container = document.getElementById('tweet_container');
    container.insertBefore(makeTweetElement(data), container.firstChild);
});

任何新的推文都會出如今列表的頂部,它們將由makeTweetElement建立,這是一個建立推文元素的簡單函數,並使用咱們做爲參數傳遞的數據填充它:

examples_earthquake_ui/code5.js

function makeTweetElement(tweetObj) {
    var tweetEl = document.createElement('div');
    tweetEl.className = 'tweet';
    var content = '<img src="$tweetImg" class="avatar" />' +
    '<div class="content">$text</div>' +
    '<div class="time">$time</div>';
    
    var time = new Date(tweetObj.created_at);
    var timeText = time.toLocaleDateString() + ' ' + time.toLocaleTimeString();
    content = content.replace('$tweetImg', tweetObj.user.profile_image_url);
    content = content.replace('$text', tweetObj.text);
    content = content.replace('$time', timeText);
    tweetEl.innerHTML = content;
    return tweetEl;
}

有了這個,咱們終於有了一個帶有相關的地理定位推文的側邊欄,可讓咱們更深刻地瞭解受地震影響的區域。

改進的想法

此儀表板已經正常運行,但能夠進行許多改進。 一些想法,使它更好:

  • 添加更多地震數據庫。 USGS是一個很棒的資源,但它主要提供在美國發生的地震。 合併來自世界各地的地震報告,而不只僅是美國,並在地圖中將它們所有展現在一塊兒將會頗有趣。 爲此,您可使用mergemergeAll的幫助,並使用distinct與選擇器函數來避免重複。
  • 每當用戶點擊推文時,將地圖置於相關地震中心。 這將涉及經過地震在服務器上對推文進行分組,而且您可能但願使用groupBy運算符將推文分組到特定地理區域。

總結

在本章中,咱們使用RxJS建立了一個響應式用戶界面,使咱們可以實時查看地球上發生的地震的各類數據。咱們在瀏覽器客戶端和Node.js服務器中都使用了RxJS,顯示了使用Observable管理應用程序的不一樣區域是多麼容易。

更重要的是,咱們已經看到咱們能夠在客戶端和服務器上以相同的方式使用RxJS,在咱們的應用程序中隨處可見Observable序列抽象。 不只如此。咱們實際上能夠在其餘編程語言中使用RxJS概念和運算符,由於許多編程語言都支持RxJS。

接下來咱們將介紹Scheduler,它是RxJS中更高級的對象類型,它容許咱們更精確地控制時間和併發性,併爲測試代碼提供了很大的幫助。

關注個人微信公衆號,更多優質文章定時推送
clipboard.png

相關文章
相關標籤/搜索