使用進程池提升Node.js並行處理能力

🖥 背景

Node是單線程模型,當須要執行多個獨立且耗時任務的時候,只能經過child_process來分發任務,提升處理速度;不像Java這種多線程語言,能夠經過線程來解決並行問題,Node只能建立進程來進行處理;可是進程相對於線程來講,開銷太大。一旦進程數較多時,CPU和內存消耗嚴重(影響我幹其餘的事情),因此作了一個簡易版的進程池,用來解決並行任務的處理。javascript

適用場景:相同且獨立且耗時的任務,例如,拿到某網站1000個用戶的帳號密碼,我如今想要他們的信息,爬他,node-process-pool很是適合。java

🤔思路

主控進程+工做進程羣node

ProcessPool是咱們管理進程的地方,咱們經過傳遞配置參數(任務腳本、腳本須要的參數、最大並行進程數)生成一個ProcessPool實例,而後經過這個實例來管控進程池。npm

ProcessItem是咱們進程池裏的進程對象,ProcessItem對象除了process的信息,咱們還增長了惟一標識和狀態(忙碌、任務失敗、任務完成、進程不可用)。數組

一批任務開始時,咱們會一次性fork到最大並行進程數,而後開始監控是否有工做進程完成任務,若是有工做進程完成了任務,那咱們就能夠複用這個工做進程,讓其執行新任務;若是任務執行失敗,咱們會將任務歸還給進程池,等待下一次分發。bash

因爲是相同且獨立且耗時的任務,因此當某個工做進程完成任務時,咱們頗有必要去檢測全部的工做進程是否已完成任務,而不僅是複用這個工做進程,咱們要一批一批的複用!!!多線程

由於差很少的時間開始執行相同的任務,當一個工做進程完成時,徹底能夠相信其餘工做進程也完成了任務,因此檢測一輪全部的工做進程,若空閒,給他們分配新任務。async

既然是批量分配任務,就不會存在只有某個工做進程在辛苦的運行,其餘工做進程袖手旁,哈哈哈哈哈,總得雨露均沾嘛。函數

