Rxjs 響應式編程-第三章: 構建併發程序

Rxjs 響應式編程-第一章:響應式
Rxjs 響應式編程-第二章:序列的深刻研究
Rxjs 響應式編程-第三章: 構建併發程序
Rxjs 響應式編程-第四章 構建完整的Web應用程序
Rxjs 響應式編程-第五章 使用Schedulers管理時間
Rxjs 響應式編程-第六章 使用Cycle.js的響應式Web應用程序html

構建併發程序

併發是正確有效地同時作幾件事的藝術。爲了實現這一目標,咱們構建咱們的程序來利用時間,以最有效的方式一塊兒運行任務。 應用程序中的平常併發示例包括在其餘活動發生時保持用戶界面響應,有效地處理數百個客戶的訂單。react

在本章中,咱們將經過爲瀏覽器製做一個用於射擊的太空飛船遊戲來探索RxJS中的併發性和純函數。咱們將首先介紹Observable管道,這是一種連接Observable運算符並在它們之間傳遞狀態的技術。而後,我將向您展現如何使用管道來構建程序,而不依賴於外部狀態或反作用,將全部邏輯和狀態封裝在Observables自己中。編程

視頻遊戲是須要保持不少狀態的計算機程序,可是咱們將使用Observable管道和一些優秀的RxJS運算符的功能編寫咱們的遊戲,沒有任何外部狀態。canvas

簡潔和可觀察的管道

Observable管道是一組連接在一塊兒的運算符,其中每一個運算符都將Observable做爲輸入並返回Observable做爲輸出。 咱們一直在使用本書中的管道; 在使用RxJS進行編程時,它們無處不在。 下面是一個簡單的事例:segmentfault

spaceship_reactive/pipeline.js數組

Rx.Observable
    .from(1, 2, 3, 4, 5, 6, 7, 8)
    .filter(function(val) { return val % 2; })
    .map(function(val) { return val * 10; });

管道是獨立的。 全部狀態從一個運算符流向下一個運算符,而不須要任何外部變量。可是當咱們構建咱們的響應式程序時,咱們可能會想要將狀態存儲在Observable管道以外(咱們在Side Effects和External State中討論了外部狀態)。這迫使咱們跟蹤咱們在管道外設置的變量,全部這些bean計數都很容易致使錯誤。爲避免這種狀況,管道中的運算符應始終使用純函數。promise

在相同輸入的狀況下,純函數始終返回相同的輸出。當咱們能夠保證程序中的函數不能修改其餘函數依賴的狀態時,設計具備高併發性的程序更容易。這就是純粹的功能給咱們帶來的東西。瀏覽器

避免外部狀態

在下面的例子中,咱們計算到目前爲止每隔一秒產生的偶數。咱們經過從interval建立一個Observable並在咱們收到的值是偶數時增長evenTicks:緩存

spaceship_reactive/state.js服務器

var evenTicks = 0;

function updateDistance(i) {
    if (i % 2 === 0) {
        evenTicks += 1;
    }
    return evenTicks;
}

var ticksObservable = Rx.Observable
    .interval(1000)
    .map(updateDistance)
    
ticksObservable.subscribe(function() {
    console.log('Subscriber 1 - evenTicks: ' + evenTicks + ' so far');
});

這是程序運行四秒後獲得的輸出:

Subscriber 1 - evenTicks: 1 so far
Subscriber 1 - evenTicks: 1 so far
Subscriber 1 - evenTicks: 2 so far
Subscriber 1 - evenTicks: 2 so far

如今,測試一下,讓咱們爲ticksObservable添加另外一個訂閱者:

spaceship_reactive/state.js

var evenTicks = 0;
function updateDistance(i) {
    if (i % 2 === 0) {
        evenTicks += 1;
    }
    return evenTicks;
}

var ticksObservable = Rx.Observable
    .interval(1000)
    .map(updateDistance)
    
ticksObservable.subscribe(function() {
    console.log('Subscriber 1 - evenTicks: ' + evenTicks + ' so far');
});

ticksObservable.subscribe(function() {
    console.log('Subscriber 2 - evenTicks: ' + evenTicks + ' so far');
});

輸出如今以下:

Subscriber 1 - evenTicks: 1 so far
Subscriber 2 - evenTicks: 2 so far
Subscriber 1 - evenTicks: 2 so far
Subscriber 2 - evenTicks: 2 so far
Subscriber 1 - evenTicks: 3 so far
Subscriber 2 - evenTicks: 4 so far
Subscriber 1 - evenTicks: 4 so far
Subscriber 2 - evenTicks: 4 so far

等等--第二個訂閱者的偶數計數不該該起做用的!他應該跟第一個訂閱者的計數徹底一致。正如您可能已經猜到的那樣,Observable管道將爲每一個訂戶運行一次,增evenTicks兩次。

共享外部狀態引發的問題一般比這個例子更微妙。在複雜的應用程序中,打開通向管道外部狀態的大門會致使代碼變得複雜,而且很快就會出現錯誤。解決方案是儘量多地封裝管道內的信息。 這是咱們能夠重構前面的代碼以免外部狀態的方法:

spaceship_reactive/state.js

function updateDistance(acc, i) {
    if (i % 2 === 0) {
        acc += 1;
    }
    return acc;
}

var ticksObservable = Rx.Observable
    .interval(1000)
    .scan(updateDistance, 0);
    
ticksObservable.subscribe(function(evenTicks) {
    console.log('Subscriber 1 - evenTicks: ' + evenTicks + ' so far');
});

ticksObservable.subscribe(function(evenTicks) {
    console.log('Subscriber 2 - evenTicks: ' + evenTicks + ' so far');
});

預期輸出:

Subscriber 1 - evenTicks: 1 so far
Subscriber 2 - evenTicks: 1 so far
Subscriber 1 - evenTicks: 1 so far
Subscriber 2 - evenTicks: 1 so far
Subscriber 1 - evenTicks: 2 so far
Subscriber 2 - evenTicks: 2 so far
Subscriber 1 - evenTicks: 2 so far
Subscriber 2 - evenTicks: 2 so far

使用scan,咱們徹底避免外部狀態。咱們將累計的偶數傳遞給updateDistance而不是依賴外部變量來保持累積值。 這樣咱們就不會增長每一個新訂戶的計數。

