Python之路(第四十篇)進程池

 

1、進程池

進程池也是經過事先劃分一塊系統資源區域,這組資源區域在服務器啓動時就已經建立和初始化,用戶若是想建立新的進程,能夠直接取得資源,從而避免了動態分配資源(這是很耗時的)。html

線程池內子進程的數目通常在3~10個之間,當有新的任務來到時,主進程將經過某種方式選擇進程池中的某一個子進程來爲之服務。相比於動態建立子進程,選擇一個已經存在的子進程的代價顯得小得多(進程開啓過多,效率反而會降低,開啓進程是須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行)。python

Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。正則表達式

 

multiprocess.Pool模塊

  Pool([numprocess  [,initializer [, initargs]]]):建立進程池
  numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值,若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務,不會開啓其餘進程
  initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
  initargs:是要傳給initializer的參數組

  

主要方法

  p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
  ​
  ​
  p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
  '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,
將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
     
  p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
  ​
  P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
  ​
  map():Pool類中的map方法,與內置的map函數用法行爲基本一致,即針對每一個參數都進行func()處理,它會使進程阻塞直到結果一塊兒總體返回。
  ​
  注意:雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒後,程序纔會運行子進程。
  ​
  terminal():結束工做進程,不在處理未處理的任務。
  ​
  join():主進程阻塞等待子進程的退出,join方法必須在close或terminate以後使用。

  

 

例子1數組

  
  from multiprocessing import Pool
  import time
  import os
  ​
  def func1(n):
      print("子進程%s開始%s"%(n,os.getpid()))
      time.sleep(2)
      print("子進程%s結束%s"%(n,os.getpid()))
      return n
  ​
  def func2(n):
      for i in range(10):
          print(n+2)
  ​
  if __name__ == "__main__":
      p = Pool(5) #5個進程,默認是cpu的核心數
      res = p.map(func1,range(10))  #10個任務,參數必須是可迭代的
      print(res)  #返回值是帶全部子進程的結果的列表
  # 默認異步的執行任務,且自帶close和joi
  #
  # p.map(funcname,iterable)     默認異步的執行任務,且自帶close和join

  

 

例子2服務器

  
  from multiprocessing import Pool
  import time
  import os
  ​
  def func(n):
      print("進程池中的進程開始%s"%n,os.getpid())
      time.sleep(2)
      print("進程池中的進程結束%s"%n,os.getpid())
  ​
  ​
  if __name__ == "__main__":
      pool = Pool(5) 
      for i in range(20):
          # pool.apply(func,args=(i,))  #同步的執行
          pool.apply_async(func,args=(i,)) #異步的執行
      pool.close() #結束進程池提交任務
      pool.join() # 感知進程池中的任務執行結束
 

  

進程池的返回值

方法apply_async()和map_async()的返回值是AsyncResul的實例obj對象。網絡

實例具備如下方法app

obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起異常。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。異步

obj.ready():若是調用完成,返回Trueasync

obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常函數

obj.wait([timeout]):等待結果變爲可用。

obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數

 

例子1

  from multiprocessing import Pool
  import time
  ​
  def func(n):
      print("進程%s"%n)
      time.sleep(2)
      return n*n
  ​
  if __name__ == "__main__":
      p = Pool(5)  #5個進程
      res_l = []
      for i in range(10): #
          # res = p.apply(func,args=(i,))  #同步
          # print(res)
          # res_l.append(res)
  ​
          res = p.apply_async(func,args=(i,))  #異步
          # print(res.get()) #注意異步時提交返回值須要用get()方法獲取,get()方法自帶join()效果,
          #若是在這裏打印則會出現和同步時同樣的效果
          res_l.append(res)
      # 同步
      # print(res_l)
  ​
      # 異步
      for i in res_l:
          print(i.get())

  

執行結果:同步時是一個接一個的慢慢返回結果,異步時很快的返回整個結果,可能會出現順序亂的狀況

 

例子2

  
  import time
  from multiprocessing import Pool
  def func(i):
      time.sleep(0.5)
      return i*i
  ​
  if __name__ == '__main__':
      p = Pool(5)
      ret = p.map(func,range(10))
      print(ret)

  

執行結果:總體一塊兒輸出,map()自帶join()效果,所以會阻塞,而後總體一塊兒輸出結果

 

進程池的回調函數

能夠爲進程池或線程池內的每一個進程或線程綁定一個函數,該函數在進程或線程的任務執行完畢後自動觸發,並接收任務的返回值看成參數,該函數稱爲回調函數。

簡單的說就是進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數。

咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

 

簡單例子

  
  from multiprocessing import Pool
  import time
  import os
  ​
  def func1(n):
      print("子進程",os.getpid())
      time.sleep(1)
      return n*n
  ​
  ​
  def func2(m):
      print("回調函數",os.getpid())
      print("回調函數",m)
  ​
  if __name__ == "__main__":
      p = Pool(5)
      for i in range(10):
          p.apply_async(func1,args=(i,),callback=func2)
      p.close()
      p.join()
      print("主進程",os.getpid())
      
      # 從執行結果能夠看出執行回調函數的進程是主進程

  

 

回調函數最多的在爬蟲程序過程當中使用的較多,爬取網頁用子進程,處理數據用回調函數。爬蟲:即網絡爬蟲,能夠簡單的理解爲爬取網頁源碼的程序,在python中能夠使用requests模塊實現。

 

 

例子1

  
  import requests
  from multiprocessing import Pool
  ​
  def get(url):
      responds = requests.get(url)
      if responds.status_code == 200:
          return url,responds.content.decode("utf-8")
  ​
  ​
  def call_back(args):
      url,content = args
      print(url,len(content))
  ​
  if __name__ == "__main__":
      url_li = ["https://www.bitmain.com/",
                "https://www.feixiaohao.com/",
                "https://www.jinse.com/",
                "http://www.avalonminer.shop/",
                "http://www.innosilicon.com/html/product/index.html"
                ]
      p = Pool(4)
      for url in url_li:
          p.apply_async(get,args=(url,),callback=call_back)
      p.close()
      p.join()
  ​
  # 在爬蟲爬取網頁過程當中,主要耗時間的是爬取的過程,
  # 假設這裏的call_back()是放在子進程中執行,則耗費了更多的時間,
  # 與此同時主進程一直是空閒的,這裏的call_back()放在主進程執行,節省了程序執行的時間

  

例子2

import re
from urllib.request import urlopen
import requests
from multiprocessing import Pool

def get_page(url,pattern):
    response=urlopen(url).read().decode('utf-8')  #這裏拿到的是有格式的網頁源碼
    return pattern,response   # 正則表達式編譯結果 網頁內容

def get(url,pattern):
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
                 'Chrome/51.0.2704.63 Safari/537.36'}
    responds = requests.get(url,headers=headers,timeout=30)
    res = responds.content.decode("utf-8") #這裏拿到的是簡化版的無格式的網頁源碼
    return pattern,res


def parse_page(info):
    pattern,page_content=info
    print("網頁內容長度",len(page_content))
    # print(page_content)
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0].strip(),
            'title':item[1].strip(),
            'actor':item[2].strip(),
            'time':item[3].strip(),
        }
        print(dic)

if __name__ == '__main__':
    regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
    pattern1=re.compile(regex,re.S)
    url_dic={'http://maoyan.com/board/7':pattern1}
    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        # res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res=p.apply_async(get,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()
相關文章
相關標籤/搜索