Ocata Neutron代碼分析(二)——Neutron RPC啓動過程分析

RPC啓動跟Neutron API的啓動在同一個函數中執行,neutron.server.wsgi_eventlet.py中的eventlet_wsgi_server。api

def eventlet_wsgi_server():
    neutron_api = service.serve_wsgi(service.NeutronApiService)
    start_api_and_rpc_workers(neutron_api)

def start_api_and_rpc_workers(neutron_api):
    try:
        worker_launcher = service.start_all_workers()

        pool = eventlet.GreenPool()
        api_thread = pool.spawn(neutron_api.wait)
        plugin_workers_thread = pool.spawn(worker_launcher.wait)

        # api and other workers should die together. When one dies,
        # kill the other.
        api_thread.link(lambda gt: plugin_workers_thread.kill())
        plugin_workers_thread.link(lambda gt: api_thread.kill())

        pool.waitall()
    except NotImplementedError:
        LOG.info(_LI("RPC was already started in parent process by "
                     "plugin."))

        neutron_api.wait()
start_api_and_rpc_workers函數中使用start_all_workers函數來啓動RPC相關的workers,返回worker_launcher對象(其實是oslo_service.service::ProcessLauncher的實例)。接着,主進程使用GreenPool分別spawn出neutron_api和worker_launcher的wait函數,並調用waitall函數來等待這兩個GreenThread結束。其中的兩個link函數是爲了保證其中一個服務掛掉,另一個服務也要隨之結束。
下面重點分析neutron.service的start_all_workers函數:
def start_all_workers():
    workers = _get_rpc_workers() + _get_plugins_workers()
    return _start_workers(workers)
start_all_workers主要完成獲取workers並啓動的工做,這裏的workers分爲兩類:
  • _get_rpc_workers返回每一個plugin(包括core plugin和service plugin)所必須的rpc worker;
  • _get_plugins_workers返回每一種(跟第一點中的每一個plugin不一樣)plugin中用於實現本身特殊需求(若是有)的rpc workers。
先分析_get_rpc_workers:
def _get_rpc_workers():
    plugin = directory.get_plugin()                         # 獲取core plugin
    service_plugins = directory.get_plugins().values()      # 獲取plugin,包括core plugin和service plugin

    if cfg.CONF.rpc_workers < 1:                            # 檢查配置文件中的rpc_workers的值,必須大於或等於1
        cfg.CONF.set_override('rpc_workers', 1)

    # If 0 < rpc_workers then start_rpc_listeners would be called in a
    # subprocess and we cannot simply catch the NotImplementedError.  It is
    # simpler to check this up front by testing whether the plugin supports
    # multiple RPC workers.
    if not plugin.rpc_workers_supported():
        LOG.debug("Active plugin doesn't implement start_rpc_listeners")
        if 0 < cfg.CONF.rpc_workers:
            LOG.error(_LE("'rpc_workers = %d' ignored because "
                          "start_rpc_listeners is not implemented."),
                      cfg.CONF.rpc_workers)
        raise NotImplementedError()

    # passing service plugins only, because core plugin is among them
    rpc_workers = [RpcWorker(service_plugins,
                             worker_process_count=cfg.CONF.rpc_workers)]

    if (cfg.CONF.rpc_state_report_workers > 0 and
            plugin.rpc_state_report_workers_supported()):
        rpc_workers.append(
            RpcReportsWorker(
                [plugin],
                worker_process_count=cfg.CONF.rpc_state_report_workers
            )
        )
    return rpc_workers
plugin.rpc_workers_supported其實是檢查core plugin是否實現了start_rpc_listeners方法。
接着,實例化了RpcWorker類,這是這個函數中最關鍵的步驟。RpcWorker類和Neutron API部分的WorkerService類同樣,是繼承於neutron_worker.NeutronWorker類的子類,一樣實現了start函數。
最後,判斷配置文件中的rpc_state_report_workers和core plugin是否支持對rpc state的report。若是支持,則再啓動rpc_state_report_workers這麼多個子進程來運行 RpcReportsWorker。這個Worker是RpcWorker的子類,只是其start_listeners_method跟RpcWorker不一樣。
class RpcReportsWorker(RpcWorker):
    start_listeners_method = 'start_rpc_state_reports_listener'

 下面分析RpcWorker的構造函數和其start函數:session

class RpcWorker(neutron_worker.NeutronWorker):
    """Wraps a worker to be handled by ProcessLauncher"""
    start_listeners_method = 'start_rpc_listeners'

    def __init__(self, plugins, worker_process_count=1):                         # 構造函數只是進行了變量的簡單賦值
        super(RpcWorker, self).__init__(
            worker_process_count=worker_process_count
        )

        self._plugins = plugins
        self._servers = []

    def start(self):
        super(RpcWorker, self).start()
        for plugin in self._plugins:
            if hasattr(plugin, self.start_listeners_method):
                try:
                    servers = getattr(plugin, self.start_listeners_method)()
                except NotImplementedError:
                    continue
                self._servers.extend(servers)
start函數會遍歷全部的plugins(包括core plugin和service plugins),查看各個plugin是否實現了start_listeners_method(即start_rpc_listeners)方法,若是實現了則調用之,若是沒有就跳過。
這就是RpcWorker的做用。各個plugin的start_rpc_listeners方法中就完成了rpc的功能,主要是經過消費特定名稱的mq隊列消息來提供服務。
下面分析_get_plugins_workers函數:
def _get_plugins_workers():
    # NOTE(twilson) get_plugins also returns the core plugin
    plugins = directory.get_unique_plugins()

    # TODO(twilson) Instead of defaulting here, come up with a good way to
    # share a common get_workers default between NeutronPluginBaseV2 and
    # ServicePluginBase
    return [
        plugin_worker
        for plugin in plugins if hasattr(plugin, 'get_workers')
        for plugin_worker in plugin.get_workers()
    ]
_get_plugins_workers函數檢查了每種plugin(不是每個)中是否實現了get_workers方法,並將該方法返回的全部workers收集後返回。這裏的get_workesr方法返回plugin用於實現本身的特殊需求或提供個性化服務的workers。
兩個收集workers的函數執行完後,下面就是啓動各個workers的函數:
def start_all_workers():
    workers = _get_rpc_workers() + _get_plugins_workers()
    return _start_workers(workers)
def _start_workers(workers):
    process_workers = [
        plugin_worker for plugin_worker in workers
        if plugin_worker.worker_process_count > 0
    ]                                                                 # 篩選出worker_process_count大於0的workers

    try:
        if process_workers:                                           # 若是存在worker_process_count大於0的workers
            worker_launcher = common_service.ProcessLauncher(
                cfg.CONF, wait_interval=1.0
            )

            # add extra process worker and spawn there all workers with
            # worker_process_count == 0
            thread_workers = [
                plugin_worker for plugin_worker in workers
                if plugin_worker.worker_process_count < 1
            ]
            if thread_workers:
                process_workers.append(
                    AllServicesNeutronWorker(thread_workers)
                )

            # dispose the whole pool before os.fork, otherwise there will
            # be shared DB connections in child processes which may cause
            # DB errors.
            session.context_manager.dispose_pool()

            for worker in process_workers:
                worker_launcher.launch_service(worker,
                                               worker.worker_process_count)
        else:                                                          # 若是workers中的全部worker的worker_process_count都爲0
            worker_launcher = common_service.ServiceLauncher(cfg.CONF)
            for worker in workers:
                worker_launcher.launch_service(worker)
        return worker_launcher
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE('Unrecoverable error: please check log for '
                              'details.'))
_start_workers首先判斷workers中是否存在worker_process_count大於0的workers。
  • 若是不存在(else分支):實例化ServiceLauncher,並調用其launch_service方法依次在當前的進程中啓動各個worker;
  • 若是存在(if分支):
    • 首先實例化ProcessLauncher;
    • 接着對workers中可能存在的worker_process_count爲0的worker進行處理,將這樣的worker造成thread_workers列表,並將這些worker做爲參數實例化AllServicesNeutronWorker(AllServicesNeutronWorker也是NeutronWorker的子類並重寫了start方法,直接調用Launcher.launch_service啓動服務)。最後將AllServicesNeutronWorker實例append到process_workers列表中;app

    • 最後,調用ProcessLauncher.launch_service方法依次啓動各個worker(與Neutron API的啓動同樣)。
ServiceLauncher和ProcessLauncher均實現了launch_service方法。但ServiceLauncher是Launcher的子類,而ProcessLauncher的父類是Object。其launch_service存在的區別是:
  • ServiceLauncher是將任務放到greenthread中運行;
  • ProcessLauncher是將任務放到os fork出來的子進程中運行。
相關文章
相關標籤/搜索