大多數時候咱們能夠避免依賴外部狀態。使用它的常見方案是緩存值或跟蹤程序中更改的值。 可是,正如您將在前面的Spaceship Reactive!中看到的那樣,能夠經過其餘幾種方式處理這些場景。例如,當咱們須要緩存值時,RxJS的Subject Class(後面會講到)能夠提供不少幫助,當咱們須要跟蹤遊戲的先前狀態時,咱們可使用像Rx.Observable.scan這樣的方法。

管道是高效的

我第一次將一堆操做符連接到管道中來轉換序列,個人直覺是它不可能有效。我知道經過連接運算符在JavaScript中轉換數組是很昂貴的。然而在本書中,咱們經過將序列轉換爲新序列來設計程序。 這會不會很低效呢?

連接在Observables和數組中看起來相似; 也都有filtermap等方法。可是有一個相當重要的區別:數組方法因爲每一個操做而建立一個新數組,而且徹底由下一個操做符轉換。另外一方面,可觀察的管道不會建立中間的Observable,而且能夠一次性將全部操做應用於每一個元素。所以,Observable僅被遍歷一次,這使得Observable連接變得高效。 看看如下示例:

spaceship_reactive/array_chain.js

stringArray // represents an array of 1,000 strings
    .map(function(str) {
        return str.toUpperCase();
    })
    .filter(function(str) {
        return /^[A-Z]+$/.test(str);
    })
    .forEach(function(str) {
        console.log(str);
    });

假設stringArray是一個包含1,000個字符串的數組,咱們要將其轉換爲大寫,而後過濾掉包含字母字符之外的任何字符串(或根本沒有字母)。而後咱們要將結果數組的每一個字符串打印到控制檯。

這是背後發生的事情:

  1. 遍歷數組並建立一個包含全部項大寫的新數組。
  2. 遍歷大寫數組,建立另外一個包含1,000個元素的數組。
  3. 遍歷篩選的數組並將每一個結果記錄到控制檯。

在轉換數組的過程當中,咱們迭代了三次數組並建立了兩個全新的大數組。 這很是低效! 若是您關注性能或者處理大量項目,則不該該以這種方式編程。

spaceship_reactive/array_chain.js

stringObservable // represents an observable emitting 1,000 strings
    .map(function(str) {
        return str.toUpperCase();
    })
    .filter(function(str) {
        return /^[A-Z]+$/.test(str);
    })
    .subscribe(function(str) {
        console.log(str);
    });

Observable的管道看起來與數組鏈很是類似,可是又不一樣。 在一個Observable中,在咱們訂閱它以前,沒有任何事情發生過,不管咱們應用了多少查詢和轉換。 當咱們調用像map這樣的變換時,咱們其實只運行了一個函數,它將對數組的每一個項目進行一次操做。 所以,在前面的代碼中,這將是會發生的事情:

  1. 建立一個大寫函數,該函數將應用於Observable的每一個項目,並在Observer訂閱它時返回將發出這些新項目的Observable。
  2. 使用先前的大寫函數組合過濾器函數,並返回一個Observable,它將發出新項目,大寫和過濾,但僅在Observable訂閱時候,纔會運行。
  3. 經過訂閱Observable來發布,經過咱們全部操做器的數據將會被髮布出來。

使用Observables,咱們只會查看一次列表,只有在絕對須要時纔會應用轉換。 例如,假設咱們在上一個示例中添加了一個take運算符:

spaceship_reactive/array_chain.js

stringObservable
    .map(function(str) {
        return str.toUpperCase();
    })
    .filter(function(str) {
        return /^[A-Z]+$/.test(str);
    })
    .take(5)
    .subscribe(function(str) {
        console.log(str);
    });

take使得Observable只發出咱們指定的前n個項目。在咱們的例子中,n是五,因此在數千個數據中,咱們只會收到前五個。 很酷的部分是咱們的代碼永遠不會遍歷全部項目; 只會遍歷前5個。

這使開發人員的生活更加輕鬆。 你能夠放心,在操做序列時,RxJS只會作必要的工做。 這種操做方式稱爲惰性評估,在Haskell和Miranda等函數式語言中很是常見。

RxJS的主體類

Subject是一種實現Observer和Observable類型的類型。 做爲Observer,它能夠訂閱Observable,而且做爲Observable,它能夠生成值並讓Observers訂閱它。

在某些狀況下,單個Subject能夠執行Observers和Observables組合的工做。例如,爲了在數據源和Subject的偵聽器之間建立代理對象,咱們可使用:

spaceship_reactive/subjects.js

var subject = new Rx.Subject();
var source = Rx.Observable.interval(300)
    .map(function(v) { return 'Interval message #' + v; })
    .take(5);
    
source.subscribe(subject);

var subscription = subject.subscribe(
    function onNext(x) { console.log('onNext: ' + x); },
    function onError(e) { console.log('onError: ' + e.message); },
    function onCompleted() { console.log('onCompleted'); }
);

subject.onNext('Our message #1');
subject.onNext('Our message #2');

setTimeout(function() {
    subject.onCompleted();
}, 1000);

輸出:

onNext: Our message #1
onNext: Our message #2
onNext: Interval message #0
onNext: Interval message #1
onNext: Interval message #2
onCompleted

在前面的示例中,咱們建立了一個新的Subject和一個每秒發送一個整數的源Observable。 而後咱們給Subject訂閱Observable。以後,咱們訂閱了一個Observer到Subject自己。 Subject自己如今表現爲Observable。

接下來,咱們使Subject發出本身的值(message1和message2)。在最終結果中,咱們獲取Subject本身的消息,而後從源Observable獲取代理值。來自Observable的值後來由於它們是異步的,而咱們當即使Subject的本身的值。請注意,即便咱們告訴源Observable採用前五個值,輸出也只顯示前三個。那是由於在一秒以後咱們在主題上調用onCompleted。 這將完成對全部訂閱的通知,並在這種狀況下覆蓋take操做符。

Subject類爲建立更專業的Subject提供了基礎。事實上,RxJS帶有一些有趣的:AsyncSubjectReplaySubjectBehaviorSubject

AsyncSubject

僅當序列完成時,AsyncSubject纔會僅發出序列的最後一個值。而後永遠緩存此值,而且在發出值以後訂閱的任何Observer將當即接收它。AsyncSubject便於返回單個值的異步操做,例如Ajax請求。

讓咱們看一個訂閱range的AsyncSubject的簡單示例:

