[書籍翻譯] 《JavaScript併發編程》第七章 抽取併發邏輯

本文是我翻譯《JavaScript Concurrency》書籍的第七章 抽取併發邏輯,該書主要以Promises、Generator、Web workers等技術來說解JavaScript併發編程方面的實踐。
完整書籍翻譯地址: https://github.com/yzsunlei/javascript_concurrency_translation 。因爲能力有限,確定存在翻譯不清楚甚至翻譯錯誤的地方,歡迎朋友們提issue指出,感謝。

到本書這裏,咱們已經在代碼中明確地模擬了併發問題。使用promises,咱們同步化了兩個或更多異步操做。使用生成器,咱們按需建立數據,避免沒必要要的內存分配。最後,咱們瞭解到Web worker是利用多個CPU內核的主要工具。javascript

在本章中,咱們將採用全部這些方法並將它們放入應用程序代碼的上下文中。也就是說,若是併發是默認的,那麼咱們須要使併發儘量不那麼明顯。咱們將首先探索各類技術,這些技術將幫助咱們在使用的組件中封裝併發機制。而後,咱們將經過使用promises來幫助worker通訊,直接改進前兩章的代碼。前端

一旦咱們可以使用promises抽象worker通訊,咱們將嘗試在生成器的幫助下實現惰性的worker。咱們還將使用Parallel.js庫來介紹worker抽象,而後是worker線程池的概念。java

編寫併發代碼

併發編程很難作到。即便是人爲的示例應用程序,大部分複雜性來自併發代碼。咱們顯然但願咱們的代碼可讀性好,同時保持併發的好處。咱們但願充分利用系統上的每一個CPU。咱們只想在須要時計算咱們須要的東西。咱們不但願意大利麪條式的代碼將多個異步操做混在一塊兒。在開發應用程序的同時關注併發編程的全部這些方面會削弱咱們應該關注的內容 - 提供應用程序有價值的功能。node

在本節中,咱們將介紹可能用於將咱們的應用程序的其他部分與棘手的併發隔離的方法。這一般意味着將併發做爲默認模式 - 即便在引擎下沒有發生真正的併發時也是如此。最後,咱們不但願咱們的代碼包含90%的併發處理技巧,而只有10%的功能。git

隱藏併發機制

在咱們全部的代碼中暴露併發機制的難度是,他們每個都稍微不一樣於另外一個。這擴大了咱們可能已經發現所在的回調地獄的狀況。例如,不是全部的併發操做都是從一些遠程資源獲取數據的網絡請求。異步數據可能來自一個worker或一些自己就是異步的回調。想象一下場景咱們使用了三個不一樣的數據源來計算一個咱們須要的值,全部的這些都是異步的。這裏是這個問題的示圖:github

image143.gif

此圖中的數據是咱們在應用程序代碼中關注的內容。從咱們正在構建的功能的角度來看,咱們並不關心上述任何事情。所以,咱們的前端架構須要封裝與併發相關的複雜性。這意味着咱們的每一個組件都應該可以以相同的方式訪問數據。除了咱們全部的異步數據源以外,還有另外一個要考慮的複雜因素 - 當數據不是異步的而且來自本地數據源呢?那麼同步本地數據源和HTTP請求呢?咱們將在下一節中介紹這些。web

沒有併發性

僅僅由於咱們正在編寫併發JavaScript應用程序,並不是每一個操做自己都是併發的。例如,若是一個組件向另外一個組件詢問它已經在內存中的數據,則它不是異步操做並會當即返回。咱們的應用程序可能處處都是這些操做,其中併發性根本就沒有意義。其中存在的挑戰 - 咱們如何將異步操做與同步操做無縫混合?編程

簡單的答案是咱們在每處作出併發的默認假設。promise使這個問題易於處理。如下是使用promise來封裝異步和同步操做的示圖說明:segmentfault

image144.gif

這看起來很像前面的那個圖,但有兩個重要區別。咱們添加了一個synchronous()操做; 這沒有回調函數,由於它不須要回調函數。它不是在等待其餘任何東西,因此它會直接地返回。其餘兩個函數就像在上圖中同樣;二者都依賴回調函數將數據提供給咱們的應用程序。第二個重要區別是有一個promise對象。這取代了sync()操做和數據概念。或者更確切地說,它將它們融合到同一個概念中。後端

這是promise的關鍵做用 - 它們爲咱們抽象同步問題提供能力。這不只適用於網絡請求,還適用於Web worker消息或依賴於回調的任何其餘異步操做。它須要一些調整來考慮下咱們的數據,由於咱們得保證它最終會到達這裏。可是,一旦咱們消除了這種心理差距,默認狀況下就會啓用併發。就咱們的功能而言,併發是默認的,而咱們在操做背後所作的事情並非最具破壞性的。

如今讓咱們看一些代碼。咱們將建立兩個函數:一個是異步的,另外一個是簡單返回值的普通函數。這裏的目標是使運行這些函數的代碼相同,儘管生成值的方式有很大不一樣:

//一個異步「fetch」函數。咱們使用「setTimeout()」
//在1秒後經過「callback()」返回一些數據。
function fetchAsync(callback) {
    setTimeout(() => {
        callback({hello: 'world'});
    }, 1000);
}

//同步操做只簡單地返回數據。
function fetchSync() {
    return {hello: 'world'};
}

//對「fetchAsync()」調用的promise。
//咱們經過了「resolve」函數做爲回調。
var asyncPromise = new Promise((resolve, reject) => {
    fetchAsync(resolve);
});

