Python 學習筆記 併發 future

concurrent.futures模塊

該模塊主要特點在於ThreadPoolExecutor 和 ProcessPoolExecutor 類,這兩個類都繼承自concurrent.futures._base.Executor類,它們實現的接口能分別在不一樣的線程或進程中執行可調用的對象,它們都在內部維護着一個工做線程或者進程池。python

ThreadPoolExecutor 和 ProcessPoolExecutor 類是高級類,大部分狀況下只要學會使用便可,無需關注其實現細節。async

####ProcessPoolExecutor 類函數

>class ThreadPoolExecutor(concurrent.futures._base.Executor)

>|  This is an abstract base class for concrete asynchronous executors.

>|  Method resolution order:

>|      ThreadPoolExecutor

 |      concurrent.futures._base.Executor

 |      builtins.object

 |

 |  Methods defined here:

 |

 |  init(self, max_workers=None, thread_name_prefix='')

 |      Initializes a new ThreadPoolExecutor instance.

 |

 |      Args:

 |          max_workers: The maximum number of threads that can be used to

 |              execute the given calls.

 |          thread_name_prefix: An optional name prefix to give our threads.

 |

 |  shutdown(self, wait=True)

 |      Clean-up the resources associated with the Executor.

 |

 |      It is safe to call this method several times. Otherwise, no other

 |      methods can be called after this one.

 |

 |      Args:

 |          wait: If True then shutdown will not return until all running

 |              futures have finished executing and the resources used by the

 |              executor have been reclaimed.

 |

 |  submit(self, fn, *args, **kwargs)

 |      Submits a callable to be executed with the given arguments.

 |

 |      Schedules the callable to be executed as fn(*args, **kwargs) and returns

 |      a Future instance representing the execution of the callable.

 |

 |      Returns:

 |          A Future representing the given call.

 |

 |  ----------------------------------------------------------------------

 |  Methods inherited from concurrent.futures._base.Executor:

 |

 |  enter(self)

 |

 |  exit(self, exc_type, exc_val, exc_tb)

 |

 |  map(self, fn, *iterables, timeout=None, chunksize=1)

 |      Returns an iterator equivalent to map(fn, iter).

 |

 |      Args:

 |          fn: A callable that will take as many arguments as there are

 |              passed iterables.

 |          timeout: The maximum number of seconds to wait. If None, then there

 |              is no limit on the wait time.

 |          chunksize: The size of the chunks the iterable will be broken into

 |              before being passed to a child process. This argument is only

 |              used by ProcessPoolExecutor; it is ignored by

 |              ThreadPoolExecutor.

 |

 |      Returns:

 |          An iterator equivalent to: map(func, *iterables) but the calls may

 |          be evaluated out-of-order.

 |

 |      Raises:

 |          TimeoutError: If the entire result iterator could not be generated

 |              before the given timeout.

 |          Exception: If fn(*args) raises for any values.

初始化能夠指定一個最大進程數做爲其參數 max_workers 的值,該值通常無需指定,默認爲當前運行機器的核心數,能夠由os.cpu_count()獲取;類中含有方法:ui

  1. map()方法,與python內置方法map() 功能相似,也就是映射,參數爲:
  • 一個可調用函數 fn
  • 一個迭代器 iterables
  • 超時時長 timeout
  • 塊數chuncksize 若是大於1, 迭代器會被分塊處理

---->> 該函數有一個特性:其返回結果與調用開始的順序是一致的;在調用過程當中不會產生阻塞,也就是說可能前者被調用執行結束以前,後者被調用已經執行結束了。this

若是必定要獲取到全部結果後再處理,能夠選擇採用submit()方法和futures.as_completed函數結合使用。lua

  1. shutdown()方法,清理全部與當前執行器(executor)相關的資源
  2. submit() 方法,提交一個可調用對象給fn使用
  3. 從concurrent.futures._base.Executor繼承了__enter__() 和 __exit__()方法,這意味着ProcessPoolExecutor 對象能夠用於with 語句。
from concurrent import futures
with futures.ProcessPoolExecutor(max_works=3) as executor:
     executor.map()

ThreadPoolExecutor類