spaceship_reactive/subjects.js

var delayedRange = Rx.Observable.range(0, 5).delay(1000);
var subject = new Rx.AsyncSubject();
delayedRange.subscribe(subject);
subject.subscribe(
    function onNext(item) { console.log('Value:', item); },
    function onError(err) { console.log('Error:', err); },
    function onCompleted() { console.log('Completed.'); }
);

在該示例中,delayedRange在延遲一秒以後發出值0到4。而後咱們建立一個新的AsyncSubject主題並將其訂閱到delayedRange。 輸出以下:

Value: 4
Completed.

按照預期,咱們只獲得Observer發出的最後一個值。如今讓咱們使用AsyncSubject來實現更真實的場景。 咱們將獲取一些遠程內容:

spaceship_reactive/subjects.js

function getProducts(url) {
    var subject;
    return Rx.Observable.create(function(observer) {
        if (!subject) {
            subject = new Rx.AsyncSubject();
            Rx.DOM.get(url).subscribe(subject);
        }
        return subject.subscribe(observer);
    });
}

var products = getProducts('/products');

// Will trigger request and receive the response when read
products.subscribe(
    function onNext(result) { console.log('Result 1:', result.response); },
    function onError(error) { console.log('ERROR', error); }
);

// Will receive the result immediately because it's cached
setTimeout(function() {
    products.subscribe(
        function onNext(result) { console.log('Result 2:', result.response); },
        function onError(error) { console.log('ERROR', error); }
    );
}, 5000)

在此代碼中,當使用URL調用getProducts時,它將返回一個Observer,該Observer將發出HTTP GET請求的結果。 如下是它如何分解:

  1. getProducts返回一個Observable序列。 咱們在這裏建立它。
  2. 若是咱們尚未建立AsyncSubject,咱們建立它並將訂閱Rx.DOM.Request.get(url)返回的Observable。
  3. 咱們將Observer訂閱到AsyncSubject。每次Observer訂閱Observable時,它實際上都會訂閱AsyncSubject,它做爲Observable檢索URL和Observers之間的代理。
  4. 咱們建立Observable來檢索URL「products」並將其存儲在products變量中。
  5. 這是第一個訂閱,將啓動URL檢索並在檢索URL時記錄結果。
  6. 這是第二個訂閱,在第一個訂閱後運行五秒鐘。因爲此時已經檢索到URL,所以不須要其餘網絡請求。 它將當即收到請求的結果,由於它已存儲在AsyncSubject中了。

有趣的是,咱們正在使用一個訂閱Rx.DOM.Request.get這個Observable的AsyncSubject。 因爲AsyncSubject緩存最後的結果,所以對產品的任何後續訂閱都將當即收到結果,而不會致使其餘網絡請求。每當咱們指望單個結果並但願保留它時,咱們就可使用AsyncSubject。

這是否意味着AsyncSubject像Promise同樣?

確實。

AsyncSubject表示異步操做的結果,您能夠將其用做promise的替代。內部的區別在於promise只會處理單個值,而AsyncSubject處理序列中的全部值,只會發送(和緩存)最後一個值。

可以如此輕鬆地模擬Promise顯示了RxJS模型的靈活性。(即便沒有AsyncSubject,使用Observables模擬一個承諾也很容易。)

BehaviorSubject

當Observer訂閱BehaviorSubject時,它接收最後發出的值,而後接收全部後續值。BehaviorSubject要求咱們提供一個起始值,以便全部Observers在訂閱BehaviorSubject時始終會收到一個值。

想象一下,咱們想要檢索一個遠程文件並在HTML頁面上輸出它的內容,但咱們在等待內容時須要佔位符文本。 咱們可使用BehaviorSubject

spaceship_reactive/behavior_subject.js

var subject = new Rx.BehaviorSubject('Waiting for content');
subject.subscribe(
    function(result) {
        document.body.textContent = result.response || result;
    },
    function(err) {
        document.body.textContent = 'There was an error retrieving content';
    }
);
Rx.DOM.get('/remote/content').subscribe(subject);

如今,HTML正文包含咱們的佔位符文本,它將保持這種狀態,直到Subject發出新值。最後,咱們請求咱們想要的資源,並將咱們的Subject訂閱到生成的Observer。

BehaviorSubject保證始終至少發出一個值,由於咱們在其構造函數中提供了一個默認值。一旦BehaviorSubject完成,它將再也不發出任何值,釋放緩存值使用的內存。

ReplaySubject

ReplaySubject緩存其值並將其從新發送到任何較晚的Observer。 與AsyncSubject不一樣,不須要爲此完成序列。

Subject

var subject = new Rx.Subject();

subject.onNext(1);
subject.subscribe(function(n) {         
    console.log('Received value:', n);
});
subject.onNext(2);
subject.onNext(3);

輸出以下:

Received value: 2 
Received value: 3

ReplaySubject

var subject = new Rx.ReplaySubject();

subject.onNext(1);

subject.subscribe(function(n) {     
    console.log('Received value:', n);
});

subject.onNext(2);
subject.onNext(3);

輸出以下:

Received value: 1
Received value: 2
Received value: 3

ReplaySubject有助於確保Observers從一開始就獲取Observable發出的全部值。它使咱們免於編寫凌亂的代碼來緩存之前的值,從而幫助咱們減小了不少錯誤。

固然,要實現該行爲,ReplaySubject會將全部值緩存在內存中。 爲了防止它佔用太多內存,咱們能夠經過緩衝區大小限制它存儲的數據量,或者經過將特定參數傳遞給構造函數來限制它。

ReplaySubject構造函數的第一個參數接受一個數字,表示咱們要緩衝的值的數量:

var subject = new Rx.ReplaySubject(2); // Buffer size of 2

subject.onNext(1);
subject.onNext(2);
subject.onNext(3);

subject.subscribe(function(n) { 
    console.log('Received value:', n);
});

輸出以下:

Received value: 2 
Received value: 3

第二個參數採用一個數字來表示咱們想要緩衝值的時間(以毫秒爲單位):

var subject = new Rx.ReplaySubject(null, 200); // Buffer size of 200ms

setTimeout(function() { 
    subject.onNext(1); 
}, 100);

setTimeout(function() { 
    subject.onNext(2); 
}, 200); 

setTimeout(function() { 
    subject.onNext(3); 
}, 300); 

