Node.js 多線程徹底指南

翻譯:瘋狂的技術宅
原文: https://blog.logrocket.com/a-...

本文首發微信公衆號:jingchengyideng
歡迎關注,天天都給你推送新鮮的前端技術文章javascript


不少人都想知道單線程的 Node.js 怎麼能與多線程後端競爭。考慮到其所謂的單線程特性,許多大公司選擇 Node 做爲其後端彷佛違反直覺。要想知道緣由,必須理解其單線程的真正含義。前端

JavaScript 的設計很是適合在網上作比較簡單的事情,好比驗證表單,或者說建立彩虹色的鼠標軌跡。 在2009年,Node.js的創始人 Ryan Dahl使開發人員能夠用該語言編寫後端代碼。java

一般支持多線程的後端語言具備各類機制,用於在線程和其餘面向線程的功能之間同步數據。要向 JavaScript 添加對此類功能的支持,須要修改整個語言,這不是 Dahl 的目標。爲了讓純 JavaScript 支持多線程,他必須想一個變通方法。接下來讓咱們探索一下其中的奧祕……node

Node.js 是如何工做的

Node.js 使用兩種線程:event loop 處理的主線程和 worker pool 中的幾個輔助線程。git

事件循環是一種機制,它採用回調(函數)並註冊它們,準備在未來的某個時刻執行。它與相關的 JavaScript 代碼在同一個線程中運行。當 JavaScript 操做阻塞線程時,事件循環也會被阻止。程序員

工做池是一種執行模型,它產生並處理單獨的線程,而後同步執行任務,並將結果返回到事件循環。事件循環使用返回的結果執行提供的回調。github

簡而言之,它負責異步 I/O操做 —— 主要是與系統磁盤和網絡的交互。它主要由諸如 fs(I/O 密集)或 crypto(CPU 密集)等模塊使用。工做池用 libuv 實現,當 Node 須要在 JavaScript 和 C++ 之間進行內部通訊時,會致使輕微的延遲,但這幾乎不可察覺。面試

基於這兩種機制,咱們能夠編寫以下代碼:算法

fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
 if (err) {
   return null;
 }

 console.log(content.toString());
});

前面提到的 fs 模塊告訴工做池使用其中一個線程來讀取文件的內容,並在完成後通知事件循環。而後事件循環獲取提供的回調函數,並用文件的內容執行它。typescript

以上是非阻塞代碼的示例,咱們沒必要同步等待某事的發生。只需告訴工做池去讀取文件,並用結果去調用提供的函數便可。因爲工做池有本身的線程,所以事件循環能夠在讀取文件時繼續正常執行。

在不須要同步執行某些複雜操做時,這一切都相安無事:任何運行時間太長的函數都會阻塞線程。若是應用程序中有大量這類功能,就可能會明顯下降服務器的吞吐量,甚至徹底凍結它。在這種狀況下,沒法繼續將工做委派給工做池。

在須要對數據進行復雜的計算時(如AI、機器學習或大數據)沒法真正有效地使用 Node.js,由於操做阻塞了主(且惟一)線程,使服務器無響應。在 Node.js v10.5.0 發佈以前就是這種狀況,在這一版本增長了對多線程的支持。

簡介:worker_threads

worker_threads 模塊容許咱們建立功能齊全的多線程 Node.js 程序。

thread worker 是在單獨的線程中生成的一段代碼(一般從文件中取出)。

注意,術語 thread workerworkerthread 常常互換使用,他們都指的是同一件事。

要想使用 thread worker,必須導入 worker_threads 模塊。讓咱們先寫一個函數來幫助咱們生成這些thread worker,而後再討論它們的屬性。

type WorkerCallback = (err: any, result?: any) => any;

export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
 const worker = new Worker(path, { workerData });

 worker.on('message', cb.bind(null, null));
 worker.on('error', cb);

 worker.on('exit', (exitCode) => {
   if (exitCode === 0) {
     return null;
   }

   return cb(new Error(`Worker has stopped with code ${exitCode}`));
 });

 return worker;
}

