最近在面試求職的時候有被問到 Node 有沒有辦法實現多線程,我一拍腦殼,害,這個我會:javascript
"利用 cluster 模式經過 fork() 實例化多個 node 服務實例, 好比一個8核服務器,就能夠啓動8個獨立實例,相互之間並不影響,十分穩定.最典型的工具就是 pm2."html
"恩恩,你說的是 cluster 多進程模式,有沒有多線程方式呢?"java
"啊...這個...那就經過 child_process 去 fork/spawn 一個子進程..."node
"這也是調起了一個子進程,並非真正的多線程,有關注過 Node 的一些新特性/api 嗎?"git
"餓...這個(此時的我是當真不知道 worker_threads...."github
而後結局很明顯,在歡聲笑語中打出GG...面試
不甘心的我去Node中文文檔查閱了相關資料,果不其然,我仍是太年輕了:)算法
害! 爲啥 Node中文文檔竟然英文API,因而我學習的同時順便翻譯了一哈.有興趣學習或者瞭解 worker_threads 模塊的同窗能夠點擊這裏查閱~~(谷歌翻譯+做者人肉翻譯,若有錯謬,還望海涵)~~segmentfault
ps: Node不是單線程的麼? 實際上並非, Node 單線程是指 v8 引擎執行 js 時是單線程的,就好像瀏覽器一個Tab進程中,就有GUI渲染線程, JS引擎線程,事件觸發線程,定時器觸發線程,異步請求http線程. Node.js 異步原理實際上是經過 libuv 的線程池完成異步調用的;當你啓動一個Node服務時,打開任務管理器,你會發現 Node 任務的線程數爲7(主線程,編譯/優化線程,分析器線程,垃圾回收線程等)api
首先確認本身 Node 版本環境支持 worker_threads 模塊.如不支持,能夠經過nvm下個最新的 Node.
筆者在 Node.js 開發版本v12.6.0上測試該模塊. 首先引入模塊:
// main.js
const { Worker, isMainThread, parentPort, workerData, MessageChannel } = require('worker_threads');
if (isMainThread) {
console.log('我是主線程', isMainThread);
const worker = new Worker(__filename);
} else {
console.log('我不是主線程', isMainThread);
}
=> 我是主線程 true
=> 我不是主線程 false
複製代碼
例子經過 new Worker 生成子線程從新執行了 main.js ,執行完畢, worker子線程自動銷燬.
__filename
你能夠寫成你所要執行的具體 worker.js
所在的路徑.
除了經過 new Worker 去加載執行 js 文件,有沒有辦法直接執行 js 代碼呢, 以下所示:
let code = ` for(let i = 0;i < 5;i++) { console.log('worker線程執行中:', i); } `
let worker = new Worker(code, { eval: true });
console.log('主線程執行完畢');
=> 主線程執行完畢
=> worker線程執行中: 0
=> worker線程執行中: 1
=> worker線程執行中: 2
=> worker線程執行中: 3
=> worker線程執行中: 4
複製代碼
若是經過 port 設置了 port.on 監聽事件, 除非手動 terminate 終結, 不然線程不會自動中斷(或者和我同樣使用 port.once 即監聽一次)
即經過workerData
完成完成線程數據初始化
const data = {
name: 'ego同窗',
age: 23,
sex: 'male',
addr: '深圳南山',
arr: [{ skill: 'coding'}, { hobby: 'basketball' }]
}
if (isMainThread) {
const worker = new Worker(__filename, { workerData: data });
} else {
workerData.age = 16;
workerData.arr[0].skill = 'sleep';
console.log(data);
console.log(workerData);
}
=>
{
name: 'ego同窗',
age: 23,
sex: 'male',
addr: '深圳南山',
arr: [ { skill: 'coding' }, { hobby: 'basketball' } ]
}
{
name: 'ego同窗',
age: 16,
sex: 'male',
addr: '深圳南山',
arr: [ { skill: 'sleep' }, { hobby: 'basketball' } ]
}
複製代碼
if (isMainThread) {
const worker = new Worker(__filename);
worker.postMessage({name: 'ego同窗'});
worker.once('message', (message) => {
console.log('主線程接收信息:', message);
});
} else {
parentPort.once('message', (obj) => {
console.log('子線程接收信息:', obj);
parentPort.postMessage(obj.name);
})
}
=> 子線程接收信息: { name: 'ego同窗' }
=> 主線程接收信息: ego同窗
複製代碼
parentPort
是生成 worker 線程時自動建立的MessagePort
實例,用於與父進程進行通訊.
//main.js
const path = require('path');
const { port1, port2 } = new MessageChannel();
if (isMainThread) {
const worker1 = new Worker(__filename);
const worker2 = new Worker(path.join(__dirname, 'worker.js'));
worker1.postMessage({ port1 }, [ port1 ]);
worker2.postMessage({ port2 }, [ port2 ]);
} else {
parentPort.once('message', ({ port1 }) => {
console.log('子線程1收到port1', port1);
port1.once('message', (msg) => {
console.log('子線程1收到', msg);
})
port1.postMessage('port1 向 port2 發消息啦');
})
}
// worker.js
const { parentPort } = require('worker_threads');
parentPort.once('message', ({ port2 }) => {
console.log('子線程2收到port2');
port2.once('message', (msg) => {
console.log('子線程2收到', msg);
})
port2.postMessage('這裏是port2, over!');
})
=>
子線程1收到port1
子線程2收到port2
子線程1收到 這裏是port2, over!
子線程2收到 port1 向 port2 發消息啦
複製代碼
簡單來講就是父線程將MessageChannel
類生成的MessagePort
對象實例分別發送到子線程中,兩個子線程便可經過 port1
,port2
進行通訊.
菜徐坤疑問:
worker線程實例可不能夠經過workerData
傳遞到另外一個worker線程裏直接使用呢?試一下:
// main.js
const worker1 = new Worker(__filename);
const worker2 = new Worker(path.join(__dirname, 'worker.js'), { workerData: worker1 });
// worker.js
const { workerData } = require('worker_threads');
console.log(workerData);
=>
internal/worker.js:144
this[kPort].postMessage({
^
DOMException [DataCloneError]: (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.ref();
...<omitted>... } could not be cloned.
複製代碼
拋出數據克隆錯誤 !
OK,咱們來看看 Node worker_thread模塊源碼 以下:
// node/lib/internal/worker.js
...
const url = options.eval ? null : pathToFileURL(filename);
// Set up the C++ handle for the worker, as well as some internal wiring.
// 爲工做程序設置C ++句柄以及一些內部連線。
this[kHandle] = new WorkerImpl(url, options.execArgv);
...
this[kPort] = this[kHandle].messagePort;
this[kPort].on('message', (data) => this[kOnMessage](data));
...
const { port1, port2 } = new MessageChannel();
this[kPublicPort] = port1;
this[kPublicPort].on('message', (message) => this.emit('message', message));
setupPortReferencing(this[kPublicPort], this, 'message');
this[kPort].postMessage({
type: messageTypes.LOAD_SCRIPT,
filename,
doEval: !!options.eval,
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
workerData: options.workerData,
publicPort: port2,
manifestSrc: getOptionValue('--experimental-policy') ?
require('internal/process/policy').src :
null,
hasStdin: !!options.stdin
}, [port2]);
// Actually start the new thread now that everything is in place.
// 如今,一切就緒,實際上開始新線程。
this[kHandle].startThread();
複製代碼
workerData
是經過 port.postMessagePort(value[, transferList])
克隆副本傳輸給目標線程,即workerData
經過結構化克隆算法進行復制:
It builds up a clone by recursing through the input object while maintaining a map of previously visited references in order to avoid infinitely traversing cycles.
經過遞歸遍歷輸入對象而創建一個副本,同時保持先前訪問的引用的映射,以免無限遍歷循環。
該算法不復制函數、錯誤、屬性描述符或原型鏈,能夠包含循環引用和類型化數組.
port.postMessage(value[, transferList])
transferList
能夠是ArrayBuffer
和MessagePort
對象的列表, 傳遞ArrayBuffer
後,訪問權限被修改,歸屬於消息接收方,它們將再也不可用於頻道的發送方 !!!
線程間經過 clone 第一個參數來互相傳遞消息, 那若是我不想處處 clone 處處傳遞數據呢, 有辦法解決嗎? 答案是有的.
cluster
和 child_process
時一般使用 SharedArrayBuffer
來實現須要多進程共享的內存, 一樣的 value
能夠包含SharedArrayBuffer
實例,從而能夠在任一線程訪問這些實例 !
const { Worker, isMainThread, parentPort, MessageChannel, threadId } = require('worker_threads');
if (isMainThread) {
const worker1 = new Worker(__filename);
const worker2 = new Worker(__filename);
const { port1, port2 } = new MessageChannel();
const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4));
// 輸出一下sharedUint8Array
console.log(sharedUint8Array);
worker1.postMessage({ uPort: port1, data: sharedUint8Array }, [ port1 ]);
worker2.postMessage({ uPort: port2, data: sharedUint8Array }, [ port2 ]);
worker2.once('message', (message) => {
console.log(`${message}, 查看共享內存:${sharedUint8Array}`);
});
} else {
parentPort.once('message', ({ uPort, data }) => {
uPort.postMessage(`我是${threadId}號線程`);
uPort.on('message', (msg) => {
console.log(`${threadId}號收到:${msg}`);
if (threadId === 2) {
data[1] = 2;
parentPort.postMessage('2號線程修改了共享內存!!!');
}
console.log(`${threadId}號查看共享內存:${data}`);
})
})
}
=>
Uint8Array [ 0, 0, 0, 0 ]
2號收到:我是1號線程
2號線程修改了共享內存!!!, 查看共享內存:0,2,0,0
1號收到:我是2號線程
2號查看共享內存:0,2,0,0
1號查看共享內存:0,2,0,0
複製代碼
經過共享內存, 咱們在一個線程中修改它, 意味全部線程中中進行了修改, 意味着數據的傳遞修改無需屢次序列化clone, 是否是方便不少呢.
若是不知足共享一個 Buffer 數組, 一般數據都是以對象的形式來存儲傳遞的, 咱們能夠建立相似的結構來達到咱們的目的.
worker 工做線程通常有兩種使用方法:
官方推薦線程池的使用方法, 畢竟一次次的建立銷燬 worker 須要佔用不小的開銷, 咱們能夠根據實際業務狀況來選擇本身的使用方式.
下面咱們來實現一個簡單的 worker_thread 線程池:
// main.js
const path = require('path');
const { Worker } = require('worker_threads');
class WorkerPool {
_workers = []; // 線程引用數組
_activeWorkers = []; // 激活的線程數組
_queue = []; // 任務隊列
constructor(workerPath, numOfThreads) {
this.workerPath = workerPath;
this.numOfThreads = numOfThreads;
this.init();
}
// 初始化多線程
init() {
if (this.numOfThreads < 1) {
throw new Error('線程池最小線程數應爲1');
}
for (let i = 0;i < this.numOfThreads; i++) {
const worker = new Worker(this.workerPath);
this._workers[i] = worker;
this._activeWorkers[i] = false;
}
}
// 結束線程池中全部線程
destroy() {
for (let i = 0; i < this.numOfThreads; i++) {
if (this._activeWorkers[i]) {
throw new Error(`${i}號線程仍在工做中...`);
}
this._workers[i].terminate();
}
}
// 檢查是否有空閒worker
checkWorkers() {
for (let i = 0; i < this.numOfThreads; i++) {
if (!this._activeWorkers[i]) {
return i;
}
}
return -1;
}
run(getData) {
return new Promise((resolve, reject) => {
const restWorkerId = this.checkWorkers();
const queueItem = {
getData,
callback: (error, result) => {
if (error) {
return reject(error);
}
return resolve(result);
}
}
// 線程池已滿,將任務加入任務隊列
if (restWorkerId === -1) {
this._queue.push(queueItem);
return null;
}
// 空閒線程執行任務
this.runWorker(restWorkerId, queueItem);
})
}
async runWorker(workerId, queueItem) {
const worker = this._workers[workerId];
this._activeWorkers[workerId] = true;
// 線程結果回調
const messageCallback = (result) => {
queueItem.callback(null, result);
cleanUp();
};
// 線程錯誤回調
const errorCallback = (error) => {
queueItem.callback(error);
cleanUp();
};
// 任務結束消除舊監聽器,若還有待完成任務,繼續完成
const cleanUp = () => {
worker.removeAllListeners('message');
worker.removeAllListeners('error');
this._activeWorkers[workerId] = false;
if (!this._queue.length) {
return null;
}
this.runWorker(workerId, this._queue.shift());
}
// 線程建立監聽結果/錯誤回調
worker.once('message', messageCallback);
worker.once('error', errorCallback);
// 向子線程傳遞初始data
worker.postMessage(queueItem.getData);
}
}
複製代碼
建立一個workerPool
類,在構造函數裏傳入要執行的js文件路徑和要啓動的線程池數,而後經過init()
方法初始化多線程,並將它們的引用存儲在_workers
數組裏,初始狀態默認都爲false
不活躍存儲在_activeWorkers
數組中.
run
方法分配執行任務,返回Promise
調用任務的回調去resolve/reject
,使用空閒線程runWorker
執行任務,若是暫時沒有空閒線程,就把任務push
進_queue
任務隊列等待執行.
runWorker
使用空閒線程指定任務,定義好結果回調和error
回調,經過設置子線程的監聽事件傳遞迴調結果,把子線程的計算結果傳遞出來
而後咱們建立一個 worker.js
在裏面寫CPU密集耗時操做:
// worker.js
const { isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
throw new Error('Its not a worker');
}
const doCalcs = (data) => {
const collection = [];
for (let i = 0; i < 10; i++) {
collection[i] = Math.round(Math.random() * 1000);
}
return collection.sort((a, b) => { return a - b });
};
parentPort.on('message', (data) => {
const result = doCalcs(data);
parentPort.postMessage(result);
});
複製代碼
這裏就簡單寫了點排序,方便輸出...
而後經過new WorkerPool
生成實例執行任務:
const pool = new WorkerPool(path.join(__dirname, 'worker.js'), 4);
const items = [...new Array(10)].fill(null);
Promise.all(items.map(async (_, i) => {
const res = await pool.run();
console.log(`任務${i}完成結果:`, res);
})).then(() => {
console.log('全部任務完成 !');
// 銷燬線程池
pool.destroy();
});
=>
任務1完成結果: [
45, 96, 197, 314,
606, 631, 648, 648,
658, 874
]
任務4完成結果: [
68, 86, 124, 330,
330, 469, 533, 766,
772, 900
]
任務5完成結果: [
107, 344, 370, 499,
504, 627, 750, 840,
873, 972
]
任務6完成結果: [
218, 257, 282, 284,
500, 607, 699, 723,
739, 826
]
任務7完成結果: [
31, 98, 141, 190,
428, 507, 685, 686,
794, 945
]
任務8完成結果: [
27, 100, 188, 245,
471, 497, 514, 620,
645, 993
]
任務9完成結果: [
193, 336, 407, 455,
478, 534, 564, 651,
755, 963
]
任務2完成結果: [
319, 337, 398, 549,
587, 659, 670, 781,
792, 843
]
任務3完成結果: [
173, 188, 273, 406,
445, 450, 582, 678,
727, 882
]
任務0完成結果: [
38, 76, 134, 239,
439, 468, 568, 696,
910, 923
]
全部任務完成 !
複製代碼
worker_threads
模塊提供了真正的單進程多線程使用方法,咱們能夠將CPU密集的任務交給線程去解決,等有告終果後經過MessageChannel
跨線程通訊/或者使用共享內存.
固然,上面這些例子都是最簡單最基本的使用方式,真正運用到生產中根據不一樣的業務複雜度,worker_threads
可能會有各類花裏胡哨的運用和實現.
害 每日扎心一問:今天你找到新工做了嘛
這些文章在我學習過程給予我很是大的幫助,有些做者已經在實際生產業務中有所實踐,他們的學習過程和經驗很是值得咱們學習~有興趣的同窗能夠選讀一些加以學習,有所收穫!
Node.js 真·多線程 Worker Threads 初探