setTimeout(function() {
    subject.subscribe(function(n) { 
        console.log('Received value:', n);
    });
     subject.onNext(4);
}, 350);

在這個例子中,咱們根據時間而不是值的數量設置緩衝區。 咱們的ReplaySubject將緩存最多200毫秒前發出的值。 咱們發出三個值,每一個值相隔100毫秒,350毫秒後咱們訂閱一個Observer,而後咱們發出另外一個值。 在訂閱時,緩存的項目是2和3,由於1發生在好久之前(大約250毫秒前),因此它再也不被緩存。

Subject是一個強大的工具,能夠爲您節省大量時間。 它們爲緩存和重複等常見場景提供了很好的解決方案。 由於他們的核心只是觀察者和觀察者,因此你不須要學習任何新東西。

響應式的飛船

爲了展現咱們如何保持一個應用程序的純粹,咱們將構建一個視頻遊戲,其中咱們的英雄將和無盡的敵人宇宙飛船戰鬥。 咱們將大量使用Observable管道,而且我會指出在可能很容易將狀態存儲在管道外的狀況以及如何避免它。

衆所周知,視頻遊戲會保留不少外部狀態分數,字符,定時器等的屏幕座標。咱們的計劃是在不依賴於保持狀態的單個外部變量的狀況下構建整個遊戲。

在咱們的遊戲中,玩家將使用鼠標水平移動飛船,並經過單擊鼠標或點擊空格鍵進行射擊。咱們的遊戲將有四個主要角色:背景中的移動星球場,玩家的宇宙飛船,敵人,以及來自玩家和敵人的子彈。

它看起來像這樣:

image

在屏幕截圖中,紅色三角形是咱們的宇宙飛船,綠色三角形是敵人。 較小的三角形是子彈。

讓咱們從設置階段開始; 這將是咱們的HTML文件:

spaceship_reactive/spaceship.html

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
        <title>Spaceship Reactive!</title>
        <script src="../rx.all-4.0.0.js"></script> <style>
            html, body {
                margin: 0;
                padding: 0;
            }
        </style>
    </head>
    <body>
        <script src="spaceship.js"></script>
    </body>
</html>

它只是一個簡單的HTML文件,能夠加載咱們將在本章其他部分使用的JavaScript文件。在那個JavaScript文件中,咱們首先設置一個canvas元素,咱們將在其中渲染咱們的遊戲:

spaceship_reactive/starfield_1.js

var canvas = document.createElement('canvas');
var ctx = canvas.getContext("2d");
document.body.appendChild(canvas); 
canvas.width = window.innerWidth; 
canvas.height = window.innerHeight;

有了這個,咱們就能夠開始編寫咱們遊戲的組件了。 首先讓咱們畫出咱們的星空背景。

建立星空背景

咱們在太空中設置遊戲所需的第一件事就是星空。咱們將建立一個向下滾動的星空,以提供穿越太空的感受。 爲此,咱們首先使用range運算符生成星標:

spaceship_reactive/starfield_1.js

var SPEED = 40;
var STAR_NUMBER = 250;
var StarStream = Rx.Observable.range(1, STAR_NUMBER)
.map(function() { 
    return {
          x: parseInt(Math.random() * canvas.width),
          y: parseInt(Math.random() * canvas.height),
          size: Math.random() * 3 + 1
    }; 
})

每一個星形將由一個包含隨機座標和1到4之間大小的對象表示。這段代碼將爲咱們提供一個生成250個這些「星星」的流。

咱們但願這些星星保持前進。一種方法是每隔幾毫秒增長全部星星的y座標。咱們將使用toArray將StarStream Observable轉換爲數組,而後將發出一個包含range生成的全部對象的數組。而後咱們可使用flatMap運算符來獲取該數組,該運算符將Observable轉換爲每隔幾毫秒產生一個值的數據。使用map咱們能夠增長原始數組的每一個項目中的y座標:

spaceship_reactive/starfield_1.js

var SPEED = 40;
var STAR_NUMBER = 250;
var StarStream = Rx.Observable.range(1, STAR_NUMBER)
.map(function() { 
    return {
          x: parseInt(Math.random() * canvas.width),
          y: parseInt(Math.random() * canvas.height),
          size: Math.random() * 3 + 1
    }; 
})
.toArray() 
.flatMap(function(starArray) {
    return Rx.Observable.interval(SPEED)
    .map(function() { 
        starArray.forEach(function(star) {
            if (star.y >= canvas.height) {
                star.y = 0; // Reset star to top of the screen
            }
            star.y += 3; // Move star 
        });
        return starArray; 
    });
})

在地圖中,咱們檢查星醒y座標是否已經在屏幕以外,若是是的話,咱們將其重置爲0.經過改變每一個星星中的座標,咱們能夠始終使用相同的星星陣列。

如今咱們須要一個小的輔助函數,在畫布上「繪製」一系列星星:

spaceship_reactive/starfield_1.js

function paintStars(stars) {
ctx.fillStyle = '#000000';
ctx.fillRect(0, 0, canvas.width, canvas.height);
ctx.fillStyle = '#ffffff';
stars.forEach(function(star) {
    ctx.fillRect(star.x, star.y, star.size, star.size);
  });
}

paintStars繪製黑色背景並在畫布上繪製星星。 實現移動星星的惟一方法是訂閱Observable並使用生成的數組調用paintStars。 這是最終的代碼:

spaceship_reactive/starfield_1.js

function paintStars(stars) {
    ctx.fillStyle = '#000000';
    ctx.fillRect(0, 0, canvas.width, canvas.height);
    ctx.fillStyle = '#ffffff';
    stars.forEach(function(star) {
        ctx.fillRect(star.x, star.y, star.size, star.size);
    });
}

var SPEED = 40;
var STAR_NUMBER = 250;
var StarStream = Rx.Observable.range(1, STAR_NUMBER)
.map(function() { 
    return {
          x: parseInt(Math.random() * canvas.width),
          y: parseInt(Math.random() * canvas.height),
          size: Math.random() * 3 + 1
    }; 
})
.toArray() 
.flatMap(function(starArray) {
    return Rx.Observable.interval(SPEED).map(function() { 
        starArray.forEach(function(star) {
            if (star.y >= canvas.height) {
                star.y = 0; // Reset star to top of the screen
            }
            star.y += 3; // Move star 
        });
        return starArray; });
}).subscribe(function(starArray) {
    paintStars(starArray);
});