要建立一個 worker,首先必須建立一個 Worker 類的實例。它的第一個參數提供了包含 worker 的代碼的文件的路徑;第二個參數提供了一個名爲 workerData 的包含一個屬性的對象。這是咱們但願線程在開始運行時能夠訪問的數據。

請注意:無論你是用的是 JavaScript, 仍是最終要轉換爲 JavaScript 的語言(例如,TypeScript),路徑應該始終引用帶有 .js.mjs 擴展名的文件。

我還想指出爲何使用回調方法,而不是返回在觸發 message 事件時將解決的 promise。這是由於 worker 能夠發送許多 message 事件,而不是一個。

正如你在上面的例子中所看到的,線程間的通訊是基於事件的,這意味着咱們設置了 worker 在發送給定事件後調用的偵聽器。

如下是最多見的事件:

worker.on('error', (error) => {});

只要 worker 中有未捕獲的異常,就會發出 error 事件。而後終止 worker,錯誤能夠做爲提供的回調中的第一個參數。

worker.on('exit', (exitCode) => {});

在 worker 退出時會發出 exit 事件。若是在worker中調用了 process.exit(),那麼 exitCode 將被提供給回調。若是 worker 以 worker.terminate() 終止,則代碼爲1。

worker.on('online', () => {});

只要 worker 中止解析 JavaScript 代碼並開始執行,就會發出 online 事件。它不經常使用,但在特定狀況下能夠提供信息。

worker.on('message', (data) => {});

只要 worker 將數據發送到父線程,就會發出 message 事件。

如今讓咱們來看看如何在線程之間共享數據。

在線程之間交換數據

要將數據發送到另外一個線程,能夠用 port.postMessage() 方法。它的原型以下:

port.postMessage(data[, transferList])

port 對象能夠是 parentPort,也能夠是 MessagePort 的實例 —— 稍後會詳細講解。

數據參數

第一個參數 —— 這裏被稱爲 data —— 是一個被複制到另外一個線程的對象。它能夠是複製算法所支持的任何內容。

數據由結構化克隆算法進行復制。引用自 Mozilla:

它經過遞歸輸入對象來進行克隆,同時保持以前訪問過的引用的映射,以免無限遍歷循環。

該算法不復制函數、錯誤、屬性描述符或原型鏈。還須要注意的是,以這種方式複製對象與使用 JSON 不一樣,由於它能夠包含循環引用和類型化數組,而 JSON 不能。

因爲可以複製類型化數組,該算法能夠在線程之間共享內存。

在線程之間共享內存

人們可能會說像 clusterchild_process 這樣的模塊在好久之前就開始使用線程了。這話對,也不對。

cluster 模塊能夠建立多個節點實例,其中一個主進程在它們之間對請求進行路由。集羣可以有效地增長服務器的吞吐量;可是咱們不能用 cluster 模塊生成一個單獨的線程。

人們傾向於用 PM2 這樣的工具來集中管理他們的程序,而不是在本身的代碼中手動執行,若是你有興趣,能夠研究一下如何使用 cluster 模塊。

child_process 模塊能夠生成任何可執行文件,不管它是不是用 JavaScript 寫的。它和 worker_threads 很是類似,但缺乏後者的幾個重要功能。

具體來講 thread workers 更輕量,而且與其父線程共享相同的進程 ID。它們還能夠與父線程共享內存,這樣能夠避免對大的數據負載進行序列化,從而更有效地來回傳遞數據。

如今讓咱們看一下如何在線程之間共享內存。爲了共享內存,必須將 ArrayBufferSharedArrayBuffer 的實例做爲數據參數發送到另外一個線程。

這是一個與其父線程共享內存的 worker:

import { parentPort } from 'worker_threads';

parentPort.on('message', () => {
 const numberOfElements = 100;
 const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements);
 const arr = new Int32Array(sharedBuffer);

 for (let i = 0; i < numberOfElements; i += 1) {
   arr[i] = Math.round(Math.random() * 30);
 }

 parentPort.postMessage({ arr });
});