class ThreadPoolExecutor(concurrent.futures._base.Executor)

 |  This is an abstract base class for concrete asynchronous executors.

 |

 |  Method resolution order:

 |      ThreadPoolExecutor

 |      concurrent.futures._base.Executor

 |      builtins.object

 |

 |  Methods defined here:

 |

 |  init(self, max_workers=None, thread_name_prefix='')

 |      Initializes a new ThreadPoolExecutor instance.

 |

 |      Args:

 |          max_workers: The maximum number of threads that can be used to

 |              execute the given calls.

 |          thread_name_prefix: An optional name prefix to give our threads.

 |

 |  shutdown(self, wait=True)

 |      Clean-up the resources associated with the Executor.

 |

 |      It is safe to call this method several times. Otherwise, no other

 |      methods can be called after this one.

 |

 |      Args:

 |          wait: If True then shutdown will not return until all running

 |              futures have finished executing and the resources used by the

 |              executor have been reclaimed.

 |

 |  submit(self, fn, *args, **kwargs)

 |      Submits a callable to be executed with the given arguments.

 |

 |      Schedules the callable to be executed as fn(*args, **kwargs) and returns

 |      a Future instance representing the execution of the callable.

 |

 |      Returns:

 |          A Future representing the given call.

 |

 |  ----------------------------------------------------------------------

 |  Methods inherited from concurrent.futures._base.Executor:

 |

 |  enter(self)

 |

 |  exit(self, exc_type, exc_val, exc_tb)

 |

 |  map(self, fn, *iterables, timeout=None, chunksize=1)

 |      Returns an iterator equivalent to map(fn, iter).

 |

 |      Args:

 |          fn: A callable that will take as many arguments as there are

 |              passed iterables.

 |          timeout: The maximum number of seconds to wait. If None, then there

 |              is no limit on the wait time.

 |          chunksize: The size of the chunks the iterable will be broken into

 |              before being passed to a child process. This argument is only

 |              used by ProcessPoolExecutor; it is ignored by

 |              ThreadPoolExecutor.

 |

 |      Returns:

 |          An iterator equivalent to: map(func, *iterables) but the calls may

 |          be evaluated out-of-order.

 |

 |      Raises:

 |          TimeoutError: If the entire result iterator could not be generated

 |              before the given timeout.

 |          Exception: If fn(*args) raises for any values.

與ProcessPoolExecutor 類十分類似,只不過一個是處理進程,一個是處理線程,可根據實際須要選擇。線程

示例

from time import sleep, strftime
from concurrent import futures


def display(*args):
    print(strftime('[%H:%M:%S]'), end="")
    print(*args)


def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n*10


def main():
    display('Script starting')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('results:', results)
    display('Waiting for individual results:')
    for i, result in enumerate(results):
        display('result {} : {}'.format(i, result))


if __name__ == '__main__':
    main()

運行結果:code

[20:32:12]Script starting
[20:32:12]loiter(0): doing nothing for 0s
[20:32:12]loiter(0): done.
[20:32:12]      loiter(1): doing nothing for 1s
[20:32:12]              loiter(2): doing nothing for 2s
[20:32:12]results: <generator object Executor.map.<locals>.result_iterator at 0x00000246DB21BC50>
[20:32:12]Waiting for individual results:
[20:32:12]                      loiter(3): doing nothing for 3s
[20:32:12]result 0 : 0
[20:32:13]      loiter(1): done.
[20:32:13]                              loiter(4): doing nothing for 4s
[20:32:13]result 1 : 10
[20:32:14]              loiter(2): done.
[20:32:14]result 2 : 20
[20:32:15]                      loiter(3): done.
[20:32:15]result 3 : 30
[20:32:17]                              loiter(4): done.
[20:32:17]result 4 : 40

不一樣機器運行結果可能不一樣。orm

示例中設置max_workers=3,因此代碼一開始運行,則有三個對象(0,1,2)被執行loiter() 操做; 三秒後,對象0運行結束,獲得結果result 0以後,對象3纔開始被執行,同理,對象4的執行時間在對象1執行結果result 1打印結束以後。對象

相關文章
相關標籤/搜索