Node.js:進程、子進程與cluster多核處理模塊

一、process對象

process對象就是處理與進程相關信息的全局對象,不須要require引用,且是EventEmitter的實例。
獲取進程信息
process對象提供了不少的API來獲取當前進程的運行信息,例如進程運行時間、內存佔用、CPU佔用、進程號等,具體使用以下所示:javascript

/**
 * 獲取當前Node.js進程信息
 */
function getProcessInfo(){
    const memUsage = process.memoryUsage();//內存使用
    const cpuUsage = process.cpuUsage();//cpu使用
    const cfg = process.config;//編譯node.js的配置信息
    const env = process.env;//用戶環境
    const pwd = process.cwd();//工做目錄
    const execPath = process.execPath;//node.exe目錄
    const pf = process.platform;//運行nodejs的操做系統平臺
    const release = process.release;//nodejs發行版本
    const pid = process.pid;//nodejs進程號
    const arch = process.arch;//運行nodejs的操做系統架構
    const uptime = process.uptime();//nodejs進程運行時間
    return {
        memUsage,
        cpuUsage,
        cfg,
        env,
        pwd,
        execPath,
        pf,
        release,
        pid,
        arch,
        uptime
    }
}
console.log(getProcessInfo());

process.argv獲取命令行指令參數
使用node命令執行某個腳本時,能夠在指令末尾加上參數,process.argv返回一個數組,第一個元素是process.execPath,第二個元素是被執行腳本的路徑,以下所示:java

var args = process.argv;
if(!args.length){
    process.exit(0);
}else{
    console.log(args.slice(2).join('\n'));
}

執行結果以下:node

E:\developmentdocument\nodejsdemo>node process-example.js a b c
a
b
cmysql

process事件
一、exit事件,當調用process.exit()方法或者事件循環隊列沒有任何工做時便會觸發該事件,監聽的回調函數的邏輯必須是同步的,不然不會執行。以下所示:算法

process.on('exit',(code)=>{
    console.log(code);
    setTimeout(()=>console.log(code),1000);//不會執行
});

二、uncaughtException事件,當一個沒有被捕獲的異常冒泡到事件隊列就會觸發該事件,默認打印錯誤信息並進程退出,當uncaughtException事件有一個以上的 listener 時,會阻止 Node 結束進程。可是這種作法有內存泄露的風險,因此千萬不要這麼作。以下所示:sql

process.on('uncaughtException',(err)=>{
    console.log(err);
});
setTimeout(()=>console.log('nihao'),1000);//1秒後會執行
a();
console.log('hehe');//不會執行

三、message事件,進程間使用childProcess.send()方法進行通訊,就會觸發該事件,使用以下所示:shell

const cp = require('child_process').fork(`${__dirname}/test.js`);
cp.on('message',(message)=>{
    console.log('got the child message:'+message);
});
cp.send('hello child!');
//test.js
process.on('message',(message)=>{
    console.log('got the parent message:'+message);
});
process.send('hello parent');

執行結果以下:npm

E:\developmentdocument\nodejsdemo>node process-example.js
got the child message:hello parent
got the parent message:hello child!canvas

process.nextTick方法
將回調函數添加到下一次事件緩存隊列中,當前事件循環都執行完畢後,全部的回調函數都會被執行,以下所示:數組

console.log('hello world');
setTimeout(()=>console.log('settimeout'),10);
process.nextTick(()=>console.log('nexttick'));
console.log('hello nodejs');

執行結果以下所示:

E:\developmentdocument\nodejsdemo>node process-example.js
hello world
hello nodejs
nexttick
settimeout

二、child_process模塊