如今咱們已經繪製好了舞臺,如今是咱們的英雄出場的時候了。

添加玩家的太空飛船

如今咱們擁有美麗的星空背景,咱們已準備好對英雄的宇宙飛船編程。雖然咱們的宇宙飛船看似簡單,但它是遊戲中最重要的對象。它是鼠標移動的觀察者,它發出當前的鼠標x座標和恆定的y座標(玩家只能水平移動,因此咱們永遠不會改變y座標):

spaceship_reactive/hero_1.js

var HERO_Y = canvas.height - 30;
var mouseMove = Rx.Observable.fromEvent(canvas, 'mousemove'); 
var SpaceShip = mouseMove
.map(function(event) { 
    return {
        x: event.clientX,
        y: HERO_Y
    };
})
.startWith({
    x: canvas.width / 2,
    y: HERO_Y 
});

請注意,我使用了startWith。這將設置Observable中的第一個值,並將其設置爲屏幕中間的位置。沒有startWith咱們的Observable只有在玩家移動鼠標時纔開始發射。

讓咱們在屏幕上渲染咱們的英雄。在這個遊戲中,全部角色都是三角形(個人圖形設計技巧不是很使人印象深入),因此咱們將定義一個輔助函數來在畫布上渲染三角形,給定座標,大小和顏色,以及它們的朝向:

spaceship_reactive/hero_1.js

function drawTriangle(x, y, width, color, direction) { 
    ctx.fillStyle = color;
    ctx.beginPath();
    ctx.moveTo(x - width, y);
    ctx.lineTo(x, direction === 'up' ? y - width : y + width); 
    ctx.lineTo(x + width, y);
    ctx.lineTo(x - width,y);
    ctx.fill();
}

咱們還將定義paintSpaceShip,它使用輔助函數:

spaceship_reactive/hero_1.js

function paintSpaceShip(x, y) { 
    drawTriangle(x, y, 20, '#ff0000', 'up');
}

但咱們如今面臨一個問題。 若是咱們訂閱了SpaceShip Observable並在訂閱中調用了drawTriangle,咱們的太空船隻有在咱們移動鼠標時才能看到,並且只是瞬間。 這是由於starStream每秒更新畫布不少次,若是咱們不移動鼠標就擦除咱們的太空船。因爲starStream沒法直接訪問太空船,所以咱們沒法在starStream訂閱中渲染太空船。 咱們能夠將最新的太空船座標保存到starStream能夠訪問的變量中,可是咱們將修改外部狀態的規則。 該怎麼辦?

一般狀況下,RxJS有一個很是方便的operator,咱們能夠用它來解決咱們的問題。

Rx.Observable.combineLatest是一個方便的operator。 它須要兩個或更多Observable並在每一個Observable發出新值時發出每一個Observable的最後結果。知道starStream如此頻繁地發出一個新項目(星星數組),咱們能夠刪除starStream的訂閱並使用combineLatest結合starStream和SpaceShip Observables,並在它們發出任何新項目時當即更新它們:

spaceship_reactive/hero_1.js

function renderScene(actors) {
    paintStars(actors.stars);
    paintSpaceShip(actors.spaceship.x, actors.spaceship.y);
}
var Game = Rx.Observable.combineLatest(StarStream, SpaceShip, function(stars, spaceship) {
    return { stars: stars, spaceship: spaceship }; 
});

Game.subscribe(renderScene);

咱們如今使用renderScene函數繪製屏幕上的全部內容,所以您能夠刪除StarStream的如下訂閱代碼:

.subscribe(function(starArray) {
    paintStars(starArray);
});

有了這個,每當Observable發出新項目時,咱們都會畫出星空背景和宇宙飛船。咱們如今有一艘宇宙飛船在太空中飛行,咱們可使用咱們的鼠標隨意移動它。這麼簡短的代碼還不錯吧!可是咱們的英雄宇宙飛船在浩瀚的太空中太孤獨了。 給它一些同伴怎麼樣?

生成敵人

若是咱們沒有任何敵人,這將是一個很是無聊的遊戲。 因此讓咱們創造一個無限的流!咱們想要每兩秒半創造一個新的敵人。讓咱們看一下Enemies Observable的代碼,而後仔細閱讀:

spaceship_reactive/enemy_1.js

var ENEMY_FREQ = 1500;
var Enemies = Rx.Observable.interval(ENEMY_FREQ)
.scan(function(enemyArray) { 
    var enemy = {
        x: parseInt(Math.random() * canvas.width),
        y: -30
    };
    enemyArray.push(enemy);
    return enemyArray; 
}, []);

var Game = Rx.Observable.combineLatest(
StarStream, SpaceShip, Enemies, function(stars, spaceship, enemies) {
    return {
        stars: stars, 
        spaceship: spaceship, 
        enemies: enemies
    }; 
});
Game.subscribe(renderScene);

爲了創造敵人,咱們使用interval運算符每1,500毫秒運行一次,而後咱們使用scan運算符建立一個敵人陣列。

每次Observable發出一個值時,scan聚合結果,併發出每一箇中間結果。 在Enemies Observable中,咱們從一個空數組開始,做爲scan的第一個參數,咱們在每次迭代中將一個新對象推送到它。 該對象包含隨機x座標和可見屏幕外的固定y座標。 有了這個,敵人將每1500毫秒發出一個包含全部當前敵人的陣列。

剩下的惟一的事情事渲染enemies的輔助函數。此函數也將更新enemies數組中每一個項目的座標:

spaceship_reactive/enemy_1.js

// Helper function to get a random integer
function getRandomInt(min, max) {
    return Math.floor(Math.random() * (max - min + 1)) + min;
}

function paintEnemies(enemies) { 
    enemies.forEach(function(enemy) {
        enemy.y += 5;
        enemy.x += getRandomInt(-15, 15);
        drawTriangle(enemy.x, enemy.y, 20, '#00ff00', 'down'); 
    });
}

你能夠在paintEnemies中看到咱們也在隨機改變x座標,這樣敵人就會沒法預測地移動到兩側。如今咱們須要更新函數renderScene以包含對paintEnemies的調用。

你可能已經注意到了咱們到目前爲止玩遊戲時的一個奇怪的效果:若是你移動鼠標,敵人會更快地向你走來!這多是遊戲中的一個很好的功能,但咱們絕對不打算這樣作。你能猜出致使這個bug的緣由嗎?

