Node是單線程模型,當須要執行多個獨立且耗時任務的時候,只能經過child_process來分發任務,提升處理速度;不像Java這種多線程語言,能夠經過線程來解決並行問題,Node只能建立進程來進行處理;可是進程相對於線程來講,開銷太大。一旦進程數較多時,CPU和內存消耗嚴重(影響我幹其餘的事情),因此作了一個簡易版的進程池,用來解決並行任務的處理。javascript
適用場景:相同且獨立且耗時的任務,例如,拿到某網站1000個用戶的帳號密碼,我如今想要他們的信息,爬他,node-process-pool很是適合。java
主控進程+工做進程羣node
ProcessPool是咱們管理進程的地方,咱們經過傳遞配置參數(任務腳本、腳本須要的參數、最大並行進程數)生成一個ProcessPool實例,而後經過這個實例來管控進程池。npm
ProcessItem是咱們進程池裏的進程對象,ProcessItem對象除了process的信息,咱們還增長了惟一標識和狀態(忙碌、任務失敗、任務完成、進程不可用)。數組
一批任務開始時,咱們會一次性fork到最大並行進程數,而後開始監控是否有工做進程完成任務,若是有工做進程完成了任務,那咱們就能夠複用這個工做進程,讓其執行新任務;若是任務執行失敗,咱們會將任務歸還給進程池,等待下一次分發。bash
因爲是相同且獨立且耗時的任務,因此當某個工做進程完成任務時,咱們頗有必要去檢測全部的工做進程是否已完成任務,而不僅是複用這個工做進程,咱們要一批一批的複用!!!多線程
由於差很少的時間開始執行相同的任務,當一個工做進程完成時,徹底能夠相信其餘工做進程也完成了任務,因此檢測一輪全部的工做進程,若空閒,給他們分配新任務。async
既然是批量分配任務,就不會存在只有某個工做進程在辛苦的運行,其餘工做進程袖手旁,哈哈哈哈哈,總得雨露均沾嘛。函數
因爲主控進程即要負責IPC又要不斷監聽批任務完成的狀況,目前我採用的方式是setInterval切割,讓IPC和監控能交替進行(ps:應該有更好的方法測試
咱們真的須要setInterval來去輪詢任務狀態嗎,何時才須要輪詢任務狀態而後調度? 工做進程狀態發生改變的時候,纔是咱們須要去檢測任務狀態和調度的時機;因此,咱們也能夠利用IPC來通知主控進程進行檢測任務狀態和調度。 ps:固然,還有更好的方法,嘿嘿
快
某大學某系統,12000個用戶,每一個用戶登錄須要兩次API訪問,兩次API訪問直接必須有0.5s間隔,而後將信息寫入文本。
單線程(未測試,理論上):12000*0.5 —> 至少須要6000s。
進程池(已測試,容量爲50的進程池):206s,平均每一個任務耗時17.1ms
效率提高:接近30倍
const fs = require('fs')
const ProcessItem = require('./ProcessItem')
const isCorrectType = require('./util').isCorrectType
/**
* 進程池類
* @param maxParallelProcess,最大並行工做進程數
* @param timeToClose,任務最長耗時時間
* @param taskParams,全部任務腳本須要的參數
* @param dependency,任務腳本所需依賴
* @param taskName, 工做腳本名稱
* @param script 腳本內容
* @param workDir 工做目錄
*/
function ProcessPool({
maxParallelProcess = 50,
timeToClose = 60 * 1000,
taskParams = [],
dependency = '',
workDir ='',
taskName = Date.now(),
script = '',}) {
try {
isCorrectType('task', script, 'function')
isCorrectType('maxParallelProcess', maxParallelProcess, 'number')
isCorrectType('timeToClose', timeToClose, 'number')
isCorrectType('dependency', dependency, 'string')
isCorrectType('workDir', workDir, 'string')
} catch (e) {
throw new Error('參數不合法' + e)
}
this.timeToClose = timeToClose
this.processList = new Map() // 使用Map存儲進程對象
this.currentProcessNum = 0 // 當前活動進程數
this.dependency = dependency // 任務腳本依賴
this.workDir = workDir // 主控函數工做目錄
this.taskName = taskName // 任務腳本名稱
this.task = `${this.workDir}/${this.taskName}.js`// 任務腳本路徑
this.taskParamsTodo = taskParams // 待完成的任務參數數組,包含了n個小任務所需參數,因此是一個二維數組
this.maxParallelProcess = maxParallelProcess // 最大進程並行數
this.script = script // 任務腳本內容
this.ready = false // 任務腳本是否構建完成
try {
this.buildTaskScript() // 根據模版建立任務腳本
} catch (e) {
throw new Error('建立任務腳本失敗' + e)
}
}
/**
* 啓動進程池
*/
ProcessPool.prototype.run = function() {
if (this.ready) {
let flag = this.hasWorkProcessRunning() // 判斷是否有工做進程正在執行或是不是第一次處理任務
const taskTodoNum = this.taskParamsTodo.length
if (flag === 1 && taskTodoNum) {
// 初始階段,fork min{任務數,最大進程數} 的進程
while (this.currentProcessNum < this.maxParallelProcess && this.currentProcessNum < taskTodoNum) {
this.addProcess()
}
} else if (flag === 2 && !taskTodoNum) {
// 有忙碌的工做進程,且任務已下發完
} else if (flag === 2 && taskTodoNum) {
// 有忙碌的工做進程,但還有任務需下發
const processList = this.processList.values()
for (const p of processList) {
if (p.state !== 1 || p.state !== 4) {
this.reuseProcess(p.id)
}
}
} else if (flag === -1 && taskTodoNum) {
// 全部工做進程空閒,但還有任務需下發
const processList = this.processList.values()
for (const p of processList) {
if (p.state !== 1 || p.state !== 4) {
this.reuseProcess(p.id)
}
}
} else if (flag < 0 && !taskTodoNum) {
// 全部進程空閒,且任務已下發完
this.closeProcessPool()
}
}
}
/**
* 生成任務腳本
*/
ProcessPool.prototype.buildTaskScript = function() {
const taskDir = this.task
const templateDir = `${__dirname}/task.js`
const dependency = `${this.dependency}\n`
const taskBody = this.script.toString()
const templateReadStream = fs.createReadStream(templateDir)
const taskWriteStream = fs.createWriteStream(taskDir)
taskWriteStream.write(dependency)
templateReadStream.pipe(taskWriteStream).write(taskBody)
taskWriteStream.on('finish', () => {
this.ready = true
this.run()
})
}
/**
* 添加一個工做進程、指派任務且監聽IPC
*/
ProcessPool.prototype.addProcess = function() {
if (this.currentProcessNum <= this.maxParallelProcess) {
let workParam = this.taskParamsTodo.shift()
const newProcess = new ProcessItem({task: this.task, workParam})
this.processList.set(newProcess.id, newProcess)
this.currentProcessNum++
this.listenProcessState(newProcess, workParam)
}
}
/**
* 工做進程與主控進程IPC
* @param workProcess
* @param params
*/
ProcessPool.prototype.listenProcessState = function(workProcess, params) {
workProcess.process.on('message', message => {
if (message === 'finish') {
workProcess.finishTask()
} else if (message === 'failed') {
this.taskParamsTodo.unshift(params)
workProcess.unFinishTask()
}
this.run()
})
}
/**
* 監測當前是否有正在處理任務的工做進程
* @returns {number}
*/
ProcessPool.prototype.hasWorkProcessRunning = function() {
if (!this.processList) return -1
if (this.processList && !this.processList.size) return 1 // 進程池剛啓動,尚無工做進程
const processList = this.processList.values()
for (const p of processList) {
if (p.state === 1) return 2 // 有忙碌的進程
}
return -1
}
/**
* 複用空閒進程
* @param id,工做進程的pid
*/
ProcessPool.prototype.reuseProcess = function(id) {
const workProcess = this.processList.get(id)
if (this.taskParamsTodo.length && workProcess && workProcess.state !== 1) {
const taskParam = this.taskParamsTodo.shift()
workProcess.state = 1 // 設置爲忙碌
workProcess.process.send(taskParam)
}
}
/**
* 關閉工做進程
* @param id
*/
ProcessPool.prototype.removeProcess = function(id) {
let workProcess = this.processList.get(id)
if (workProcess) {
workProcess.terminate()
this.currentProcessNum--
}
}
/**
* 關閉全部工做進程
*/
ProcessPool.prototype.removeAllProcess = function() {
const processItems = this.processList.values()
for (const processItem of processItems) {
processItem.terminate()
}
}
/**
* 關閉進程池
*/
ProcessPool.prototype.closeProcessPool = function() {
this.removeAllProcess()
this.ready = false
this.processList = null
}
module.exports = ProcessPool
### ProcessItem.js
```javascript
const ChildProcess = require('child_process')
/**
* 工做進程類
*/
function ProcessItem({ task = './task.js', workParam = [] }) {
/**
* state 狀態碼
* 1: 忙碌
* 2: 完成任務
* 3: 未完成任務
* 4: 不可用
*/
if (!Array.isArray(workParam)) {
throw new Error('workParam must be a array')
}
if (typeof task !== 'string') {
throw new Error('workParam must be a string')
}
this.process = this.createProcess(task, workParam)
this.state = 1
this.id = this.process.pid
}
ProcessItem.prototype.finishTask = function() {
if (this.state === 1) {
this.state = 2
}
}
ProcessItem.prototype.unFinishTask = function() {
this.state = 3
}
ProcessItem.prototype.terminate = function() {
try {
this.process.kill()
this.state = 4
} catch (e) {
throw new Error(`關閉進程${this.id}失敗`)
}
}
ProcessItem.prototype.createProcess = function (task, workParam) {
let childProcess = ChildProcess.fork(task, workParam)
if (childProcess) {
return childProcess
} else {
throw new Error('create process failed')
}
}
module.exports = ProcessItem
複製代碼
/** * 當進程被子進程建立後,馬上執行工做任務 */
async function firstTask() {
const workParam = process.argv.slice(2)
await task(workParam)
}
/** * 完成任務,提示進程池已完成,工做進程空閒 */
async function finishTask() {
await process.send('finish')
}
/** * 任務失敗,提示進程池未完成,歸還任務 */
async function unFinishTask() {
await process.send('failed')
}
/** * 監聽進程池後續指派的任務 */
process.on('message', async workParam => {
await task(workParam)
try {
await finishTask()
} catch (e) {
await unFinishTask()
}
})
/** * 進程被建立時當即執行進程池指派的任務 * @returns {Promise<void>} */
async function main() {
try {
await firstTask()
await finishTask()
} catch (e) {
await unFinishTask()
}
}
main()
/** * @name 工做進程負責的任務 * @param workParam // 執行任務所需的參數數組 * 動態添加任務腳本到此文件尾部 */
複製代碼
function isCorrectType(name,value, type) {
if (type === 'array') {
if (!Array.isArray(value)) {
throw new Error(`${name} must be a array`)
}
} else {
if (typeof value !== type) {
throw new Error(`${name} must be a ${type}`)
}
}
}
exports.isCorrectType = isCorrectType
複製代碼
npm install node-process-pool
複製代碼
// 進程池使用示例
const ProcessPool = require('../src/ProcessPool')
const taskParams = []
for (let i = 0; i < 100; i++) {
taskParams[i] = [i]
}
// 建立進程池實例
const processPool = new ProcessPool({
maxParallelProcess: 50, // 支持最大進程並行數
timeToClose: 60 * 1000, // 單個任務被執行最大時長
dependency: `const path = require('path')`, // 任務腳本依賴
workDir: __dirname, // 當前目錄
taskName: 'test', // 任務腳本名稱
script: async function task(workParam) {
console.log(workParam)
}, // 任務腳本內容
taskParams // 須要執行的任務參數列表,二維數組
})
// 利用進程池進行處理大規模任務
processPool.run()
複製代碼