經過child_process模塊能夠建立子進程,從而實現多進程模式,更好地利用CPU多核計算資源。該模塊提供了四種方法建立子進程,分別是child_process.spawn()child_process.exec()child_process.execFile()child_process.fork(),這四個方法都返回一個childProcess對象,該對象實現了EventEmitter的接口,帶有stdout,stdin,stderr的對象。
child_process.spawn(command[, args][, options])方法
該方法使用command指令建立一個新進程,參數含義以下:

  • command,帶執行的命令
  • args,命令行參數數組
  • options,可選參數,爲一個對象

options參數主要擁有如下屬性:

  • cwd,當前工做目錄,若沒有指定,則使用當前工做目錄
  • env,命令執行環境,默認爲process.env
  • argv0,若是沒有指定command,該值會被設置爲command
  • stdio,子進程標準IO配置

返回值爲childProcess對象,使用以下所示:

const child_process = require('child_process');
const iconv = require('iconv-lite');
const spawn = child_process.spawn;

const buffArr = [];
let buffLen = 0;

const dirs = spawn('cmd.exe',['/C','dir']);
dirs.stdout.on('data',(data)=>{
    buffArr.push(data);
    buffLen+=data.length;
});
dirs.stderr.on('end',()=>{
    console.log(iconv.decode(Buffer.concat(buffArr,buffLen),'GBK'));
});
dirs.stderr.on('error',(err)=>{
    console.log(err);
});
dirs.on('close',(code)=>{
    console.log(code);
});

執行結果以下:

正在 Ping www.qq.com [14.17.32.211] 具備 32 字節的數據:
來自 14.17.32.211 的回覆: 字節=32 時間=2ms TTL=55
來自 14.17.32.211 的回覆: 字節=32 時間=2ms TTL=55
來自 14.17.32.211 的回覆: 字節=32 時間=3ms TTL=55
來自 14.17.32.211 的回覆: 字節=32 時間=3ms TTL=55
14.17.32.211 的 Ping 統計信息:
數據包: 已發送 = 4,已接收 = 4,丟失 = 0 (0% 丟失),
往返行程的估計時間(以毫秒爲單位):
最短 = 2ms,最長 = 3ms,平均 = 2ms

若是輸出碰到亂碼的時候,能夠藉助iconv-lite進行轉碼便可,使用npm install iconv-lite --save
child_process.exec(command[, options][, callback])方法
新建一個shell執行command指令,並緩存產生的輸出結果,方法參數含義以下:

  • command,待執行的指令,帶獨立的參數
  • options,對象,擁有cwd,env,encoding,shell,maxBuffer等屬性
  • callback,回調函數,參數爲(error,stdout,stderr),若是執行成功,error則爲null,不然爲Error的實例。

返回值也是childProcess對象,該方法與child_process.spawn()方法的區別在於,使用回調函數得到子進程的輸出數據,會先將數據緩存在內存中,等待子進程執行完畢以後,再將全部的數據buffer交給回調函數,若是該數據大小超過了maxBuffer(默認爲200KB),則會拋出錯誤。雖然能夠經過參數maxBuffer來設置子進程的緩存大小,可是不建議這麼作,由於exec()方法不合適建立返回大量數據的進程,應該就返回一些狀態碼。
使用以下所示:

exec('netstat /ano | find /C /I "tcp"',(err,stdout,stderr)=>{
    if(err) throw err;
    console.log(stdout);
    console.log(stderr);
});

child_process.execFile(file[, args][, options][, callback])方法
相似與child_process.exec()方法,不一樣之處是不會建立一個shell,而是直接使用指定的可執行文件建立一個新進程,更有效率,使用以下所示:

execFile('mysql',['--version'],(err,stdout,stderr)=>{
    if(err) throw err;
    console.log(stdout);
    console.log(stderr);
});

child_process.fork(modulePath[, args][, options])方法
建立一個子進程執行module,並與子進程創建IPC通道進行通訊,方法返回一個childProcess對象,做爲子進程的句柄,經過send()方法向子進程發送信息,監聽message事件接收子進程的消息,子進程亦同理。使用以下所示:

