nodejs篇-進程與集羣cluster

咱們啓動一個服務、運行一個實例,就是開一個服務進程,Node.js 裏經過 node app.js 開啓一個服務進程,多進程就是進程的複製(fork),fork 出來的每一個進程都擁有本身的獨立空間地址、數據棧,一個進程沒法訪問另一個進程裏定義的變量、數據結構,只有創建了 IPC 通訊,進程之間纔可數據共享。

child_process

node.js中能夠經過下面四種方式建立子進程:html

  • child_process.spawn(command, args)
  • child_process.exec(command, options)
  • child_process.execFile(file, args[, callback])
  • child_process.fork(modulePath, args)

spawn

const {spawn} = require("child_process");
// 建立 文件
spawn("touch",["index.js"]);

spawn()會返回child-process子進程實例:node

const {spawn} = require("child_process");
// cwd 指定子進程的工做目錄,默認當前目錄
const child = spawn("ls",["-l"],{cwd:__dirname});
// 輸出進程信息
child.stdout.pipe(process.stdout);
console.log(process.pid,child.pid);

子進程一樣基於事件機制(EventEmitter API),提供了一些事件:git

  • exit:子進程退出時觸發,能夠得知進程退出狀態(code和signal)
  • disconnect:父進程調用child.disconnect()時觸發
  • error:子進程建立失敗,或被kill時觸發
  • close:子進程的stdio流(標準輸入輸出流)關閉時觸發
  • message:子進程經過process.send()發送消息時觸發,父子進程消息通訊
close與exit的區別主要體如今多進程共享同一stdio流的場景,某個進程退出了並不意味着stdio流被關閉了

子進程具備可讀流的特性,利用可讀流實現find . -type f | wc -l,遞歸統計當前目錄文件數量:github

const { spawn } = require('child_process');

const find = spawn('find', ['.', '-type', 'f']);
const wc = spawn('wc', ['-l']);

find.stdout.pipe(wc.stdin);

wc.stdout.on('data', (data) => {
  console.log(`Number of files ${data}`);
});

exec

spawn()exec()方法的區別在於,exec()不是基於stream的,exec()會將傳入命令的執行結果暫存到buffer中,再整個傳遞給回調函數。shell

spawn()默認不會建立shell去執行命令(性能上會稍好),而exec()方法執行是會先建立shell,因此能夠在exec()方法中傳入任意shell腳本。segmentfault

const {exec} = require("child_process");

exec("node -v",(error,stdout,stderr)=>{
    if (error) console.log(error);
    console.log(stdout)
})
exec()方法由於能夠傳入任意shell腳本因此存在安全風險。

spawn()方法默認不會建立shell去執行傳入的命令(因此性能上稍微好一點),不過能夠經過參數實現:安全

const { spawn } = require('child_process');
const child = spawn('node -v', {
  shell: true
});
child.stdout.pipe(process.stdout);

這種作法的好處是,既能支持shell語法,也能經過stream IO進行標準輸入輸出。服務器

execFile

const {execFile} = require("child_process");

execFile("node",["-v"],(error,stdout,stderr)=>{
    console.log({ error, stdout, stderr })
    console.log(stdout)
})

經過可執行文件路徑執行:數據結構

const {execFile} = require("child_process");

execFile("/Users/.nvm/versions/node/v12.1.0/bin/node",["-v"],(error,stdout,stderr)=>{
    console.log({ error, stdout, stderr })
    console.log(stdout)
})

fork

fork()方法能夠用來建立Node進程,而且父子進程能夠互相通訊併發

//master.js
const {fork} = require("child_process");
const worker = fork("worker.js");

worker.on("message",(msg)=>{
    console.log(`from worder:${msg}`)
});
worker.send("this is master");

// worker.js
process.on("message",(msg)=>{
    console.log("worker",msg)
});
process.send("this is worker");

利用fork()能夠用來處理計算量大,耗時長的任務:

const longComputation = () => {
  let sum = 0;
  for (let i = 0; i < 1e10; i++) {
    sum += i;
  };
  return sum;
};

longComputation方法拆分到子進程中,這樣主進程的事件循環不會被耗時計算阻塞:

const http = require('http');
const { fork } = require('child_process');

const server = http.createServer();

server.on('request', (req, res) => {
  if (req.url === '/compute') {
    // 將計算量大的任務,拆分到子進程中處理
    const compute = fork('compute.js');
    compute.send('start');
    compute.on('message', sum => {
        // 收到子進程任務後,返回
      res.end(`Sum is ${sum}`);
    });
  } else {
    res.end('Ok')
  }
});

