前言:
最近由於項目須要,開始開始接觸Node.js這塊相關的內容,其中就包括pm2這個東東。爲了更加深刻的瞭解pm2的原理,因此抽空簡單的看了一下pm2的代碼。在這裏也跟你們分享一下看完的收穫,水平有限,歡迎指正和探討。node
本文使用的pm2版本是4.2.3
git
下載完pm2代碼以後,簡單快速啓動demogithub
pm2 start ./examples/api-pm2/http.js -i 2
執行完這個命令以後,咱們就能順利的利用咱們的pm2啓動到咱們的nodejs腳本,打開頁面 http://127.0.0.1:8000 ,就能夠看到頁面上的 ‘hey’ 了。shell
以上就是最最最簡單的啓動方式,後面咱們也會根據這個簡單的命令來分析其中的一些流程和步驟json
首先咱們啓動pm2,就會執行bin目錄下的pm2這個文件,隨後require到lib/binaries/CLI.js
這個文件,並開始執行。
打開這個文件,咱們會看到這個文件確實有點長,因此咱們簡化一下,把這個文件分爲3個步驟:1)初始化;2)connect;3)處理相關的命令;windows
初始化很好理解,就是對用戶的參數作一些規範化的處理以保證後續的流程,其次是實例化了pm2對象,這個對象由lib/API.js
中的API這個類提供。api
Common.determineSilentCLI(); Common.printVersion(); var pm2 = new PM2(); PM2ioHandler.usePM2Client(pm2)
在實例化pm2的過程當中,也在這個實例上掛載了client屬性session
this.Client = new Client({ pm2_home: that.pm2_home, conf: this._conf, secret_key: this.secret_key, public_key: this.public_key, daemon_mode: this.daemon_mode, machine_name: this.machine_name });
這裏先不用疑惑這些實例和屬性是作什麼的,後續用到了就會作進一步分析
初始化的大概流程就差很少是這樣,平平無奇,由於複雜的在後面(手動狗頭)app
接下來就是connect
過程:
這個過程我理解爲就是建立通訊系統的過程。
先上個connect流程圖:
入口就是在lib/binaries/CLI.js
中執行的pm2.connect方法框架
pm2.connect(function() { debug('Now connected to daemon'); if (process.argv.slice(2)[0] === 'completion') { checkCompletion(); //Close client if completion related installation var third = process.argv.slice(3)[0]; if ( third == null || third === 'install' || third === 'uninstall') pm2.disconnect(); } else { beginCommandProcessing(); }
隨後咱們就進入了pm2實例的connect方法,咱們發現這裏面最重要的就是調用了client的start方法,因此咱們進入到lib/client.js
文件康康start方法到底作了什麼事情,進來發現這個方法啥也沒幹,就僅僅是執行了pingDeamon這個方法,因此咱們從這個client實例的pingDeamon方法開始追蹤流程
Client.prototype.pingDaemon = function pingDaemon(cb) { var req = axon.socket('req'); var client = new rpc.Client(req); var that = this; client.sock.once('reconnect attempt', function() { client.sock.close(); process.nextTick(function() { return cb(false); }); }); client.sock.once('error', function(e) { if (e.code === 'EACCES') { fs.stat(that.conf.DAEMON_RPC_PORT, function(e, stats) { if (stats.uid === 0) { console.error(that.conf.PREFIX_MSG_ERR + 'Permission denied, to give access to current user:'); } else console.error(that.conf.PREFIX_MSG_ERR + 'Permission denied, check permissions on ' + that.conf.DAEMON_RPC_PORT); process.exit(1); }); } else console.error(e.message || e); }); client.sock.once('connect', function() { client.sock.once('close', function() { return cb(true); }); client.sock.close(); }); req.connect(this.rpc_socket_file); };
從這裏面咱們發現,原來使用到了TJ大神的axon的socket庫來進行通訊。咱們看到這裏使用到的是req/rep模式,建立一個rpc客戶端並嘗試鏈接到server。沒有成功,這個時候接受到close消息以後會進入到'reconnect attempt'的事件回調。在上面的代碼中咱們也看到,在執行connect以前,client.sock監聽了了各類事件'reconnect attempt','error','connect'(ps: pm2又對axon作了一個簡單的封裝: pm2-axon-rpc
,但對主流程並沒有太大幹擾)。
進到'reconnect attempt'的回調後,關閉sock,並在下個tick中執行pingDaemon函數傳入的callback(這裏的cb)。。。慢着,這個回調是什麼來着????
如今回看這個pingDaemon函數的調用代碼
this.pingDaemon(function(daemonAlive) { if (daemonAlive === true) // 已經被我刪除,由於這裏不講這個邏輯 /** * No Daemon mode */ if (that.daemon_mode === false) { // 已經被我刪除,由於這裏不講這個邏輯 ... } /** * Daemon mode */ that.launchDaemon(function(err, child) { if (err) { Common.printError(err); return cb ? cb(err) : process.exit(that.conf.ERROR_EXIT); } if (!process.env.PM2_DISCRETE_MODE) Common.printOut(that.conf.PREFIX_MSG + 'PM2 Successfully daemonized'); that.launchRPC(function(err, meta) { return cb(null, { daemon_mode : that.conf.daemon_mode, new_pm2_instance : true, rpc_socket_file : that.rpc_socket_file, pub_socket_file : that.pub_socket_file, pm2_home : that.pm2_home }); }); }); });
因此如今就開始執行這個function(daemonAlive){...}的函數了。這裏講Daemon mode
這個邏輯分支。通過一番猛如虎的判斷(手動刪除代碼🐶)以後,直接執行到this.launchDaemon
這個方法。
Client.prototype.launchDaemon = function(opts, cb) { if (typeof(opts) == 'function') { cb = opts; opts = { interactor : true }; } var that = this var ClientJS = path.resolve(path.dirname(module.filename), 'Daemon.js'); var node_args = []; var out, err; out = fs.openSync(that.conf.PM2_LOG_FILE_PATH, 'a'), err = fs.openSync(that.conf.PM2_LOG_FILE_PATH, 'a'); if (this.conf.LOW_MEMORY_ENVIRONMENT) { var os = require('os'); node_args.push('--gc-global'); // Does full GC (smaller memory footprint) node_args.push('--max-old-space-size=' + Math.floor(os.totalmem() / 1024 / 1024)); } if (process.env.PM2_NODE_OPTIONS) node_args = node_args.concat(process.env.PM2_NODE_OPTIONS.split(' ')); node_args.push(ClientJS); if (!process.env.PM2_DISCRETE_MODE) Common.printOut(that.conf.PREFIX_MSG + 'Spawning PM2 daemon with pm2_home=' + this.pm2_home); var interpreter = 'node'; if (require('shelljs').which('node') == null) interpreter = process.execPath; var child = require('child_process').spawn(interpreter, node_args, { detached : true, cwd : that.conf.cwd || process.cwd(), env : util._extend({ 'SILENT' : that.conf.DEBUG ? !that.conf.DEBUG : true, 'PM2_HOME' : that.pm2_home }, process.env), stdio : ['ipc', out, err] }); function onError(e) { console.error(e.message || e); return cb ? cb(e.message || e) : false; } child.once('error', onError); child.unref(); child.once('message', function(msg) { debug('PM2 daemon launched with return message: ', msg); child.removeListener('error', onError); child.disconnect(); if (opts && opts.interactor == false) return cb(null, child); if (process.env.PM2_NO_INTERACTION == 'true') return cb(null, child); /** * Here the Keymetrics agent is launched automaticcaly if * it has been already configured before (via pm2 link) */ KMDaemon.launchAndInteract(that.conf, { machine_name : that.machine_name, public_key : that.public_key, secret_key : that.secret_key, pm2_version : pkg.version }, function(err, data, interactor_proc) { that.interactor_process = interactor_proc; return cb(null, child); }); }); };
又是一輪參數字段整合以後,經過node原生的child_process模塊的spawn方法,在子進程中執行lib/Daemon.js
,而且返回子進程句柄child。
那麼邏輯又進到這個lib/Daemon.js
中,咱們看到這個文件作的事情,就是,初始化這個Daemon對象並執行start方法。在start方法中,經過domain建立一個context,並在這個context中去執行innerStart這個方法。
Daemon.prototype.innerStart = function(cb) { var that = this; if (!cb) cb = function() { fmt.sep(); fmt.title('New PM2 Daemon started'); fmt.field('Time', new Date()); fmt.field('PM2 version', pkg.version); fmt.field('Node.js version', process.versions.node); fmt.field('Current arch', process.arch); fmt.field('PM2 home', cst.PM2_HOME); fmt.field('PM2 PID file', that.pid_path); fmt.field('RPC socket file', that.rpc_socket_file); fmt.field('BUS socket file', that.pub_socket_file); fmt.field('Application log path', cst.DEFAULT_LOG_PATH); fmt.field('Worker Interval', cst.WORKER_INTERVAL); fmt.field('Process dump file', cst.DUMP_FILE_PATH); fmt.field('Concurrent actions', cst.CONCURRENT_ACTIONS); fmt.field('SIGTERM timeout', cst.KILL_TIMEOUT); fmt.sep(); }; // Write Daemon PID into file try { fs.writeFileSync(that.pid_path, process.pid); } catch (e) { console.error(e.stack || e); } if (this.ignore_signals != true) this.handleSignals(); /** * Pub system for real time notifications */ this.pub = axon.socket('pub-emitter'); this.pub_socket = this.pub.bind(this.pub_socket_file); this.pub_socket.once('bind', function() { fs.chmod(that.pub_socket_file, '775', function(e) { if (e) console.error(e); try { if (process.env.PM2_SOCKET_USER && process.env.PM2_SOCKET_GROUP) fs.chown(that.pub_socket_file, parseInt(process.env.PM2_SOCKET_USER), parseInt(process.env.PM2_SOCKET_GROUP), function(e) { if (e) console.error(e); }); } catch(e) { console.error(e); } }); that.pub_socket_ready = true; that.sendReady(cb); }); /** * Rep/Req - RPC system to interact with God */ this.rep = axon.socket('rep'); var server = new rpc.Server(this.rep); this.rpc_socket = this.rep.bind(this.rpc_socket_file); this.rpc_socket.once('bind', function() { fs.chmod(that.rpc_socket_file, '775', function(e) { if (e) console.error(e); try { if (process.env.PM2_SOCKET_USER && process.env.PM2_SOCKET_GROUP) fs.chown(that.rpc_socket_file, parseInt(process.env.PM2_SOCKET_USER), parseInt(process.env.PM2_SOCKET_GROUP), function(e) { if (e) console.error(e); }); } catch(e) { console.error(e); } }); that.rpc_socket_ready = true; that.sendReady(cb); }); /** * Memory Snapshot */ function profile(type, msg, cb) { if (semver.satisfies(process.version, '< 8')) return cb(null, { error: 'Node.js is not on right version' }) var cmd if (type === 'cpu') { cmd = { enable: 'Profiler.enable', start: 'Profiler.start', stop: 'Profiler.stop', disable: 'Profiler.disable' } } if (type == 'mem') { cmd = { enable: 'HeapProfiler.enable', start: 'HeapProfiler.startSampling', stop: 'HeapProfiler.stopSampling', disable: 'HeapProfiler.disable' } } const inspector = require('inspector') var session = new inspector.Session() session.connect() var timeout = msg.timeout || 5000 session.post(cmd.enable, (err, data) => { if (err) return cb(null, { error: err.message || err }) console.log(`Starting ${cmd.start}`) session.post(cmd.start, (err, data) => { if (err) return cb(null, { error: err.message || err }) setTimeout(() => { session.post(cmd.stop, (err, data) => { if (err) return cb(null, { error: err.message || err }) const profile = data.profile console.log(`Stopping ${cmd.stop}`) session.post(cmd.disable) fs.writeFile(msg.pwd, JSON.stringify(profile), (err) => { if (err) return cb(null, { error: err.message || err }) return cb(null, { file : msg.pwd }) }) }) }, timeout) }) }) } server.expose({ killMe : that.close.bind(this), profileCPU : profile.bind(this, 'cpu'), profileMEM : profile.bind(this, 'mem'), prepare : God.prepare, launchSysMonitoring : God.launchSysMonitoring, getMonitorData : God.getMonitorData, getSystemData : God.getSystemData, startProcessId : God.startProcessId, stopProcessId : God.stopProcessId, restartProcessId : God.restartProcessId, deleteProcessId : God.deleteProcessId, sendLineToStdin : God.sendLineToStdin, softReloadProcessId : God.softReloadProcessId, reloadProcessId : God.reloadProcessId, duplicateProcessId : God.duplicateProcessId, resetMetaProcessId : God.resetMetaProcessId, stopWatch : God.stopWatch, startWatch : God.startWatch, toggleWatch : God.toggleWatch, notifyByProcessId : God.notifyByProcessId, notifyKillPM2 : God.notifyKillPM2, monitor : God.monitor, unmonitor : God.unmonitor, msgProcess : God.msgProcess, sendDataToProcessId : God.sendDataToProcessId, sendSignalToProcessId : God.sendSignalToProcessId, sendSignalToProcessName : God.sendSignalToProcessName, ping : God.ping, getVersion : God.getVersion, getReport : God.getReport, reloadLogs : God.reloadLogs }); this.startLogic(); }
這個代碼有點長,一看就頭暈眼花,因此咱們只抽去關鍵代碼看看,其餘的能夠暫時忽略。
前面依然是參數的處理。後面咱們看到又是這個axon。首先是用axon的PubEmitter / SubEmitter
模式建立一個實時通知服務,賦值到daemon實例的pub_socket
屬性上面。而後再使用Rep/Req
模式的rep
建立與God的通訊服務。最後把這個服務經過expose的方式,將服務與God的方法進行關聯。舉個例子:客戶端調用this.client.executeRemote
方法,那麼就會經過axon的req
客戶端去調用到God的prepare方法,執行完後客戶端拿到最後的結果。最後就是執行startLogic
方法,主要是作一些God的事件監聽。至此呢,launchDaemon
方法就完事了。
後續就是執行launchDaemon的callback了。
返回以前的代碼,看看launchDaemon
的callback作了什麼事情,發現主要的就是執行了launchRPC
這個方法
Client.prototype.launchRPC = function launchRPC(cb) { var self = this; debug('Launching RPC client on socket file %s', this.rpc_socket_file); var req = axon.socket('req'); this.client = new rpc.Client(req); var connectHandler = function() { self.client.sock.removeListener('error', errorHandler); debug('RPC Connected to Daemon'); if (cb) { setTimeout(function() { cb(null); }, 4); } }; var errorHandler = function(e) { self.client.sock.removeListener('connect', connectHandler); if (cb) { return cb(e); } }; this.client.sock.once('connect', connectHandler); this.client.sock.once('error', errorHandler); this.client_sock = req.connect(this.rpc_socket_file); };
這個方法相對沒那麼複雜,就是設置this.client的屬性,這裏用到的是就是以前說過的axon的req
客戶端,有沒有印象以前的服務端已經在lib/daemon.js
中建立了,因此這裏最後就能夠connect到相應的server了(提示:RPC Connected to Daemon
)。
至此,整個client.start
的流程就走完了,最後執行回調方法,調用that.launchAll
去launch全部pm2的module,關於module這塊也挺有意思,下篇文章會分享module模塊的流程,敬請期待🐶。
隨着client.start
的流程結束了,整個connect的流程也就差很少結束了。
接下來就開始分析命令處理的內容。
命令處理部分主要就是處理用戶在輸入pm2以後的參數。回頭看看咱們啓動的demo,這裏面的參數就是 start
& ./examples/api-pm2/http.js
& -i
& 2
大體的流程以下:
入口文件依然是lib/binaries/CLI.js
咱們能夠看到pm2和其餘不少框架的CLI同樣,利用commander
來處理cli的輸入,這裏不作過多解釋。直接看處處理start的地方
// // Start command // commander.command('start [name|namespace|file|ecosystem|id...]') .action(function(cmd, opts) { // 已經被我刪除 if (cmd == "-") { // 已經被我刪除 } else { // 已經被我刪除 forEachLimit(cmd, 1, function(script, next) { pm2.start(script, commander, next); }, function(err) { if (err && err.message && (err.message.includes('Script not found') === true || err.message.includes('NOT AVAILABLE IN PATH') === true)) { pm2.exitCli(1) } else pm2.speedList(err ? 1 : 0); }); } });
好了,改忽(刪)略(除)已經忽(刪)略(除)了,咱們能夠看到裏面最核心的就是pm2.start
的方法。看到lib/API.js
中的start
方法,通過又一輪操做入參以後,最後是
執行_startScript
方法,這個方法纔是真正幹活的方法,這個方法也比較複雜:
_startScript (script, opts, cb) { if (typeof opts == "function") { cb = opts; opts = {}; } var that = this; /** * Commander.js tricks */ var app_conf = Config.filterOptions(opts); var appConf = {}; if (typeof app_conf.name == 'function') delete app_conf.name; delete app_conf.args; // Retrieve arguments via -- <args> var argsIndex; if (opts.rawArgs && (argsIndex = opts.rawArgs.indexOf('--')) >= 0) app_conf.args = opts.rawArgs.slice(argsIndex + 1); else if (opts.scriptArgs) app_conf.args = opts.scriptArgs; app_conf.script = script; if(!app_conf.namespace) app_conf.namespace = 'default'; if ((appConf = Common.verifyConfs(app_conf)) instanceof Error) { Common.err(appConf) return cb ? cb(Common.retErr(appConf)) : that.exitCli(conf.ERROR_EXIT); } app_conf = appConf[0]; if (opts.watchDelay) { if (typeof opts.watchDelay === "string" && opts.watchDelay.indexOf("ms") !== -1) app_conf.watch_delay = parseInt(opts.watchDelay); else { app_conf.watch_delay = parseFloat(opts.watchDelay) * 1000; } } var mas = []; if(typeof opts.ext != 'undefined') hf.make_available_extension(opts, mas); // for -e flag mas.length > 0 ? app_conf.ignore_watch = mas : 0; /** * If -w option, write configuration to configuration.json file */ if (app_conf.write) { var dst_path = path.join(process.env.PWD || process.cwd(), app_conf.name + '-pm2.json'); Common.printOut(conf.PREFIX_MSG + 'Writing configuration to', chalk.blue(dst_path)); // pretty JSON try { fs.writeFileSync(dst_path, JSON.stringify(app_conf, null, 2)); } catch (e) { console.error(e.stack || e); } } series([ restartExistingProcessName, restartExistingNameSpace, restartExistingProcessId, restartExistingProcessPathOrStartNew ], function(err, data) { if (err instanceof Error) return cb ? cb(err) : that.exitCli(conf.ERROR_EXIT); var ret = {}; data.forEach(function(_dt) { if (_dt !== undefined) ret = _dt; }); return cb ? cb(null, ret) : that.speedList(); }); /** * If start <app_name> start/restart application */ function restartExistingProcessName(cb) { if (!isNaN(script) || (typeof script === 'string' && script.indexOf('/') != -1) || (typeof script === 'string' && path.extname(script) !== '')) return cb(null); that.Client.getProcessIdByName(script, function(err, ids) { if (err && cb) return cb(err); if (ids.length > 0) { that._operate('restartProcessId', script, opts, function(err, list) { if (err) return cb(err); Common.printOut(conf.PREFIX_MSG + 'Process successfully started'); return cb(true, list); }); } else return cb(null); }); } /** * If start <namespace> start/restart namespace */ function restartExistingNameSpace(cb) { if (!isNaN(script) || (typeof script === 'string' && script.indexOf('/') != -1) || (typeof script === 'string' && path.extname(script) !== '')) return cb(null); if (script !== 'all') { that.Client.getProcessIdsByNamespace(script, function (err, ids) { if (err && cb) return cb(err); if (ids.length > 0) { that._operate('restartProcessId', script, opts, function (err, list) { if (err) return cb(err); Common.printOut(conf.PREFIX_MSG + 'Process successfully started'); return cb(true, list); }); } else return cb(null); }); } else { that._operate('restartProcessId', 'all', function(err, list) { if (err) return cb(err); Common.printOut(conf.PREFIX_MSG + 'Process successfully started'); return cb(true, list); }); } } function restartExistingProcessId(cb) { if (isNaN(script)) return cb(null); that._operate('restartProcessId', script, opts, function(err, list) { if (err) return cb(err); Common.printOut(conf.PREFIX_MSG + 'Process successfully started'); return cb(true, list); }); } /** * Restart a process with the same full path * Or start it */ function restartExistingProcessPathOrStartNew(cb) { that.Client.executeRemote('getMonitorData', {}, function(err, procs) { if (err) return cb ? cb(new Error(err)) : that.exitCli(conf.ERROR_EXIT); var full_path = path.resolve(that.cwd, script); var managed_script = null; procs.forEach(function(proc) { if (proc.pm2_env.pm_exec_path == full_path && proc.pm2_env.name == app_conf.name) managed_script = proc; }); if (managed_script && (managed_script.pm2_env.status == conf.STOPPED_STATUS || managed_script.pm2_env.status == conf.STOPPING_STATUS || managed_script.pm2_env.status == conf.ERRORED_STATUS)) { // Restart process if stopped var app_name = managed_script.pm2_env.name; that._operate('restartProcessId', app_name, opts, function(err, list) { if (err) return cb ? cb(new Error(err)) : that.exitCli(conf.ERROR_EXIT); Common.printOut(conf.PREFIX_MSG + 'Process successfully started'); return cb(true, list); }); return false; } else if (managed_script && !opts.force) { Common.err('Script already launched, add -f option to force re-execution'); return cb(new Error('Script already launched')); } var resolved_paths = null; try { resolved_paths = Common.resolveAppAttributes({ cwd : that.cwd, pm2_home : that.pm2_home }, app_conf); } catch(e) { Common.err(e.message); return cb(Common.retErr(e)); } Common.printOut(conf.PREFIX_MSG + 'Starting %s in %s (%d instance' + (resolved_paths.instances > 1 ? 's' : '') + ')', resolved_paths.pm_exec_path, resolved_paths.exec_mode, resolved_paths.instances); if (!resolved_paths.env) resolved_paths.env = {}; // Set PM2 HOME in case of child process using PM2 API resolved_paths.env['PM2_HOME'] = that.pm2_home; var additional_env = Modularizer.getAdditionalConf(resolved_paths.name); util._extend(resolved_paths.env, additional_env); // Is KM linked? resolved_paths.km_link = that.gl_is_km_linked; that.Client.executeRemote('prepare', resolved_paths, function(err, data) { if (err) { Common.printError(conf.PREFIX_MSG_ERR + 'Error while launching application', err.stack || err); return cb(Common.retErr(err)); } Common.printOut(conf.PREFIX_MSG + 'Done.'); return cb(true, data); }); return false; }); } }
忽略前面的各類處理,直接看到series
這個方法的調用,分別執行到下面定義的四個方法restartExistingProcessName
, restartExistingNameSpace
,restartExistingProcessId
,restartExistingProcessPathOrStartNew
沒錯,從名字和做者的註釋咱們大概可以猜到(事實也是🐶),最後一個方法restartExistingProcessPathOrStartNew
對咱們的這個流程影響最大。
那咱們重點看看這個方法到底作了什麼:
上來就先用rpc的方式獲取監控數據getMonitorData
,這個也是經過God去處理,而後最後返回各類配置&默認數據。在回調函數中對這些數據進行各類處理,這裏先忽略。最後又經過rpc的方式調用到God的prepare
方法。那麼咱們順藤摸瓜,打開lib/God.js
文件,看到prepare
方法裏面又是一輪騷操做以後調用executeApp
方法,看到這個方法名,咱們大概能夠猜出來,咱們的應用,也許就是在這裏被執行了。
咱們驗證一下:方法中經過env_copy.exec_mode === 'cluster_mode'
判斷,肯定當前的模式,因此流程進到cluster_mode
中,執行God.nodeApp
方法。可能最後的邏輯在這個方法中,咱們打開lib/God/ClusterMode.js
God.nodeApp = function nodeApp(env_copy, cb){ var clu = null; console.log(`App [${env_copy.name}:${env_copy.pm_id}] starting in -cluster mode-`) if (env_copy.node_args && Array.isArray(env_copy.node_args)) { cluster.settings.execArgv = env_copy.node_args; } env_copy._pm2_version = pkg.version; try { // node.js cluster clients can not receive deep-level objects or arrays in the forked process, e.g.: // { "args": ["foo", "bar"], "env": { "foo1": "bar1" }} will be parsed to // { "args": "foo, bar", "env": "[object Object]"} // So we passing a stringified JSON here. clu = cluster.fork({pm2_env: JSON.stringify(env_copy), windowsHide: true}); } catch(e) { God.logAndGenerateError(e); return cb(e); } clu.pm2_env = env_copy; /** * Broadcast message to God */ clu.on('message', function cluMessage(msg) { /********************************* * If you edit this function * Do the same in ForkMode.js ! *********************************/ if (msg.data && msg.type) { return God.bus.emit(msg.type ? msg.type : 'process:msg', { at : Utility.getDate(), data : msg.data, process : { pm_id : clu.pm2_env.pm_id, name : clu.pm2_env.name, rev : (clu.pm2_env.versioning && clu.pm2_env.versioning.revision) ? clu.pm2_env.versioning.revision : null } }); } else { if (typeof msg == 'object' && 'node_version' in msg) { clu.pm2_env.node_version = msg.node_version; return false; } else if (typeof msg == 'object' && 'cron_restart' in msg) { return God.restartProcessId({ id : clu.pm2_env.pm_id }, function() { console.log('Application %s has been restarted via CRON', clu.pm2_env.name); }); } return God.bus.emit('process:msg', { at : Utility.getDate(), raw : msg, process : { pm_id : clu.pm2_env.pm_id, name : clu.pm2_env.name } }); } }); return cb(null, clu); };
果真不出所料,這裏面用到node.js的原生cluster模塊根據傳入的i
參數,fork出相應數量的子進程,並監聽message
事件,經過God.bus.emit('process:msg',{})
的方式跟God進行通訊,最後把子進程的應用傳到回調函數中,穩穩的。
可是咱們fork出哪一個js文件呢,這裏沒有指明,因此又往上層的lib/God.js
找到,在文件的一開始,已經作了關於cluster
的設置,代碼以下
if (semver.lt(process.version, '10.0.0')) { cluster.setupMaster({ windowsHide: true, exec : path.resolve(path.dirname(module.filename), 'ProcessContainerLegacy.js') }); } else { cluster.setupMaster({ windowsHide: true, exec : path.resolve(path.dirname(module.filename), 'ProcessContainer.js') }); }
咱們看到這裏經過原生cluster的api:setupMaster
來設置app的啓動文件ProcessContainer.js
那咱們就看看這個啓動文件,這個文件裏面是一個自執行函數,通過前面的各類處理以後,最後執行的是exec
這個方法。這個方法先是對腳本的後綴判斷完以後,再作一些進程對'message'事件的監聽,開啓日誌服務,捕獲進程錯誤等一系列操做以後,經過import
或者require
的方式來加載咱們的業務腳本,也就是咱們以前輸入的./examples/api-pm2/http.js
。終於在這個時候,咱們的業務代碼就開始跑了。
業務代碼跑起來以後呢,就會在回調函數裏經過God.notify
方法去通知到God。自此,大體的pm2 start流程就算是跑通裏,業務代碼也能夠正常運行了。
以上就是這整個pm2例子的大體流程,寫得比較粗,不少細節沒有講到,之後會繼續完善這篇文章,歡迎關注,歡迎拍磚。蟹蟹你們,你們加油武漢加油,中國加油!!!