首先,咱們建立一個 SharedArrayBuffer,其內存須要包含100個32位整數。接下來建立一個 Int32Array 實例,它將用緩衝區來保存其結構,而後用一些隨機數填充數組並將其發送到父線程。

在父線程中:

import path from 'path';

import { runWorker } from '../run-worker';

const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => {
 if (err) {
   return null;
 }

 arr[0] = 5;
});

worker.postMessage({});

arr [0] 的值改成5,實際上會在兩個線程中修改它。

固然,經過共享內存,咱們冒險在一個線程中修改一個值,同時也在另外一個線程中進行了修改。可是咱們在這個過程當中也獲得了一個好處:該值不須要進行序列化就能夠另外一個線程中使用,這極大地提升了效率。只需記住管理數據正確的引用,以便在完成數據處理後對其進行垃圾回收。

共享一個整數數組當然很好,但咱們真正感興趣的是共享對象 —— 這是存儲信息的默認方式。不幸的是,沒有 SharedObjectBuffer 或相似的東西,但咱們能夠本身建立一個相似的結構

transferList參數

transferList 中只能包含 ArrayBufferMessagePort。一旦它們被傳送到另外一個線程,就不能再次被傳送了;由於內存裏的內容已經被移動到了另外一個線程。

目前,還不能經過 transferList(可使用 child_process 模塊)來傳輸網絡套接字。

建立通訊渠道

線程之間的通訊是經過 port 進行的,port 是 MessagePort 類的實例,並啓用基於事件的通訊。

使用 port 在線程之間進行通訊的方法有兩種。第一個是默認值,這個方法比較容易。在 worker 的代碼中,咱們從worker_threads 模塊導入一個名爲 parentPort 的對象,並使用對象的 .postMessage() 方法將消息發送到父線程。

這是一個例子:

import { parentPort } from 'worker_threads';
const data = {
 // ...
};

parentPort.postMessage(data);

parentPort 是 Node.js 在幕後建立的 MessagePort 實例,用於與父線程進行通訊。這樣就能夠用 parentPortworker 對象在線程之間進行通訊。

線程間的第二種通訊方式是建立一個 MessageChannel 並將其發送給 worker。如下代碼是如何建立一個新的 MessagePort 並與咱們的 worker 共享它:

import path from 'path';
import { Worker, MessageChannel } from 'worker_threads';

const worker = new Worker(path.join(__dirname, 'worker.js'));

const { port1, port2 } = new MessageChannel();

port1.on('message', (message) => {
 console.log('message from worker:', message);
});

worker.postMessage({ port: port2 }, [port2]);

在建立 port1port2 以後,咱們在 port1 上設置事件監聽器並將 port2 發送給 worker。咱們必須將它包含在 transferList 中,以便將其傳輸給 worker 。

在 worker 內部:

import { parentPort, MessagePort } from 'worker_threads';

parentPort.on('message', (data) => {
 const { port }: { port: MessagePort } = data;

 port.postMessage('heres your message!');
});

這樣,咱們就能使用父線程發送的 port 了。

使用 parentPort 不必定是錯誤的方法,但最好用 MessageChannel 的實例建立一個新的 MessagePort,而後與生成的 worker 共享它。

請注意,在後面的例子中,爲了簡便起見,我用了 parentPort

使用 worker 的兩種方式

能夠經過兩種方式使用 worker。第一種是生成一個 worker,而後執行它的代碼,並將結果發送到父線程。經過這種方法,每當出現新任務時,都必須從新建立一個工做者。

第二種方法是生成一個 worker 併爲 message 事件設置監聽器。每次觸發 message 時,它都會完成工做並將結果發送回父線程,這會使 worker 保持活動狀態以供之後使用。

Node.js 文檔推薦第二種方法,由於在建立 thread worker 時須要建立虛擬機並解析和執行代碼,這會產生比較大的開銷。因此這種方法比不斷產生新 worker 的效率更高。

