Python標準庫爲咱們提供了threading和multiprocessing模塊編寫相應的異步多線程/多進程代碼。從Python3.2開始,標準庫爲咱們提供了concurrent.futures
模塊,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
兩個類ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來建立線程池和進程池的代碼。實現了對threading
和multiprocessing
的更高級的抽象,對編寫線程池/進程池提供了直接的支持。
concurrent.futures基礎模塊是executor和future。java
concurrent.futures模塊的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。可是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor倒是很是有用,顧名思義二者分別被用來建立線程池和進程池的代碼。咱們能夠將相應的tasks直接放入線程池/進程池,不須要維護Queue來操心死鎖的問題,線程池/進程池會自動幫咱們調度。node
Future這個概念相信有java和nodejs下編程經驗的朋友確定不陌生了,你能夠把它理解爲一個在將來完成的操做,這是異步編程的基礎,傳統編程模式下好比咱們操做queue.get的時候,在等待返回結果以前會產生阻塞,cpu不能讓出來作其餘事情,而Future的引入幫助咱們在等待的這段時間能夠完成其餘的操做。python
Executor中定義了submit()
方法,這個方法的做用是提交一個可執行的回調task
,並返回一個future實例。future對象表明的就是給定的調用。 編程
submit()
方法實現進程池/線程池from concurrent.futures import ProcessPoolExecutor import os,time,random def task(n): print('%s is running' %os.getpid()) time.sleep(2) return n**2 if __name__ == '__main__': p=ProcessPoolExecutor() #不填則默認爲cpu的個數 l=[] start=time.time() for i in range(10): obj=p.submit(task,i) #submit()方法返回的是一個future實例,要獲得結果須要用obj.result() l.append(obj) p.shutdown() #相似用from multiprocessing import Pool實現進程池中的close及join一塊兒的做用 print('='*30) # print([obj for obj in l]) print([obj.result() for obj in l]) print(time.time()-start) #上面方法也可寫成下面的方法 # start = time.time() # with ProcessPoolExecutor() as p: #相似打開文件,可省去.shutdown() # future_tasks = [p.submit(task, i) for i in range(10)] # print('=' * 30) # print([obj.result() for obj in future_tasks]) # print(time.time() - start)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import threading import os,time,random def task(n): print('%s:%s is running' %(threading.currentThread().getName(),os.getpid())) time.sleep(2) return n**2 if __name__ == '__main__': p=ThreadPoolExecutor() #不填則默認爲cpu的個數*5 l=[] start=time.time() for i in range(10): obj=p.submit(task,i) l.append(obj) p.shutdown() print('='*30) print([obj.result() for obj in l]) print(time.time()-start) #上面方法也可寫成下面的方法 # start = time.time() # with ThreadPoolExecutor() as p: #相似打開文件,可省去.shutdown() # future_tasks = [p.submit(task, i) for i in range(10)] # print('=' * 30) # print([obj.result() for obj in future_tasks]) # print(time.time() - start)
默認爲異步執行多線程
#p.submit(task,i).result()即同步執行 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time,random def task(n): print('%s is running' %os.getpid()) time.sleep(2) return n**2 if __name__ == '__main__': p=ProcessPoolExecutor() start=time.time() for i in range(10): res=p.submit(task,i).result() print(res) print('='*30) print(time.time()-start)
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import requests import os import time from threading import currentThread def get_page(url): print('%s:<%s> is getting [%s]' %(currentThread().getName(),os.getpid(),url)) response=requests.get(url) time.sleep(2) return {'url':url,'text':response.text} def parse_page(res): #此處的res是一個p.submit得到的一個future對象,不是結果 res=res.result() #res.result()拿到的纔是對應的結果 print('%s:<%s> parse [%s]' %(currentThread().getName(),os.getpid(),res['url'])) with open('db.txt','a') as f: parse_res='url:%s size:%s\n' %(res['url'],len(res['text'])) f.write(parse_res) if __name__ == '__main__': # p=ProcessPoolExecutor() p=ThreadPoolExecutor() urls = [ 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] for url in urls: # multiprocessing.pool_obj.apply_async(get_page,args=(url,),callback=parse_page) p.submit(get_page, url).add_done_callback(parse_page) #與以前的回調函數拿到的結果不一樣,這裏拿到的是前面submit方法執行完後返回的對象,要.result才能拿到對應的結果 p.shutdown() print('主',os.getpid())
和內置函數map差很少的用法,這個方法返回一個map(func, *iterables)迭代器,迭代器中的回調執行返回的結果有序的。app
如下是經過concurrent.futures模塊下類ThreadPoolExecutor和ProcessPoolExecutor實例化的對象的map方法實現進程池、線程池dom
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time def task(n): print('%s is running' %os.getpid()) time.sleep(2) return n**2 if __name__ == '__main__': # p=ProcessPoolExecutor() p=ThreadPoolExecutor() start = time.time() obj=p.map(task,range(10)) p.shutdown() print('='*30) print(list(obj)) print(time.time() - start)