Rxjs 響應式編程-第五章 使用Schedulers管理時間

Rxjs 響應式編程-第一章:響應式
Rxjs 響應式編程-第二章:序列的深刻研究
Rxjs 響應式編程-第三章: 構建併發程序
Rxjs 響應式編程-第四章 構建完整的Web應用程序
Rxjs 響應式編程-第五章 使用Schedulers管理時間
Rxjs 響應式編程-第六章 使用Cycle.js的響應式Web應用程序react

使用Schedulers管理時間

自從接觸RxJS,就開始在個人項目中使用它。有一段時間我覺得我知道如何有效地使用它,但有一個使人煩惱的問題:我怎麼知道我使用的運算符是同步仍是異步?換句話說,Operators到底何時發出通知?這彷佛是正確使用RxJS的關鍵部分,但對我來講感受有點模糊。編程

我認爲,間隔運算符顯然是異步的,因此它在內部使用相似setTimeout的東西來發出項目。可是,若是我使用範圍怎麼辦?它也是異步發射的嗎?它會阻止事件循環嗎?來自哪裏?我處處都在使用這些運算符,但我對它們的內部併發模型知之甚少。canvas

而後我瞭解了Schedulers。segmentfault

Schedulers是一種強大的機制,能夠精確管理應用程序中的併發性。它們容許您隨時更改其併發模型,從而對Observable如何發出通知進行細粒度控制。在本章中,您將學習如何使用調度程序並在常見場景中應用它們。咱們將專一於測試,調度程序特別有用,您將學習如何製做本身的Schedulers。數組

使用Schedulers

Schedulers是一種「安排」未來發生的操做的機制。 RxJS中的每一個運算符在內部使用一個Schedulers,選擇該Schedulers以在最可能的狀況下提供最佳性能。瀏覽器

讓咱們看看咱們如何改變運算符中的Schedulers以及這樣作的後果。 首先讓咱們建立一個包含1,000個整數的數組:微信

var arr = [];
for (var i=0; i<1000; i++) {
    arr.push(i);
}

而後,咱們從arr建立一個Observable並強制它經過訂閱它來發出全部通知。 在代碼中,咱們還保存了發出全部通知所需的時間:併發

var timeStart = Date.now();
Rx.Observable.from(arr).subscribe(
    function onNext() {},
    function onError() {},
    function onCompleted() {
        console.log('Total time: ' + (Date.now() - timeStart) + 'ms');
    }
);
"Total time: 6ms」

六毫秒 - 不壞! from在內部使用Rx.Scheduler.currentThread,它計劃在任何當前工做完成後運行。 一旦啓動,它將同步處理全部通知。app

在讓咱們將Scheduler更改成Rx.Scheduler.default框架

var timeStart = Date.now();
Rx.Observable.from(arr, null, null, Rx.Scheduler.default).subscribe(
    function onNext() {},
    function onError() {},
    function onCompleted() {
        console.log('Total time: ' + (Date.now() - timeStart) + 'ms');
    }
);
"Total time: 5337ms」

哇,咱們的代碼運行速度比使用currentThread Scheduler慢幾千倍。 那是由於默認的Scheduler異步運行每一個通知。 咱們能夠經過在訂閱後添加一個簡單的日誌語句來驗證這一點。

使用currentThread Scheduler:

Rx.Observable.from(arr).subscribe( ... );
console.log('Hi there!’);
"Total time: 8ms"
"Hi there!"

使用默認Scheduler:

Rx.Observable.from(arr, null, null, Rx.Scheduler.timeout).subscribe( ... );
console.log('Hi there!’);
"Hi there!"
"Total time: 5423ms"

由於使用默認Schedule的Observer以異步方式發出其項目,因此咱們的console.log語句(它是同步的)在Observable甚至開始發出任何通知以前執行。 使用currentThread Scheduler,全部通知都會同步發生,所以只有在Observable發出全部通知時纔會執行console.log語句。

所以,Scheduler確實能夠改變咱們的Observable的工做方式。 在咱們的例子中,性能確實受到異步處理一個已經可用的大型陣列的影響。 但咱們實際上可使用Scheduler來提升性能。 例如,咱們能夠在對Observable執行昂貴的操做以前動態切換Scheduler:

arr.groupBy(function(value) {
    return value % 2 === 0;
})
.map(function(value) {
    return value.observeOn(Rx.Scheduler.default);
})
.map(function(groupedObservable) {
    return expensiveOperation(groupedObservable);
});

在前面的代碼中,咱們將數組中的全部值分爲兩組:偶數和非偶數。 groupBy返回一個Observable,它爲每一個建立的組發出一個Observable。 這裏是很酷的部分:在運行以前對每一個分組的Observable中的項目進行昂貴的操做,咱們使用observeOn將Scheduler切換到默認值,這樣昂貴的操做將異步執行,而不是阻塞事件循環

observeOn和subscribeOn

在上一節中,咱們使用observeOn運算符來更改某些Observable中的Scheduler。 observeOn和subscribeOn是返回Observable實例副本的運算符,但它使用的Scheduler咱們做爲參數傳遞的。

observeOn接受一個Scheduler並返回一個使用該Scheduler的新Observable。 它將使每一個onNext調用在新的Scheduler中運行。

subscribeOn強制Observable的訂閱和取消訂閱工做(而不是通知)在特定的Scheduler上運行。 與observeOn同樣,它接受Scheduler做爲參數。 例如,當咱們在瀏覽器中運行並在訂閱調用中執行重要工做時,卻不但願用它來阻止UI線程,subscribeOn很是有用。

基本的Rx Scheduler

讓咱們在咱們剛剛使用的Scheduler中深刻了解一下。 RxJS的運算符最經常使用的是immediate,default和currentThread。

Immediate Scheduler

Immediate Scheduler同步發出來自Observable的通知,所以不管什麼時候在Immediate Scheduler上調度操做,它都將當即執行,從而阻塞該線程。 Rx.Observable.range是內部使用Immediate Scheduler序的運算符之一:

console.log('Before subscription');

Rx.Observable.range(1, 5)
.do(function(a) {
    console.log('Processing value', a);
})
.map(function(value) { return value * value; })
.subscribe(function(value) { console.log('Emitted', value); });

console.log('After subscription');
Before subscription
Processing value 1
Emitted 1
Processing value 2
Emitted 4
Processing value 3
Emitted 9
Processing value 4
Emitted 16
Processing value 5
Emitted 25
After subscription

程序輸出按咱們指望的順序發生。 每一個console.log語句在當前項的通知以前運行。

什麼時候使用它

Immediate Scheduler很是適合於在每一個通知中執行可預測且很是昂貴的操做的Observable。 此外,Observable最終必須調用onCompleted。

Default Scheduler

Default Scheduler以異步方式運行操做。 您能夠將其視爲setTimeout的等價物,其延遲爲零毫秒,從而保持序列中的順序。 它使用其運行的平臺上可用的最有效的異步實現(例如,Node.js中的process.nextTick或瀏覽器中的setTimeout)。

讓咱們使用前一個使用了range示例,並使其在默認的Scheduler上運行。 爲此,咱們將使用observeOn運算符:

console.log('Before subscription');

Rx.Observable.range(1, 5)
.do(function(value) {
    console.log('Processing value', value);
})
.observeOn(Rx.Scheduler.default)
.map(function(value) { return value * value; })
.subscribe(function(value) { console.log('Emitted', value); });

console.log('After subscription');
Before subscription
Processing value 1
Processing value 2
Processing value 3
Processing value 4
Processing value 5
After subscription
Emitted 1
Emitted 4
Emitted 9
Emitted 16
Emitted 25

這個輸出有很大的不一樣。 咱們的同步console.log語句輸出每一個值,但咱們使Observable在默認的Scheduler上運行,它會異步生成每一個值。 這意味着咱們在do運算符中的日誌語句在平方值以前處理。

什麼時候使用它

Default Scheduler永遠不會阻塞事件循環,所以它很是適合涉及時間的操做,如異步請求。 它也能夠在從未完成的Observable中使用,由於它不會在等待新通知時阻塞程序(這可能永遠不會發生)。

Current Thread Scheduler

currentThread Scheduler與Immediate Scheduler同樣是同步的,可是若是咱們使用遞歸運算符,它會將要執行的操做排入隊列,而不是當即執行它們。 遞歸運算符是一個本身調度另外一個運算符的運算符。 一個很好的例子就是repeatrepeat運算符 - 若是沒有給出參數 - 將無限期地重複鏈中的先前Observable序列。

若是對使用Immediate Scheduler的運算符(例如return)調用repeat,則會遇到麻煩。 讓咱們經過重複值10來嘗試這個,而後使用take只取重複的第一個值。 理想狀況下,代碼將打印10次而後退出:

// Be careful: the code below will freeze your environment!
Rx.Observable.return(10).repeat().take(1)
.subscribe(function(value) {
    console.log(value);
});
Error: Too much recursion

此代碼致使無限循環。 在訂閱時,如return調用onNext(10)而後onCompleted,這使得repeat再次訂閱return。 因爲返回正在Immediate Scheduler上運行,所以該過程會重複,致使無限循環而且永遠不會結束。

可是若是相反咱們經過將它做爲第二個參數傳遞給currentThread Scheduler給return,咱們獲得:

var scheduler = Rx.Scheduler.currentThread;
Rx.Observable.return(10, scheduler).repeat().take(1)
.subscribe(function(value) {
    console.log(value);
});
10

如今,當repeat從新訂閱返回時,新的onNext調用將排隊,由於以前的onCompleted仍在發生。 repeat而後返回一個可使用的一次性對象,它調用onCompleted並經過重複處理取消repeat,最終從subscribe返回調用。

什麼時候使用它

currentThread Scheduler對於涉及遞歸運算符(如repeat)的操做很是有用,而且一般用於包含嵌套運算符的迭代。

動畫調度

對於諸如canvas或DOM動畫之類的快速視覺更新,咱們可使用具備很是小時間間隔的interval運算符,或者咱們能夠在內部使用相似setTimeout的函數來調度通知。

但這兩種方法都不理想。 在他們兩個中咱們都在瀏覽器上拋出全部這些更新,這可能沒法足夠快地處理它們。之因此會發生這種狀況,是由於瀏覽器正在嘗試渲染一個幀,而後它會收到渲染下一幀的指令,所以它會丟棄當前幀以保持速度。 結果是致使動畫的不流暢,卡頓。

瀏覽器具備處理動畫的原生方式,而且它們提供了一個使用它的API,稱爲requestAnimationFramerequestAnimationFrame容許瀏覽器經過在最合適的時間排列動畫來優化性能,並幫助咱們實現更流暢的動畫。

有專門的Scheduler處理requestAnimationFrame

RxDOM庫附帶了一些額外的調度程序,其中一個是requestAnimationFrame Scheduler。

是的,你猜對了。 咱們可使用此Scheduler來改進咱們的太空飛船視頻遊戲。 在其中,咱們創建了40ms的刷新速度 - 大約每秒25幀 - 經過在該速度下建立一個interval Observable,而後使用combineLatest以間隔設置的速度更新整個遊戲場景(由於它是最快速更新的Observable) )...但誰知道瀏覽器使用這種技術丟幀了多少幀! 使用requestAnimationFrame能夠得到更好的性能。

讓咱們建立一個使用Rx.Scheduler.requestAnimationFrame做爲其調度程序的Observable。 請注意,它與interval運算符的工做方式相似:

ch_schedulers/starfield_raf.js

function animationLoop(scheduler) {
    return Rx.Observable.generate(
        0,
        function() { return true; }, // Keep generating forever
        function(x) { return x + 1; }, // Increment internal value
        function(x) { return x; }, // Value to return on each notification
        Rx.Scheduler.requestAnimationFrame
    ); // Schedule to requestAnimationFrame
}

如今,不管什麼時候咱們使用了25 FPS動畫,咱們均可以使用animationLoop函數。 因此咱們的Observable繪製了星星,以前看起來像這樣:

spaceship_reactive/spaceship.js

var StarStream = Rx.Observable.range(1, 250)
.map(function() {
    return {
        x: parseInt(Math.random() * canvas.width),
        y: parseInt(Math.random() * canvas.height),
        size: Math.random() * 3 + 1
    };
})
.toArray()
.flatMap(function(arr) {
    return Rx.Observable.interval(SPEED).map(function() {
        return arr.map(function(star) {
            if (star.y >= canvas.height) {
                star.y = 0;
            }
            star.y += 3;
            return star;
        });
    });
});

變成這樣:

ch_schedulers/starfield_raf.js

var StarStream = Rx.Observable.range(1, 250)
.map(function() {
    return {
        x: parseInt(Math.random() * canvas.width),
        y: parseInt(Math.random() * canvas.height),
        size: Math.random() * 3 + 1
    };
})
.toArray()
.flatMap(function(arr) {
    return animationLoop().map(function() {
        return arr.map(function(star) {
            if (star.y >= canvas.height) {
                star.y = 0;
            }
            star.y += 3;
            return star;
        });
    });
});

這給了咱們一個更流暢的動畫。 代碼也更簡潔!

使用Scheduler進行測試

測試多是咱們可使用Scheduler的最引人注目的場景之一。 到目前爲止,在本書中,咱們一直在編寫咱們的核心代碼而不考慮後果。 可是在現實世界的軟件項目中,咱們將編寫測試以確保咱們的代碼按照咱們的意圖運行。

測試異步代碼很難。 咱們常常遇到如下問題之一:

  • 模擬異步事件很複雜且容易出錯。 測試的重點是避免bug和錯誤,但若是你的測試自己有錯誤,那這顯然是有問題的。
  • 若是咱們想要準確測試基於時間的功能,自動化測試變得很是緩慢。 例如,若是咱們須要準確測試在嘗試檢索遠程文件四秒後調用錯誤,則每一個測試至少須要花費很長時間才能運行結束。 若是咱們不斷運行咱們的測試套件,那將影響咱們的開發時間。

TestScheduler

RxJS爲咱們提供了TestScheduler,一個旨在幫助測試的Scheduler。 TestScheduler容許咱們在方便時模擬時間並建立肯定性測試,確保它們100%可重複。 除此以外,它容許咱們執行須要花費大量時間並將其壓縮到瞬間的操做,同時保持測試的準確性。

TestScheduler是VirtualTimeScheduler的專業化。 VirtualTimeSchedulers在「虛擬」時間而不是實時執行操做。 計劃的操做進入隊列並在虛擬時間內分配一個時刻。 而後,Scheduler在其時鐘前進時按順序運行操做。 由於它是虛擬時間,因此一切都當即運行,而沒必要等待指定的時間。 咱們來看一個例子:

var onNext = Rx.ReactiveTest.onNext;
QUnit.test("Test value order", function(assert) {
    var scheduler = new Rx.TestScheduler();
    var subject = scheduler.createColdObservable(
        onNext(100, 'first'),
        onNext(200, 'second'),
        onNext(300, 'third')
    );
    var result = '';
    subject.subscribe(function(value) { result = value });
    scheduler.advanceBy(100);
    assert.equal(result, 'first');
    scheduler.advanceBy(100);
    assert.equal(result, 'second');
    scheduler.advanceBy(100);
    assert.equal(result, 'third');
});

在前面的代碼中,咱們測試了來自冷Observable的一些值以正確的順序到達。 爲此,咱們在TestScheduler中使用helper方法createColdObservable來建立一個Observable,它回放咱們做爲參數傳遞的onNext通知。 在每一個通知中,咱們指定應該發出通知值的時間。 在此以後,咱們訂閱此Observable,手動提早調度程序中的虛擬時間,並檢查它是否確實發出了預期值。 若是示例在正常時間運行,則須要300毫秒,但因爲咱們使用TestScheduler來運行Observable,它將當即運行,但徹底按照咱們的順序。

寫一個真實的測試案例

沒有比在現實世界中爲時間敏感的任務編寫測試更好的方法來理解如何使用虛擬時間來縮短期。 讓咱們從咱們在緩衝值中製做的地震查看器中恢復一個Observable:

quakes
.pluck('properties')
.map(makeRow)
.bufferWithTime(500)
.filter(function(rows) { return rows.length > 0; })
.map(function(rows) {
    var fragment = document.createDocumentFragment();
    rows.forEach(function(row) {
        fragment.appendChild(row);
    });
    return fragment;
})
.subscribe(function(fragment) {
    table.appendChild(fragment);
});

爲了使代碼更易於測試,讓咱們將Observable封裝在一個函數中,該函數接受咱們在bufferWithTime運算符中使用的Scheduler。在Obpectables中參數化將要測試的Scheduler老是一個好主意。

ch_schedulers/testscheduler.js

function quakeBatches(scheduler) {
    return quakes.pluck('properties')
    .bufferWithTime(500, null, scheduler || null)
    .filter(function(rows) {
        return rows.length > 0;
    });
}

讓咱們經過採起一些步驟來簡化代碼,但保持本質。 此代碼採用包含屬性屬性的Observable JSON對象,將它們緩衝到每500毫秒釋放的批次中,並過濾掉空的批次。

咱們想要驗證此代碼是否有效,但咱們絕對不但願每次運行測試時都等待幾秒鐘,以確保咱們的緩衝按預期工做。 這是虛擬時間和TestScheduler將幫助咱們的地方:

ch_schedulers/testscheduler.js

❶ var onNext = Rx.ReactiveTest.onNext;
var onCompleted = Rx.ReactiveTest.onCompleted;
var subscribe = Rx.ReactiveTest.subscribe;
❷ var scheduler = new Rx.TestScheduler();
❸ var quakes = scheduler.createHotObservable(
    onNext(100, { properties: 1 }),
    onNext(300, { properties: 2 }),
    onNext(550, { properties: 3 }),
    onNext(750, { properties: 4 }),
    onNext(1000, { properties: 5 }),
    onCompleted(1100)
);
❹ QUnit.test("Test quake buffering", function(assert) {
    ❺ var results = scheduler.startScheduler(function() {
        return quakeBatches(scheduler)
    }, {
        created: 0,
        subscribed: 0,
        disposed: 1200
    });
    ❻ var messages = results.messages;
    console.log(results.scheduler === scheduler);
    ❼ assert.equal(
        messages[0].toString(),
        onNext(501, [1, 2]).toString()
    );
    assert.equal(
        messages[1].toString(),
        onNext(1001, [3, 4, 5]).toString()
    );
    assert.equal(
        messages[2].toString(),
        onCompleted(1100).toString()
    );
});

讓咱們一步一步地剖析代碼:

  1. 咱們首先從ReactiveTest加載一些輔助函數。 這些在虛擬時間內註冊onNext,onCompleted和訂閱事件。
  2. 咱們建立了一個新的TestScheduler,它將推進整個測試。
  3. 咱們使用TestScheduler中的方法createHotObservable建立一個假的熱Observable,它將在虛擬時間內模擬特定點的通知。 特別是,它在第一秒發出五個通知,並在1100毫秒完成。 每次它發出一個具備特定屬性的對象。
  4. 咱們可使用任何測試框架來運行測試。 對於咱們的例子,我選擇了QUnit。
  5. 咱們使用startScheduler方法建立一個使用測試調度程序的Observable。 第一個參數是一個函數,它建立Observable以使用咱們的Scheduler運行。 在咱們的例子中,咱們只返回咱們傳遞TestScheduler的quakeBatches函數。 第二個參數是一個對象,它包含咱們想要建立Observable的不一樣虛擬時間,訂閱它並處理它。 對於咱們的示例,咱們在虛擬時間0開始和訂閱,而且咱們在1200(虛擬)毫秒處理Observable。
  6. startScheduler方法返回一個帶有scheduler和messages屬性的對象。 在消息中,咱們能夠在虛擬時間內找到Observable發出的全部通知。
  7. 咱們的第一個斷言測試在501毫秒以後(在第一個緩衝時間限制以後),咱們的Observable產生值1和2。
    咱們的第二個斷言測試在1001毫秒後,咱們的Observable產生剩餘的值3,4和5.最後,咱們的第三個斷言檢查序列是否徹底在1100毫秒完成,正如咱們在熱的Observable地震中所指出的那樣。

該代碼以很是可靠的方式有效地測試咱們的高度異步的Observable,而且無需跳過箍來模擬異步條件。咱們只是指定咱們但願代碼在虛擬時間內做出反應的時間,咱們使用測試調度程序來運行整個操做。

總結

Scheduler是RxJS的重要組成部分。 即便您能夠在沒有明確使用它們的狀況下走很長的路,它們也是一種先進的概念,它可讓您在程序中微調併發性。虛擬時間的概念是RxJS獨有的,對於測試異步代碼等任務很是有用。

在下一章中,咱們將使用Cycle.js,這是一種基於稱爲單向數據流的概念來建立使人驚歎的Web應用程序的反應方式。有了它,咱們將使用現代技術建立一個快速的Web應用程序,從而顯着改進傳統的Web應用程序製做方式。

關注個人微信公衆號,更多優質文章定時推送
clipboard.png

相關文章
相關標籤/搜索