這種方法被稱爲工做池,由於咱們建立了一個工做池並讓它們等待,在須要時調度 message 事件來完成工做。

如下是一個產生、執行而後關閉 worker 例子:

import { parentPort } from 'worker_threads';

const collection = [];

for (let i = 0; i < 10; i += 1) {
 collection[i] = i;
}

parentPort.postMessage(collection);

collection 發送到父線程後,它就會退出。

下面是一個 worker 的例子,它能夠在給定任務以前等待很長一段時間:

import { parentPort } from 'worker_threads';

parentPort.on('message', (data: any) => {
 const result = doSomething(data);

 parentPort.postMessage(result);
});

worker_threads 模塊中可用的重要屬性

worker_threads 模塊中有一些可用的屬性:

isMainThread

當不在工做線程內操做時,該屬性爲 true 。若是你以爲有必要,能夠在 worker 文件的開頭包含一個簡單的 if 語句,以確保它只做爲 worker 運行。

import { isMainThread } from 'worker_threads';

if (isMainThread) {
 throw new Error('Its not a worker');
}

workerData

產生線程時包含在 worker 的構造函數中的數據。

const worker = new Worker(path, { workerData });

在工做線程中:

import { workerData } from 'worker_threads';

console.log(workerData.property);

parentPort

前面提到的 MessagePort 實例,用於與父線程通訊。

threadId

分配給 worker 的惟一標識符。


如今咱們知道了技術細節,接下來實現一些東西並在實踐中檢驗學到的知識。

實現 setTimeout

setTimeout 是一個無限循環,顧名思義,用來檢測程序運行時間是否超時。它在循環中檢查起始時間與給定毫秒數之和是否小於實際日期。

import { parentPort, workerData } from 'worker_threads';

const time = Date.now();

while (true) {
    if (time + workerData.time <= Date.now()) {
        parentPort.postMessage({});
        break;
    }
}

這個特定的實現產生一個線程,而後執行它的代碼,最後在完成後退出。

接下來實現使用這個 worker 的代碼。首先建立一個狀態,用它來跟蹤生成的 worker:

const timeoutState: { [key: string]: Worker } = {};

而後時負責建立 worker 並將其保存到狀態的函數:

export function setTimeout(callback: (err: any) => any, time: number) {
 const id = uuidv4();

 const worker = runWorker(
   path.join(__dirname, './timeout-worker.js'),
   (err) => {
     if (!timeoutState[id]) {
       return null;
     }

     timeoutState[id] = null;

     if (err) {
       return callback(err);
     }

     callback(null);
   },
   {
     time,
   },
 );

 timeoutState[id] = worker;

 return id;
}

首先,咱們使用 UUID 包爲 worker 建立一個惟一的標識符,而後用先前定義的函數 runWorker 來獲取 worker。咱們還向 worker 傳入一個回調函數,一旦 worker 發送了數據就會被觸發。最後,把 worker 保存在狀態中並返回 id

在回調函數中,咱們必須檢查該 worker 是否仍然存在於該狀態中,由於有可能會 cancelTimeout(),這將會把它刪除。若是確實存在,就把它從狀態中刪除,並調用傳給 setTimeout 函數的 callback

cancelTimeout 函數使用 .terminate() 方法強制 worker 退出,並從該狀態中刪除該這個worker:

export function cancelTimeout(id: string) {
 if (timeoutState[id]) {
   timeoutState[id].terminate();

   timeoutState[id] = undefined;

   return true;
 }

 return false;
}

若是你有興趣,我也實現了 setInterval,代碼在這裏,但由於它對線程什麼都沒作(咱們重用setTimeout的代碼),因此我決定不在這裏進行解釋。

我已經建立了一個短小的測試代碼,目的是檢查這種方法與原生方法的不一樣之處。你能夠在這裏找到代碼。這些是結果:

native setTimeout { ms: 7004, averageCPUCost: 0.1416 }
worker setTimeout { ms: 7046, averageCPUCost: 0.308 }

咱們能夠看到 setTimeout 有一點延遲 - 大約40ms - 這時 worker 被建立時的消耗。平均 CPU 成本也略高,但沒什麼難以忍受的(CPU 成本是整個過程持續時間內 CPU 使用率的平均值)。

若是咱們能夠重用 worker,就可以下降延遲和 CPU 使用率,這就是要實現工做池的緣由。

實現工做池

如上所述,工做池是給定數量的被事先建立的 worker,他們保持空閒並監聽 message 事件。一旦 message 事件被觸發,他們就會開始工做併發回結果。

爲了更好地描述咱們將要作的事情,下面咱們來建立一個由八個 thread worker 組成的工做池:

const pool = new WorkerPool(path.join(__dirname, './test-worker.js'), 8);

若是你熟悉限制併發操做,那麼你在這裏看到的邏輯幾乎相同,只是一個不一樣的用例。

如上面的代碼片斷所示,咱們把指向 worker 的路徑和要生成的 worker 數量傳給了 WorkerPool 的構造函數。

export class WorkerPool<T, N> {
 private queue: QueueItem<T, N>[] = [];
 private workersById: { [key: number]: Worker } = {};
 private activeWorkersById: { [key: number]: boolean } = {};

 public constructor(public workerPath: string, public numberOfThreads: number) {
   this.init();
 }
}

這裏還有其餘一些屬性,如 workersByIdactiveWorkersById,咱們能夠分別保存現有的 worker 和當前正在運行的 worker 的 ID。還有 queue,咱們可使用如下結構來保存對象:

type QueueCallback<N> = (err: any, result?: N) => void;

interface QueueItem<T, N> {
 callback: QueueCallback<N>;
 getData: () => T;
}

callback 只是默認的節點回調,第一個參數是錯誤,第二個參數是可能的結果。 getData 是傳遞給工做池 .run() 方法的函數(以下所述),一旦項目開始處理就會被調用。 getData 函數返回的數據將傳給工做線程。

.init() 方法中,咱們建立了 worker 並將它們保存在如下狀態中:

private init() {
  if (this.numberOfThreads < 1) {
    return null;
  }

  for (let i = 0; i < this.numberOfThreads; i += 1) {
    const worker = new Worker(this.workerPath);

    this.workersById[i] = worker;
    this.activeWorkersById[i] = false;
  }
}

爲避免無限循環,咱們首先要確保線程數 > 1。而後建立有效的 worker 數,並將它們的索引保存在 workersById 狀態。咱們在 activeWorkersById 狀態中保存了它們當前是否正在運行的信息,默認狀況下該狀態始終爲false。

如今咱們必須實現前面提到的 .run() 方法來設置一個 worker 可用的任務。

public run(getData: () => T) {
  return new Promise<N>((resolve, reject) => {
    const availableWorkerId = this.getInactiveWorkerId();

    const queueItem: QueueItem<T, N> = {
      getData,
      callback: (error, result) => {
        if (error) {
          return reject(error);
        }
return resolve(result);
      },
    };

    if (availableWorkerId === -1) {
      this.queue.push(queueItem);

      return null;
    }

    this.runWorker(availableWorkerId, queueItem);
  });
}

在 promise 函數裏,咱們首先經過調用 .getInactiveWorkerId() 來檢查是否存在空閒的 worker 能夠來處理數據:

private getInactiveWorkerId(): number {
  for (let i = 0; i < this.numberOfThreads; i += 1) {
    if (!this.activeWorkersById[i]) {
      return i;
    }
  }

  return -1;
}

接下來,咱們建立一個 queueItem,在其中保存傳遞給 .run() 方法的 getData 函數以及回調。在回調中,咱們要麼 resolve 或者 reject promise,這取決於 worker 是否將錯誤傳遞給回調。