//對「fetchSync()」調用的promise。
//這個promise當即完成使用返回值。
var syncPromise = new Promise((resolve, reject) => {
    resolve(fetchSync());
});

//建立一個等待兩個promise完成的promise。
//這讓咱們無縫混合同步值和異步值。
Promise.all([
    asyncPromise,
    syncPromise
    ]).then((results) => {
        var [asyncResult, syncResult] = results;
        console.log('async', asyncResult);
        //→async {hello: 'world'}
    });

console.log('sync', syncResult);
//→sync {hello:'world'}

在這裏權衡的是增長promise的複雜性,包裹它們而不是讓簡單的返回值函數立刻返回。但在現實中,封裝promise的複雜性中,若是咱們不是寫一個併發應用,咱們顯然須要關心這類問題自己。這些的好處是巨大的。當一切都是promise的值時,咱們能夠安全地排除使人討厭的致使不一致的併發錯誤。

worker與promise通訊

咱們如今已經知道了爲何將原始值視爲promise有益於咱們的代碼。是時候將這個概念應用於web workers了。在前兩章中,咱們的代碼同步來自Web worker的響應看起來有點棘手。這是由於咱們基本上試圖模仿許多promise善於處理的樣板工做。咱們首先嚐試經過建立輔助函數來解決這些問題,這些函數爲咱們包裝worker通訊,返回promise。而後咱們將嘗試另外一種涉及在較低級別擴展Web worker的方法。最後,咱們將介紹一些涉及多個worker的更復雜的同步方案,例如上一章中的那些worker方案。

輔助函數

若是咱們可以以promise解決的形式得到Web worker響應,那將是理想的。可是,咱們須要首先創造promise - 咱們該怎麼作這個?好吧,咱們能夠手動建立promise,其中發送給worker的消息是從promise executor函數中發送的。可是,若是咱們採用這種方法,咱們就不會比引入promise以前好多少了。

技巧是在單個輔助函數中封裝發佈到worker的消息和從worker接收的任何消息,以下所示:

image145.gif

咱們來看一個實現這種模式的輔助函數示例。首先,咱們須要一個執行某項任務的worker - 咱們將從這開始:

//吃掉一些CPU循環...
//源自http://adambom.github.io/parallel.js/
function work(n) {
    var i = 0;
    while (++i < n * n) {}
    return i;
}

//當咱們收到消息時,咱們會發布一條消息id,
//以及在「number」上執行「work()」的結果。
addEventListener('message', (e) => {
    postMessage({
        id: e.data.id,
        result: work(e.data.number)
    });
});

在這裏,咱們有一個worker,它會對咱們傳遞的任何數字進行平方。這個work()函數特地很慢,以便咱們能夠看到咱們的應用程序做爲一個總體在Web worker花費比平時更長的時間來完成任務時的表現。它還使用咱們在以前的Web worker示例中看到的id,所以它能夠與發送消息的代碼協調。讓咱們如今實現使用此worker的輔助函數:

//這將生成惟一ID。
//咱們須要它們將Web worker執行的任務
//映射到更大的建立它們的操做。
function* genID() {
    var id = 0;
    while (true) {
        yield id++;
    }
}

//建立全局「id」生成器。
var id = genID();

//這個對象包含promises的解析器函數,
//當結果從worker那裏返回時,咱們經過id在這裏查看。
var resolvers = {};

//開始咱們的worker...
var worker = new Worker('worker.js');
worker.addEventListener('message', (e) => {

    //找到合適的解析器函數。
    var resolver = resolvers[e.data.id];

    //從「resolvers」對象中刪除它。
    delete resolvers[e.data.id];
    
    //經過調用解析器函數將worker數據傳遞給promise。
    resolver(e.data.result);
});

//這是咱們的輔助函數。
//它處理向worker發送消息,
//並將promise綁定到worker的響應。
function square(number) {
    return new Promise((resolve, reject) => {
        //用於將Web worker響應和解析器函數綁定在一塊兒的id。
        var msgId = id.next().value;

        //存儲解析器以便之後在Web worker消息回調中可使用。
        resolvers[msgId] = resolve;

        //發佈消息 - id和number參數
        worker.postMessage({
            id: msgId,
            number: number
        });
    });
}

square(10).then((result) => {
    console.log('square(10)', result);
    //→square(10) 100
});

square(100).then((result) => {
    console.log('square(100)', result);
    //→square(100) 10000
});

square(1000).then((result) => {
    console.log('square(1000)', result);
    //→square(1000) 1000000
});

若是咱們關注square()函數的使用方式,傳遞一個數字參數並將一個promise做爲返回值,咱們能夠看到這符合咱們以前關於默認狀況下使代碼併發的討論。例如,咱們能夠從這個場景中徹底刪除worker,只需更改輔助函數解析它返回的promise的方式,咱們的其他代碼將繼續保持不變。

輔助函數策略只是一種使用promises簡化worker通訊的方法。也許咱們能夠決定咱們不必定要維護一堆輔助函數。接下來,咱們將看一個比輔助函數更細粒度的方法。

擴展postMessage()

咱們能夠採用更通用的方法,而不是積聚大量輔助功能。輔助函數自己沒有什麼問題;他們是直接並且重要的。若是咱們達到了數百個這樣的函數,它們的做用就會開始大打折扣了。更通用的方法是繼續使用worker.postMessage()。