const fibonacci = fork('./fibonacci.js');
const n = 10;
fibonacci.on('message',(msg)=>{
    console.log(`fibonacci ${n} is:${msg.result}`);
});
fibonacci.send({n:n});
//fibonacci.js
function fibonacci(n,ac1=1,ac2=1){
    return n<=2?ac2:fibonacci(n-1,ac2,ac1+ac2);
}
process.on('message',(msg)=>{
    process.send({result:fibonacci(msg.n)})
});

child.disconnect()方法
關閉父子進程之間的IPC通道,以後父子進程不能執行通訊,並會當即觸發disconnect事件,使用以下所示:

const fibonacci = fork('./fibonacci.js');
const n = 10;
fibonacci.on('message',(msg)=>{
    console.log(`fibonacci ${n} is:${msg.result}`);
    fibonacci.disconnect();
});
fibonacci.on('disconnect',()=>{
    console.log('與子進程斷開鏈接.');
});
fibonacci.send({n:n});
//fibonacci.js
function fibonacci(n,ac1=1,ac2=1){
    return n<=2?ac2:fibonacci(n-1,ac2,ac1+ac2);
}
process.on('message',(msg)=>{
    process.send({result:fibonacci(msg.n)})
});

執行結果:

fibonacci 10 is:55
與子進程斷開鏈接.

子進程主要用來作CPU密集型的工做,如fibonacci數列的計算,canvas像素處理等。

三、cluster多核處理模塊

Node.js是單線程運行的,無論你的機器有多少個內核,只能用到其中的一個,爲了能利用多核計算資源,須要使用多進程來處理應用。cluster模塊讓咱們能夠很容易地建立一個負載均衡的集羣,自動分配CPU多核資源。
使用以下所示:

const cluster = require('cluster');
const http = require('http');
const cpuNums = require('os').cpus().length;
if(cluster.isMaster){
    for(let i=0;i<cpuNums;i++){
        cluster.fork();
    }
    cluster.on('exit',(worker)=>{
        console.log(`worker${worker.id} exit.`)
    });
    cluster.on('fork',(worker)=>{
        console.log(`fork:worker${worker.id}`)
    });
    cluster.on('listening',(worker,addr)=>{
        console.log(`worker${worker.id} listening on ${addr.address}:${addr.port}`)
    });
    cluster.on('online',(worker)=>{
        console.log(`worker${worker.id} is online now`)
    });
}else{
    http.createServer((req,res)=>{
        console.log(cluster.worker.id);
        res.writeHead(200);
        res.end('hello world');
    }).listen(3000,'127.0.0.1');
}

執行結果:

fork:worker1
fork:worker2
fork:worker3
fork:worker4
worker1 is online now
worker2 is online now
worker3 is online now
worker1 listening on 127.0.0.1:3000
worker4 is online now
worker2 listening on 127.0.0.1:3000
worker3 listening on 127.0.0.1:3000
worker4 listening on 127.0.0.1:3000

cluster工做原理
如上代碼所示,master是控制進程,worker是執行進程,每一個worker都是使用child_process.fork()函數建立的,所以worker與master之間經過IPC進行通訊。
當worker調用用server.listen()方法時會向master進程發送一個消息,讓它建立一個服務器socket,作好監聽並分享給該worker。若是master已經有監聽好的socket,就跳過建立和監聽的過程,直接分享。換句話說,全部的worker監聽的都是同一個socket,當有新鏈接進來的時候,由負載均衡算法選出一個worker進行處理。
cluster對象的屬性和方法
cluster.isMaster:標誌是否master進程,爲true則是
cluster.isWorker:標誌是否worker進程,爲true則是
cluster.worker:得到當前的worker對象,在master進程中使用無效
cluster.workers: 得到集羣中全部存活的worker對象,子啊worker進程使用無效
cluster.fork(): 建立工做進程worker
cluster.disconnect([callback]): 斷開全部worker進程通訊
*cluster對象的事件
Event: 'fork': 監聽建立worker進程事件
Event: 'online': 監聽worker建立成功事件
Event: 'listening': 監聽worker進程進入監聽事件
Event: 'disconnect': 監聽worker斷開事件
Event: 'exit': 監聽worker退出事件
Event: 'message':監聽worker進程發送消息事件
使用以下所示:

