Ocata Neutron代碼分析(三)——oslo_service中的ServiceLauncher和ProcessLauncher(轉載)

1.概述
 
Openstack中有一個叫Launcher的概念,即專門用來啓動服務的,這個類被放在了oslo_service這個包裏面。Launcher分爲兩種,一種是ServiceLauncher,另外一種爲ProcessLauncher。ServiceLauncher用來啓動單進程的服務,而ProcessLauncher用來啓動有多個worker子進程的服務。
 
2.ServiceLauncher
 
ServiceLauncher繼承自Launcher,啓動服務的一個重要成員就是launcher_service,ServiceLauncher沒有對該成員函數進行任何改寫。
def launch_service(self, service, workers=1):
        """Load and start the given service.

        :param service: The service you would like to start, must be an
                        instance of :class:`oslo_service.service.ServiceBase`
        :param workers: This param makes this method compatible with
                        ProcessLauncher.launch_service. It must be None, 1 or
                        omitted.
        :returns: None

        """
        if workers is not None and workers != 1:
            raise ValueError(_("Launcher asked to start multiple workers"))
        _check_service_base(service)
        service.backdoor_port = self.backdoor_port
        self.services.add(service)

 laucher_service就是將服務添加到self.services成員裏面,services成員的類型是class Services,看看它的add方法。併發

class Services(object):

    def __init__(self):
        self.services = []
        self.tg = threadgroup.ThreadGroup()
        self.done = event.Event()

    def add(self, service):
        """Add a service to a list and create a thread to run it.

        :param service: service to run
        """
        self.services.append(service)
        self.tg.add_thread(self.run_service, service, self.done)

    @staticmethod
    def run_service(service, done):
        """Service start wrapper.

        :param service: service to run
        :param done: event to wait on until a shutdown is triggered
        :returns: None

        """
        try:
            service.start()
        except Exception:
            LOG.exception('Error starting thread.')
            raise SystemExit(1)
        else:
            done.wait()
Services這個類的初始化很簡單,即建立一個ThreadGroup,ThreadGroup實際上是eventlet的GreenPool,Openstack利用eventlet實現併發。add方法,將self.run_service這個方法放入pool中,而service就是它的參數。run_service方法很簡單,就是調用service的start方法,這樣就完成了服務的啓動。
 
3.ProcessLauncher
 
ProcessLauncher直接繼承於Object,因此其對launch_service方法進行了實現。
class ProcessLauncher(object):
    def launch_service(self, service, workers=1):
        """Launch a service with a given number of workers.

       :param service: a service to launch, must be an instance of
              :class:`oslo_service.service.ServiceBase`
       :param workers: a number of processes in which a service
              will be running
        """
        _check_service_base(service)
        wrap = ServiceWrapper(service, workers)

        LOG.info('Starting %d workers', wrap.workers)
        while self.running and len(wrap.children) < wrap.workers:
            self._start_child(wrap)

 lauch_service除了接受service參數之外,還須要接受一個workers參數,即子進程的個數,而後調用_start_child啓動多個子進程。app

    def _start_child(self, wrap):
        if len(wrap.forktimes) > wrap.workers:
            # Limit ourselves to one process a second (over the period of
            # number of workers * 1 second). This will allow workers to
            # start up quickly but ensure we don't fork off children that
            # die instantly too quickly.
            if time.time() - wrap.forktimes[0] < wrap.workers:
                LOG.info('Forking too fast, sleeping')
                time.sleep(1)

            wrap.forktimes.pop(0)

        wrap.forktimes.append(time.time())

        pid = os.fork()
        if pid == 0:
            self.launcher = self._child_process(wrap.service)
            while True:
                self._child_process_handle_signal()
                status, signo = self._child_wait_for_exit_or_signal(
                    self.launcher)
                if not _is_sighup_and_daemon(signo):
                    self.launcher.wait()
                    break
                self.launcher.restart()

            os._exit(status)

        LOG.debug('Started child %d', pid)

        wrap.children.add(pid)
        self.children[pid] = wrap

        return pid

_start_child只是簡單的調用了一個os.fork(),而後子進程開始運行,子進程調用_child_process。dom

   def _child_process(self, service):
        self._child_process_handle_signal()

        # Reopen the eventlet hub to make sure we don't share an epoll
        # fd with parent and/or siblings, which would be bad
        eventlet.hubs.use_hub()

        # Close write to ensure only parent has it open
        os.close(self.writepipe)
        # Create greenthread to watch for parent to close pipe
        eventlet.spawn_n(self._pipe_watcher)

        # Reseed random number generator
        random.seed()

        launcher = Launcher(self.conf, restart_method=self.restart_method)
        launcher.launch_service(service)
        return launcher

 _child_process其實很簡單,建立一個Launcher,調用Laucher.launch_service方法。前面介紹過ServiceLauncher繼承自Launcher,也是調用的launch_service方法,將服務啓動,所以接下來的步驟與前面一致,最終都將調用service.start方法啓動服務。函數

相關文章
相關標籤/搜索