因此讓咱們看看是否可使這個方法返回一個promise,就像咱們上一節中的helper函數同樣。這樣,咱們繼續使用細粒度postMessage()方法,但改進了咱們的同步語義。首先,看看這裏的worker代碼:

addEventListener('message', (e) => {

    //咱們將發回主線程的結果,
    //它應該始終包含消息id。
    var result = {id: e.data.id};

    //基於「action」,計算響應值「value」。
    //選項是單獨保留文本,
    //將其轉換爲大寫,或將其轉換爲小寫。
    if (e.data.action === 'echo') {
        result.value = e.data.value;
    } else if (e.data.action === 'upper') {
        result.value = e.data.value.toUpperCase();
    } else if (e.data.action === 'lower') {
        result.value = e.data.value.toLowerCase();
    }
});

//經過等待延時模擬一個運行時間很長的worker,
//它在1秒後返回結果。
setTimeout(() => {
    postMessage(result);
}, 1000);

這與咱們迄今爲止在Web worker代碼中看到的徹底不一樣。如今,在主線程中,咱們必須弄清楚如何改變Worker的接口。咱們如今就這樣作。而後,咱們將嘗試向此worker發佈一些消息並將處理promises做爲響應:

//這個對象包含promises的解析器函數,
//當結果從worker那裏返回時,咱們經過id在這裏查看。
var resolvers = {};

//保持「postMessage()」的原始實現,
//因此咱們能夠稍後在咱們的自定義「postMessage()」中調用它。
var postMessage = Worker.prototype.postMessage;

//用咱們的自定義實現替換「postMessage()」。
Worker.prototype.postMessage = function(data) {
    return new Promise((resolve, reject) => {

    //用於將Web worker響應和解析器函數綁定在一塊兒的id。
    var msgId = id.next().value;

    //存儲解析器以便之後能夠在Web worker消息回調使用。
    resolvers[msgId] = resolve;

    //運行原始的「Worker.postMessage()」實現,
    //實際上負責將消息發佈到worker線程。
    postMessage.call(this, Object.assign({
            id: msgId
        }, data));
    });
};

//開始咱們的worker...
var worker = new Worker('worker.js');

worker.addEventListener('message', (e) => {

    //找到合適的解析器函數。
    var resolver = resolvers[e.data.id];

    //從「resolvers」對象中刪除它。
    delete resolvers[e.data.id];
    
    //經過調用解析器函數將worker數據傳遞給promise。
    resolver(e.data.value);
});

worker.postMessage({
    action: 'echo',
    value: 'Hello World'
}).then((value) => {
    console.log('echo', `"${value}"`);
    //→echo 「Hello World」
});

worker.postMessage({
    action: 'upper',
    value: 'Hello World'
}).then((value) => {
    console.log('upper', `"${value}"`);
    //→upper 「HELLO WORLD」
});

worker.postMessage({
    action: 'lower',
    value: 'Hello World'
}).then((value) => {
    console.log('lower',`"${value}"`);
    //→lower 「hello world」
});

嗯,這正是咱們須要的,對吧?咱們能夠直接將消息數據發佈給worker,並經過promise解析將響應數據發送給咱們。做爲一個額外的好處,若是咱們如此傾向,咱們實際上能夠圍繞這個新的postMessage()函數實現包裝輔助函數。主要參與完成這項工做的技巧是存儲對原始postMessage()的引用。而後,咱們覆蓋web worker屬性postMessage,而不是函數自己。最後,咱們能夠複用它並添加必要的協調來保證好用。

同步worker結果

該代碼在最後2段已經充分下降了web workers回調地獄到可接受的水平。在事實上,如今咱們已經有了一個方法處理如何封裝web workers通訊由具備的postMessage()返回一個promise,咱們準備要開始簡化一些未使用這種方法的混亂的worker代碼。咱們已經瞭解了這些例子的,因此到目前爲止,已經從promise中獲益良多,他們是簡單的; 沒有這些抽象不會是世界末日。

那麼咱們映射數據集合而後映射和迭代集合的場景呢?咱們能夠回顧map/reduce代碼在「第6章,實用的並行」。這主要是因爲全部worker通訊模板代碼與嘗試執行map/reduce操做的代碼混合在一塊兒。讓咱們看看使用promise技術是否更好。首先,咱們將建立一個很是基本的worker:

//返回一個輸入數組的映射,
//它經過平方數組中的每一個數字。
addEventListener('message', (e) => {
    postMessage({
        id: e.data.id,
        value: e.data.value.map(v => v * v)
    });
});

咱們可使用此worker傳遞數組進行映射。所以,咱們將建立其中兩個並在兩個worker之間拆分工做負載,以下所示:

function onMessage(e) {

    //找到合適的解析器函數。
    var resolver = resolvers[e.data.id];

    //從「resolvers」對象中刪除它。
    delete resolvers[e.data.id];

    //經過調用解析器函數將worker數據傳遞給promise
    resolver(e.data.value);
}

//開始咱們的worker...
var worker1 = new Worker('worker.js'),
    worker2 = new Worker('worker.js');

//建立一些要處理的數據。
var array = new Array(50000).fill(null).map((v, i) => i);

//當worker返回數據時,找到適當的解析器函數來調用。
worker1.addEventListener('message', onMessage);
worker2.addEventListener('message', onMessage);

//將輸入數據拆分爲2,給出前半部分到第一個worker,
//給出後一部分到第二個worker。在這一點上,咱們有兩個promises。
var promise1 = worker1.postMessage({
    value: array.slice(0, Math.floor(array.length / 2))
});

