對Python線程池的研究是以前對Apshceduler分析的附加工做。數據庫
在以前對Apshceduler源碼分析的文章中,寫到調度器將任務放入線程池的函數segmentfault
def _do_submit_job(self, job, run_times): def callback(f): exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else (f.exception(), getattr(f.exception(), '__traceback__', None))) if exc: self._run_job_error(job.id, exc, tb) else: self._run_job_success(job.id, f.result()) f = self._pool.submit(_run_job, job, job._jobstore_alias, run_times, self._logger.name) f.add_done_callback(callback)
這裏分析的線程池類是concurrent.futures.ThreadPoolExecutor,也就是上述代碼中self._pool所使用的類。先上self._pool.submit函數的代碼,再作詳細分析函數
def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f
f和w是兩個很是重要的變量,f做爲submit返回的對象,submit函數的調用者能夠對其添加回調,待fn執行完成後,會在當前線程執行,具體是如何實現的,這裏先不說,下面再詳細分析;w則是封裝了線程須要執行的方法和參數,經過self._work_queue.put(w)方法放入一個隊列當中。源碼分析
self._adjust_thread_count()方法則是檢查當前線程池的線程數量,若是小於設定的最大值,就開闢一個線程,代碼就不上了,直接看這些個線程都是幹嗎的線程
def _worker(executor_reference, work_queue): try: while True: work_item = work_queue.get(block=True) if work_item is not None: work_item.run() # Delete references to object. See issue16284 del work_item continue executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True)
這些線程就是一個死循環,不斷的從任務隊列中獲取到_WorkItem,而後經過其封裝方法,執行咱們須要的任務。若是取到的任務爲None,就往隊列中再放入一個None,以通知其它線程結束,而後結束當前循環。code
def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException as e: self.future.set_exception(e) else: self.future.set_result(result)
若是沒有異常,執行結束後,會執行以前咱們說的回調。在self.future.set_result(result)方法中會執行任務回調,固然了,是在當前線程中。若是須要寫入數據庫之類的操做,不建議在回調中直接寫入。對象