const cluster = require('cluster');
const http = require('http');
const cpuNums = require('os').cpus().length;
/*process.env.NODE_DEBUG='net';*/
if(cluster.isMaster){
    for(let i=0;i<cpuNums;i++){
        cluster.fork();
    }
    cluster.on('exit',(worker)=>{
        console.log(`worker${worker.id} exit.`)
    });
    cluster.on('fork',(worker)=>{
        console.log(`fork:worker${worker.id}`)
    });

    cluster.on('disconnect',(worker)=>{
        console.log(`worker${worker.id} is disconnected.`)
    });
    cluster.on('listening',(worker,addr)=>{
        console.log(`worker${worker.id} listening on ${addr.address}:${addr.port}`)
    });
    cluster.on('online',(worker)=>{
        console.log(`worker${worker.id} is online now`)
    });

    cluster.on('message',(worker,msg)=>{
        console.log(`got the worker${worker.id}'s msg:${msg}`);
    });

    Object.keys(cluster.workers).forEach((id)=>{
        cluster.workers[id].send(`hello worker${id}`);
    });
}else{
    process.on('message',(msg)=>{
        console.log('worker'+cluster.worker.id+' got the master msg:'+msg);
    });
    process.send('hello master, I am worker'+cluster.worker.id);
    http.createServer((req,res)=>{
        res.writeHead(200);
        res.end('hello world'+cluster.worker.id);
    }).listen(3000,'127.0.0.1');
}

執行結果以下:

fork:worker1
fork:worker2
fork:worker3
fork:worker4
worker1 is online now
worker2 is online now
got the worker1's msg:hello master, I am worker1
worker1 got the master msg:hello worker1
worker1 listening on 127.0.0.1:3000
worker4 is online now
got the worker2's msg:hello master, I am worker2
worker2 got the master msg:hello worker2
worker3 is online now
worker2 listening on 127.0.0.1:3000
got the worker4's msg:hello master, I am worker4
worker4 got the master msg:hello worker4
worker4 listening on 127.0.0.1:3000
got the worker3's msg:hello master, I am worker3
worker3 got the master msg:hello worker3
worker3 listening on 127.0.0.1:3000

在win7環境下,cluster負載均衡狀況,以下所示:
服務端代碼:

const cluster = require('cluster');
const http = require('http');
const cpuNums = require('os').cpus().length;

if(cluster.isMaster){
    var i = 0;
    const widArr = [];
    for(let i=0;i<cpuNums;i++){
        cluster.fork();
    }
    cluster.on('message',(worker,msg)=>{
        if(msg === 'ex'){
            i++;
            widArr.push(worker.id);
            (i>=80)&&(process.exit(0));
        }
    });
    process.on('exit', (code) => {
        console.log(analyzeArr(widArr));
    });
    //統計每一個worker被調用的次數
    function analyzeArr(arr) {
        let obj = {};
        arr.forEach((id, idx, arr) => {
            obj['work' + id] = obj['work' + id] !== void 0 ? obj['work' + id] + 1 : 1;
        });
        return obj;
    }

}else{
    http.createServer((req,res)=>{
        console.log(`worker${cluster.worker.id}`);
        process.send('ex');
        res.writeHead(200);
        res.end('hello world'+cluster.worker.id);
    }).listen(3000,'127.0.0.1');
}

使用Apache的AB命令進行測試,併發40,總共80:C:\Users\learn>ab -c 40 -n 80 http://127.0.0.1:3000/
測試結果:

{ work4: 19, work3: 20, work1: 19, work2: 22 }

相關文章
相關標籤/搜索