python由於其全局解釋器鎖GIL而沒法經過線程實現真正的平行計算。這個論斷咱們不展開,可是有個概念咱們要說明,IO密集型 vs. 計算密集型。html
IO密集型:讀取文件,讀取網絡套接字頻繁。python
計算密集型:大量消耗CPU的數學與邏輯運算,也就是咱們這裏說的平行計算。網絡
而concurrent.futures模塊,能夠利用multiprocessing實現真正的平行計算。多線程
核心原理是:concurrent.futures會以子進程的形式,平行的運行多個python解釋器,從而令python程序能夠利用多核CPU來提高執行速度。因爲子進程與主解釋器相分離,因此他們的全局解釋器鎖也是相互獨立的。每一個子進程都可以完整的使用一個CPU內核。併發
第一章 concurrent.futures性能闡述app
- 最大公約數
這個函數是一個計算密集型的函數。異步
# -*- coding:utf-8 -*- # 求最大公約數 def gcd(pair): a, b = pair low = min(a, b) for i in range(low, 0, -1): if a % i == 0 and b % i == 0: return inumbers = [
(1963309, 2265973), (1879675, 2493670), (2030677, 3814172),
(1551645, 2229620), (1988912, 4736670), (2198964, 7876293)
]
socket
- 不使用多線程/多進程
import timestart = time.time()
results = list(map(gcd, numbers))
end = time.time()
print 'Took %.3f seconds.' % (end - start)asyncTook 2.507 seconds.函數
消耗時間是:2.507。
- 多線程ThreadPoolExecutor
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executorstart = time.time()
pool = ThreadPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time.time()
print 'Took %.3f seconds.' % (end - start)Took 2.840 seconds.
消耗時間是:2.840。
上面說過gcd是一個計算密集型函數,由於GIL的緣由,多線程是沒法提高效率的。同時,線程啓動的時候,有必定的開銷,與線程池進行通訊,也會有開銷,因此這個程序使用了多線程反而更慢了。
- 多進程ProcessPoolExecutor
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executorstart = time.time()
pool = ProcessPoolExecutor(max_workers=2)
results = list(pool.map(gcd, numbers))
end = time.time()
print 'Took %.3f seconds.' % (end - start)Took 1.861 seconds.
消耗時間:1.861。
在兩個CPU核心的機器上運行多進程程序,比其餘兩個版本都快。這是由於,ProcessPoolExecutor類會利用multiprocessing模塊所提供的底層機制,完成下列操做:
1)把numbers列表中的每一項輸入數據都傳給map。
2)用pickle模塊對數據進行序列化,將其變成二進制形式。
3)經過本地套接字,將序列化以後的數據從煮解釋器所在的進程,發送到子解釋器所在的進程。
4)在子進程中,用pickle對二進制數據進行反序列化,將其還原成python對象。
5)引入包含gcd函數的python模塊。
6)各個子進程並行的對各自的輸入數據進行計算。
7)對運行的結果進行序列化操做,將其轉變成字節。
8)將這些字節經過socket複製到主進程之中。
9)主進程對這些字節執行反序列化操做,將其還原成python對象。
10)最後,把每一個子進程所求出的計算結果合併到一份列表之中,並返回給調用者。
multiprocessing開銷比較大,緣由就在於:主進程和子進程之間通訊,必須進行序列化和反序列化的操做。
第二章 concurrent.futures源碼分析
- Executor
能夠任務Executor是一個抽象類,提供了以下抽象方法submit,map(上面已經使用過),shutdown。值得一提的是Executor實現了__enter__和__exit__使得其對象能夠使用with操做符。關於上下文管理和with操做符詳細請參看這篇博客http://www.cnblogs.com/kangoroo/p/7627167.html
ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來建立線程池和進程池的代碼。
class Executor(object): """This is an abstract base class for concrete asynchronous executors."""def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.Returns:
A Future representing the given call.
"""
raise NotImplementedError()def map(self, fn, *iterables, **kwargs):
"""Returns a iterator equivalent to map(fn, iter).Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
timeout = kwargs.get('timeout')
if timeout is not None:
end_time = timeout + time.time()fs = [self.submit(fn, args) for args in itertools.izip(iterables)]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()def shutdown(self, wait=True):
"""Clean-up the resources associated with the Executor.It is safe to call this method several times. Otherwise, no other
methods can be called after this one.Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
executor have been reclaimed.
"""
passdef enter(self):
return selfdef exit(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
下面咱們以線程ProcessPoolExecutor的方式說明其中的各個方法。
- map
map(self, fn, *iterables, **kwargs)
map方法的實例咱們上面已經實現過,值得注意的是,返回的results列表是有序的,順序和*iterables迭代器的順序一致。
這裏咱們使用with操做符,使得當任務執行完成以後,自動執行shutdown函數,而無需編寫相關釋放代碼。
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executorstart = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
results = list(pool.map(gcd, numbers))
print 'results: %s' % results
end = time.time()
print 'Took %.3f seconds.' % (end - start)
產出結果是:
results: [1, 5, 1, 5, 2, 3]
Took 1.617 seconds.
- submit
submit(self, fn, *args, **kwargs)
submit方法用於提交一個可並行的方法,submit方法同時返回一個future實例。
future對象標識這個線程/進程異步進行,並在將來的某個時間執行完成。future實例表示線程/進程狀態的回調。
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executorstart = time.time()
futures = list()
with ProcessPoolExecutor(max_workers=2) as pool:
for pair in numbers:
future = pool.submit(gcd, pair)
futures.append(future)
print 'results: %s' % [future.result() for future in futures]
end = time.time()
print 'Took %.3f seconds.' % (end - start)
產出結果是:
results: [1, 5, 1, 5, 2, 3]
Took 2.289 seconds.
- future
submit函數返回future對象,future提供了跟蹤任務執行狀態的方法。好比判斷任務是否執行中future.running(),判斷任務是否執行完成future.done()等等。
as_completed方法傳入futures迭代器和timeout兩個參數
默認timeout=None,阻塞等待任務執行完成,並返回執行完成的future對象迭代器,迭代器是經過yield實現的。
timeout>0,等待timeout時間,若是timeout時間到仍有任務未能完成,再也不執行並拋出異常TimeoutError
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completedstart = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
futures = [ pool.submit(gcd, pair) for pair in numbers]
for future in futures:
print '執行中:%s, 已完成:%s' % (future.running(), future.done())
print '#### 分界線 ####'
for future in as_completed(futures, timeout=2):
print '執行中:%s, 已完成:%s' % (future.running(), future.done())
end = time.time()
print 'Took %.3f seconds.' % (end - start)
- wait
wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另一個是uncompleted(未完成的)。
使用wait方法的一個優點就是得到更大的自由度,它接收三個參數FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE,默認設置爲ALL_COMPLETED。
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed, wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTIONstart = time.time()
with ProcessPoolExecutor(max_workers=2) as pool:
futures = [ pool.submit(gcd, pair) for pair in numbers]
for future in futures:
print '執行中:%s, 已完成:%s' % (future.running(), future.done())
print '#### 分界線 ####'
done, unfinished = wait(futures, timeout=2, return_when=ALL_COMPLETED)
for d in done:
print '執行中:%s, 已完成:%s' % (d.running(), d.done())
print d.result()
end = time.time()
print 'Took %.3f seconds.' % (end - start)
因爲設置了ALL_COMPLETED,因此wait等待全部的task執行完成,能夠看到6個任務都執行完成了。
執行中:True, 已完成:False 執行中:True, 已完成:False 執行中:True, 已完成:False 執行中:True, 已完成:False 執行中:False, 已完成:False 執行中:False, 已完成:False #### 分界線 #### 執行中:False, 已完成:True 執行中:False, 已完成:True 執行中:False, 已完成:True 執行中:False, 已完成:True 執行中:False, 已完成:True 執行中:False, 已完成:True Took 1.518 seconds.
若是咱們將配置改成FIRST_COMPLETED,wait會等待直到第一個任務執行完成,返回當時全部執行成功的任務。這裏並無作併發控制。
重跑,結構以下,能夠看到執行了2個任務。
執行中:True, 已完成:False 執行中:True, 已完成:False 執行中:True, 已完成:False 執行中:True, 已完成:False 執行中:False, 已完成:False 執行中:False, 已完成:False #### 分界線 #### 執行中:False, 已完成:True 執行中:False, 已完成:True Took 1.517 seconds.