若是你猜到它與paintEnemies功能有關,你就是對的。只要任何Observable產生一個值,combineLatest就會渲染咱們的場景。若是咱們不移動鼠標,最快的發射器將始終是starStream,由於它的間隔爲40毫秒(Enemies Observable僅每1,500毫秒發出一次)。可是,當咱們移動鼠標時,SpaceShip將比starStream發射得更快(你的鼠標每秒發射屢次座標),而後paintEnemies將執行屢次,更快地增長敵人的座標。

爲了不這種狀況以及將來的相似問題,咱們須要規範遊戲的速度,以便Observable不會比咱們的鼠標速度更快地發出值。

是的,正如您可能已經猜到的那樣,RxJS有一個operator。

Avoid Drinking from the Firehose

咱們是否是接收數據的速度太快了。大多數狀況下,咱們但願得到全部速度,可是根據Observable流值的頻率,咱們可能但願刪除一些咱們收到的值。咱們如今處於其中一種狀況中。咱們在屏幕上渲染事物的速度與咱們擁有的最快Observable的速度成正比。事實證實,咱們最快的Observable對咱們來講太快了,咱們須要在遊戲中創建一個恆定的更新速度。

sample是Observable實例中的一個方法,給定一個以毫秒爲單位的時間參數,返回一個Observable,它發出每一個時間間隔內父Observable發出的最後一個值。

image

請注意sample如何在間隔時刻丟棄最後一個值以前的任何值。 認清您是否須要此行爲很是重要。在咱們的例子中,咱們不關心刪除值,由於咱們只想每40毫秒渲染一個元素的當前狀態。若是全部值對您都很重要,您可能須要考慮緩衝區運算符:

spaceship_reactive/enemy_2.js

Rx.Observable.combineLatest( StarStream, SpaceShip, Enemies, function(stars, spaceship, enemies) {
    return {
        stars: stars, 
        spaceship: spaceship, 
        enemies: enemies
    }; 
    
})
.sample(SPEED)
.subscribe(renderScene);

經過在combineLatest以後調用sample,咱們確保combineLatest永遠不會在前一個以後的40毫秒內產生任何值(咱們的常量SPEED設置爲40)。

射擊

看到成羣的敵人來到咱們身邊有點可怕;咱們能作的就是走開,但願他們不要看到咱們。若是讓讓咱們的英雄有能力射擊邪惡的外星人宇宙飛船會怎麼樣?

咱們但願咱們的太空船在咱們點擊鼠標或按空格鍵時進行射擊,所以咱們將爲每一個事件建立一個Observable並將它們合併到一個名爲playerShots的Observable中。 請注意,咱們經過空格鍵,空格鍵的鍵代碼事32:

spaceship_reactive/hero_shots.js

var playerFiring = Rx.Observable .merge(
Rx.Observable.fromEvent(canvas, 'click'),
Rx.Observable.fromEvent(canvas, 'keydown')
.filter(function(evt) { 
    return evt.keycode === 32; 
}) )

如今咱們已經瞭解了sample,咱們能夠用它來增長遊戲的趣味並限制咱們太空船的射擊頻率。不然,玩家能夠高速射擊並輕易摧毀全部敵人。咱們這樣作是爲了讓玩家最多隻能每200毫秒射擊一次:

spaceship_reactive/hero_shots.js

var playerFiring = Rx.Observable.merge(
    Rx.Observable.fromEvent(canvas, 'click'),
    Rx.Observable.fromEvent(canvas, 'keydown')
    .filter(function(evt) { 
        return evt.keycode === 32; 
    })
)
.sample(200)
.timestamp();

咱們還添加了一個時間戳操做符,它在咱們的Observable發出的每一個值中設置一個屬性時間戳,以及它發出的確切時間。 咱們稍後會用它。

最後,爲了從咱們的宇宙飛船發射射擊,咱們須要知道射擊時刻宇宙飛船的x座標。這樣咱們就能夠將設計子彈渲染到正確的x座標。 從SpaceShip Observable設置一個外部變量看起來比較簡單,它會始終包含最後發出的x座標,但這會破壞咱們不成文的協議,永遠不會改變外部狀態!

相反,咱們將經過再次使用咱們的好朋友combineLatest來實現這一目標:

spaceship_reactive/hero_shots.js

var HeroShots = Rx.Observable
.combineLatest(
playerFiring,
SpaceShip,
function(shotEvents, spaceShip) {
    return { x: spaceShip.x };
})
.scan(function(shotArray, shot) {
    shotArray.push({x: shot.x, y: HERO_Y});
    return shotArray;
}, []);

咱們如今從SpaceShip和playerFiring獲取更新的值,這樣咱們就能夠獲得咱們想要的x座標。 咱們使用掃描的方式與咱們用於Enemy Observable的方式相同,爲每一個子彈建立一個當前座標數組。有了這個,咱們應該準備好在屏幕上繪製咱們的鏡頭。 咱們使用輔助函數繪製子彈數組中的每一個子彈:

spaceship_reactive/hero_shots.js

var SHOOTING_SPEED = 15;
function paintHeroShots(heroShots) {
    heroShots.forEach(function(shot) {
        shot.y -= SHOOTING_SPEED;
        drawTriangle(shot.x, shot.y, 5, '#ffff00', 'up');
    });
}

而後咱們從咱們的主要combineLatest操做中調用paintHeroShots

Rx.Observable.combineLatest(
StarStream, SpaceShip, Enemies, HeroShots,
function(stars, spaceship, enemies, heroShots) {
    return {
        stars: stars,
        spaceship: spaceship,
        enemies: enemies,
        eroShots: heroShots
    };
})
.sample(SPEED)
.subscribe(renderScene);

咱們在renderScene中添加對paintHeroShots的調用:

function renderScene(actors) {
    paintStars(actors.stars);
    paintSpaceShip(actors.spaceship.x, actors.spaceship.y);
    paintEnemies(actors.enemies);
    aintHeroShots(actors.heroShots);
}

如今,當你運行遊戲時,你會注意到每次移動鼠標時,咱們的宇宙飛船都會瘋狂的射擊。 效果雖然不錯,但這不是咱們想要的! 讓咱們再看看HeroShots Observable。 在其中,咱們使用combineLatest,以便咱們擁有來自playerFiring和SpaceShip的值。 這與咱們以前遇到的問題相似。每次鼠標移動時,HeroShots中的combineLatest都會發出值,這就轉化爲被射擊的子彈。在這種狀況下,節流無濟於事,由於咱們但願用戶隨時隨地進行射擊,而且節流將限制射擊次數並使其中的許屢次數降低。