server.listen(3000);

進程間通訊IPC

每一個進程各自有不一樣的用戶地址空間,任何一個進程的全局變量在另外一個進程中都看不到,因此進程之間要交換數據必須經過內核,在內核中開闢一塊緩衝區,進程1把數據從用戶空間拷到內核緩衝區,進程2再從內核緩衝區把數據讀走,內核提供的這種機制稱爲 進程間通訊(IPC,InterProcess Communication)

進程之間能夠藉助內置的IPC機制通訊

父進程:

  • 接收事件process.on('message')
  • 發送信息給子進程master.send()

子進程:

  • 接收事件process.on('message')
  • 發送信息給父進程process.send()

fork 多進程

nodejs中的多進程是 多進程 + 單線程 的模式
// master.js. 
process.title = 'node-master'
const net = require("net");
const {fork} = require("child_process");

const handle = net._createServerHandle("127.0.0.1",3000);

for(let i=0;i<4;i++){
    fork("./worker.js").send({},handle);
}

// worker.js
process.title = 'worker-master';

const net = require("net");

process.on("message",(msg,handle)=>start(handle));

const buf = "hello nodejs";
const res= ["HTTP/1.1 200 ok","content-length:"+buf.length].join("\r\n")+"\r\n\r\n"+buf;

function start(server){
    server.listen();
    let num=0;
    server.onconnection = function(err,handle){
        num++;
        console.log(`worker ${process.pid} num ${num}`);
        let socket = new net.Socket({handle});
        socket.readable = socket.writable = true
        socket.end(res);
    }
}

運行node master.js,這裏能夠使用測試工具 Siege

siege -c 20 -r 10 http://localhost:3000

-c 併發量,併發數爲20人 -r 是重複次數, 重複10次

這種建立進程的特色是:

  • 在一個服務上同時啓動多個進程
  • 每一個進程運行一樣的代碼(start方法)
  • 多個進程能夠同時監聽一個端口(3000)

不過每次請求過來交給哪一個worker處理,master並不清楚,咱們更但願master可以掌控全局,將請求指定給worker,咱們作下面的改造:

//master.js
process.title = 'node-master'
const net =require("net");
const {fork} = require("child_process");

// 定義workers變量,保存子進程worker
let workers = [];
for(let i=0;i<4;i++){
    workers.push(fork("./worker.js"));
}
const handle = net._createServerHandle("0.0.0.0", 3000)
handle.listen();
// master控制請求
handle.onconnection = function(err,handle){
    let worker = workers.pop();
    // 將請求傳遞給子進程
    worker.send({},handle);
    workers.unshift(worker);
}

// worker.js
process.title = 'worker-master';
const net = require("net")
process.on("message", (msg, handle) => start(handle))

const buf = "hello nodejs"
const res = ["HTTP/1.1 200 ok", "content-length:" + buf.length].join("\r\n") + "\r\n\r\n" + buf

function start(handle) {
  console.log(`get a connection on worker,pid = %d`, process.pid)
  let socket = new net.Socket({ handle })
  socket.readable = socket.writable = true
  socket.end(res)
}

Cluster 多進程

Node.js 官方提供的 Cluster 模塊不只充分利用機器 CPU 內核開箱即用的解決方案,還有助於 Node 進程增長可用性的能力, Cluster模塊是對多進程服務能力的封裝。
// master.js
const cluster = require("cluster");
const numCPUS = require("os").cpus().length;

if(cluster.isMaster){
    console.log(`master start...`)
    for(let i=0;i<numCPUS;i++){
        cluster.fork();
    };

    cluster.on("listening",(worker,address)=>{
        console.log(`master listing worker pid ${worker.process.pid} address port:${address.port}`)
    })

}else if(cluster.isWorker){
    require("./wroker.js")
}
//wroker.js
const http = require("http");
http.createServer((req,res)=>res.end(`hello`)).listen(3000)

進程重啓和守護

進程重啓

爲了增長服務器的可用性,咱們但願實例在出現崩潰或者異常退出時,可以自動重啓。

//master.js
const cluster = require("cluster")
const numCPUS = require("os").cpus().length