var promise2 = worker2.postMessage({
    value: array.slice(Math.floor(array.length / 2))
});

//使用「Promise.all()」來同步workers
//比手動嘗試協調整個worker回調函數要容易得多。
Promise.all([promise1, promise2]).then((values) => {
    console.log('reduced', [].concat(...values).reduce((r, v) => r + v));
    //→reduced 41665416675000
});

這就是咱們須要向worker發佈數據以及同步來自兩個或更多worker的數據時,咱們實際上就有動力編寫併發代碼 - 它看起來與如今的其餘應用程序代碼相同。

惰性workers

如今是咱們從不一樣角度看待web workers的時候了。咱們使用worker的根本原​​因是咱們想要在相同的時間內計算比過去更多的數據。正如咱們如今所知,這樣作涉及消息傳遞錯綜複雜,能夠說是分而治之的策略。咱們必須經過將數據輸入和輸出worker,一般使用數組。

生成器幫助咱們實現惰性地計算。也就是說,咱們不想在內存中計算內容或分配數據,直到咱們確實須要它。web workers難以實現這一目標嗎?或者咱們能夠利用生成器來惰性地並行計算嗎?

在本節中,咱們將探討有關在Web worker中使用生成器的方法。首先,咱們將研究與Web worker相關的開銷問題。而後,咱們將編寫一些代碼經過使用生成器來將數據輸入或者輸出worker。最後,咱們將看看咱們是否能夠惰性地經過一個生成器鏈在web worker上傳遞全部數據。

減小開銷

主線程能夠拆分開銷大的Web workers操做,在另外一個線程中運行它們。這意味着DOM可以渲染掛起的更新並處理掛起的用戶事件。可是,咱們仍然面臨分配大型數組的開銷和更新UI所需的時間。儘管與Web worker並行處理,但咱們的用戶仍然可能面臨運行緩慢,由於在處理完整個數據集以前,UI沒有更新。這是常見的模式的示圖:

image146.gif

這是具備單個worker的數據所採用的通用路徑; 當有多個worker時,一樣的方法也適用。使用這種方法,咱們沒法避免須要將數據序列化兩次這一事實,咱們必須分配兩次。這些開銷僅僅是爲了促進worker的通訊,而與咱們試圖實現的應用程序功能幾乎沒有關係。

worker通訊所需的數組和序列化開銷一般不是什麼大問題。可是,對於更大的集合,咱們可能會面臨真正的性能問題,這源於咱們用於提升性能的機制。所以,從另外一個角度看worker通訊不會受到損失,即便最初沒有必要。

這是大多數worker採用的通用路徑的變體。不是預先分配和序列化大量數據,而是將單個項傳入和傳出worker。這使得UI有機會在全部處理的數據到達以前使用已處理的數據進行更新。

image147.gif

在workers中生成值

若是咱們想要在workers生成結果時更新UI,那麼他們沒法將結果集打包爲數組,以便在完成全部計算後發送回主線程。當發生這種狀況時,UI就停在那裏而不響應用戶。咱們但願一個惰性的方法,其中值是在一段時間產生一個,這樣UI就能夠越快被更新。讓咱們創建一個例子,將輸入發送到該web workers,而後將結果以一個比咱們以前在這本書已經看到的更細微的水平發送回來:

首先,咱們將創造一個worker; 它的代碼以下:

//消耗一些CPU循環...
//源自http://adambom.github.io/parallel.js/
function work(n) {
    var i = 0;
    while(++i < n * n) {}
    return i;
}

//將調用「work()」的結果發回給主線程
addEventListener('message', (e) => {
    postMessage(work(e.data));
});

這裏沒有什麼可大不了的。它與咱們已經習慣的經過低效率地對數字進行減慢運行的代碼的work()函數相同。worker內部沒有使用實際的生成器。這是由於咱們真的不須要,咱們立刻就會明白爲何:

//建立一個「update()」協程,
//在生成結果時持續更新UI。
var update = coroutine(function* () {
    var input;
    
    while (true) {
        input = yield;
        console.log('result', input.data);
    }
});

//建立worker,並指定「update()」協程
//做爲「message」回調處理程序。
var worker = new Worker('worker.js');
worker.addEventListener('message', update);

//一個數字逐漸變大的數組
var array = new Array(10).fill(null).map((v, i) => i * 10000);

//迭代數組,將每一個數字做爲私有消息傳遞給worker。
for(let item of array) {
    worker.postMessage(item);
}
//→
//result 1
//result 100000000
//result 400000000
//result 900000000
//result 1600000000
//result 2500000000
//result 3600000000
//result 4900000000
//result 6400000000
//result 8100000000

傳遞給咱們worker的每一個數字的處理成本都比前一個數字要高。總的來講,在向用戶顯示任何內容以前處理整個輸入數組會以爲應用程序掛起或出錯了。可是,這不是這種狀況,由於雖然每一個數字的處理開銷很高,但咱們會在結果可用時將結果發佈回來。

咱們經過傳入一個數組來執行和將數組做爲輸出返回來執行有着相同的工做量。然而,這種方法只是改變了事情發生的順序。咱們在演示中引入了協做式多任務 - 在一個任務中計算一些數據並在另外一個任務中更新UI。完成工做所花費的總時間是相同的,但對於用戶來講,感受要快得多。總得說來,用戶可感知的應用程序性能是惟一重要的性能指標。