若是 availableWorkerId 的值是 -1,意味着當前沒有可用的 worker,咱們將 queueItem 添加到 queue。若是有可用的 worker,則調用 .runWorker() 方法來執行 worker。

.runWorker() 方法中,咱們必須把當前 worker 的 activeWorkersById 設置爲使用狀態;爲 messageerror 事件設置事件監聽器(並在以後清理它們);最後將數據發送給 worker。

private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {
 const worker = this.workersById[workerId];

 this.activeWorkersById[workerId] = true;

 const messageCallback = (result: N) => {
   queueItem.callback(null, result);

   cleanUp();
 };

 const errorCallback = (error: any) => {
   queueItem.callback(error);

   cleanUp();
 };

 const cleanUp = () => {
   worker.removeAllListeners('message');
   worker.removeAllListeners('error');

   this.activeWorkersById[workerId] = false;

   if (!this.queue.length) {
     return null;
   }

   this.runWorker(workerId, this.queue.shift());
 };

 worker.once('message', messageCallback);
 worker.once('error', errorCallback);

 worker.postMessage(await queueItem.getData());
}

首先,經過使用傳遞的 workerId,咱們從 workersById 中得到 worker 引用。而後,在 activeWorkersById 中,將 [workerId] 屬性設置爲true,這樣咱們就能知道在 worker 在忙,不要運行其餘任務。

接下來,分別建立 messageCallbackerrorCallback 用來在消息和錯誤事件上調用,而後註冊所述函數來監聽事件並將數據發送給 worker。

在回調中,咱們調用 queueItem 的回調,而後調用 cleanUp 函數。在 cleanUp 函數中,要刪除事件偵聽器,由於咱們會屢次重用同一個 worker。若是沒有刪除監聽器的話就會發生內存泄漏,內存會被慢慢耗盡。

activeWorkersById 狀態中,咱們將 [workerId] 屬性設置爲 false,並檢查隊列是否爲空。若是不是,就從 queue 中刪除第一個項目,並用另外一個 queueItem 再次調用 worker。

接着建立一個在收到 message 事件中的數據後進行一些計算的 worker:

import { isMainThread, parentPort } from 'worker_threads';

if (isMainThread) {
 throw new Error('Its not a worker');
}

const doCalcs = (data: any) => {
 const collection = [];

 for (let i = 0; i < 1000000; i += 1) {
   collection[i] = Math.round(Math.random() * 100000);
 }

 return collection.sort((a, b) => {
   if (a > b) {
     return 1;
   }

   return -1;
 });
};

parentPort.on('message', (data: any) => {
 const result = doCalcs(data);

 parentPort.postMessage(result);
});

worker 建立了一個包含 100 萬個隨機數的數組,而後對它們進行排序。只要可以多花費一些時間才能完成,作些什麼事情並不重要。

如下是工做池簡單用法的示例:

const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, './test-worker.js'), 8);

const items = [...new Array(100)].fill(null);

Promise.all(
 items.map(async (_, i) => {
   await pool.run(() => ({ i }));

   console.log('finished', i);
 }),
).then(() => {
 console.log('finished all');
});

首先建立一個由八個 worker 組成的工做池。而後建立一個包含 100 個元素的數組,對於每一個元素,咱們在工做池中運行一個任務。開始運行後將當即執行八個任務,其他任務被放入隊列並逐個執行。經過使用工做池,咱們沒必要每次都建立一個 worker,從而大大提升了效率。

結論

worker_threads 提供了一種爲程序添加多線程支持的簡單的方法。經過將繁重的 CPU 計算委託給其餘線程,能夠顯着提升服務器的吞吐量。經過官方線程支持,咱們能夠期待更多來自AI、機器學習和大數據等領域的開發人員和工程師使用 Node.js.

本文首發微信公衆號:jingchengyideng

歡迎掃描二維碼關注公衆號,天天都給你推送新鮮的前端技術文章

歡迎掃描二維碼關注公衆號,天天都給你推送新鮮的前端技術文章



歡迎繼續閱讀本專欄其它高贊文章: