PM2不2:源碼分析

pm2
前言:
最近由於項目須要,開始開始接觸Node.js這塊相關的內容,其中就包括pm2這個東東。爲了更加深刻的瞭解pm2的原理,因此抽空簡單的看了一下pm2的代碼。在這裏也跟你們分享一下看完的收穫,水平有限,歡迎指正和探討。node

本文使用的pm2版本是4.2.3git


下載完pm2代碼以後,簡單快速啓動demogithub

pm2 start ./examples/api-pm2/http.js -i 2

執行完這個命令以後,咱們就能順利的利用咱們的pm2啓動到咱們的nodejs腳本,打開頁面 http://127.0.0.1:8000 ,就能夠看到頁面上的 ‘hey’ 了。shell

以上就是最最最簡單的啓動方式,後面咱們也會根據這個簡單的命令來分析其中的一些流程和步驟json


首先咱們啓動pm2,就會執行bin目錄下的pm2這個文件,隨後require到lib/binaries/CLI.js這個文件,並開始執行。
打開這個文件,咱們會看到這個文件確實有點長,因此咱們簡化一下,把這個文件分爲3個步驟:1)初始化;2)connect;3)處理相關的命令;windows

初始化很好理解,就是對用戶的參數作一些規範化的處理以保證後續的流程,其次是實例化了pm2對象,這個對象由lib/API.js中的API這個類提供。api

Common.determineSilentCLI();
Common.printVersion();

var pm2 = new PM2();

PM2ioHandler.usePM2Client(pm2)

在實例化pm2的過程當中,也在這個實例上掛載了client屬性session

this.Client = new Client({
      pm2_home: that.pm2_home,
      conf: this._conf,
      secret_key: this.secret_key,
      public_key: this.public_key,
      daemon_mode: this.daemon_mode,
      machine_name: this.machine_name
    });

這裏先不用疑惑這些實例和屬性是作什麼的,後續用到了就會作進一步分析
初始化的大概流程就差很少是這樣,平平無奇,由於複雜的在後面(手動狗頭)app


接下來就是connect過程:
這個過程我理解爲就是建立通訊系統的過程。
先上個connect流程圖:
PM2-flow-connect.png
入口就是在lib/binaries/CLI.js中執行的pm2.connect方法框架

