nodejs多線程的探索和實踐

1 背景

需求中有如下場景
1 對稱解密、非對稱解密
2 壓縮、解壓
3 大量文件的增刪改查
4 處理大量的字符串,解析協議
上面的場景都是很是耗時間的,解密、壓縮、文件操做,nodejs使用了內置的線程池支持了異步。可是處理字符串和解析協議是單純消耗cpu的操做。並且nodejs對解密的支持彷佛不是很好。我使用了純js的解密庫,因此沒法在nodejs主線程裏處理。尤爲rsa解密,很是耗時間。node

因此這時候就要探索解決方案,nodejs提供了多線程的能力。因此天然就選擇了這種方案。可是這只是初步的想法和方案。由於nodejs雖然提供了多線程能力,可是沒有提供一個應用層的線程池。因此若是咱們單純地使用多線程,一個請求一個線程,這顯然不現實。咱們不得不實現本身的線程池。本文分享的內容是這個線程池的實現。git

線程池的設計涉及到不少方面,對於純cpu型的任務,線程數和cpu核數要相等才能達到最優的性能,不然過多的線程引發的上下文切換反而會致使性能降低。而對於io型的任務,更多的線程理論上是會更好,由於能夠更早地給硬盤發出命令,磁盤會優化並持續地處理請求,想象一下,若是發出一個命令,硬盤處理一個,而後再發下一個命令,再處理一個,這樣顯然效率很低。固然,線程數也不是越多越好。線程過多會引發系統負載太高,過多上下文切換也會帶來性能的降低。下面看一下線程池的實現方案。github

2 設計思路

首先根據配置建立多個線程(分爲預建立和懶建立),而後對用戶暴露提交任務的接口,由調度中心負責接收任務,而後根據策略選擇處理該任務的線程。子線程一直在輪詢是否有任務須要處理。處理完通知調度中心。
web


下面看一下具體的實現


2.1 和用戶通訊的數據結構編程

class UserWork extends EventEmitter {
    constructor({ workId, threadId }) {
        super();
        this.workId = workId;
        this.threadId = threadId;
        workPool[workId] = this;
    }
}

用戶提交任務的時候,調度中心返回一個UserWork對象。用戶可使用該對象和調度中心通訊。服務器

2.2 調度中心的實現
調度中心的實現大體分爲如下幾個邏輯。
2.2.1 初始化微信

 constructor(options = {}) {
        this.options = options;
        // 線程池總任務數
        this.totalWork = 0;
        // 子線程隊列
        this.workerQueue = [];
        // 核心線程數
        this.coreThreads = ~~options.coreThreads || config.CORE_THREADS;
        // 線程池最大線程數,若是不支持動態擴容則最大線程數等於核心線程數
        this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads;
        // 工做線程處理任務的模式
        this.sync = options.sync !== false;
        // 超過任務隊列長度時的處理策略
        this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD;
        // 是否預建立子線程
        this.preCreate = options.preCreate === true;
        this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME;
        this.pollIntervalTime = ~~options.pollIntervalTime || config.POLL_INTERVAL_TIME;
        this.maxWork = ~~options.maxWork || config.MAX_WORK;
        // 是否預建立線程池
        this.preCreate && this.preCreateThreads();
    }

從初始化代碼中咱們看到線程池大體支持的能力。數據結構

  1. 核心線程數多線程

  2. 最大線程數dom

  3. 過載時的處理策略,和過載的閾值

  4. 子線程空閒退出的時間和輪詢任務的時間

  5. 是否預建立線程池

  6. 是否支持動態擴容
    核心線程數是任務數沒有達到閾值時的工做線程集合。是處理任務的主力軍。任務數達到閾值後,若是支持動態擴容(可配置)則會建立新的線程去處理更多的任務。一旦負載變低,線程空閒時間達到閾值則會自動退出。若是擴容的線程數達到閾值,還有新的任務到來,則根據丟棄策略進行相關的處理。

2.2.2 建立線程

 newThread() {
        let { sync } = this;
        const worker = new Worker(workerPath, {workerData: { sync, maxIdleTime: this.maxIdleTime, pollIntervalTime: this.pollIntervalTime, }});
        const node = {
            worker,
            // 該線程處理的任務數量
            queueLength: 0,
        };
        this.workerQueue.push(node);
        const threadId = worker.threadId;
        worker.on('exit', (status) => {
            // 異常退出則補充線程,正常退出則不補充
            if (status) {
                this.newThread();
            }
            this.totalWork -= node.queueLength;
            this.workerQueue = this.workerQueue.filter((worker) => {
                return worker.threadId !== threadId;
            });
        });
        // 和子線程通訊
        worker.on('message', (result) => {
            const {
                work,
                event,
            } = result;
            const { data, error, workId } = work;
            // 經過workId拿到對應的userWorker
            const userWorker = workPool[workId];
            delete workPool[workId];
            // 任務數減一
            node.queueLength--;
            this.totalWork--;
            switch(event) {
                case 'done':
                    // 通知用戶,任務完成
                    userWorker.emit('done', data);
                    break;
                case 'error':
                    // 通知用戶,任務出錯
                    if (EventEmitter.listenerCount(userWorker, 'error')) {
                        userWorker.emit('error', error);
                    }
                    break;
                default: break;
            }
        });
        worker.on('error', (...rest) => {
            console.log(...rest)
        });
        return node;
    }