因爲主控進程即要負責IPC又要不斷監聽批任務完成的狀況,目前我採用的方式是setInterval切割,讓IPC和監控能交替進行(ps:應該有更好的方法測試

咱們真的須要setInterval來去輪詢任務狀態嗎,何時才須要輪詢任務狀態而後調度? 工做進程狀態發生改變的時候,纔是咱們須要去檢測任務狀態和調度的時機;因此,咱們也能夠利用IPC來通知主控進程進行檢測任務狀態和調度。 ps:固然,還有更好的方法,嘿嘿

✨ Features

  • 某大學某系統,12000個用戶,每一個用戶登錄須要兩次API訪問,兩次API訪問直接必須有0.5s間隔,而後將信息寫入文本。

    單線程(未測試,理論上):12000*0.5 —> 至少須要6000s

    進程池(已測試,容量爲50的進程池):206s,平均每一個任務耗時17.1ms

    效率提高:接近30倍

💻 實現

ProcessPool.js

const fs = require('fs')
const ProcessItem = require('./ProcessItem')
const isCorrectType = require('./util').isCorrectType
/**
 * 進程池類
 * @param maxParallelProcess,最大並行工做進程數
 * @param timeToClose,任務最長耗時時間
 * @param taskParams,全部任務腳本須要的參數
 * @param dependency,任務腳本所需依賴
 * @param taskName, 工做腳本名稱
 * @param script 腳本內容
 * @param workDir 工做目錄
 */
function ProcessPool({
    maxParallelProcess = 50,
    timeToClose = 60 * 1000,
    taskParams = [],
    dependency = '',
    workDir ='',
    taskName = Date.now(),
    script = '',}) {
  try {
    isCorrectType('task', script, 'function')
    isCorrectType('maxParallelProcess', maxParallelProcess, 'number')
    isCorrectType('timeToClose', timeToClose, 'number')
    isCorrectType('dependency', dependency, 'string')
    isCorrectType('workDir', workDir, 'string')
  } catch (e) {
    throw new Error('參數不合法' + e)
  }
  this.timeToClose = timeToClose
  this.processList = new Map() // 使用Map存儲進程對象
  this.currentProcessNum = 0 // 當前活動進程數
  this.dependency = dependency // 任務腳本依賴
  this.workDir = workDir // 主控函數工做目錄
  this.taskName = taskName // 任務腳本名稱
  this.task = `${this.workDir}/${this.taskName}.js`// 任務腳本路徑
  this.taskParamsTodo = taskParams // 待完成的任務參數數組,包含了n個小任務所需參數,因此是一個二維數組
  this.maxParallelProcess = maxParallelProcess // 最大進程並行數
  this.script = script // 任務腳本內容
  this.ready = false // 任務腳本是否構建完成
  try {
    this.buildTaskScript() // 根據模版建立任務腳本
  } catch (e) {
    throw new Error('建立任務腳本失敗' + e)
  }
}
/**
 * 啓動進程池
 */
ProcessPool.prototype.run = function() {
  if (this.ready) {
    let flag = this.hasWorkProcessRunning() // 判斷是否有工做進程正在執行或是不是第一次處理任務
    const taskTodoNum = this.taskParamsTodo.length
    if (flag === 1 && taskTodoNum) {
      // 初始階段,fork min{任務數,最大進程數} 的進程
      while (this.currentProcessNum < this.maxParallelProcess && this.currentProcessNum < taskTodoNum) {
        this.addProcess()
      }
    } else if (flag === 2 && !taskTodoNum) {
      // 有忙碌的工做進程,且任務已下發完
    } else if (flag === 2 && taskTodoNum) {
      // 有忙碌的工做進程,但還有任務需下發
      const processList = this.processList.values()
      for (const p of processList) {
        if (p.state !== 1 || p.state !== 4) {
          this.reuseProcess(p.id)
        }
      }
    } else if (flag === -1 && taskTodoNum) {
      // 全部工做進程空閒,但還有任務需下發
      const processList = this.processList.values()
      for (const p of processList) {
        if (p.state !== 1 || p.state !== 4) {
          this.reuseProcess(p.id)
        }
      }
    } else if (flag < 0 && !taskTodoNum) {
      // 全部進程空閒,且任務已下發完
      this.closeProcessPool()
    }
  }
}
/**
 * 生成任務腳本
 */
ProcessPool.prototype.buildTaskScript = function() {
  const taskDir = this.task
  const templateDir = `${__dirname}/task.js`
  const dependency = `${this.dependency}\n`
  const taskBody = this.script.toString()
  const templateReadStream = fs.createReadStream(templateDir)
  const taskWriteStream = fs.createWriteStream(taskDir)
  taskWriteStream.write(dependency)
  templateReadStream.pipe(taskWriteStream).write(taskBody)
  taskWriteStream.on('finish', () => {
    this.ready = true
    this.run()
  })
}
/**
 * 添加一個工做進程、指派任務且監聽IPC
 */
ProcessPool.prototype.addProcess = function() {
  if (this.currentProcessNum <= this.maxParallelProcess) {
    let workParam = this.taskParamsTodo.shift()
    const newProcess = new ProcessItem({task: this.task, workParam})
    this.processList.set(newProcess.id, newProcess)
    this.currentProcessNum++
    this.listenProcessState(newProcess, workParam)
  }
}
/**
 * 工做進程與主控進程IPC
 * @param workProcess
 * @param params
 */
ProcessPool.prototype.listenProcessState = function(workProcess, params) {
  workProcess.process.on('message', message => {
    if (message === 'finish') {
      workProcess.finishTask()
    } else if (message === 'failed') {
      this.taskParamsTodo.unshift(params)
      workProcess.unFinishTask()
    }
    this.run()
  })
}
/**
 * 監測當前是否有正在處理任務的工做進程
 * @returns {number}
 */
ProcessPool.prototype.hasWorkProcessRunning = function() {
  if (!this.processList) return -1
  if (this.processList && !this.processList.size) return 1 // 進程池剛啓動,尚無工做進程
  const processList = this.processList.values()
  for (const p of processList) {
    if (p.state === 1) return 2 // 有忙碌的進程
  }
  return -1
}
/**
 * 複用空閒進程
 * @param id,工做進程的pid
 */
ProcessPool.prototype.reuseProcess = function(id) {
  const workProcess = this.processList.get(id)
  if (this.taskParamsTodo.length && workProcess && workProcess.state !== 1) {
    const taskParam = this.taskParamsTodo.shift()
    workProcess.state = 1 // 設置爲忙碌
    workProcess.process.send(taskParam)
  }
}
/**
 * 關閉工做進程
 * @param id
 */
ProcessPool.prototype.removeProcess = function(id) {
  let workProcess = this.processList.get(id)
  if (workProcess) {
    workProcess.terminate()
    this.currentProcessNum--
  }
}
/**
 * 關閉全部工做進程
 */
ProcessPool.prototype.removeAllProcess = function() {
  const processItems = this.processList.values()
  for (const processItem of processItems) {
    processItem.terminate()
  }
}
/**
 * 關閉進程池
 */
ProcessPool.prototype.closeProcessPool = function() {
  this.removeAllProcess()
  this.ready = false
  this.processList = null
}
module.exports = ProcessPool
### ProcessItem.js
```javascript
const ChildProcess = require('child_process')
/**
 * 工做進程類
 */
function ProcessItem({ task = './task.js', workParam = [] }) {
  /**
   * state 狀態碼
   * 1: 忙碌
   * 2: 完成任務
   * 3: 未完成任務
   * 4: 不可用
   */
  if (!Array.isArray(workParam)) {
    throw new Error('workParam must be a array')
  }
  if (typeof task !== 'string') {
    throw new Error('workParam must be a string')
  }
  this.process = this.createProcess(task, workParam)
  this.state = 1
  this.id = this.process.pid
}
ProcessItem.prototype.finishTask = function() {
  if (this.state === 1) {
    this.state = 2
  }
}
ProcessItem.prototype.unFinishTask = function() {
  this.state = 3
}
ProcessItem.prototype.terminate = function() {
  try {
    this.process.kill()
    this.state = 4
  } catch (e) {
    throw new Error(`關閉進程${this.id}失敗`)
  }
}
ProcessItem.prototype.createProcess = function (task, workParam) {
  let childProcess = ChildProcess.fork(task, workParam)
  if (childProcess) {
    return childProcess
  } else {
    throw new Error('create process failed')
  }
}

module.exports = ProcessItem
複製代碼

task.js

/** * 當進程被子進程建立後,馬上執行工做任務 */
async function firstTask() {
  const workParam = process.argv.slice(2)
  await task(workParam)
}
/** * 完成任務,提示進程池已完成,工做進程空閒 */
async function finishTask() {
  await process.send('finish')
}
/** * 任務失敗,提示進程池未完成,歸還任務 */
async function unFinishTask() {
  await process.send('failed')
}
/** * 監聽進程池後續指派的任務 */
process.on('message', async workParam => {
  await task(workParam)
  try {
    await finishTask()
  } catch (e) {
    await unFinishTask()
  }
})
/** * 進程被建立時當即執行進程池指派的任務 * @returns {Promise<void>} */
async function main() {
  try {
    await firstTask()
    await finishTask()
  } catch (e) {
    await unFinishTask()
  }
}
main()
/** * @name 工做進程負責的任務 * @param workParam // 執行任務所需的參數數組 * 動態添加任務腳本到此文件尾部 */
複製代碼

util.js

function isCorrectType(name,value, type) {
  if (type === 'array') {
    if (!Array.isArray(value)) {
      throw new Error(`${name} must be a array`)
    }
  } else {
    if (typeof value !== type) {
      throw new Error(`${name} must be a ${type}`)
    }
  }
}

exports.isCorrectType = isCorrectType
複製代碼

📦 安裝

npm install node-process-pool
複製代碼

🔨使用

// 進程池使用示例
const ProcessPool = require('../src/ProcessPool')
const taskParams = []
for (let i = 0; i < 100; i++) {
  taskParams[i] = [i]
}
// 建立進程池實例
const processPool = new ProcessPool({
  maxParallelProcess: 50, // 支持最大進程並行數
  timeToClose: 60 * 1000, // 單個任務被執行最大時長
  dependency: `const path = require('path')`, // 任務腳本依賴
  workDir: __dirname, // 當前目錄
  taskName: 'test', // 任務腳本名稱
  script: async function task(workParam) {
    console.log(workParam)
  }, // 任務腳本內容
  taskParams // 須要執行的任務參數列表,二維數組
})
// 利用進程池進行處理大規模任務
processPool.run()
複製代碼
相關文章
相關標籤/搜索