咱們將輸入做爲單獨的消息傳遞。咱們能夠將輸入做爲數組傳遞,單獨發佈結果,並得到相同的效果。可是,這可能
僅僅是沒必要要的複雜性。對於模式有天然的對應關係,由於它是 - 項目輸入,項目輸出。若是你不須要就不要改變它。

惰性worker鏈

正如咱們在「第4章,使用Generator實現惰性計算」看到,咱們能夠組裝生成器鏈。這就是咱們惰性地實現複雜函數的方式;一個項流經一系列生成器函數,這些函數在生成以前將項轉換爲下一個生成器,直到它到達調用者。沒有生成器,咱們必須分配大量的中間數據結構,只是爲了將數據從一個函數傳遞到下一個函數。

在本文以前的部分中,咱們看到Web worker可使用相似於生成器的模式。因爲咱們在這裏面臨相似的問題,咱們不但願分配大型數據結構。咱們能夠經過在更細粒度級別傳遞項目來避免這樣作。這具備保持UI響應的額外好處,由於咱們可以在最後一個項目從worker到達以前更新它。鑑於咱們能夠與worker作不少事情,咱們難道不能基於在這個想法並組裝更復雜的worker處理節點鏈嗎?

例如,假設咱們有一組數字和幾個轉換。咱們在UI中顯示這些轉換以前,咱們須要按特定順序進行這些轉換。理想狀況下,咱們會設置一個worker鏈,每一個worker負責執行其指定的轉換,而後將輸出傳遞給下一個worker。最終,主線程得到一個能夠在DOM中顯示的值。

這個目標的問題在於它所涉及的很棘手的通訊。因爲專用worker只與建立它們的主線程進行通訊,所以將結果發送回主線程,而後發送到鏈中的下一個worker線程,這幾乎沒有什麼益處。好吧,事實證實,專用worker能夠直接通訊而不涉及主線程。咱們能夠在這裏使用稱爲頻道消息的東西。這個想法很簡單; 它涉及建立一個頻道,它有兩個端口 - 消息在一個端口上發佈並在另外一個端口上接收。

咱們一直在使用消息傳遞頻道和端口。他們被捲入web workers。這是消息事件和postMessage()方法模式的來源。如下是咱們如何使用頻道和端口鏈接咱們的Web worker的示圖:

image151.gif

咱們能夠看到,每一個頻道使用兩個消息傳遞端口。第一端口是用於發佈消息,而所述第二端口被使用來接收消息事件。主線程惟一一次使用是當所述處理鏈首先被用於發佈一個消息給第一信道和當該消息從第三信道被接收到的消息。不要讓worker通訊所需的六個端口嚇倒咱們,讓咱們寫一些代碼; 也許,那裏看起來會更易於理解。首先,咱們將建立鏈中使用的worker。實際上,他們是同一個worker的兩個實例。下面是代碼:

addEventListener('message', (e) => {

    //獲取用於發送和接收消息的端口。
    var [port1, port2] = e.ports;

    //偵聽第一個端口的傳入消息。
    port1.addEventListener('message', (e) => {
        
        //在第二個端口上響應,結果爲調用「work()」。
        port2.postMessage(work(e.data));
    });

    //啓動兩個端口。
    port1.start();
    port2.start();
});

這是頗有趣的。在這個worker中,咱們有消息端口可使用。第一個端口用於接收輸入,第二個端口用於發送輸出。該work()函數簡單地使用咱們如今熟悉的平方數消耗CPU週期來看workers如何表現。咱們在主線程中想要作的是建立這個worker的兩個實例,這樣咱們就能夠傳遞第一個平方數的實例。而後,在不將結果傳遞迴主線程的狀況下,它將結果傳遞給下一個worker,並再次對數字進行平方。通訊路線應該與前面的圖表很是類似。讓咱們看一下使用消息傳遞通道鏈接worker的一些代碼:

//開始咱們的worker...
var worker1 = new Worker('worker.js');
var worker2 = new Worker('worker.js');

//建立通訊所需的在兩個worker之間的消息通道。
var channel1 = new MessageChannel();
var channel2 = new MessageChannel();
var channel3 = new MessageChannel();

//咱們的「update()」協程會記錄worker的響應
var update = coroutine(function* () {
    var input;
    
    while (true) {
        input = yield;
        console.log('result', input.data);
    }
});

//使用「worker1」鏈接「channel1」和「channel2」。
worker1.postMessage(null, [
    channel1.port2,
    channel2.port1
]);

//使用「worker2」鏈接「channel2」和「channel3」。
worker2.postMessage(null, [
    channel2.port2,
    channel3.port1
]);

//將咱們的協程「update()」鏈接到收到「channel3」任何消息。
channel3.port2.addEventListener('message', update);
channel3.port2.start();

//咱們的輸入數據 - 一組數字。
var array = new array(25)
    .fill(null)
    .map((v, i) => i*10);

//將每一個數組項發佈到「channel1」。
for (let item of array) {
    channel1.port1.postMessage(item);
}

除了咱們要發送給worker的數據以外,咱們還能夠發送一個消息端口列表,咱們但願將這些消息端口傳輸到worker上下文。這就是咱們對發送給worker的前兩條消息的處理方式。消息數據爲空,由於咱們沒有對它作任何事情。實際上,這些是咱們發送的惟一消息直接給worker。通訊的其他部分經過咱們建立的消息通道進行。開銷大的計算髮生在worker上,由於那是消息處理程序所在的位置。

使用Parallel.js

使用Parallel.js庫的目的是爲了使與Web worker交互儘量的無縫。在事實上,它完成了這本書的一個關鍵目標,它隱藏併發機制,並讓咱們可以專一於咱們正在構建的應用程序。

在本節中,咱們將介紹Parallel.js對worker通訊採起的方法以及將代碼傳遞給worker的通用方法。而後,咱們將介紹一些使用Parallel.js生成新worker線程的代碼。最後,咱們將探索這個庫已經提供的內置map/reduce功能。

它怎麼工做的

在本書中到目前爲止咱們使用的全部worker都是咱們本身創造的。咱們在worker中實現了消息事件處理,計算某些值,而後發佈響應。使用Parallel.js,咱們不實現worker。相反,咱們實現函數,而後將函數傳遞給由庫管理的workers。

這給咱們帶來了一些麻煩。咱們全部的代碼都在主線程中實現,這意味着更容易使用在主線程中實現的函數,由於咱們不須要使用importScripts()將它們導入到Web worker中。咱們也不須要經過腳本目錄建立Web worker並手動啓動它們。相反,咱們讓Parallel.js爲咱們生成新的worker,而後咱們能夠經過將函數和數據傳遞給他們來告訴worker該作什麼。那麼,這到底是如何工做的呢?

workers須要一個腳本參數。沒有有效的腳本,worker根本沒法工做。Parallel.js有一個簡單的eval腳本。這是傳遞給庫建立的worker的內容。而後,主線程中的API將在worker中進行評估代碼,並在須要與workers通訊時將其發送。

這是可行的,由於Parallel.js的目的不是暴露worker支持的大量功能。相反,目標是使worker通訊機制儘量無縫,同時提供最小的功能。這樣能夠輕鬆構建與咱們的應用程序相關的併發功能,而不是咱們永遠不會使用的許多其餘功能。

如下是咱們如何使用Parallel.js和它的eval腳本將數據和代碼傳遞給worker的說明:

image152.gif

生成workers

Parallel.js庫有一個做業的概念。做業的主要輸入是做業要處理的數據。做業的建立並不直接與後臺worker的建立聯繫在一塊兒。workers與Parallel.js中的做業不一樣;使用庫時,咱們不直接與worker交互。一旦咱們有了做業實例,而且它提供了咱們的數據,咱們就會使用一個做業方法來調用workers。

最基本的方法是spawn(),它將一個函數做爲參數並在Web worker中運行它。咱們傳遞給它一個函數做爲參數而且在web worker中運行。咱們傳遞給它的函數能夠返回結果,而後將它們解析爲一個thenable對象被spawn()函數返回。讓咱們看一下使用Parallel.js生成由一個web worker返回的新做業的代碼:

//一個數字輸入數組。
var array = new Array(2500)
    .fill(null)
    .map((v, i) => i);

//建立一個新的並行做業。
//在這裏沒有worker的建立 - 
//咱們只傳遞咱們正在使用的構造數據。
var job = new Parallel(array);

//爲咱們的「spawn()」做業啓動一個定時器。
console.time(`${array.length} items`);

//建立一個新的Web worker,並將咱們的數據和這個函數傳遞給它。
//咱們正在慢慢映射數組的每一個數字到它的平方。
job.spawn((coll) => {
    return coll.map((n) => {
        var i = 0;
        while(++i < n*n) {}
        return i;
    });

    //「spawn()」的返回值是thenable。含義
    //咱們能夠分配一個「then()」回調函數,
    //就像返回的promise那樣。
}).then((result) => {
    console.timeEnd(`${array.length} items`);
    //→2500 items:3408.078ms
});

那麼如今,這很不錯; 咱們沒必要擔憂任何單調的Web worker生命週期任務。咱們有一些數據和一些咱們想要應用於數據的函數,咱們但願與頁面上發生的其餘做業並行運行。最吸引人的是熟悉的thenable,從那裏返回的spawn()方法。它適用於咱們的併發應用程序,其中全部其餘應用程序都被視爲promise。

咱們記錄處理咱們提供的輸入數據的函數所需的時間。咱們只爲這個任務生成一個Web worker,所以在主線程中計算獲得的結果與原來的時間相同。除了釋放主線程來處理DOM事件和重繪以外,沒有實際的性能提高。咱們將看看是否可使用一個不一樣的方法來提高併發級別。

當咱們完成後,spawn()建立的worker當即終止。這爲咱們釋放了內存。可是,沒有併發級別來管理
spawn()的使用,若是咱們願意,咱們能夠連續調用它100次。

Mapping and reducing

在上一節中,咱們使用spawn()方法生成了一個worker線程。Parallel.js還有一個map()方法和一個reduce()方法。這個方法是讓事情變得更輕鬆。經過傳遞map()函數,庫將自動將其應用於做業數據中的每一個項。相似的語義適用於reduce()方法。讓咱們經過編寫一些代碼來看看它是如何工做的:

//一個數字輸入數組。
var array = new Array(2500)
    .fill(null)
    .map((v, i) => i);

//建立一個新的並行做業。
//這裏不會建立workers - 咱們只傳遞咱們正在使用的構造數據。
var job1 = new Parallel(array);

//爲咱們的「spawn()」做業啓動一個計時器。
console.time('JOB1');