建立線程主要是調用nodejs提供的模塊進行建立。而後監聽子線程的退出和message、error事件。若是是異常退出則補充線程。調度中心維護了一個子線程的隊列。記錄了每一個子線程(worker)的實例和任務數。
2.2.3 選擇執行任務的線程

selectThead() {
        let min = Number.MAX_SAFE_INTEGER;
        let i = 0;
        let index = 0;
        // 找出任務數最少的線程,把任務交給他
        for (; i < this.workerQueue.length; i++) {
            const { queueLength } = this.workerQueue[i];
            if (queueLength < min) {
                index = i;
                min = queueLength;
            }
        }
        return this.workerQueue[index];
    }

選擇策略目前是選擇任務數最少的,原本還支持隨機和輪詢方式,可是貌似沒有什麼場景和必要,就去掉了。
2.2.4 暴露提交任務的接口

submit(filename, options = {}) {
        return new Promise(async (resolve, reject) => {
            let thread;
            // 沒有線程則建立一個
            if (this.workerQueue.length) {
                thread = this.selectThead();
                // 任務隊列非空
                if (thread.queueLength !== 0) {
                    // 子線程個數尚未達到核心線程數,則新建線程處理
                    if (this.workerQueue.length < this.coreThreads) {
                        thread = this.newThread();
                    } else if (this.totalWork + 1 > this.maxWork){
                        // 總任務數已達到閾值,尚未達到線程數閾值,則建立
                        if(this.workerQueue.length < this.maxThreads) {
                            thread = this.newThread();
                        } else {
                            // 處理溢出的任務
                            switch(this.discardPolicy) {
                                case DISCARD_POLICY.ABORT: 
                                    return reject(new Error('queue overflow'));
                                case DISCARD_POLICY.CALLER_RUNS: 
                                    const userWork =  new UserWork({workId: this.generateWorkId(), threadId}); 
                                    try {
                                        const asyncFunction = require(filename);
                                        if (!isAsyncFunction(asyncFunction)) {
                                            return reject(new Error('need export a async function'));
                                        }
                                        const result = await asyncFunction(options);
                                        resolve(userWork);
                                        setImmediate(() => {
                                            userWork.emit('done', result);
                                        });
                                    } catch (error) {
                                        resolve(userWork);
                                        setImmediate(() => {
                                            userWork.emit('error', error);
                                        });
                                    }
                                    return;
                                case DISCARD_POLICY.DISCARD_OLDEST: 
                                    thread.worker.postMessage({cmd: 'delete'});
                                    break;
                                case DISCARD_POLICY.DISCARD:
                                    return reject(new Error('discard'));
                                case DISCARD_POLICY.NOT_DISCARD:
                                    break;
                                default: 
                                    break;
                            }
                        }
                    }
                }
            } else {
                thread = this.newThread();
            }
            // 生成一個任務id
            const workId = this.generateWorkId();
            // 新建一個work,交給對應的子線程
            const work = new Work({ workId, filename, options });
            const userWork = new UserWork({workId, threadId: thread.worker.threadId});
            thread.queueLength++;
            this.totalWork++;
            thread.worker.postMessage({cmd: 'add', work});
            resolve(userWork);
        })
    }

提交任務的函數比較複雜,提交一個任務的時候,調度中心會根據當前的負載狀況和線程數,決定對一個任務作如何處理。若是能夠處理,則把任務交給選中的子線程。最後給用戶返回一個UserWorker對象。
2.3調度中心和子線程的通訊數據結構

class Work {
    constructor({workId, filename, options}) {
        // 任務id
        this.workId = workId;
        // 文件名
        this.filename = filename;
        // 處理結果,由用戶代碼返回
        this.data = null;
        // 執行出錯
        this.error = null;
        // 執行時入參
        this.options = options;
    }
}

一個任務對應一個id,目前只支持文件的執行模式,後續會支持字符串。
2.4 子線程的實現
子線程的實現主要分爲幾個部分
2.4.1 監聽調度中心分發的命令

parentPort.on('message', ({cmd, work}) => {
    switch(cmd) {
        case 'delete':
            return queue.shift();
        case 'add':
            return queue.push(work);
    }
});

