許多人想知道單線程的Node.js如何與多線程後端競爭。所以,考慮到Node既有的單線程特性,那麼多的大公司選擇Node做爲它們的後端彷佛是違反直覺的。要知道爲何,咱們必須理解Node單線程的真正含義。javascript
當初建立JavaScript的目的只是爲了在web上作一些簡單的事情,好比驗證表單,或者建立一個彩虹色的鼠標軌跡。直到2009年,Ryan Dahl才建立了Node,使開發人員可以使用該語言編寫後端代碼。前端
後端語言一般支持多線程,有各類機制能夠在線程之間同步數據,以及支持線程相關的其餘特性。要在JavaScript中支持這些東西,就須要改變整個語言,而這並非Dahl真正的目標。爲了讓普通JavaScript支持多線程,他必須建立一個變通方案。讓咱們瞭解一下……java
Node.js 使用兩種線程:一個主線程由event loop處理,其餘輔助線程由worker pool處理。node
事件循環是一種獲取回調(函數)並將其註冊以備未來執行的機制。它與日常的JavaScript代碼在相同的線程中運行。當JavaScript操做阻塞線程時,事件循環也被阻塞。git
Worker pool 是一種執行模型,它生成並處理單獨的線程,而後這些線程同步執行任務並將結果返回給事件循環。而後,事件循環使用上述結果執行提供的回調。github
簡而言之,它負責異步I/O操做——主要是與系統的磁盤和網絡的交互。它主要用於 fs
(I/O密集型)或crypto
(CPU密集型)等模塊。Worker pool 是在libuv中實現的,每當Node須要在JavaScript和C++之間進行內部通訊時,它都會致使輕微的延遲,但這並不明顯。web
有了這兩種機制,咱們能夠這樣寫代碼:算法
fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
if (err) {
return null;
}
console.log(content.toString());
});
複製代碼
前面提到的fs模塊告訴worker pool使用它的一個線程來讀取文件的內容,並在完成時通知事件循環。而後,事件循環接受提供的回調函數並使用文件的內容執行它。json
以上是一個非阻塞代碼的例子。所以,咱們沒必要同步等待某件事發生。咱們告訴worker pool讀取文件並使用執行結果調用提供的函數。因爲worker pool有本身的線程,因此在讀取文件時,事件循環能夠繼續正常執行。後端
在須要同步執行某些複雜的操做以前,一切都是正常的:任何花費太長時間運行的函數都會阻塞線程。若是一個應用程序有不少這樣的功能,它可能會顯著下降服務器的吞吐量,或者徹底卡死。在這種狀況下,沒法將工做分配給worker pool。
須要複雜計算的領域——如AI、機器學習或大數據——實際上不能有效地使用Node.js,由於這些操做阻塞了僅有的一個線程(主線程),使服務器無響應。這種狀況一直持續到Node.js v10.5.0出現,它增長了對多線程的支持。
worker_threads
模塊是一個包,它容許咱們建立全功能的多線程Node.js應用程序。
線程worker是在單獨的線程中生成的一段代碼(一般從文件中獲取)。
注意,術語thread worker、worker和thread常常交替使用;它們都指的是同一件事。
要開始使用線程worker,咱們必須導入 worker_threads
模塊。咱們先建立一個函數來幫助生成這些線程worker,而後再討論它們的屬性。
type WorkerCallback = (err: any, result?: any) => any;
export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
const worker = new Worker(path, { workerData });
worker.on('message', cb.bind(null, null));
worker.on('error', cb);
worker.on('exit', (exitCode) => {
if (exitCode === 0) {
return null;
}
return cb(new Error(`Worker has stopped with code ${exitCode}`));
});
return worker;
}
複製代碼
要建立worker,咱們必須建立Worker
類的一個實例。在第一個參數中,咱們提供了包含worker代碼的文件的路徑;第二個參數,咱們提供了一個對象,其中包含一個名爲workerData
的屬性。這是咱們但願線程在開始運行時可以訪問的數據。
注意,不管你使用的是JavaScript自己,仍是可轉換爲JavaScript的語言(好比TypeScript),路徑都應該是指向帶有 .js
或.mjs
擴展名的文件。
我還想指出爲何咱們使用回調方法,而不是返回一個在message
事件觸發時被解決的promise。這是由於worker能夠發送多個 message
事件,而不是一個。
正如你在上面的示例中所看到的,線程之間的通訊是基於事件的,這就是說咱們能夠設置事件監聽器,以便在worker觸發指定事件時調用它。
如下是最多見的事件:
worker.on('error', (error) => {});
複製代碼
當worker中出現未捕獲的異常時,就會發出error
事件。而後終止worker,錯誤做爲回調函數的第一個參數傳遞。
worker.on('exit', (exitCode) => {});
複製代碼
當worker 退出時,會發送exit
事件。若是在worker內部調用了process.exit()
,則會向回調函數提供狀態碼exitCode
。若是使用worker.terminate()
終止worker ,狀態碼爲1。
worker.on('online', () => {});
複製代碼
在worker 中止解析JavaScript代碼並開始執行時發送online
事件。它不經常使用,但能夠在特定的狀況下提供有效信息。
worker.on('message', (data) => {});
複製代碼
worker 向父線程發送數據時會發送message
事件。
如今,咱們來看看如何在線程之間共享數據。
要將數據發送到另外一個線程,咱們使用port.postMessage()
方法。函數簽名以下:
port.postMessage(data[, transferList])
複製代碼
端口對象能夠是parentPort
,也能夠是 MessagePort
的一個實例。稍後再詳細介紹。
第一個參數 data
是一個複製到另外一個線程的對象。它能夠包含複製算法支持的任何內容。
數據由結構化克隆算法複製。據Mozilla:
它經過遞歸遍歷輸入對象來構建克隆,同時維護之前訪問過的引用的映射,以免無限遍歷循環。
該算法不復制函數、錯誤對象、屬性描述符或原型鏈。還應該注意,以這種方式複製對象與JSON不一樣,由於它能夠包含循環引用和類型化數組,而JSON不能。
經過支持類型化數組的複製,該算法使得在線程之間共享內存成爲可能。
人們可能會說,cluster
或child_process
之類的模塊在好久之前就啓用了線程。對,也不對。
cluster
模塊能夠建立多個node實例,由一個主進程在它們之間路由分發收到的請求。集羣應用程序有效地成倍增長服務器吞吐量;可是,咱們不能使用 cluster
模塊派生一個單獨的線程。
人們傾向於使用PM2這樣的工具管理集羣應用程序,而不是在代碼中手動處理。可是若是你有興趣,你能夠看下個人這篇關於如何使用cluster
模塊的帖子。
child_process
模塊能夠生成任何可執行文件,無論它是否是JavaScript。它很是相似,可是它缺乏worker_threads
所具備的幾個重要特性。
具體來講,線程worker更輕量級,而且與父線程共享相同的進程ID。它們還能夠與父線程共享內存,這使它們能夠避免序列化大的數據負載,從而更有效地來回發送數據。
如今讓咱們看一個如何在線程之間共享內存的示例。爲了共享內存,必須將ArrayBuffer
或SharedArrayBuffer
的實例做爲數據參數或置於數據參數內部發送給另外一個線程。
這是一個與父線程共享內存的worker :
import { parentPort } from 'worker_threads';
parentPort.on('message', () => {
const numberOfElements = 100;
const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements);
const arr = new Int32Array(sharedBuffer);
for (let i = 0; i < numberOfElements; i += 1) {
arr[i] = Math.round(Math.random() * 30);
}
parentPort.postMessage({ arr });
});
複製代碼
首先,咱們建立一個SharedArrayBuffer
,其中的內存須要包含100個32位整數。接下來,咱們建立一個Int32Array
的實例,它將使用緩衝區來保存它的結構,而後咱們用一些隨機數填充數組並將其發送給父線程。
父線程:
import path from 'path';
import { runWorker } from '../run-worker';
const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => {
if (err) {
return null;
}
arr[0] = 5;
});
worker.postMessage({});
複製代碼
經過將'arr[0]
更改成5
,咱們實際上在兩個線程中都修改了它。
固然,經過共享內存,咱們可能面臨一個風險:在一個線程中更改一個值,另外一個線程中也隨之改變了。可是咱們同時也得到了一個很是好的特性:不須要序列化值就能夠在另外一個線程中使用,這極大地提升了效率。只需記住正確地管理對數據的引用,以便在完成數據處理後對其進行垃圾收集。
共享一個整數數組是能夠的,但咱們真正感興趣的是共享對象——存儲信息的默認方式。不幸的是,沒有SharedObjectBuffer
或相似的東西,但咱們能夠本身建立一個相似的結構。
transferList
只能包含ArrayBuffer
和MessagePort
。一旦它們被轉移到另外一個線程,就不能再在發送線程中使用:內存被移動到另外一個線程,所以在發送線程中不可用。
目前,咱們還不能經過將它們包含在transferList
中來傳輸網絡套接字(這個能夠經過child_process
模塊來實現)。
線程之間的通訊經過端口進行,端口是MessagePort
類的實例,支持基於事件的通訊。
使用端口在線程之間進行通訊有兩種方法。第一個是默認的,也是兩個中比較簡單的一個。在worker的代碼中,咱們從worker_threads
模塊導入一個名爲parentPort
的對象,並使用該對象的.postMessage()
方法向父線程發送消息。
這裏有一個例子:
import { parentPort } from 'worker_threads';
const data = {
// ...
};
parentPort.postMessage(data);
複製代碼
parentPort
是Node.js在後臺爲咱們建立的MessagePort
的一個實例,它支持與父線程的通訊。這樣,咱們能夠經過使用parentPort
和 worker
對象在線程之間進行通訊。
線程之間通訊的第二種方式是實際建立一個本身的MessageChannel
並將它發送給worker。下面演示瞭如何建立一個新的MessagePort
,並與worker共享:
import path from 'path';
import { Worker, MessageChannel } from 'worker_threads';
const worker = new Worker(path.join(__dirname, 'worker.js'));
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => {
console.log('message from worker:', message);
});
worker.postMessage({ port: port2 }, [port2]);
複製代碼
建立port1
和port2
以後,咱們在port1
上設置事件監聽器,並將port2
發送給wroker。咱們必須把它包括在transferList
中,以便轉移到worker一方。
如今,在worker內部:
import { parentPort, MessagePort } from 'worker_threads';
parentPort.on('message', (data) => {
const { port }: { port: MessagePort } = data;
port.postMessage('heres your message!');
});
複製代碼
經過這種方式,咱們使用父線程發送的端口。
使用parentPort
不必定是一個錯誤的方法,但更好的方法是建立一個新的MessagePort
,其中包含一個MessageChannel
實例,而後與派生的worker共享它(即:關注點分離)。
請注意,在下面的示例中,我使用了 parentPort
來簡化。
咱們有兩種使用worker的方法。第一種方法是生成一個worker,執行它的代碼,而後將結果發送給父線程。使用這種方法,每次出現新任務時,咱們都必須從新建立一個worker。
第二種方法是派生一個worker併爲message
事件設置偵聽器。每次觸發message
時,它都會執行工做並將結果發送回父線程,父線程將保持worker的活動狀態,以供之後使用。
Node.js 文檔推薦使用第二種方法,由於實際建立一個線程worker須要花費不少精力,這須要建立一個虛擬機並解析和執行代碼。這種方法也比不斷建立worker更有效。
這種方法稱爲線程池,由於咱們建立一個線程池並讓他們等待,在須要時發送message
事件來完成工做。
下面是一個文件的例子,其中包含了一個worker的建立、執行和關閉的過程:
import { parentPort } from 'worker_threads';
const collection = [];
for (let i = 0; i < 10; i += 1) {
collection[i] = i;
}
parentPort.postMessage(collection);
複製代碼
將collection
發送到父線程後,它就退出了。
這裏有一個worker的例子,它能夠等待很長一段時間才獲得任務:
import { parentPort } from 'worker_threads';
parentPort.on('message', (data: any) => {
const result = doSomething(data);
parentPort.postMessage(result);
});
複製代碼
在worker_threads
模塊中有一些可用的屬性:
當不在worker線程內操做時,該屬性爲true
。若是須要,能夠在worker文件的開頭包含一個簡單的if
語句,以確保它只是做爲worker運行。
import { isMainThread } from 'worker_threads';
if (isMainThread) {
throw new Error('Its not a worker');
}
複製代碼
包含在生成的線程worker構造函數中的數據。
const worker = new Worker(path, { workerData });
複製代碼
在worker線程裏:
import { workerData } from 'worker_threads';
console.log(workerData.property);
複製代碼
前面提到的用於與父線程通訊的MessagePort
實例。
分配給worker的惟一標識符。
如今咱們知道了技術細節,讓咱們實現一些東西並在實踐中測試咱們的知識。
setTimeout
是一個無限循環,顧名思義,它會讓應用程序超時。實際上,它會在每次迭代中檢查起始時間和給定毫秒數的總和是否小於當前時間。
import { parentPort, workerData } from 'worker_threads';
const time = Date.now();
while (true) {
if (time + workerData.time <= Date.now()) {
parentPort.postMessage({});
break;
}
}
複製代碼
這個特定的實現生成一個線程,執行它的代碼,而後在完成以後退出。
讓咱們嘗試實現使用這個worker的代碼。首先,讓咱們建立一個狀態,在這個狀態中,咱們將跟蹤生成的worker:
const timeoutState: { [key: string]: Worker } = { };
複製代碼
接下來是負責建立worker並將其保存到狀態中的函數:
export function setTimeout(callback: (err: any) => any, time: number) {
const id = uuidv4();
const worker = runWorker(
path.join(__dirname, './timeout-worker.js'),
(err) => {
if (!timeoutState[id]) {
return null;
}
timeoutState[id] = null;
if (err) {
return callback(err);
}
callback(null);
},
{
time,
},
);
timeoutState[id] = worker;
return id;
}
複製代碼
首先,咱們使用UUID包爲worker建立一個唯一的標識符,而後使用前面定義的幫助函數runWorker
來獲取worker。咱們還向worker傳遞一個回調函數,該函數在worker發送一些數據時觸發。最後,咱們將worker保存到狀態並返回id
。
在回調函數中,咱們必須檢查worker是否仍然存在於狀態中,由於可能存在cancelTimeout()
,會把它刪除。若是它確實存在,咱們將它從狀態中刪除,並調用傳遞給setTimeout
函數的callback
。
cancelTimeout
函數使用.terminate()
方法強迫worker退出,並將worker從狀態中移除:
export function cancelTimeout(id: string) {
if (timeoutState[id]) {
timeoutState[id].terminate();
timeoutState[id] = undefined;
return true;
}
return false;
}
複製代碼
我建立了一小段測試代碼,用於檢查這種方法與原生方法有多大差異。你能夠查看這裏的代碼。結果以下:
native setTimeout { ms: 7004, averageCPUCost: 0.1416 }
複製代碼
worker setTimeout { ms: 7046, averageCPUCost: 0.308 }
複製代碼
咱們能夠看到在setTimeout
中有一個輕微的延遲(大約40ms),這是由正在建立的worker形成的。平均CPU成本也稍微高一些,可是還能夠接受(CPU成本是整個進程期間CPU使用量的平均值)。
若是咱們能夠重用worker,咱們將下降延遲和CPU使用率,這就是爲何咱們如今要研究如何實現咱們本身的線程池。
如前所述,一個線程池是一個給定數量的先前建立的worker線程,他們等待並監聽message
事件。一旦觸發了message
事件,它們就執行工做併發回結果。
爲了更好地說明咱們將要作什麼,下面是如何建立一個8個worker線程的線程池:
const pool = new WorkerPool(path.join(__dirname, './test-worker.js'), 8);
複製代碼
若是你熟悉限制併發操做,那麼你將看到這裏的邏輯幾乎是相同的,只是不一樣的應用場景。
如上面的代碼片斷所示,咱們向WorkerPool
的構造函數傳遞worker的路徑和要建立的worker的數量。
export class WorkerPool<T, N> {
private queue: QueueItem<T, N>[] = [];
private workersById: { [key: number]: Worker } = {};
private activeWorkersById: { [key: number]: boolean } = {};
public constructor(public workerPath: string, public numberOfThreads: number) {
this.init();
}
}
複製代碼
在這裏,咱們有額外的屬性,好比workersById
和activeWorkersById
,在這些屬性中,咱們能夠分別保存現有worker和當前運行worker的id。還有queue
,咱們能夠用下面的結構保存對象:
type QueueCallback<N> = (err: any, result?: N) => void;
interface QueueItem<T, N> {
callback: QueueCallback<N>;
getData: () => T;
}
複製代碼
callback
只是默認的node回調,它的第一個參數是error,第二個參數是可能的結果。getData
是傳遞給線程池的.run()
方法的函數(下面將對此進行解釋),該方法在項目開始處理時調用。getData
函數返回的數據將被傳遞給worker線程。
在.init()
方法中,咱們建立worker並將他們保存在狀態中:
private init() {
if (this.numberOfThreads < 1) {
return null;
}
for (let i = 0; i < this.numberOfThreads; i += 1) {
const worker = new Worker(this.workerPath);
this.workersById[i] = worker;
this.activeWorkersById[i] = false;
}
}
複製代碼
爲了不無限循環,咱們首先確保線程的數量是>1。而後,咱們建立有效數量的worker,並經過它們在workersById
狀態下的索引保存它們。咱們保存它們當前是否運行在 activeWorkersById
狀態的信息,這個狀態在默認狀況下老是爲false。
如今,咱們必須實現前面提到的.run()
方法來設置一個任務,以便在worker可用時運行。
public run(getData: () => T) {
return new Promise<N>((resolve, reject) => {
const availableWorkerId = this.getInactiveWorkerId();
const queueItem: QueueItem<T, N> = {
getData,
callback: (error, result) => {
if (error) {
return reject(error);
}
return resolve(result);
},
};
if (availableWorkerId === -1) {
this.queue.push(queueItem);
return null;
}
this.runWorker(availableWorkerId, queueItem);
});
}
複製代碼
在傳遞給promise的函數內部,咱們首先檢查是否有一個worker能夠經過調用.getInactiveWorkerId()
來處理數據:
private getInactiveWorkerId(): number {
for (let i = 0; i < this.numberOfThreads; i += 1) {
if (!this.activeWorkersById[i]) {
return i;
}
}
return -1;
}
複製代碼
接下來,咱們建立了一個queueItem
,其中保存傳遞給 .run()
方法的getData
函數和回調函數。在回調中,咱們要麼resolve
要麼reject
該promise,這取決於worker是否將錯誤傳遞給回調。
若是 availableWorkerId
是-1,那麼沒有可用的worker,咱們將queueItem
添加到queue
。若是有一個可用的worker,咱們調用 . runworker()
方法來執行這個worker。
在.runWorker()
方法中,咱們必須在activeWorkersById
狀態中設置當前正在使用的worker;爲 message
和error
事件設置事件監聽器(並在隨後清除);最後,將數據發送給worker。
private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {
const worker = this.workersById[workerId];
this.activeWorkersById[workerId] = true;
const messageCallback = (result: N) => {
queueItem.callback(null, result);
cleanUp();
};
const errorCallback = (error: any) => {
queueItem.callback(error);
cleanUp();
};
const cleanUp = () => {
worker.removeAllListeners('message');
worker.removeAllListeners('error');
this.activeWorkersById[workerId] = false;
if (!this.queue.length) {
return null;
}
this.runWorker(workerId, this.queue.shift());
};
worker.once('message', messageCallback);
worker.once('error', errorCallback);
worker.postMessage(await queueItem.getData());
}
複製代碼
首先,經過使用傳遞的workerId
,咱們從workersById
狀態得到worker引用。而後,在activeWorkersById
內部,咱們將[workerId]
屬性設置爲true,這樣咱們就知道在worker忙碌時不運行任何其餘東西。
接下來,咱們分別在message
和error
事件上建立messageCallback
和errorCallback
,而後註冊這些函數來監聽事件並將數據發送給工做者。
在回調函數中,咱們調用queueItem
的回調,而後調用cleanUp
函數。在cleanUp
函數中,咱們確保刪除事件監聽器,由於咱們屢次重用同一個worker。若是咱們不刪除監聽器,就會出現內存泄漏;實際上,咱們會慢慢地耗盡內存。
在activeWorkersById
狀態中,咱們將[workerId]
屬性設置爲false
,並檢查隊列是否爲空。若是不是,則從隊列中刪除第一個項目,並使用另外一個queueItem
再次調用worker。
讓咱們建立一個worker,在接收到message
事件的數據後執行一些計算:
import { isMainThread, parentPort } from 'worker_threads';
if (isMainThread) {
throw new Error('Its not a worker');
}
const doCalcs = (data: any) => {
const collection = [];
for (let i = 0; i < 1000000; i += 1) {
collection[i] = Math.round(Math.random() * 100000);
}
return collection.sort((a, b) => {
if (a > b) {
return 1;
}
return -1;
});
};
parentPort.on('message', (data: any) => {
const result = doCalcs(data);
parentPort.postMessage(result);
});
複製代碼
worker建立了一個由100萬個隨機數組成的數組,而後對它們進行排序。只要花點時間完成,會發生什麼事可有可無。
下面是一個簡單的使用線程池的例子:
const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, './test-worker.js'), 8);
const items = [...new Array(100)].fill(null);
Promise.all(
items.map(async (_, i) => {
await pool.run(() => ({ i }));
console.log('finished', i);
}),
).then(() => {
console.log('finished all');
});
複製代碼
咱們首先建立一個包含8個worker的線程池。而後咱們建立一個包含100個元素的數組,對於每一個元素,咱們在工做池中運行一個任務。首先,8個任務將被當即執行,其他的將被放入隊列並逐步執行。經過使用一個線程池,咱們沒必要每次都建立一個worker,這極大地提升了效率。
worker_threads
爲應用程序提供了一種很是簡單的方法支持多線程。經過將繁重的CPU計算任務委託給其餘線程,咱們能夠顯著提升服務器的吞吐量。有了官方的線程支持,咱們能夠期待更多來自AI、機器學習和大數據等領域的開發人員和工程師開始使用Node.js。
歡迎關注微信公衆號「1204譯站」,獲取更多前端技術乾貨文章。