concurrent.futures
模塊提供了高度封裝的異步調用接口python
ThreadPoolExecutor:
線程池,提供異步調用編程
ProcessPoolExecutor:
進程池,提供異步調用服務器
ProcessPoolExecutor 和 ThreadPoolExecutor:
二者都實現相同的接口,該接口由抽象Executor類定義。併發
submit(fn, *args, **kwargs):
異步提交任務app
map(func, *iterables, timeout=None, chunksize=1)
:取代for循環submit的操做dom
shutdown(wait=True)
:至關於進程池的pool.close()+pool.join()
操做異步
result(timeout=None)
:取得結果函數
add_done_callback(fn)
:回調函數操作系統
池的功能:限制進程數或線程數.線程
何時限制: 當併發的任務數量遠遠大於計算機所能承受的範圍,即沒法一次性開啓過多的任務數量 我就應該考慮去限制我進程數或線程數,從保證服務器不崩.
from concurrent.futures import ProcessPoolExecutor from multiprocessing import Process,current_process import time def task(i): print(f'{current_process().name} 在執行任務{i}') time.sleep(1) if __name__ == '__main__': pool = ProcessPoolExecutor(4) # 進程池裏又4個進程 for i in range(20): # 20個任務 pool.submit(task,i)# 進程池裏當前執行的任務i,池子裏的4個進程一次一次執行任務
from concurrent.futures import ThreadPoolExecutor from threading import Thread,currentThread import time def task(i): print(f'{currentThread().name} 在執行任務{i}') time.sleep(1) if __name__ == '__main__': pool = ThreadPoolExecutor(4) # 進程池裏又4個線程 for i in range(20): # 20個任務 pool.submit(task,i)# 線程池裏當前執行的任務i,池子裏的4個線程一次一次執行任務
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(20): # future=executor.submit(task,i) executor.map(task,range(1,21)) #map取代了for+submit
理解爲提交任務的兩種方式
同步: 提交了一個任務,必須等任務執行完了(拿到返回值),才能執行下一行代碼
異步: 提交了一個任務,不要等執行完了,能夠直接執行下一行代碼.
同步:至關於執行任務的串行執行
異步
from concurrent.futures import ProcessPoolExecutor from multiprocessing import Process,current_process import time n = 1 def task(i): global n print(f'{current_process().name} 在執行任務{i}') time.sleep(1) n += i return n if __name__ == '__main__': pool = ProcessPoolExecutor(4) # 進程池裏又4個線程 pool_lis = [] for i in range(20): # 20個任務 future = pool.submit(task,i)# 進程池裏當前執行的任務i,池子裏的4個線程一次一次執行任務 # print(future.result()) # 這是在等待我執行任務獲得的結果,若是一直沒有結果,這裏會致使咱們全部任務編程了串行 # 在這裏就引出了下面的pool.shutdown()方法 pool_lis.append(future) pool.shutdown(wait=True) # 關閉了池的入口,不容許在往裏面添加任務了,會等帶全部的任務執行完,結束阻塞 for p in pool_lis: print(p.result()) print(n)# 這裏一開始確定是拿到0的,由於我只是去告訴操做系統執行子進程的任務,代碼依然會繼續往下執行 # 能夠用join去解決,等待每個進程結束後,拿到他的結果
import time from threading import Thread,currentThread from concurrent.futures import ThreadPoolExecutor def task(i): print(f'{currentThread().name} 在執行{i}') time.sleep(1) return i**2 # parse 就是一個回調函數 def parse(future): # 處理拿到的結果 print(f'{currentThread().name} 結束了當前任務') print(future.result()) if __name__ == '__main__': pool = ThreadPoolExecutor(4) for i in range(20): future = pool.submit(task,i) ''' 給當前執行的任務綁定了一個函數,在當前任務結束的時候就會觸發這個函數(稱之爲回調函數) 會把future對象做爲參數傳給函數 注:這個稱爲回調函數,當前任務處理結束了,就回來調parse這個函數 ''' future.add_done_callback(parse) # add_done_callback (parse) parse是一個回調函數 # add_done_callback () 是對象的一個綁定方法,他的參數就是一個函數