理解Python併發編程-PoolExecutor篇

原文連接python

 

以前咱們使用多線程(threading)和多進程(multiprocessing)完成常規的需求,在啓動的時候start、jon等步驟不能省,複雜的須要還要用1-2個隊列。隨着需求愈來愈複雜,若是沒有良好的設計和抽象這部分的功能層次,代碼量越多調試的難度就越大。有沒有什麼好的方法把這些步驟抽象一下呢,讓咱們不關注這些細節,輕裝上陣呢?設計模式

答案是:有的。多線程

從Python3.2開始一個叫作concurrent.futures被歸入了標準庫,而在Python2它屬於第三方的futures庫,須要手動安裝:併發

 

pip install futures

```                                             


這個模塊中有2個類:ThreadPoolExecutor和ProcessPoolExecutor,也就是對threading和multiprocessing的進行了高級別的抽象, 暴露出統一的接口,幫助開發者很是方便的實現異步調用: ```python import time from concurrent.futures import ProcessPoolExecutor, as_completed NUMBERS = range(25, 38) def fib(n): if n<= 2: return 1 return fib(n-1) + fib(n-2) start = time.time() with ProcessPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result) print 'COST: {}'.format(time.time() - start)



感覺下是否是很輕便呢?看一下花費的時間:異步

 

python fib_executor.py fib(25) = 75025 fib(26) = 121393 fib(27) = 196418 fib(28) = 317811 fib(29) = 514229 fib(30) = 832040 fib(31) = 1346269 fib(32) = 2178309 fib(33) = 3524578 fib(34) = 5702887 fib(35) = 9227465 fib(36) = 14930352 fib(37) = 24157817 COST: 10.8920350075



除了用map,另一個經常使用的方法是submit。若是你要提交的任務的函數是同樣的,就能夠簡化成map。可是假如提交的任務函數是不同的,或者執行的過程之可能出現異常(使用map執行過程當中發現問題會直接拋出錯誤)就要用到submit:函數

 

from concurrent.futures import ThreadPoolExecutor, as_completed NUMBERS = range(30, 35) def fib(n): if n == 34: raise Exception("Don't do this") if n<= 2: return 1 return fib(n-1) + fib(n-2) with ThreadPoolExecutor(max_workers=3) as executor: future_to_num = {executor.submit(fib, num): num for num in NUMBERS} for future in as_completed(future_to_num): num = future_to_num[future] try: result = future.result() except Exception as e: print 'raise an exception: {}'.format(e) else: print 'fib({}) = {}'.format(num, result) with ThreadPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result)



執一下:tornado

 

python fib_executor_with_raise.py fib(30) = 832040 fib(31) = 1346269 raise an exception: Don't do this fib(32) = 2178309 fib(33) = 3524578 Traceback (most recent call last): File "fib_executor_with_raise.py", line 28, in <module> for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 580, in map yield future.result() File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 400, in result return self.__get_result() File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 359, in __get_result reraise(self._exception, self._traceback) File "/Library/Python/2.7/site-packages/concurrent/futures/_compat.py", line 107, in reraise exec('raise exc_type, exc_value, traceback', {}, locals_) File "/Library/Python/2.7/site-packages/concurrent/futures/thread.py", line 61, in run result = self.fn(*self.args, **self.kwargs) File "fib_executor_with_raise.py", line 9, in fib raise Exception("Don't do this") Exception: Don't do this



能夠看到,第一次捕捉到了異常,可是第二次執行的時候錯誤直接拋出來了。this

上面說到的map,有些同窗立刻會說,這不是進程(線程)池的效果嗎?看起來確實是的:spa

 

import time

from multiprocessing.pool import Pool NUMBERS = range(25, 38) def fib(n): if n<= 2: return 1 return fib(n-1) + fib(n-2) start = time.time() pool = Pool(3) results = pool.map(fib, NUMBERS) for num, result in zip(NUMBERS, pool.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result) print 'COST: {}'.format(time.time() - start)



好像代碼量更小喲。好吧,看一下花費的時間:線程

 
    
 

python fib_pool.py

fib(25) = 75025

fib(26) = 121393

fib(27) = 196418

fib(28) = 317811

fib(29) = 514229

fib(30) = 832040

fib(31) = 1346269

fib(32) = 2178309

fib(33) = 3524578

fib(34) = 5702887

fib(35) = 9227465

fib(36) = 14930352

fib(37) = 24157817

COST: 17.1342718601
 
    
 

WhatTF居然花費了1.7倍的時間。爲何?

BTW,有興趣的同窗能夠對比下ThreadPool和ThreadPoolExecutor,因爲GIL的緣故,對比的差距必定會更多。

原理

咱們就拿ProcessPoolExecutor介紹下它的原理,引用官方代碼註釋中的流程圖:

 

|======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | => | | => | Call Q | => | | | | +----------+ | | +-----------+ | | | | | ... | | | | ... | | | | | | 6 | | | | 5, call() | | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+



咱們結合源碼和上面的數據流分析一下:

  1. executor.map會建立多個_WorkItem對象,每一個對象都傳入了新建立的一個Future對象。
  2. 把每一個_WorkItem對象而後放進一個叫作「Work Items」的dict中,鍵是不一樣的「Work Ids」。
  3. 建立一個管理「Work Ids」隊列的線程「Local worker thread」,它能作2件事:
    1. 從「Work Ids」隊列中獲取Work Id, 經過「Work Items」找到對應的_WorkItem。若是這個Item被取消了,就從「Work Items」裏面把它刪掉,不然從新打包成一個_CallItem放入「Call Q」這個隊列。executor的那些進程會從隊列中取_CallItem執行,並把結果封裝成_ResultItems放入「Result Q」隊列中。
    2. 從「Result Q」隊列中獲取_ResultItems,而後從「Work Items」更新對應的Future對象並刪掉入口。

看起來就是一個「生產者/消費者」模型罷了,錯了。咱們要注意,整個過程並非多個進程與任務+結果-2個隊列直接通訊的,而是經過一箇中間的「Local worker thread」,它就是讓效率提高的重要緣由之一!!!

設想,當某一段程序提交了一個請求,指望獲得一個答覆。但服務程序對這個請求可能很慢,在傳統的單線程環境下,調用函數是同步的,也就是說它必須等到服務程序返回結果後,才能進行其餘處理。而在Future模式下,調用方式改成異步,而原先等待返回的時間段,在主調用函數中,則可用於處理其餘事物。

Future

Future是常見的一種併發設計模式,在多個其餘語言中均可以見到這種解決方案。

一個Future對象表明了一些還沒有就緒(完成)的結果,在「未來」的某個時間就緒了以後就能夠獲取到這個結果。好比上面的例子,咱們指望併發的執行一些參數不一樣的fib函數,獲取所有的結果。傳統模式就是在等待queue.get返回結果,這個是同步模式,而在Future模式下,調用方式改成異步,而原先等待返回的時間段,因爲「Local worker thread」的存在,這個時候能夠完成其餘工做

在tornado中也有對應的實現。2013年的時候,我曾經寫過一篇博客使用tornado讓你的請求異步非阻塞,最後也提到了用concurrent.futures實現異步非阻塞的完成耗時任務。

 

原文連接

相關文章
相關標籤/搜索