//這裏的問題是Parallel.js會爲每一個數組元素建立一個新的worker,
//致使並行減速。
job1.map((n) => {
    var i = 0;
    while (++i < n*n) {}
    return i;
}).reduce((pair) => {
 
    //將數組項reduce爲一個總和。
    return pair[0] + pair[1];
}).then((data) => {
    console.log('job1 reduced', data);
    //→job1 reduced 5205208751
    
    console.timeEnd('job1');
    //→job1:59443.863ms
});

哎喲! 這是一個很是重要的性能 - 這裏發生了什麼?咱們在這裏看到的是一種稱爲並行減速的現象。當並行通訊開銷過多時,會發生這種減速。在這個特定示例中發生這種狀況的緣由是因爲Parallel.js在map()中處理數組的方式。每一個數組項都經過一個worker。這並不意味着建立了2500個worker - 一個worker用於數組中的每一個元素。建立的worker數量最多隻能達到4或者咱們在本書前面看到的navigator.hardwareConcurrency值。

在真正的開銷來自於發送的消息並收到了worker-5000個消息!這顯然不是最優的,由於由代碼中的定時器給證實。讓咱們看看是否可以作出一個對這些數字的大幅改善,同時保持大體相同的代碼結構:

//更快的執行。
var job2 = new Parallel(array);

console.time('job2');

//在映射數組以前,將數組拆分爲較小的數組塊。
//這樣,每一個Parallel.js worker都是處理數組而不是數組項。
//這避免了發送數千個Web worker消息。
job2.spawn((data) => {
    var index = 0,
        size = 1000,
        results = [];

    while (true) {
        let chunk = data.slice(index, index + size);

        if (chunk.length) {
            results.push(chunk);
            index += size;
        } else {
            return result;
        }
    }
}).map((array) => {

    //返回數組塊的映射。
    return array.map((n) => {
        var i = 0;
        while(++i < n * n) {}
        return i;
    });
}).reduce((pair) => {

    //將數組塊或數字reduce爲一個總和。
    return(Array.isArray(pair[0]) ? 
        pair[0].reduce((r, v) => r + v) :
        pair[0]) + (Array.isArray(pair[1]) ? 
        pair[1].reduce((r, v) => r + v) : 
        pair[1]);
}).then((data) => {

    console.log('job2 reduced', data);
    //→job2 resuced 5205208751
});

console.timeEnd('job2');
//→job2:2723.661ms

在這裏,咱們能夠看到的是在一樣的結果被產生,而且快得多。不一樣之處在於咱們開始工做以前將數組切片成的陣列較小的數組塊。這些數組就是傳遞給workers的項,而不是單個的數。因此映射做業略微有好的改變,而平方一個數字,它映射一個較小的數組到平方的數組。該reduce的邏輯是稍微複雜一些,但整體來講,咱們的作法是仍然是相同的。最重要的是,咱們已經刪除了大量的消息傳遞瓶頸,他們在第一次執行形成不可接受的性能缺陷。

就像spawn()方法在返回時清理worker同樣,Parallel.js中的map()和reduce()方法也是如此。
釋放worker的缺點是,不管什麼時候調用這些方法,都須要從新建立它們。咱們將在下一節討論這個挑戰。

worker線程池

本章的最後一節介紹了worker線程池的概念。在上一節關於Parallel.js的介紹中,咱們遇到了常常建立和終止worker的問題。這須要不少開銷。若是知道咱們可以運行的併發級別,那麼爲何不分配一個能夠承擔工做的靜態大小的worker線程池?

建立worker線程池的第一個設計任務是分配worker。下一步是經過將做業分發給池中的可用worker來計劃做業。最後,當全部worker都在運行時,咱們須要考慮忙碌狀態。讓咱們開始吧。

分配池

在考慮分配worker線程池以前,咱們須要查看整體worker抽象池。咱們如何但願它的外觀和行爲是怎樣的?理想狀況下,咱們但願抽象池的外觀和行爲相似於普通的專用worker。咱們能夠向線程池發佈消息並得到promise做爲響應。所以,雖然咱們沒法直接擴展Worker原型,但咱們能夠建立一個與Worker API很是類似的新的抽象。

咱們如今來看一些代碼吧。這是咱們將使用的初始抽象:

//表示Web worker線程的「池」,
//隱藏在後面單個Web worker接口的接口。
function WorkerPool(script) {
    //併發級別,或者Web worker要創造的數量。
    //這使用了「hardwareConcurrency」屬性(若是存在)。
    //不然,默認爲4,
    //由於這是對最多見的CPU結構進行的合理猜想。
    var concurrency = navigator.hardwareConcurrency || 4;

    //worker實例自己存儲在Map中,做爲鍵。
    //咱們立刻就會明白爲何。
    var workers = this.workers = new Map();

    //對於發佈的消息存在隊列,全部worker都很忙。
    //因此這可能永遠不會被用到的。
    var queue = this.queue = [];

    //用於下面建立worker程序實例,
    //以及添加事件監聽器。
    var worker;
    for (var i = 0; i < concurrency; i++) {
        worker = new Worker(script);
        worker.addEventListener('message', function(e) {

            //咱們使用「get()」方法來查找promise的「resolve()」函數。
            //該worker是關鍵。咱們調用的從worker返回的數據的解析器
            //而且能夠將其重置爲null。
            //這個很重要,由於它表示worker是空閒的,
            //能夠承擔更多工做。
            workers.get(this)(e.data);
            workers.set(this, null);

            //若是有排隊的數據,咱們獲得第一個
            //隊列中的「data」和「resolver」。
            //咱們用數據調用「postMessage()」以前,
            //咱們使用新的「resolve()」函數更新「workers」映射。
            if (queue.length) {
                var [data, resolver] = queue.shift();
                workers.set(this, resolver);
                this.postMessage(data);
            }
            
            //這是worker的初始設置,做爲在「worker」映射中的鍵。
            //它的值爲null,意味着沒有解析函數,它能夠承擔工做。
            this.workers.set(worker, null);
        }.bind(worker));
    }
}

