求不更學不動之Node.js多線程

伴隨10.5.0的發佈,Node.js 新增了對多線程的實驗性支持(worker_threads模塊)。html

爲何須要多線程?

Node.js因爲JS的執行在單一線程,致使CPU密集計算的任務可能會使主線程會處於繁忙的狀態,進而影響服務的性能,雖然能夠經過child_process模塊建立子進程的方式來解決,可是一方面進程之間沒法共享內存,另外一方面建立進程的開銷也不小。因此在10.5.0版本中Node.js提供了worker_threads模塊來支持多線程,一直以來被人所詬病的不擅長CPU密集計算有望成爲歷史。node

如何啓用多線程?

多線程目前仍然處於實驗階段,因此啓動時須要增長--experimental-workerflag才能生效。api

如何建立多線程?

worker_threads模塊中比較重要的幾個類:

MessageChannel: 用於建立異步、雙向通訊的通道實例。MessageChannel實例包含兩個屬性port1和port2,這兩個屬性都是MessagePort的實例。多線程

MessagePort: 用於表示MessageChannel通道的終端,用於Worker之間傳輸結構化數據、內存區域和其餘的MessagePort。MessagePort繼承了EventEmitter,所以可使用postMessage和on方法實現消息的傳遞與接收。app

Worker: 用於建立單獨的JS線程。異步

worker_threads模塊中比較重要的幾個屬性:

parentPort: 子線程中的parentPort指向能夠與主線程進行通訊的MessagePort。async

子線程向父線程發送消息函數

parentPort.postMessage(...)

子線程接受來自父線程的消息post

parentPort.on('message', (msg) => ...)

isMainThread: 用於區分當前文件是否在主線程中執行性能

workerData: 用於傳遞給Worker構造函數的data副本,在子線程中能夠經過workerData獲取到父進程傳入的數據。

瞭解經常使用類與屬性以後再來看一下代碼示例

const { Worker, parentPort, isMainThread } = require('worker_threads');
if (isMainThread) {
  const w = new Worker(__filename, {
    workerData: {
      name: 'Randal'
    }
  });
  w.postMessage(1e10);
  const startTime = Date.now();
  w.on('message', function(msg) {
    console.log('main thread get message: ' + msg);
    console.log('compute time ellapsed: ' + (Date.now() - startTime) / 1000);
  });
  console.log('main thread executing');
} else {
  const longComputation = (val) => {
    let sum = 0;
    for (let i = 0; i < val; i++) {
      sum += i;
    };
    return sum;
  };
  parentPort.on('message', (msg) => {
    console.log(`${workerData.name} worker get message: ` + msg);
    parentPort.postMessage(longComputation(msg));
  });
}

// 執行結果
main thread executing
Randal worker get message: 10000000000
main thread get message: 49999999990067860000
compute time ellapsed: 14.954

線程間如何傳輸數據?

port.postMessag(value[, transferList])

除了value以外,postMessage方法還支持傳入transferList參數,transferList是一個List,支持的數據類型包括ArrayBuffer和MessagePort對象,transferList中的對象在傳輸完成後,在發送對象的線程中就不能夠繼續使用了。

const { Worker, isMainThread, parentPort } = require('worker_threads');
// 主線程
if (isMainThread) {
  const sab = new ArrayBuffer(Int32Array.BYTES_PER_ELEMENT * 100);
  const ia = new Int32Array(sab);

  for (let i = 0; i < ia.length; i++) {
    ia[i] = i;
  }
  console.log("this is the main thread");
  for (let i = 0; i < 1; i++) {
    let w = new Worker(__filename);
    console.log('before transfer: ', sab);
    w.postMessage(null, [
      sab
    ]);
    setTimeout(() => {
      console.log('after transfer: ', sab);
    }, 1000);
  }
} else {
  console.log("this isn't main thread");
}
// 輸出結果
this is the main thread
before transfer:  ArrayBuffer { byteLength: 400 }
this isn't main thread
after transfer:  ArrayBuffer { byteLength: 0 }

若是ArrayBuffer是經過value傳輸的(且在transferList中不存在),則傳輸過去的是副本,以下所示:

w.postMessage(sab);

// 輸出結果
this is the main thread
before transfer:  ArrayBuffer { byteLength: 400 }
this isn't main thread
after transfer:  ArrayBuffer { byteLength: 400 }

線程間如何共享內存?

輪到SharedArrayBuffer出場了,若是postMessage中的value是SharedArrayBuffer的話,則線程之間就能夠共享內存,以下面例子所示:

const { Worker, isMainThread, parentPort } = require('worker_threads');
// 主線程
if (isMainThread) {
  const sab = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * 5);
  const ia = new Int32Array(sab);

  for (let i = 0; i < ia.length; i++) {
    ia[i] = i;
  }
  for (let i = 0; i < 2; i++) {
    let w = new Worker(__filename);
    w.postMessage(sab);
   w.on('message', () => {
    console.log(ia);
   });
  }
} else {
  parentPort.on('message', (msg) => {
    const ia = new Int32Array(msg, 0, 1);
    ia[0] = ia[0] + 1;
    parentPort.postMessage('done');
  });
}

// 輸出結果
Int32Array [ 1, 1, 2, 3, 4 ]
Int32Array [ 2, 1, 2, 3, 4 ]

開源庫

推薦一個封裝threads的開源庫microjob

Microjob is a tiny wrapper for Node.js threads and is intended to perform heavy CPU loads using anonymous functions. So, Microjob treats Node.js threads as temporary working units: if you need to spawn a long-living thread, then you should use the default API.

官方示例以下:

(async () => {
  const { job } = require('microjob')

  try {
    // this function will be executed in another thread
    const res = await job(() => {
      let i = 0
      for (i = 0; i < 1000000; i++) {
        // heavy CPU load ...
      }

      return i
    })

    console.log(res) // 1000000
  } catch (err) {
    console.error(err)
  }
})()

參考資料

https://medium.com/dailyjs/th...
https://nodejs.org/dist/lates...

相關文章
相關標籤/搜索