2.4.2 輪詢是否有任務須要處理

function poll() {
    const now = Date.now();
    if (now - lastWorkTime > maxIdleTime && !queue.length) {
        process.exit(0);
    }
    setTimeout(async () => {
        // 處理任務
        poll();
    }
    }, pollIntervalTime);
}
// 輪詢判斷是否有任務
poll();

不斷輪詢是否有任務須要處理,若是沒有而且空閒時間達到閾值則退出。
2.4.3 處理任務
處理任務模式分爲同步和異步

     while(queue.length) {
          const work = queue.shift();
          try {
              const { filename, options } = work;
              const asyncFunction = require(filename);
              if (!isAsyncFunction(asyncFunction)) {
                  return;
              }
              lastWorkTime = now;

              const result = await asyncFunction(options);
              work.data = result;
              parentPort.postMessage({event: 'done', work});
          } catch (error) {
              work.error = error.toString();
              parentPort.postMessage({event: 'error', work});
          }
      }

用戶須要導出一個async函數,使用這種方案主要是爲了執行時能夠給用戶傳入參數。而且實現同步。處理完後通知調度中心。下面是異步處理方式,子線程不須要同步等待用戶的代碼結果。

       const arr = [];
       while(queue.length) {
           const work = queue.shift();
           try {
               const { filename } = work;
               const asyncFunction = require(filename);
               if (!isAsyncFunction(asyncFunction)) {
                   return;
               }
               arr.push({asyncFunction, work});
           } catch (error) {
               work.error = error.toString();
               parentPort.postMessage({event: 'error', work});
           }
       }
       arr.map(async ({asyncFunction, work}) => {
           try {
               const { options } = work;
               lastWorkTime = now;
               const result = await asyncFunction(options);
               work.data = result;
               parentPort.postMessage({event: 'done', work});
           } catch (e) {
               work.error = error.toString();
               parentPort.postMessage({event: 'done', work});
           }
       })

最後還有一些配置和定製化的功能。

module.exports = {
    // 最大的線程數
    MAX_THREADS: 50,
    // 線程池最大任務數
    MAX_WORK: Infinity,
    // 默認核心線程數
    CORE_THREADS: 10,
    // 最大空閒時間
    MAX_IDLE_TIME: 10 * 60 * 1000,
    // 子線程輪詢時間
    POLL_INTERVAL_TIME: 10,
};
// 丟棄策略
const DISCARD_POLICY = {
    // 報錯
    ABORT: 1,
    // 在主線程裏執行
    CALLER_RUNS: 2,
    // 丟棄最老的的任務
    DISCARD_OLDEST: 3,
    // 丟棄
    DISCARD: 4,
    // 不丟棄
    NOT_DISCARD: 5,
};

支持多個類型的線程池

class AsyncThreadPool extends ThreadPool {
    constructor(options) {
        super({...options, sync: false});
    }
}

class SyncThreadPool extends ThreadPool {
    constructor(options) {
        super({...options, sync: true});
    }
}
// cpu型任務的線程池,線程數和cpu核數同樣,不支持動態擴容
class CPUThreadPool extends ThreadPool {
    constructor(options) {
        super({...options, coreThreads: cores, expansion: false});
    }
}
// 線程池只有一個線程,相似消息隊列
class SingleThreadPool extends ThreadPool {
    constructor(options) {
        super({...options, coreThreads: 1, expansion: false });
    }
}
// 線程數固定的線程池,不支持動態擴容線程
class FixedThreadPool extends ThreadPool {
    constructor(options) {
        super({ ...options, expansion: false });
    }
}

這就是線程池的實現,有不少細節還須要思考。下面是一個性能測試的例子。

3 測試

const { MAX } = require('./constants');
module.exports = async function() {
    let ret = 0;
    let i = 0;
    while(i++ < MAX) {
        ret++;
        Buffer.from(String(Math.random())).toString('base64');
    }
    return ret;
}

在服務器以單線程和多線程的方式執行以上代碼,下面是MAX爲10000和100000時,使用CPUThreadPool類型線程池的性能對比(具體代碼參考https://github.com/theanarkh/nodejs-threadpool)。
10000
單線程 [ 358.35, 490.93, 705.23, 982.6, 1155.72 ]
多線程 [ 379.3, 230.35, 315.52, 429.4, 496.04 ]

100000
單線程 [ 2485.5, 4454.63, 6894.5, 9173.16, 11011.16 ]
多線程 [ 1791.75, 2787.15, 3275.08, 4093.39, 3674.91 ]
咱們發現這個數據差異很是明顯。而且隨着處理時間的增加,性能差距越明顯。


本文分享自微信公衆號 - 編程雜技(theanarkh)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索