Python線程池源碼分析

對Python線程池的研究是以前對Apshceduler分析的附加工做。數據庫

在以前對Apshceduler源碼分析的文章中,寫到調度器將任務放入線程池的函數segmentfault

def _do_submit_job(self, job, run_times):
        def callback(f):
            exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else
                       (f.exception(), getattr(f.exception(), '__traceback__', None)))
            if exc:
                self._run_job_error(job.id, exc, tb)
            else:
                self._run_job_success(job.id, f.result())

        f = self._pool.submit(_run_job, job, job._jobstore_alias, run_times, self._logger.name)
        f.add_done_callback(callback)

這裏分析的線程池類是concurrent.futures.ThreadPoolExecutor,也就是上述代碼中self._pool所使用的類。先上self._pool.submit函數的代碼,再作詳細分析函數

def submit(self, fn, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f

f和w是兩個很是重要的變量,f做爲submit返回的對象,submit函數的調用者能夠對其添加回調,待fn執行完成後,會在當前線程執行,具體是如何實現的,這裏先不說,下面再詳細分析;w則是封裝了線程須要執行的方法和參數,經過self._work_queue.put(w)方法放入一個隊列當中。源碼分析

self._adjust_thread_count()方法則是檢查當前線程池的線程數量,若是小於設定的最大值,就開闢一個線程,代碼就不上了,直接看這些個線程都是幹嗎的線程

def _worker(executor_reference, work_queue):
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()
                # Delete references to object. See issue16284
                del work_item
                continue
            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            if _shutdown or executor is None or executor._shutdown:
                # Notice other workers
                work_queue.put(None)
                return
            del executor
    except BaseException:
        _base.LOGGER.critical('Exception in worker', exc_info=True)

這些線程就是一個死循環,不斷的從任務隊列中獲取到_WorkItem,而後經過其封裝方法,執行咱們須要的任務。若是取到的任務爲None,就往隊列中再放入一個None,以通知其它線程結束,而後結束當前循環。code

def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as e:
            self.future.set_exception(e)
        else:
            self.future.set_result(result)

若是沒有異常,執行結束後,會執行以前咱們說的回調。在self.future.set_result(result)方法中會執行任務回調,固然了,是在當前線程中。若是須要寫入數據庫之類的操做,不建議在回調中直接寫入。對象

相關文章
相關標籤/搜索