每當Observable發出新值時,combineLatest會發出每一個Observable發出的最後一個值。 咱們能夠利用這個優點。每當鼠標移動時,combineLatest會發出新的SpaceShip位置和playerFiring的最後一個發射值,除非咱們發射新子彈,不然它將保持不變。 而後,只有當發射的子彈與前一子彈不一樣時,咱們才能發出一個值。 distinctUntilChanged操做符爲咱們執行髒工做。

運算符distinctdistinctUntilChanged容許咱們過濾掉Observable已經發出的結果。 distinct過濾掉先前發出的任何結果,而distinctUntilChanged過濾掉相同的結果,除非在它們之間發出不一樣的結果。咱們只須要確保新子彈與前一子彈不一樣,因此distinctUntilChanged對咱們來講已經足夠了。(它還使咱們免於更高內存使用的不一樣;不一樣的須要將全部先前的結果保留在內存中。)

咱們修改了heroShots,所以它只根據時間戳發出新子彈:

spaceship_reactive/hero_shots2.js

var HeroShots = Rx.Observable
.combineLatest(
playerFiring,
SpaceShip,
function(shotEvents, spaceShip) {
    return {
        timestamp: shotEvents.timestamp,
        x: spaceShip.x
    };
})
.distinctUntilChanged(function(shot) { return shot.timestamp; })
.scan(function(shotArray, shot) {
    shotArray.push({ x:shot.x, y: HERO_Y });
    return shotArray;
}, []);

若是一切順利,咱們如今可以從咱們的宇宙飛船射擊敵人!

敵人射擊

咱們應該容許敵人射擊; 不然這是一個很是不公平的無聊遊戲。 並且很無聊! 對於敵人射擊,咱們將執行如下操做:

  • 每一個敵人都會保留更新的子彈陣列。
  • 每一個敵人都會以給定的頻率射擊。

爲此,咱們將使用區間運算符來存儲敵人值的新子彈。咱們還將介紹一個新的輔助函數isVisible,它有助於濾除座標在可見屏幕以外的元素。 這就是Enemy Observable如今的樣子:

spaceship_reactive/enemy_shots.js

function isVisible(obj) {
    return obj.x > -40 && obj.x < canvas.width + 40 &&
obj.y > -40 && obj.y < canvas.height + 40;
}
var ENEMY_FREQ = 1500;
var ENEMY_SHOOTING_FREQ = 750;
var Enemies = Rx.Observable.interval(ENEMY_FREQ)
.scan(function(enemyArray) {
    var enemy = {
        x: parseInt(Math.random() * canvas.width),
        y: -30,
        shots: []
    };
    
    Rx.Observable.interval(ENEMY_SHOOTING_FREQ).subscribe(function() {
        enemy.shots.push({ x: enemy.x, y: enemy.y });
        enemy.shots = enemy.shots.filter(isVisible);
    });
    
    enemyArray.push(enemy);
    return enemyArray.filter(isVisible);
}, []);

在該代碼中,咱們每次建立新敵人時都會建立一個區間。此間隔將繼續向敵方子彈陣列添加子彈,而後它將過濾掉屏幕外的子彈。咱們也可使用isVisible來過濾屏幕外的敵人,就像咱們在return語句中所作的那樣。

咱們須要更新paintEnemies,以便渲染敵人的鏡頭並更新他們的y座標。而後咱們使用咱們方便的drawTriangle函數來繪製鏡頭:

spaceship_reactive/enemy_shots.js

function paintEnemies(enemies) {
    enemies.forEach(function(enemy) {
        enemy.y += 5;
        enemy.x += getRandomInt(-15, 15);
        drawTriangle(enemy.x, enemy.y, 20, '#00ff00', 'down');
        enemy.shots.forEach(function(shot) {
            shot.y += SHOOTING_SPEED;
            drawTriangle(shot.x, shot.y, 5, '#00ffff', 'down');
        });
    });
}

有了這個,如今每一個人都在射擊其餘人,但沒有人被摧毀!他們只是滑過敵人和咱們的宇宙飛船,由於咱們尚未定義當射擊與太空飛船碰撞時會發生什麼。

碰撞檢測

當射擊擊中敵人時,咱們但願子彈和敵人都能消失?讓咱們定義一個輔助函數來檢測兩個目標是否發生了碰撞:

spaceship_reactive/enemy_shots2.js

function collision(target1, target2) {
    return (target1.x > target2.x - 20 && target1.x < target2.x + 20) &&
(target1.y > target2.y - 20 && target1.y < target2.y + 20);
}

如今讓咱們修改輔助函數paintHeroShots來檢查每一個子彈是否擊中敵人。對於發生命中的狀況,咱們將在已擊中的敵人上將屬性isDead設置爲true,而且咱們將子彈的座標設置爲屏幕外。 子彈最終會被濾除,由於它在屏幕外。

spaceship_reactive/enemy_shots2.js

function paintEnemies(enemies) {
    enemies.forEach(function(enemy) {
        enemy.y += 5;
        enemy.x += getRandomInt(-15, 15);
        if (!enemy.isDead) {
        drawTriangle(enemy.x, enemy.y, 20, '#00ff00', 'down');
        }
        enemy.shots.forEach(function(shot) {
            shot.y += SHOOTING_SPEED;
            drawTriangle(shot.x, shot.y, 5, '#00ffff', 'down');
        });
    });
}

var SHOOTING_SPEED = 15;
function paintHeroShots(heroShots, enemies) {
    heroShots.forEach(function(shot, i) {
        for (var l=0; l<enemies.length; l++) {
            var enemy = enemies[l];
            if (!enemy.isDead && collision(shot, enemy)) {
                enemy.isDead = true;
                shot.x = shot.y = -100;
                break;
            }
        }
        shot.y -= SHOOTING_SPEED;
        drawTriangle(shot.x, shot.y, 5, '#ffff00', 'up');
    });
}

接下來讓咱們擺脫任何將屬性isDead設置爲true的敵人。惟一須要注意的是,咱們須要等待那個特定敵人的全部射擊消失;不然,當咱們擊中一個敵人時,它的全部射擊都會隨之消失,這很奇怪。 所以,咱們檢查其射擊的長度,並僅在沒有射擊時過濾掉敵人物體:

