原文連接python
以前咱們使用多線程(threading)和多進程(multiprocessing)完成常規的需求,在啓動的時候start、jon等步驟不能省,複雜的須要還要用1-2個隊列。隨着需求愈來愈複雜,若是沒有良好的設計和抽象這部分的功能層次,代碼量越多調試的難度就越大。有沒有什麼好的方法把這些步驟抽象一下呢,讓咱們不關注這些細節,輕裝上陣呢?設計模式
答案是:有的。多線程
從Python3.2開始一個叫作concurrent.futures被歸入了標準庫,而在Python2它屬於第三方的futures庫,須要手動安裝:併發
pip install futures
```
這個模塊中有2個類:ThreadPoolExecutor和ProcessPoolExecutor,也就是對threading和multiprocessing的進行了高級別的抽象, 暴露出統一的接口,幫助開發者很是方便的實現異步調用: ```python import time from concurrent.futures import ProcessPoolExecutor, as_completed NUMBERS = range(25, 38) def fib(n): if n<= 2: return 1 return fib(n-1) + fib(n-2) start = time.time() with ProcessPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result) print 'COST: {}'.format(time.time() - start)
感覺下是否是很輕便呢?看一下花費的時間:異步
python fib_executor.py fib(25) = 75025 fib(26) = 121393 fib(27) = 196418 fib(28) = 317811 fib(29) = 514229 fib(30) = 832040 fib(31) = 1346269 fib(32) = 2178309 fib(33) = 3524578 fib(34) = 5702887 fib(35) = 9227465 fib(36) = 14930352 fib(37) = 24157817 COST: 10.8920350075
除了用map,另一個經常使用的方法是submit。若是你要提交的任務的函數是同樣的,就能夠簡化成map。可是假如提交的任務函數是不同的,或者執行的過程之可能出現異常(使用map執行過程當中發現問題會直接拋出錯誤)就要用到submit:函數
from concurrent.futures import ThreadPoolExecutor, as_completed NUMBERS = range(30, 35) def fib(n): if n == 34: raise Exception("Don't do this") if n<= 2: return 1 return fib(n-1) + fib(n-2) with ThreadPoolExecutor(max_workers=3) as executor: future_to_num = {executor.submit(fib, num): num for num in NUMBERS} for future in as_completed(future_to_num): num = future_to_num[future] try: result = future.result() except Exception as e: print 'raise an exception: {}'.format(e) else: print 'fib({}) = {}'.format(num, result) with ThreadPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result)
執一下:tornado
python fib_executor_with_raise.py fib(30) = 832040 fib(31) = 1346269 raise an exception: Don't do this fib(32) = 2178309 fib(33) = 3524578 Traceback (most recent call last): File "fib_executor_with_raise.py", line 28, in <module> for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 580, in map yield future.result() File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 400, in result return self.__get_result() File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 359, in __get_result reraise(self._exception, self._traceback) File "/Library/Python/2.7/site-packages/concurrent/futures/_compat.py", line 107, in reraise exec('raise exc_type, exc_value, traceback', {}, locals_) File "/Library/Python/2.7/site-packages/concurrent/futures/thread.py", line 61, in run result = self.fn(*self.args, **self.kwargs) File "fib_executor_with_raise.py", line 9, in fib raise Exception("Don't do this") Exception: Don't do this
能夠看到,第一次捕捉到了異常,可是第二次執行的時候錯誤直接拋出來了。this
上面說到的map,有些同窗立刻會說,這不是進程(線程)池的效果嗎?看起來確實是的:spa
import time
from multiprocessing.pool import Pool NUMBERS = range(25, 38) def fib(n): if n<= 2: return 1 return fib(n-1) + fib(n-2) start = time.time() pool = Pool(3) results = pool.map(fib, NUMBERS) for num, result in zip(NUMBERS, pool.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result) print 'COST: {}'.format(time.time() - start)
好像代碼量更小喲。好吧,看一下花費的時間:線程
python fib_pool.py fib(25) = 75025 fib(26) = 121393 fib(27) = 196418 fib(28) = 317811 fib(29) = 514229 fib(30) = 832040 fib(31) = 1346269 fib(32) = 2178309 fib(33) = 3524578 fib(34) = 5702887 fib(35) = 9227465 fib(36) = 14930352 fib(37) = 24157817 COST: 17.1342718601 |
|
WhatTF居然花費了1.7倍的時間。爲何?
BTW,有興趣的同窗能夠對比下ThreadPool和ThreadPoolExecutor,因爲GIL的緣故,對比的差距必定會更多。
咱們就拿ProcessPoolExecutor介紹下它的原理,引用官方代碼註釋中的流程圖:
|======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | => | | => | Call Q | => | | | | +----------+ | | +-----------+ | | | | | ... | | | | ... | | | | | | 6 | | | | 5, call() | | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------