建立新的WorkerPool時,給定的腳本用於生成線程池中的全部worker。該worker屬性是一個Map實例,worker實例自己是做爲鍵。咱們將worker存儲爲映射鍵的緣由是咱們能夠輕鬆地查找適當的解析器函數來調用。

當給定的worker程序響應時,調用咱們添加到每一個worker的消息事件處理程序,這就是咱們找的等待調用的解析器函數的地方。咱們不可能調用錯誤的解析器,由於給定的worker在完成當前任務以前不會接受新的任務。

調度任務

如今咱們將實現postMessage()方法。這是調用者用於將消息發佈到池中的一個worker。調用者不知道哪一個worker知足了他們的要求,他們也不關心。他們將promise做爲返回值,並以worker響應做爲解析值:

WorkerPool.prototype.postMessage = function(data) {

    //「workers」Map映射實例,其中包含全部存儲的Web worker。
    var workers = this.workers;

    //當全部worker都很忙時消息被放在「queue」隊列中
    var queue = this.queue;

    //嘗試找一個可用的worker。
    var worker = this.getWorker();

    //promise會當即返回給調用者,
    //即便沒有worker可用。
    return new Promise(function(resolve) {

        //若是找到了worker,咱們能夠更新Map映射,
        //使用worker做爲鍵,並使用「resolve()」函數做爲值。
        //若是沒有worker,那麼消息數據以及「resolve()」函數被推送到「queue」隊列。
        if (worker) {
            workers.set(worker, resolve);
            worker.postMessage(data);
        } else {
            queue.push([data, resolve]);
        }
    });
};

它是promise執行器函數,實際負責查找第一個可用的worker並在那裏發佈咱們的消息。當找到可用的worker時,咱們還在咱們的worker映射中設置了worker的解析器函數。若是池中沒有可用的worker程序,已發佈的消息則將進入隊列。此隊列在消息事件處理程序中清空。這是由於當worker返回消息時,這意味着worker是空閒的能夠承擔更多工做,而且在返回空閒狀態以前檢查是否有任何worker排隊。

該getWorker()方法是一個簡單的輔助函數爲咱們查找下一個可用的worker。咱們知道若是一個worker在workers映射中將其解析器函數設置爲null,則能夠執行該任務。最後,讓咱們看看這個worker線程池的應用:

//建立一個新的線程池和一個負載計數器。
var pool = new WorkerPool('worker.js');
var workload = 0;

document.getElementById('work').addEventListener('click', function(e) {

    //獲取咱們要傳遞給worker的數據,
    //併爲此負載建立計數器。
    var amount = +document.getElementById('amount').value,
        timer = 'Workload' + (++workload);

    console.time(timer);

    //將消息傳遞給線程池,並在promise完成時,中止計時器。
    pool.postMessage(amount).then(function(result) {
        console.timeEnd(timer);
    });

    //若是消息開始排隊,
    //咱們的線程池就是過載並顯示警告。
    if (pool.queue.length) {
        console.warn('worker pool is getting busy...');
    }
});

在這種使用場景中,咱們有幾個表單控件將參數化工做發送給worker。數字越大,工做時間越長; 它使用標準的work()函數來緩慢地對數字做平方。若是咱們使用大量數字並頻繁單擊按鈕將消息發佈到線程池中,那麼最終咱們將耗盡線程池中可用的資源。若是是這種狀況,咱們將顯示警告。可是,這僅用於故障排除,當線程池繁忙時,發佈的消息不會丟失,它們只是排隊等候。

小結

本章的重點是從代碼中刪除突兀的併發語法。它只是提升了咱們應用程序成功運行的可能性,由於咱們將擁有易於維護和構建的代碼。咱們解決的第一個問題是經過使全部內容都是併發的方式來編寫併發代碼。當沒有所涉及的猜想成分時,咱們的代碼就是一致的,不易受併發錯誤的影響。

而後,咱們研究了抽象Web worker通訊能夠採起的各類方法。輔助函數是一個選項,所以擴展了postMessage()方法。而後,當咱們須要UI響應時,咱們解決了Web workers的一些限制。即便咱們的大型數據集處理速度更快,咱們仍然存在更新UI的問題。這是經過將Web worker做爲生成器處理來完成的。

咱們沒必要本身編寫全部這些JavaScript併發工具方法。咱們花了一些時間來研究Parallel.js庫的各類功能和限制。咱們以介紹Web worker線程池結束了本章。這些消除了與worker建立和終止相關的大量開銷,而且它們極大地簡化了任務的分配和結果的協調。

這些都是適用於前端的併發話題。如今是時候切換一下,使用NodeJS查看後端的JavaScript併發性。

最後補充下書籍章節目錄

另外還有講解兩章nodeJs後端併發方面的,和一章項目實戰方面的,這裏就再也不貼了,有興趣可轉向https://github.com/yzsunlei/javascript_concurrency_translation查看。

相關文章
相關標籤/搜索