if (cluster.isMaster) {
  console.log("master start..")
  for (let i = 0; i < numCPUS; i++) {
      cluster.fork()
    }
  cluster.on("listening", (worker, address) => {
    console.log("listening worker pid " + worker.process.pid)
  })
  cluster.on("exit", (worker, code, signal) => {
      // 子進程出現異常或者奔潰退出
    if (code !== 0 && !worker.exitedAfterDisconnect) {
      console.log(`工做進程 ${worker.id} 崩潰了,正在開始一個新的工做進程`)
      // 從新開啓子進程
      cluster.fork()
    }
  })
} else if (cluster.isWorker) {
  require("./server")
}
const http = require("http")
const server = http.createServer((req, res) => {
    // 隨機觸發錯誤
  if (Math.random() > 0.5) {
      throw new Error(`worker error pid=${process.pid}`)
  }
  res.end(`worker pid:${process.pid} num:${num}`)
}).listen(3000)

若是請求拋出異常而結束子進程,主進程可以監聽到結束事件,重啓開啓子進程。

上面的重啓只是簡單處理,真正項目中要考慮到的就不少了,這裏能夠參考egg的多進程模型和進程間通信

下面是來自文章Node.js進階之進程與線程更全面的例子:

// master.js
const {fork} = require("child_process");
const numCPUS = require("os").cpus().length;

const server = require("net").createServer();
server.listen(3000);
process.title="node-master";

const workers = {};
const createWorker = ()=>{
    const worker = fork("worker.js");
    worker.on("message",message=>{
        if(message.act==="suicide"){
            createWorker();
        }
    })

    worker.on("exit",(code,signal)=>{
        console.log('worker process exited,code %s signal:%s',code,signal);
        delete workers[worker.pid];
    });

    worker.send("server",server);
    workers[worker.pid] = worker;
    console.log("worker process created,pid %s ppid:%s", worker.pid, process.ppid)
}

for (let i = 0; i < numCPUS; i++) {
  createWorker()
}

process.once("SIGINT",close.bind(this,"SIGINT")); // kill(2) Ctrl+C
process.once("SIGQUIT", close.bind(this, "SIGQUIT")) // kill(3) Ctrl+l
process.once("SIGTERM", close.bind(this, "SIGTERM")) // kill(15) default
process.once("exit", close.bind(this))

function close(code){
    console.log('process exit',code);
    if(code!=0){
        for(let pid in workers){
            console.log('master process exit,kill worker pid:',pid);
            workers[pid].kill("SIGINT");
        }
    };
    process.exit(0);
}
//worker.js
const http=require("http");
const server = http.createServer((req,res)=>{
    res.writeHead(200,{"Content-Type":"text/plain"});
    res.end(`worker pid:${process.pid},ppid:${process.ppid}`)
    throw new Error("worker process exception!");
});

let worker;
process.title = "node-worker";
process.on("message",(message,handle)=>{
    if(message==="server"){
        worker = handle;
        worker.on("connection",socket=>{
            server.emit("connection",socket)
        })
    }
})
process.on("uncaughtException",(error)=>{
    console.log('some error')
    process.send({act:"suicide"});
    worker.close(()=>{
        console.log(process.pid+" close")
        process.exit(1);
    })
})

這個例子考慮更加周到,經過uncaughtException捕獲子進程異常後,發送信息給主進程重啓,並在連接關閉後退出。

進程守護

pm2能夠使服務在後臺運行不受終端的影響,這裏主要經過兩步處理:

  • options.detached:爲true時運行子進程在父進程退出後繼續運行
  • unref() 方法能夠斷絕跟父進程的關係,使父進程退出後子進程不會跟着退出
const { spawn } = require("child_process")

function startDaemon() {
  const daemon = spawn("node", ["daemon.js"], {
    // 當前工做目錄
    cwd: __dirname,
    // 做爲獨立進程存在
    detached: true,
    // 忽視輸入輸出流
    stdio: "ignore",
  })
  console.log(`守護進程 ppid:%s pid:%s`, process.pid, daemon.pid)
  // 斷絕父子進程關係
  daemon.unref()
}

startDaemon()
// daemon.js
const fs = require("fs")
const {Console} = require("console");
// 輸出日誌
const logger = new Console(fs.createWriteStream("./stdout.log"),fs.createWriteStream("./stderr.log"));
// 保持進程一直在後臺運行
setInterval(()=>{
    logger.log("daemon pid:",process.pid,"ppid:",process.ppid)
},1000*10);

// 生成關閉文件
fs.writeFileSync("./stop.js", `process.kill(${process.pid}, "SIGTERM")`)

參考連接

相關文章
相關標籤/搜索