python multiprocessing深度解析

在寫python多線程代碼的時候,會用到multiprocessing這個包,這篇文章總結了一些這個包在多進程管理方面的一些原理和代碼分析。python

1. 問題一:是否須要顯式調用pool的close和join方法,不調用的話,子進程是否沒法退出?bootstrap

首先初始化Pool的時候,指定processes的個數,就是pool中worker的個數,pool初始化的時候,會把worker以daemon=True的子進程方式啓動起來。api

    def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        for i in range(self._processes - len(self._pool)):
            w = self.Process(target=worker,
                             args=(self._inqueue, self._outqueue,
                                   self._initializer,
                                   self._initargs, self._maxtasksperchild)
                            )
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            debug('added worker')

推薦在使用完pool以後,用thread pool的時候調用close()和join()方法,這樣能夠把pool中的worker都釋放掉(等待子任務結束)。可是若是不顯式的調用,在主進程退出的時候,這些子進程也會退出(緣由是設置了daemon這個flag)。多線程

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
                   active_children=active_children,
                   current_process=current_process):
    # NB: we hold on to references to functions in the arglist due to the
    # situation described below, where this function is called after this
    # module's globals are destroyed.

    global _exiting

    info('process shutting down')
    debug('running all "atexit" finalizers with priority >= 0')
    _run_finalizers(0)

    if current_process() is not None:
        # NB: we check if the current process is None here because if
        # it's None, any call to ``active_children()`` will throw an
        # AttributeError (active_children winds up trying to get
        # attributes from util._current_process).  This happens in a
        # variety of shutdown circumstances that are not well-understood
        # because module-scope variables are not apparently supposed to
        # be destroyed until after this function is called.  However,
        # they are indeed destroyed before this function is called.  See
        # issues 9775 and 15881.  Also related: 4106, 9205, and 9207.

        for p in active_children():
            if p._daemonic:
                info('calling terminate() for daemon %s', p.name)
                p._popen.terminate()

        for p in active_children():
            info('calling join() for process %s', p.name)
            p.join()

    debug('running the remaining "atexit" finalizers')
    _run_finalizers()

主進程退出的時候,會調用_exit_function, 若是看到active的children是_daemonic的就會調用其terninate方法,讓子進程退出。exit是經過這個調用註冊的,atexit.register(_exit_function),本質是利用系統的退出hook方法,在退出的時候觸發對應的函數。app

2. 問題二:若是啓動以後,kill -9主進程,子進程會不會沒法退出?函數

以下代碼是pool中worker的主代碼邏輯,若是kill -9主進程,子進程若是沒有在處理做業,由於主進程退出了,get()方法從queue中拿task的時候,就會發生exception,這樣worker會退出。若是子進程正在處理任務,任務結束的時候,須要往queue中扔回結果,由於主進程退出了,因此也會exception,worker同樣會退出。this

def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
    put = outqueue.put
    get = inqueue.get
    if hasattr(inqueue, '_writer'):
        inqueue._writer.close()
        outqueue._reader.close()

    if initializer is not None:
        initializer(*initargs)
    completed = 0
    while maxtasks is None or (maxtasks and completed < maxtasks):
        try:
            task = get()
        except (EOFError, IOError):
            debug('worker got EOFError or IOError -- exiting')
            break

        if task is None:
            debug('worker got sentinel -- exiting')
            break

        job, i, func, args, kwds = task
        try:
            result = (True, func(*args, **kwds))
        except Exception, e:
            result = (False, e)
        try:
            put((job, i, result))
        except Exception as e:
            wrapped = MaybeEncodingError(e, result[1])
            debug("Possible encoding error while sending result: %s" % (
                wrapped))
            put((job, i, (False, wrapped)))
        completed += 1
    debug('worker exiting after %d tasks' % completed)

worker退出的時候,看以下代碼spa

## process.py
def _bootstrap(self):
        from . import util
        global _current_process

        try:
            self._children = set()
            self._counter = itertools.count(1)
            try:
                sys.stdin.close()
                sys.stdin = open(os.devnull)
            except (OSError, ValueError):
                pass
            _current_process = self
            util._finalizer_registry.clear()
            util._run_after_forkers()
            util.info('child process calling self.run()')
            try:
                self.run()
                exitcode = 0
            finally:
                util._exit_function()

子進程run()會結束,而後調用_exit_function()清理一些子進程,調用_run_finalizers()結束進程。線程

可是若是子進程在pool的worker中跑的是長時間不退出的task,那這個子進程就會沒法退出,一直在運行。若是task都是短做業,即便主進程被kill -9,子進程也會在做業跑完以後都退出。debug

相關文章
相關標籤/搜索