進程池也是經過事先劃分一塊系統資源區域,這組資源區域在服務器啓動時就已經建立和初始化,用戶若是想建立新的進程,能夠直接取得資源,從而避免了動態分配資源(這是很耗時的)。html
線程池內子進程的數目通常在3~10個之間,當有新的任務來到時,主進程將經過某種方式選擇進程池中的某一個子進程來爲之服務。相比於動態建立子進程,選擇一個已經存在的子進程的代價顯得小得多(進程開啓過多,效率反而會降低,開啓進程是須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行)。python
Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。正則表達式
Pool([numprocess [,initializer [, initargs]]]):建立進程池 numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值,若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務,不會開啓其餘進程 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None initargs:是要傳給initializer的參數組
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時, 將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。''' p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用 map():Pool類中的map方法,與內置的map函數用法行爲基本一致,即針對每一個參數都進行func()處理,它會使進程阻塞直到結果一塊兒總體返回。 注意:雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒後,程序纔會運行子進程。 terminal():結束工做進程,不在處理未處理的任務。 join():主進程阻塞等待子進程的退出,join方法必須在close或terminate以後使用。
例子1數組
from multiprocessing import Pool import time import os def func1(n): print("子進程%s開始%s"%(n,os.getpid())) time.sleep(2) print("子進程%s結束%s"%(n,os.getpid())) return n def func2(n): for i in range(10): print(n+2) if __name__ == "__main__": p = Pool(5) #5個進程,默認是cpu的核心數 res = p.map(func1,range(10)) #10個任務,參數必須是可迭代的 print(res) #返回值是帶全部子進程的結果的列表 # 默認異步的執行任務,且自帶close和joi # # p.map(funcname,iterable) 默認異步的執行任務,且自帶close和join
例子2服務器
from multiprocessing import Pool import time import os def func(n): print("進程池中的進程開始%s"%n,os.getpid()) time.sleep(2) print("進程池中的進程結束%s"%n,os.getpid()) if __name__ == "__main__": pool = Pool(5) for i in range(20): # pool.apply(func,args=(i,)) #同步的執行 pool.apply_async(func,args=(i,)) #異步的執行 pool.close() #結束進程池提交任務 pool.join() # 感知進程池中的任務執行結束
方法apply_async()和map_async()的返回值是AsyncResul的實例obj對象。網絡
實例具備如下方法app
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起異常。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。異步
obj.ready():若是調用完成,返回Trueasync
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常函數
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
例子1
from multiprocessing import Pool import time def func(n): print("進程%s"%n) time.sleep(2) return n*n if __name__ == "__main__": p = Pool(5) #5個進程 res_l = [] for i in range(10): # # res = p.apply(func,args=(i,)) #同步 # print(res) # res_l.append(res) res = p.apply_async(func,args=(i,)) #異步 # print(res.get()) #注意異步時提交返回值須要用get()方法獲取,get()方法自帶join()效果, #若是在這裏打印則會出現和同步時同樣的效果 res_l.append(res) # 同步 # print(res_l) # 異步 for i in res_l: print(i.get())
執行結果:同步時是一個接一個的慢慢返回結果,異步時很快的返回整個結果,可能會出現順序亂的狀況
例子2
import time from multiprocessing import Pool def func(i): time.sleep(0.5) return i*i if __name__ == '__main__': p = Pool(5) ret = p.map(func,range(10)) print(ret)
執行結果:總體一塊兒輸出,map()自帶join()效果,所以會阻塞,而後總體一塊兒輸出結果
能夠爲進程池或線程池內的每一個進程或線程綁定一個函數,該函數在進程或線程的任務執行完畢後自動觸發,並接收任務的返回值看成參數,該函數稱爲回調函數。
簡單的說就是進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數。
咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
簡單例子
from multiprocessing import Pool import time import os def func1(n): print("子進程",os.getpid()) time.sleep(1) return n*n def func2(m): print("回調函數",os.getpid()) print("回調函數",m) if __name__ == "__main__": p = Pool(5) for i in range(10): p.apply_async(func1,args=(i,),callback=func2) p.close() p.join() print("主進程",os.getpid()) # 從執行結果能夠看出執行回調函數的進程是主進程
回調函數最多的在爬蟲程序過程當中使用的較多,爬取網頁用子進程,處理數據用回調函數。爬蟲:即網絡爬蟲,能夠簡單的理解爲爬取網頁源碼的程序,在python中能夠使用requests模塊實現。
例子1
import requests from multiprocessing import Pool def get(url): responds = requests.get(url) if responds.status_code == 200: return url,responds.content.decode("utf-8") def call_back(args): url,content = args print(url,len(content)) if __name__ == "__main__": url_li = ["https://www.bitmain.com/", "https://www.feixiaohao.com/", "https://www.jinse.com/", "http://www.avalonminer.shop/", "http://www.innosilicon.com/html/product/index.html" ] p = Pool(4) for url in url_li: p.apply_async(get,args=(url,),callback=call_back) p.close() p.join() # 在爬蟲爬取網頁過程當中,主要耗時間的是爬取的過程, # 假設這裏的call_back()是放在子進程中執行,則耗費了更多的時間, # 與此同時主進程一直是空閒的,這裏的call_back()放在主進程執行,節省了程序執行的時間
例子2
import re from urllib.request import urlopen import requests from multiprocessing import Pool def get_page(url,pattern): response=urlopen(url).read().decode('utf-8') #這裏拿到的是有格式的網頁源碼 return pattern,response # 正則表達式編譯結果 網頁內容 def get(url,pattern): headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/51.0.2704.63 Safari/537.36'} responds = requests.get(url,headers=headers,timeout=30) res = responds.content.decode("utf-8") #這裏拿到的是簡化版的無格式的網頁源碼 return pattern,res def parse_page(info): pattern,page_content=info print("網頁內容長度",len(page_content)) # print(page_content) res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0].strip(), 'title':item[1].strip(), 'actor':item[2].strip(), 'time':item[3].strip(), } print(dic) if __name__ == '__main__': regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>' pattern1=re.compile(regex,re.S) url_dic={'http://maoyan.com/board/7':pattern1} p=Pool() res_l=[] for url,pattern in url_dic.items(): # res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res=p.apply_async(get,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get()