pm2.connect(function() {
    debug('Now connected to daemon');
    if (process.argv.slice(2)[0] === 'completion') {
      checkCompletion();
      //Close client if completion related installation
      var third = process.argv.slice(3)[0];
      if ( third == null || third === 'install' || third === 'uninstall')
        pm2.disconnect();
    }
    else {
      beginCommandProcessing();
    }

隨後咱們就進入了pm2實例的connect方法,咱們發現這裏面最重要的就是調用了client的start方法,因此咱們進入到lib/client.js文件康康start方法到底作了什麼事情,進來發現這個方法啥也沒幹,就僅僅是執行了pingDeamon這個方法,因此咱們從這個client實例的pingDeamon方法開始追蹤流程

Client.prototype.pingDaemon = function pingDaemon(cb) {
  var req    = axon.socket('req');
  var client = new rpc.Client(req);
  var that = this;

  client.sock.once('reconnect attempt', function() {
    client.sock.close();
    process.nextTick(function() {
      return cb(false);
    });
  });

  client.sock.once('error', function(e) {
    if (e.code === 'EACCES') {
      fs.stat(that.conf.DAEMON_RPC_PORT, function(e, stats) {
        if (stats.uid === 0) {
          console.error(that.conf.PREFIX_MSG_ERR + 'Permission denied, to give access to current user:');
        }
        else
          console.error(that.conf.PREFIX_MSG_ERR + 'Permission denied, check permissions on ' + that.conf.DAEMON_RPC_PORT);

        process.exit(1);
      });
    }
    else
      console.error(e.message || e);
  });

  client.sock.once('connect', function() {
    client.sock.once('close', function() {
      return cb(true);
    });
    client.sock.close();
  });

  req.connect(this.rpc_socket_file);
};

從這裏面咱們發現,原來使用到了TJ大神的axon的socket庫來進行通訊。咱們看到這裏使用到的是req/rep模式,建立一個rpc客戶端並嘗試鏈接到server。沒有成功,這個時候接受到close消息以後會進入到'reconnect attempt'的事件回調。在上面的代碼中咱們也看到,在執行connect以前,client.sock監聽了了各類事件'reconnect attempt','error','connect'(ps: pm2又對axon作了一個簡單的封裝: pm2-axon-rpc,但對主流程並沒有太大幹擾)。
進到'reconnect attempt'的回調後,關閉sock,並在下個tick中執行pingDaemon函數傳入的callback(這裏的cb)。。。慢着,這個回調是什麼來着????
如今回看這個pingDaemon函數的調用代碼

this.pingDaemon(function(daemonAlive) {
    if (daemonAlive === true)
      // 已經被我刪除,由於這裏不講這個邏輯

    /**
     * No Daemon mode
     */
    if (that.daemon_mode === false) {
      // 已經被我刪除,由於這裏不講這個邏輯  
      ...
    }

    /**
     * Daemon mode
     */
    that.launchDaemon(function(err, child) {
      if (err) {
        Common.printError(err);
        return cb ? cb(err) : process.exit(that.conf.ERROR_EXIT);
      }

      if (!process.env.PM2_DISCRETE_MODE)
        Common.printOut(that.conf.PREFIX_MSG + 'PM2 Successfully daemonized');

      that.launchRPC(function(err, meta) {
        return cb(null, {
          daemon_mode      : that.conf.daemon_mode,
          new_pm2_instance : true,
          rpc_socket_file  : that.rpc_socket_file,
          pub_socket_file  : that.pub_socket_file,
          pm2_home         : that.pm2_home
        });
      });
    });
  });

因此如今就開始執行這個function(daemonAlive){...}的函數了。這裏講Daemon mode這個邏輯分支。通過一番猛如虎的判斷(手動刪除代碼🐶)以後,直接執行到this.launchDaemon這個方法。

Client.prototype.launchDaemon = function(opts, cb) {
  if (typeof(opts) == 'function') {
    cb = opts;
    opts = {
      interactor : true
    };
  }

  var that = this
  var ClientJS = path.resolve(path.dirname(module.filename), 'Daemon.js');
  var node_args = [];
  var out, err;

    
  out = fs.openSync(that.conf.PM2_LOG_FILE_PATH, 'a'),
  err = fs.openSync(that.conf.PM2_LOG_FILE_PATH, 'a');

  if (this.conf.LOW_MEMORY_ENVIRONMENT) {
    var os = require('os');
    node_args.push('--gc-global'); // Does full GC (smaller memory footprint)
    node_args.push('--max-old-space-size=' + Math.floor(os.totalmem() / 1024 / 1024));
  }


  if (process.env.PM2_NODE_OPTIONS)
    node_args = node_args.concat(process.env.PM2_NODE_OPTIONS.split(' '));
  node_args.push(ClientJS);

  if (!process.env.PM2_DISCRETE_MODE)
    Common.printOut(that.conf.PREFIX_MSG + 'Spawning PM2 daemon with pm2_home=' + this.pm2_home);

  var interpreter = 'node';

  if (require('shelljs').which('node') == null)
    interpreter = process.execPath;

  var child = require('child_process').spawn(interpreter, node_args, {
    detached   : true,
    cwd        : that.conf.cwd || process.cwd(),
    env        : util._extend({
      'SILENT'      : that.conf.DEBUG ? !that.conf.DEBUG : true,
      'PM2_HOME'   : that.pm2_home
    }, process.env),
    stdio      : ['ipc', out, err]
  });

  function onError(e) {
    console.error(e.message || e);
    return cb ? cb(e.message || e) : false;
  }

  child.once('error', onError);

  child.unref();

  child.once('message', function(msg) {
    debug('PM2 daemon launched with return message: ', msg);
    child.removeListener('error', onError);
    child.disconnect();

    if (opts && opts.interactor == false)
      return cb(null, child);

    if (process.env.PM2_NO_INTERACTION == 'true')
      return cb(null, child);

    /**
     * Here the Keymetrics agent is launched automaticcaly if
     * it has been already configured before (via pm2 link)
     */
    KMDaemon.launchAndInteract(that.conf, {
      machine_name : that.machine_name,
      public_key   : that.public_key,
      secret_key   : that.secret_key,
      pm2_version  : pkg.version
    }, function(err, data, interactor_proc) {
      that.interactor_process = interactor_proc;
      return cb(null, child);
    });
  });
};

又是一輪參數字段整合以後,經過node原生的child_process模塊的spawn方法,在子進程中執行lib/Daemon.js,而且返回子進程句柄child。
那麼邏輯又進到這個lib/Daemon.js中,咱們看到這個文件作的事情,就是,初始化這個Daemon對象並執行start方法。在start方法中,經過domain建立一個context,並在這個context中去執行innerStart這個方法。

Daemon.prototype.innerStart = function(cb) {
  var that = this;

  if (!cb) cb = function() {
    fmt.sep();
    fmt.title('New PM2 Daemon started');
    fmt.field('Time', new Date());
    fmt.field('PM2 version', pkg.version);
    fmt.field('Node.js version', process.versions.node);
    fmt.field('Current arch', process.arch);
    fmt.field('PM2 home', cst.PM2_HOME);
    fmt.field('PM2 PID file', that.pid_path);
    fmt.field('RPC socket file', that.rpc_socket_file);
    fmt.field('BUS socket file', that.pub_socket_file);
    fmt.field('Application log path', cst.DEFAULT_LOG_PATH);
    fmt.field('Worker Interval', cst.WORKER_INTERVAL);
    fmt.field('Process dump file', cst.DUMP_FILE_PATH);
    fmt.field('Concurrent actions', cst.CONCURRENT_ACTIONS);
    fmt.field('SIGTERM timeout', cst.KILL_TIMEOUT);
    fmt.sep();
  };

  // Write Daemon PID into file
  try {
    fs.writeFileSync(that.pid_path, process.pid);
  } catch (e) {
    console.error(e.stack || e);
  }

  if (this.ignore_signals != true)
    this.handleSignals();

  /**
   * Pub system for real time notifications
   */
  this.pub    = axon.socket('pub-emitter');

  this.pub_socket = this.pub.bind(this.pub_socket_file);

  this.pub_socket.once('bind', function() {
    fs.chmod(that.pub_socket_file, '775', function(e) {
      if (e) console.error(e);

      try {
        if (process.env.PM2_SOCKET_USER && process.env.PM2_SOCKET_GROUP)
          fs.chown(that.pub_socket_file,
                   parseInt(process.env.PM2_SOCKET_USER),
                   parseInt(process.env.PM2_SOCKET_GROUP), function(e) {
                     if (e) console.error(e);
                   });
      } catch(e) {
        console.error(e);
      }
    });

    that.pub_socket_ready = true;
    that.sendReady(cb);
  });

  /**
   * Rep/Req - RPC system to interact with God
   */
  this.rep    = axon.socket('rep');

  var server = new rpc.Server(this.rep);

  this.rpc_socket = this.rep.bind(this.rpc_socket_file);

  this.rpc_socket.once('bind', function() {
    fs.chmod(that.rpc_socket_file, '775', function(e) {
      if (e) console.error(e);

      try {
        if (process.env.PM2_SOCKET_USER && process.env.PM2_SOCKET_GROUP)
          fs.chown(that.rpc_socket_file,
                   parseInt(process.env.PM2_SOCKET_USER),
                   parseInt(process.env.PM2_SOCKET_GROUP), function(e) {
                     if (e) console.error(e);
                   });
      } catch(e) {
        console.error(e);
      }
    });


    that.rpc_socket_ready = true;
    that.sendReady(cb);
  });


  /**
   * Memory Snapshot
   */
  function profile(type, msg, cb) {
    if (semver.satisfies(process.version, '< 8'))
      return cb(null, { error: 'Node.js is not on right version' })

    var cmd

    if (type === 'cpu') {
      cmd = {
        enable: 'Profiler.enable',
        start: 'Profiler.start',
        stop: 'Profiler.stop',
        disable: 'Profiler.disable'
      }
    }
    if (type == 'mem') {
      cmd = {
        enable: 'HeapProfiler.enable',
        start: 'HeapProfiler.startSampling',
        stop: 'HeapProfiler.stopSampling',
        disable: 'HeapProfiler.disable'
      }
    }

    const inspector = require('inspector')
    var session = new inspector.Session()

    session.connect()

    var timeout = msg.timeout || 5000

    session.post(cmd.enable, (err, data) => {
      if (err) return cb(null, { error: err.message || err })

      console.log(`Starting ${cmd.start}`)
      session.post(cmd.start, (err, data) => {
        if (err) return cb(null, { error: err.message || err })

        setTimeout(() => {
          session.post(cmd.stop, (err, data) => {
            if (err) return cb(null, { error: err.message || err })
            const profile = data.profile

            console.log(`Stopping ${cmd.stop}`)
            session.post(cmd.disable)

            fs.writeFile(msg.pwd, JSON.stringify(profile), (err) => {
              if (err) return cb(null, { error: err.message || err })
              return cb(null, { file : msg.pwd })
            })
          })
        }, timeout)
      })
    })
  }

  server.expose({
    killMe                  : that.close.bind(this),
    profileCPU              : profile.bind(this, 'cpu'),
    profileMEM              : profile.bind(this, 'mem'),
    prepare                 : God.prepare,
    launchSysMonitoring     : God.launchSysMonitoring,
    getMonitorData          : God.getMonitorData,
    getSystemData           : God.getSystemData,

    startProcessId          : God.startProcessId,
    stopProcessId           : God.stopProcessId,
    restartProcessId        : God.restartProcessId,
    deleteProcessId         : God.deleteProcessId,

    sendLineToStdin         : God.sendLineToStdin,
    softReloadProcessId     : God.softReloadProcessId,
    reloadProcessId         : God.reloadProcessId,
    duplicateProcessId      : God.duplicateProcessId,
    resetMetaProcessId      : God.resetMetaProcessId,
    stopWatch               : God.stopWatch,
    startWatch              : God.startWatch,
    toggleWatch             : God.toggleWatch,
    notifyByProcessId       : God.notifyByProcessId,

    notifyKillPM2           : God.notifyKillPM2,
    monitor                 : God.monitor,
    unmonitor               : God.unmonitor,

    msgProcess              : God.msgProcess,
    sendDataToProcessId     : God.sendDataToProcessId,
    sendSignalToProcessId   : God.sendSignalToProcessId,
    sendSignalToProcessName : God.sendSignalToProcessName,

    ping                    : God.ping,
    getVersion              : God.getVersion,
    getReport               : God.getReport,
    reloadLogs              : God.reloadLogs
  });

  this.startLogic();
}

這個代碼有點長,一看就頭暈眼花,因此咱們只抽去關鍵代碼看看,其餘的能夠暫時忽略。
前面依然是參數的處理。後面咱們看到又是這個axon。首先是用axon的PubEmitter / SubEmitter模式建立一個實時通知服務,賦值到daemon實例的pub_socket屬性上面。而後再使用Rep/Req模式的rep建立與God的通訊服務。最後把這個服務經過expose的方式,將服務與God的方法進行關聯。舉個例子:客戶端調用this.client.executeRemote方法,那麼就會經過axon的req客戶端去調用到God的prepare方法,執行完後客戶端拿到最後的結果。最後就是執行startLogic方法,主要是作一些God的事件監聽。至此呢,launchDaemon方法就完事了。
後續就是執行launchDaemon的callback了。

返回以前的代碼,看看launchDaemon的callback作了什麼事情,發現主要的就是執行了launchRPC這個方法

Client.prototype.launchRPC = function launchRPC(cb) {
  var self = this;
  debug('Launching RPC client on socket file %s', this.rpc_socket_file);
  var req      = axon.socket('req');
  this.client  = new rpc.Client(req);

  var connectHandler = function() {
    self.client.sock.removeListener('error', errorHandler);
    debug('RPC Connected to Daemon');
    if (cb) {
      setTimeout(function() {
        cb(null);
      }, 4);
    }
  };

  var errorHandler = function(e) {
    self.client.sock.removeListener('connect', connectHandler);
    if (cb) {
      return cb(e);
    }
  };

  this.client.sock.once('connect', connectHandler);
  this.client.sock.once('error', errorHandler);
  this.client_sock = req.connect(this.rpc_socket_file);
};

這個方法相對沒那麼複雜,就是設置this.client的屬性,這裏用到的是就是以前說過的axon的req客戶端,有沒有印象以前的服務端已經在lib/daemon.js中建立了,因此這裏最後就能夠connect到相應的server了(提示:RPC Connected to Daemon)。

至此,整個client.start的流程就走完了,最後執行回調方法,調用that.launchAll去launch全部pm2的module,關於module這塊也挺有意思,下篇文章會分享module模塊的流程,敬請期待🐶。

隨着client.start的流程結束了,整個connect的流程也就差很少結束了。
接下來就開始分析命令處理的內容。


命令處理部分主要就是處理用戶在輸入pm2以後的參數。回頭看看咱們啓動的demo,這裏面的參數就是 start & ./examples/api-pm2/http.js & -i & 2

大體的流程以下:
pm2-commander-flow.png

入口文件依然是lib/binaries/CLI.js
咱們能夠看到pm2和其餘不少框架的CLI同樣,利用commander來處理cli的輸入,這裏不作過多解釋。直接看處處理start的地方

//
// Start command
//
commander.command('start [name|namespace|file|ecosystem|id...]')
  .action(function(cmd, opts) {
          // 已經被我刪除
    if (cmd == "-") {
        // 已經被我刪除
    }
    else {
      // 已經被我刪除
      forEachLimit(cmd, 1, function(script, next) {
        pm2.start(script, commander, next);
      }, function(err) {
        if (err && err.message &&
            (err.message.includes('Script not found') === true ||
             err.message.includes('NOT AVAILABLE IN PATH') === true)) {
          pm2.exitCli(1)
        }
        else
          pm2.speedList(err ? 1 : 0);
      });
    }
  });

好了,改忽(刪)略(除)已經忽(刪)略(除)了,咱們能夠看到裏面最核心的就是pm2.start的方法。看到lib/API.js中的start方法,通過又一輪操做入參以後,最後是
執行_startScript方法,這個方法纔是真正幹活的方法,這個方法也比較複雜:

_startScript (script, opts, cb) {
    if (typeof opts == "function") {
      cb = opts;
      opts = {};
    }
    var that = this;

    /**
     * Commander.js tricks
     */
    var app_conf = Config.filterOptions(opts);
    var appConf = {};

    if (typeof app_conf.name == 'function')
      delete app_conf.name;

    delete app_conf.args;

    // Retrieve arguments via -- <args>
    var argsIndex;

    if (opts.rawArgs && (argsIndex = opts.rawArgs.indexOf('--')) >= 0)
      app_conf.args = opts.rawArgs.slice(argsIndex + 1);
    else if (opts.scriptArgs)
      app_conf.args = opts.scriptArgs;

    app_conf.script = script;
    if(!app_conf.namespace)
      app_conf.namespace = 'default';

    if ((appConf = Common.verifyConfs(app_conf)) instanceof Error) {
      Common.err(appConf)
      return cb ? cb(Common.retErr(appConf)) : that.exitCli(conf.ERROR_EXIT);
    }

    app_conf = appConf[0];

    if (opts.watchDelay) {
      if (typeof opts.watchDelay === "string" && opts.watchDelay.indexOf("ms") !== -1)
        app_conf.watch_delay = parseInt(opts.watchDelay);
      else {
        app_conf.watch_delay = parseFloat(opts.watchDelay) * 1000;
      }
    }

    var mas = [];
    if(typeof opts.ext != 'undefined')
      hf.make_available_extension(opts, mas); // for -e flag
    mas.length > 0 ? app_conf.ignore_watch = mas : 0;

    /**
     * If -w option, write configuration to configuration.json file
     */
    if (app_conf.write) {
      var dst_path = path.join(process.env.PWD || process.cwd(), app_conf.name + '-pm2.json');
      Common.printOut(conf.PREFIX_MSG + 'Writing configuration to', chalk.blue(dst_path));
      // pretty JSON
      try {
        fs.writeFileSync(dst_path, JSON.stringify(app_conf, null, 2));
      } catch (e) {
        console.error(e.stack || e);
      }
    }

    series([
      restartExistingProcessName,
      restartExistingNameSpace,
      restartExistingProcessId,
      restartExistingProcessPathOrStartNew
    ], function(err, data) {
      if (err instanceof Error)
        return cb ? cb(err) : that.exitCli(conf.ERROR_EXIT);

      var ret = {};

      data.forEach(function(_dt) {
        if (_dt !== undefined)
          ret = _dt;
      });

      return cb ? cb(null, ret) : that.speedList();
    });

    /**
     * If start <app_name> start/restart application
     */
    function restartExistingProcessName(cb) {
      if (!isNaN(script) ||
        (typeof script === 'string' && script.indexOf('/') != -1) ||
        (typeof script === 'string' && path.extname(script) !== ''))
        return cb(null);

        that.Client.getProcessIdByName(script, function(err, ids) {
          if (err && cb) return cb(err);
          if (ids.length > 0) {
            that._operate('restartProcessId', script, opts, function(err, list) {
              if (err) return cb(err);
              Common.printOut(conf.PREFIX_MSG + 'Process successfully started');
              return cb(true, list);
            });
          }
          else return cb(null);
        });
    }

    /**
     * If start <namespace> start/restart namespace
     */
    function restartExistingNameSpace(cb) {
      if (!isNaN(script) ||
        (typeof script === 'string' && script.indexOf('/') != -1) ||
        (typeof script === 'string' && path.extname(script) !== ''))
        return cb(null);

      if (script !== 'all') {
        that.Client.getProcessIdsByNamespace(script, function (err, ids) {
          if (err && cb) return cb(err);
          if (ids.length > 0) {
            that._operate('restartProcessId', script, opts, function (err, list) {
              if (err) return cb(err);
              Common.printOut(conf.PREFIX_MSG + 'Process successfully started');
              return cb(true, list);
            });
          }
          else return cb(null);
        });
      }
      else {
        that._operate('restartProcessId', 'all', function(err, list) {
          if (err) return cb(err);
          Common.printOut(conf.PREFIX_MSG + 'Process successfully started');
          return cb(true, list);
        });
      }
    }

    function restartExistingProcessId(cb) {
      if (isNaN(script)) return cb(null);

      that._operate('restartProcessId', script, opts, function(err, list) {
        if (err) return cb(err);
        Common.printOut(conf.PREFIX_MSG + 'Process successfully started');
        return cb(true, list);
      });
    }

    /**
     * Restart a process with the same full path
     * Or start it
     */
    function restartExistingProcessPathOrStartNew(cb) {
      that.Client.executeRemote('getMonitorData', {}, function(err, procs) {
        if (err) return cb ? cb(new Error(err)) : that.exitCli(conf.ERROR_EXIT);

        var full_path = path.resolve(that.cwd, script);
        var managed_script = null;

        procs.forEach(function(proc) {
          if (proc.pm2_env.pm_exec_path == full_path &&
              proc.pm2_env.name == app_conf.name)
            managed_script = proc;
        });

        if (managed_script &&
          (managed_script.pm2_env.status == conf.STOPPED_STATUS ||
            managed_script.pm2_env.status == conf.STOPPING_STATUS ||
            managed_script.pm2_env.status == conf.ERRORED_STATUS)) {
          // Restart process if stopped
          var app_name = managed_script.pm2_env.name;

          that._operate('restartProcessId', app_name, opts, function(err, list) {
            if (err) return cb ? cb(new Error(err)) : that.exitCli(conf.ERROR_EXIT);
            Common.printOut(conf.PREFIX_MSG + 'Process successfully started');
            return cb(true, list);
          });
          return false;
        }
        else if (managed_script && !opts.force) {
          Common.err('Script already launched, add -f option to force re-execution');
          return cb(new Error('Script already launched'));
        }

        var resolved_paths = null;

        try {
          resolved_paths = Common.resolveAppAttributes({
            cwd      : that.cwd,
            pm2_home : that.pm2_home
          }, app_conf);
        } catch(e) {
          Common.err(e.message);
          return cb(Common.retErr(e));
        }

        Common.printOut(conf.PREFIX_MSG + 'Starting %s in %s (%d instance' + (resolved_paths.instances > 1 ? 's' : '') + ')',
          resolved_paths.pm_exec_path, resolved_paths.exec_mode, resolved_paths.instances);

        if (!resolved_paths.env) resolved_paths.env = {};

        // Set PM2 HOME in case of child process using PM2 API
        resolved_paths.env['PM2_HOME'] = that.pm2_home;

        var additional_env = Modularizer.getAdditionalConf(resolved_paths.name);
        util._extend(resolved_paths.env, additional_env);

        // Is KM linked?
        resolved_paths.km_link = that.gl_is_km_linked;

        that.Client.executeRemote('prepare', resolved_paths, function(err, data) {
          if (err) {
            Common.printError(conf.PREFIX_MSG_ERR + 'Error while launching application', err.stack || err);
            return cb(Common.retErr(err));
          }

          Common.printOut(conf.PREFIX_MSG + 'Done.');
          return cb(true, data);
        });
        return false; 
      });
    }
  }

忽略前面的各類處理,直接看到series這個方法的調用,分別執行到下面定義的四個方法
restartExistingProcessName, restartExistingNameSpace,
restartExistingProcessId,restartExistingProcessPathOrStartNew
沒錯,從名字和做者的註釋咱們大概可以猜到(事實也是🐶),最後一個方法restartExistingProcessPathOrStartNew對咱們的這個流程影響最大。

那咱們重點看看這個方法到底作了什麼:
上來就先用rpc的方式獲取監控數據getMonitorData,這個也是經過God去處理,而後最後返回各類配置&默認數據。在回調函數中對這些數據進行各類處理,這裏先忽略。最後又經過rpc的方式調用到God的prepare方法。那麼咱們順藤摸瓜,打開lib/God.js文件,看到prepare方法裏面又是一輪騷操做以後調用executeApp方法,看到這個方法名,咱們大概能夠猜出來,咱們的應用,也許就是在這裏被執行了。
咱們驗證一下:方法中經過env_copy.exec_mode === 'cluster_mode'判斷,肯定當前的模式,因此流程進到cluster_mode中,執行God.nodeApp方法。可能最後的邏輯在這個方法中,咱們打開lib/God/ClusterMode.js

God.nodeApp = function nodeApp(env_copy, cb){
    var clu = null;

    console.log(`App [${env_copy.name}:${env_copy.pm_id}] starting in -cluster mode-`)
    if (env_copy.node_args && Array.isArray(env_copy.node_args)) {
      cluster.settings.execArgv = env_copy.node_args;
    }

    env_copy._pm2_version = pkg.version;

    try {
      // node.js cluster clients can not receive deep-level objects or arrays in the forked process, e.g.:
      // { "args": ["foo", "bar"], "env": { "foo1": "bar1" }} will be parsed to
      // { "args": "foo, bar", "env": "[object Object]"}
      // So we passing a stringified JSON here.
      clu = cluster.fork({pm2_env: JSON.stringify(env_copy), windowsHide: true});
    } catch(e) {
      God.logAndGenerateError(e);
      return cb(e);
    }

    clu.pm2_env = env_copy;

    /**
     * Broadcast message to God
     */
    clu.on('message', function cluMessage(msg) {
      /*********************************
       * If you edit this function
       * Do the same in ForkMode.js !
       *********************************/
      if (msg.data && msg.type) {
        return God.bus.emit(msg.type ? msg.type : 'process:msg', {
          at      : Utility.getDate(),
          data    : msg.data,
          process :  {
            pm_id      : clu.pm2_env.pm_id,
            name       : clu.pm2_env.name,
            rev        : (clu.pm2_env.versioning && clu.pm2_env.versioning.revision) ? clu.pm2_env.versioning.revision : null
          }
        });
      }
      else {

        if (typeof msg == 'object' && 'node_version' in msg) {
          clu.pm2_env.node_version = msg.node_version;
          return false;
        } else if (typeof msg == 'object' && 'cron_restart' in msg) {
          return God.restartProcessId({
            id : clu.pm2_env.pm_id
          }, function() {
            console.log('Application %s has been restarted via CRON', clu.pm2_env.name);
          });
        }

        return God.bus.emit('process:msg', {
          at      : Utility.getDate(),
          raw     : msg,
          process :  {
            pm_id      : clu.pm2_env.pm_id,
            name       : clu.pm2_env.name
          }
        });
      }
    });

    return cb(null, clu);
  };

果真不出所料,這裏面用到node.js的原生cluster模塊根據傳入的i參數,fork出相應數量的子進程,並監聽message事件,經過God.bus.emit('process:msg',{})的方式跟God進行通訊,最後把子進程的應用傳到回調函數中,穩穩的。
可是咱們fork出哪一個js文件呢,這裏沒有指明,因此又往上層的lib/God.js找到,在文件的一開始,已經作了關於cluster的設置,代碼以下

if (semver.lt(process.version, '10.0.0')) {
  cluster.setupMaster({
    windowsHide: true,
    exec : path.resolve(path.dirname(module.filename), 'ProcessContainerLegacy.js')
  });
}
else {
  cluster.setupMaster({
    windowsHide: true,
    exec : path.resolve(path.dirname(module.filename), 'ProcessContainer.js')
  });
}

咱們看到這裏經過原生cluster的api:setupMaster來設置app的啓動文件ProcessContainer.js
那咱們就看看這個啓動文件,這個文件裏面是一個自執行函數,通過前面的各類處理以後,最後執行的是exec這個方法。這個方法先是對腳本的後綴判斷完以後,再作一些進程對'message'事件的監聽,開啓日誌服務,捕獲進程錯誤等一系列操做以後,經過import或者require的方式來加載咱們的業務腳本,也就是咱們以前輸入的./examples/api-pm2/http.js。終於在這個時候,咱們的業務代碼就開始跑了。
業務代碼跑起來以後呢,就會在回調函數裏經過God.notify方法去通知到God。自此,大體的pm2 start流程就算是跑通裏,業務代碼也能夠正常運行了。


以上就是這整個pm2例子的大體流程,寫得比較粗,不少細節沒有講到,之後會繼續完善這篇文章,歡迎關注,歡迎拍磚。蟹蟹你們,你們加油
武漢加油,中國加油!!!

相關文章
相關標籤/搜索