先看個例子:python
import time from concurrent.futures import ThreadPoolExecutor def foo(): print('enter at {} ...'.format(time.strftime('%X'))) time.sleep(5) print('exit at {} ...'.format(time.strftime('%X'))) executor = ThreadPoolExecutor() executor.submit(foo) executor.shutdown()
執行結果:函數
enter at 16:20:31 ... exit at 16:20:36 ...
shutdown(wait=True) 方法默認阻塞當前線程,等待子線程執行完畢。即便 shutdown(wait=Fasle)也只是非阻塞的關閉線程池,線程池中正在執行任務的子線程並不會被立刻中止,而是會繼續執行直到執行完畢。嘗試在源碼中給新開啓的子線程調用t.join(0)來立馬強制中止子線程t,也不行,究竟是什麼緣由保證了線程池中的線程在關閉線程池時,線程池中正在執行任務的子線程們不會被關閉呢?this
看一下ThreadPoolExecutor源碼:線程
class ThreadPoolExecutor(_base.Executor): def __init__(self, max_workers=None, thread_name_prefix=''): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. """ if max_workers is None: # Use this number because ThreadPoolExecutor is often # used to overlap I/O instead of CPU work. max_workers = (os.cpu_count() or 1) * 5 if max_workers <= 0: raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() self._thread_name_prefix = thread_name_prefix def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() # 把目標函數f包裝成worker對象,執行worker.run()會調用f() w = _WorkItem(f, fn, args, kwargs) # 把worker對象放入到隊列中 self._work_queue.put(w) # 開啓一個新的線程不斷的從queue中獲取worker對象,獲取到則調用worker.run() self._adjust_thread_count() return f submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): # 當執行del executor時,這個回調方法會被調用,也就是說當executor對象被垃圾回收時調用 def weakref_cb(_, q=self._work_queue): q.put(None) num_threads = len(self._threads) if num_threads < self._max_workers: thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads) # 把_worker函數做爲新線程的執行函數 t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue)) t.daemon = True t.start() self._threads.add(t) # 這一步很重要,是確保該線程t不被t.join(0)強制中斷的關鍵。具體查看_python_exit函數 _threads_queues[t] = self._work_queue def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True self._work_queue.put(None) if wait: for t in self._threads: t.join() shutdown.__doc__ = _base.Executor.shutdown.__doc__
submit(func) 幹了兩件事:code
_adjust_thread_count()幹了兩件事:orm
開啓一個新線程執行_worker函數,這個函數的做用就是不斷去queue中取出worker, 執行woker.run(),即執行func()對象
把新線程跟隊列queue綁定,防止線程被join(0)強制中斷。隊列
來看一下_worker函數源碼:get
def _worker(executor_reference, work_queue): try: while True: # 不斷從queue中取出worker對象 work_item = work_queue.get(block=True) if work_item is not None: # 執行func() work_item.run() # Delete references to object. See issue16284 del work_item continue # 從弱引用對象中返回executor executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. # 當executor執行shutdown()方法時executor._shutdown爲True,同時會放入None到隊列, # 當work_item.run()執行完畢時,又會進入到下一輪循環從queue中獲取worker對象,可是 # 因爲shutdown()放入了None到queue,所以取出的對象是None,從而判斷這裏的if條件分支, # 發現executor._shutdown是True,又放入一個None到queue中,是來通知其餘線程跳出while循環的 # shutdown()中的添加None到隊列是用來結束線程池中的某一個線程的,這個if分支中的添加None # 隊列是用來通知其餘線程中的某一個線程結束的,這樣連鎖反應使得全部線程執行完func中的邏輯後都會結束 if _shutdown or executor is None or executor._shutdown: # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True)
能夠看出,這個 _worker方法的做用就是在新新線程中不斷得到queue中的worker對象,執行worker.run()方法,執行完畢後經過放入None到queue隊列的方式來通知其餘線程結束。源碼
再來看看_adjust_thread_count()方法中的_threads_queues[t] = self._work_queue這個操做是如何實現防止join(0)的操做強制中止正在執行的線程的。
import atexit _threads_queues = weakref.WeakKeyDictionary() _shutdown = False def _python_exit(): global _shutdown _shutdown = True items = list(_threads_queues.items()) for t, q in items: q.put(None) # 取出_threads_queues中的線程t,執行t.join()強制等待子線程完成 for t, q in items: t.join() atexit.register(_python_exit)
這個atexit模塊的做用是用來註冊一個函數,當MainThread中的邏輯執行完畢時,會執行註冊的這個_python_exit函數。而後執行_python_exit中的邏輯,也就是說t.join()會被執行,強制阻塞。這裏好奇,既然是在MainThread結束後執行,那這個t.join()是在什麼線程中被執行的呢。實際上是一個叫_DummyThread線程的虛擬線程中執行的。
import atexit import threading import weakref import time threads_queues = weakref.WeakKeyDictionary() def foo(): print('enter at {} ...'.format(time.strftime('%X'))) time.sleep(5) print('exit at {} ...'.format(time.strftime('%X'))) def _python_exit(): items = list(threads_queues.items()) print('current thread in _python_exit --> ', threading.current_thread()) for t, _ in items: t.join() atexit.register(_python_exit) if __name__ == '__main__': t = threading.Thread(target=foo) t.setDaemon(True) t.start() threads_queues[t] = foo print(time.strftime('%X')) t.join(timeout=2) print(time.strftime('%X')) t.join(timeout=2) print(time.strftime('%X')) print('current thread in main -->', threading.current_thread()) print(threading.current_thread(), 'end')
執行結果:
enter at 17:13:44 ... 17:13:44 17:13:46 17:13:48 current thread in main --> <_MainThread(MainThread, started 12688)> <_MainThread(MainThread, started 12688)> end current thread in _python_exit --> <_DummyThread(Dummy-2, started daemon 12688)> exit at 17:13:49 ...
從這個例子能夠看到,當線程t開啓時foo函數阻塞5秒,在MainThread中2次調用t.join(timeout=2),分別的等待了2秒,總等待時間是4秒,可是當執行第二個t.join(timeout=2)後,線程t依然沒有被強制中止,而後主線執行完畢,而後_python_exit方法被調用,在_DummyThread線程中由調用t.join(),繼續等待子線程t的執行完畢,直到線程t打印exit at 17:13:49 ...
才執行完畢。
總結:
join()是能夠被一個線程屢次調用的,至關是屢次等待的疊加。把_python_exit函數註冊到atexit模塊後,其餘線程即便企圖調用t.jion(n)來終止線程t也不起做用,由於_python_exit老是在最後執行時調用t.jion()來保證讓線程t執行完畢,而不是被中途強制中止。