Node.js 真·多線程 Worker Threads 初探

基本信息

筆者在 Node.js 最新的開發版本 v11.4.0 上測試該特性,目前須要添加 flag 才能引入 Worker Threads,例如:javascript

node --experimental-worker index.js
複製代碼

Worker Threads 特性是在2018年6月20日的 v10.5.0 版本引入的:html

node/CHANGELOGjava

目前該模塊處於 Stability 1 - Experimental 階段,改動會較大,不建議用於生產環境。node

模塊接口git

const {
  isMainThread, parentPort, workerData, threadId,
  MessageChannel, MessagePort, Worker
} = require('worker_threads');
複製代碼

該模塊對象和類很是少,只有4個對象和3個類。github

  • isMainThread:false 表示當前爲 worker 線程,false 表示爲主線程
  • parentPort: 在 worker 線程裏是表示父進程的 MessagePort 類型的對象,在主線程裏爲 null
  • workerData: 在 worker 線程裏是父進程建立 worker 線程時的初始化數據,在主線程裏是 undefined
  • threadId: 在 worker 線程裏是線程 ID,在父進程裏是 0
  • MessageChannel: 包含兩個已經互相可以誇線程通訊的 MessagePort 類型對象,可用於建立自定義的通訊頻道,可參考樣例二的實現。
  • MessagePort: 用於跨線程通訊的句柄,繼承了 EventEmitter,包括 close message 事件用於接收對象關閉和發送的消息,以及 close postMessage 等操做。
  • Worker: 主線程用於建立 worker 線程的對象類型,包含全部的 MessagePort 操做以及一些特有的子線程 meta data 操做。構造函數的第一個參數是子線程執行的入口腳本程序,第二個參數包含一些配置項,能夠指定一些初始參數。 詳細內容見文檔

內存模型的變動

在使用 clusterchild_process 時一般使用 SharedArrayBuffer 來實現須要多進程共享的內存。api

port.postMessage(value[, transferList])
複製代碼

如今 Worker Threads 模塊在 API 層不建議多線程共享內存,第一個參數 value 的值會被 clone 一份在接受消息的線程。transferList 只能傳遞 ArrayBuffer 或者 MessagePort 對象,傳遞 ArrayBuffer 會修改該 Buffer 的訪問權限給接受消息的線程,傳遞 MessagePort 能夠參考樣例二。瀏覽器

全部跨線程消息的通訊都經過走底層的 v8 序列化實現,更具體的 Worker Threads 和 v8 多線程模型以及和瀏覽器的 Web Worker 標準的關係暫不展開。bash

樣例一:主線程和 worker 線程通訊計數,計數到 5 後 worker 線程自殺

const {
  isMainThread, parentPort, workerData, threadId,
  MessageChannel, MessagePort, Worker
} = require('worker_threads');

function mainThread() {
  const worker = new Worker(__filename, { workerData: 0 });
  worker.on('exit', code => { console.log(`main: worker stopped with exit code ${code}`); });
  worker.on('message', msg => {
    console.log(`main: receive ${msg}`);
    worker.postMessage(msg + 1);
  });
}

function workerThread() {
  console.log(`worker: threadId ${threadId} start with ${__filename}`);
  console.log(`worker: workerDate ${workerData}`);
  parentPort.on('message', msg => {
    console.log(`worker: receive ${msg}`);
    if (msg === 5) { process.exit(); }
    parentPort.postMessage(msg);
  }),
  parentPort.postMessage(workerData);
}

if (isMainThread) {
  mainThread();
} else {
  workerThread();
}
複製代碼

輸出結果:多線程

worker: threadId 1 start with /Users/azard/test/index.js
worker: workerDate 0
main: receive 0
worker: receive 1
main: receive 1
worker: receive 2
main: receive 2
worker: receive 3
main: receive 3
worker: receive 4
main: receive 4
worker: receive 5
main: receive 5
main: worker stopped with exit code 0
複製代碼

樣例二:使用 MessageChannel 讓兩個子線程直接通訊

const {
  isMainThread, parentPort, workerData, threadId,
  MessageChannel, MessagePort, Worker
} = require('worker_threads');

if (isMainThread) {
  const worker1 = new Worker(__filename);
  const worker2 = new Worker(__filename);
  const subChannel = new MessageChannel();
  worker1.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  worker2.postMessage({ hereIsYourPort: subChannel.port2 }, [subChannel.port2]);
} else {
  parentPort.once('message', (value) => {
    value.hereIsYourPort.postMessage('hello');
    value.hereIsYourPort.on('message', msg => {
      console.log(`thread ${threadId}: receive ${msg}`);
    });
  });
}
複製代碼

輸出:

thread 2: receive hello
thread 1: receive hello
複製代碼

總結

如今 Node.js 有了真多線程,不須要再用 cluster 或者 child_process 的多進程來處理一些問題了,相關的框架、庫的模型在 Worker Threads 穩定後也能夠開始進行迭代更新。

相關文章
相關標籤/搜索