spaceship_reactive/enemy_shots2.js

var Enemies = Rx.Observable.interval(ENEMY_FREQ)
.scan(function(enemyArray) {
    var enemy = {
        x: parseInt(Math.random() * canvas.width),
        y: -30,
        shots: []
    };
    Rx.Observable.interval(ENEMY_SHOOTING_FREQ).subscribe(function() {
        if (!enemy.isDead) {
            enemy.shots.push({ x: enemy.x, y: enemy.y });
        }
        enemy.shots = enemy.shots.filter(isVisible);
    });
    enemyArray.push(enemy);
    return enemyArray
    .filter(isVisible)
    .filter(function(enemy) {
        return !(enemy.isDead && enemy.shots.length === 0);
    });
}, []);

爲了檢查玩家的船是否被擊中,咱們建立了一個函數gameOver:

spaceship_reactive/enemy_shots2.js

function gameOver(ship, enemies) {
    return enemies.some(function(enemy) {
        if (collision(ship, enemy)) {
            return true;
        }
        return enemy.shots.some(function(shot) {
            return collision(ship, shot);
        });
    });
}

若是敵人或敵人射擊擊中玩家的宇宙飛船,則此函數返回true。

在繼續以前,讓咱們瞭解一個有用的運算符:takeWhile。當咱們在現有的Observable上調用takeWhile時,Observable將繼續發出值,直到函數做爲參數傳遞給takeWhile返回false。

咱們可使用takeWhile告訴咱們的主要combineLatest Observable繼續獲取值,直到gameOver返回true:

spaceship_reactive/enemy_shots2.js

Rx.Observable.combineLatest(
    StarStream, SpaceShip, Enemies, HeroShots,
    function(stars, spaceship, enemies, heroShots) {
        return {
            stars: stars,
            spaceship: spaceship,
            enemies: enemies,
            heroShots: heroShots
        };
    })
.sample(SPEED)
.takeWhile(function(actors) {
    return gameOver(actors.spaceship, actors.enemies) === false;
})
.subscribe(renderScene);

當gameOver返回true時,combineLatest將中止發射值,從而馬上中止遊戲。

最後一件事:保持分數

若是咱們不能向朋友吹噓咱們的結果,會是什麼樣的遊戲?咱們顯然須要一種方法來跟蹤咱們的表現。 咱們須要得分。

讓咱們建立一個簡單的輔助函數來將分數繪製到屏幕的左上角:

spaceship_reactive/score.js

function paintScore(score) {
    ctx.fillStyle = '#ffffff';
    ctx.font = 'bold 26px sans-serif';
    ctx.fillText('Score: ' + score, 40, 43);
}

爲了保持分數,咱們將使用Subject。咱們能夠在基於combineLatest的主遊戲循環中輕鬆使用它,就像它只是另外一個Observable同樣,咱們能夠隨時將值推送到它。

spaceship_reactive/score.js

var ScoreSubject = new Rx.Subject();
var score = ScoreSubject.scan(function (prev, cur) {
    return prev + cur;
}, 0).concat(Rx.Observable.return(0));

在該代碼中,咱們使用scan運算符將每一個新值與總聚合結果相加。因爲咱們在遊戲開始時不會有任何分數,咱們會鏈接一個返回0的Observable,所以咱們有一個起點。

如今,只要咱們擊中敵人,咱們就必須將分數推向咱們的Subject;這是在paintHeroShots中發生的事情:

spaceship_reactive/score.js

var SCORE_INCREASE = 10;
function paintHeroShots(heroShots, enemies) {
    heroShots.forEach(function(shot, i) {
        for (var l=0; l<enemies.length; l++) {
            var enemy = enemies[l];
            if (!enemy.isDead && collision(shot, enemy)) {
                ScoreSubject.onNext(SCORE_INCREASE);
                enemy.isDead = true;
                shot.x = shot.y = -100;
                break;
            }
        }
        shot.y -= SHOOTING_SPEED;
        drawTriangle(shot.x, shot.y, 5, '#ffff00', 'up');
    });
}

固然,咱們將paintScore添加到renderScene,以便分數顯示在屏幕上:

spaceship_reactive/score.js

function renderScene(actors) {
    paintStars(actors.stars);
    paintSpaceShip(actors.spaceship.x, actors.spaceship.y);
    paintEnemies(actors.enemies);
    paintHeroShots(actors.heroShots, actors.enemies);
    paintScore(actors.score);
}

這完成了咱們的Spaceship Reactive遊戲。咱們已經設法在瀏覽器中對整個遊戲進行編碼,避免經過Observable管道的功能改變任何外部狀態。

改進的想法

我相信你已經有了一些使遊戲更使人興奮的想法,我也有一些改進建議,讓遊戲更好,同時提升你的RxJS技能:

  • 添加以不一樣速度移動的第二個(或第三個!)星形場以建立視差效果。 這能夠經過幾種不一樣的方式完成。 嘗試重用現有代碼並儘量以聲明方式執行。
  • 經過使它們以隨機間隔發射而不是ENEMY_SHOOTING_FREQ中指定的固定敵人來製造更多不可預測的敵人。 若是玩家的分數越高,你可讓他們更快地開火,這是額外的積分!
  • 容許玩家在短期內擊中幾個敵人得到更多積分。

總結

咱們只使用Observables爲瀏覽器構建了一個完整的遊戲,而且沿途咱們已經看到了幾種很是方便的方法來處理併發以及組合和轉換Observable。這是RxJS的優點之一:總有一種方法能夠幫助解決您正在嘗試解決的問題。請隨意在RxJS文檔中探索它們

反應式編程能夠輕鬆編寫併發程序。Observable抽象和強大的RxJS方法使程序的不一樣部分可以有效地進行交互。不依賴外部狀態進行編程可能須要一些時間來適應,但它有很大的好處。咱們能夠將整個行爲封裝在一個Observable管道中,使咱們的程序更加可靠和可靠。

在下一章中,咱們將選擇咱們離開它的地震可視化應用程序並添加一個顯示與地震有關的推文的Node.js服務器部分。咱們還將改進其用戶界面,使其看起來像一個真正的地震儀表板。

關注個人微信公衆號,更多優質文章定時推送
clipboard.png

相關文章
相關標籤/搜索