九. 併發編程 (進程池)

一 .進程池(multiprocess.Pool)css

1.進程池概念html

爲何要有進程池?進程池的概念。

在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先
,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據
任務開啓或者結束進程。那麼咱們要怎麼作呢? 在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將
進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,
池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。
Pool([numprocess  [,initializer [, initargs]]]):建立進程池

1 numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
2 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
3 initargs:是要傳給initializer的參數組

import os
import time
import random
from multiprocessing import Pool
多進程和進程
from
multiprocessing import Pool,Proces def run(n): for i in range(10): print(n+1) if __name__ == '__main__':
進程池 p
=Pool(5) # 參數表示5個進程 (相似於5個cppu) p.map(run, range(50)) # 50任務 # 等同於上面只是時間上的差別 進程池更快 下面這個要慢帶你 多進程 for i in range(50): Process(target=run,args=(i,)).start()
 
 
進程池和多進程效率對比
 注意:map : 這個map在這裏自帶了join方法 不須要人爲添加
def run(n):
    for i in  range(10):
        print(n+1)
if __name__ == '__main__':

進程池 str1
=time.time() p=Pool(5) # 參數表示5個進程 (相似於5個cppu) p.map(run, range(50)) # 50任務 這個map在這裏自帶了join方法 不須要人爲添加 t1=time.time()-str1 # 0.26728272438049316 進程池執行時間 多進程 str2 = time.time() ret=[] for i in range(50): p1=Process(target=run,args=(i,)) ret.append(p1) p1.start() for i in ret: i.join() t2=time.time()-str2 # 2.7745769023895264 沒有使用進程池執行時間 print(t1,t2)



 
  
map進程池傳參數
# map : 這個map在這裏自帶了join方法 不須要人爲添加
def run(n):
    for i in  range(10):
        print(n+1)

def run1(n):
   print(n)     # ('aa', 100)
   print(n[1])  # 100

if __name__ == '__main__':

       p=Pool(5)       # 參數表示5個進程 (相似於5個cppu)
       p.map(run, range(5))  # 50任務          這個map在這裏自帶了join方法   不須要人爲添加

       p.map(run1,[("aa",100)])

2.進程同步(apply)python

# 同步
import os,time
from multiprocessing import Pool

def work(n):
    print("開始 run%s"%n,os.getpid())
    time.sleep(1)
    print("結束 run%s" % n, os.getpid())

if __name__ == '__main__':
    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l=[]

    for i in range(10):
        res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程當中可能有阻塞也可能沒有阻塞
                                    # 但無論該任務是否存在阻塞,同步調用都會在原地等着
    print(res_l, "")
#
# 執行結果:
# 開始 run0 14036
# 結束 run0 14036
# 開始 run1 18312
# 結束 run1 18312
# 開始 run2 12744
# 結束 run2 12744
# 開始 run3 14036
# 結束 run3 14036
# 開始 run4 18312
# 結束 run4 18312
# 開始 run5 12744
# 結束 run5 12744
# 開始 run6 14036
# 結束 run6 14036
# 開始 run7 18312
# 結束 run7 18312
# 開始 run8 12744
# 結束 run8 12744
# 開始 run9 14036
# 結束 run9 14036
# [] 空
# 進程已結束,退出代碼 0

3.進程異步( apply_async)git

# 異步 帶問題的
import os
import time
import random
from multiprocessing import Pool
def work(n):
    print("開始 run%s" % n, os.getpid())
    time.sleep(1)
    print("結束 run%s" % n, os.getpid())
if __name__ == '__main__':
    print("主進程")
    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    for i in range(10):
        res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行
                                          # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務
        # 注意這裏是一個真異步 就是主進程不會等子進程結束 而是主進程執行完了不會管子進程  由於沒有發感知進程池結束
# 執行結果
#     主進程
#     進程已結束,退出代碼 0
# 異步
import os
import time
import random
from multiprocessing import Pool
def work(n):
    print("開始 run%s" % n, os.getpid())
    time.sleep(1)
    print("結束 run%s" % n, os.getpid())
if __name__ == '__main__':
    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務

    for i in range(10):
        res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行
                                          # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務
    p.close()    # 結束進程池接收任務
    p.join()     # 感知進程池中的任務結束

# 執行結果
# 開始 run0 12284
# 開始 run1 4864
# 開始 run2 18452
# 結束 run0 12284
# 開始 run3 12284
# 結束 run1 4864
# 開始 run4 4864
# 結束 run2 18452
# 開始 run5 18452
# 結束 run3 12284
# 開始 run6 12284
# 結束 run4 4864
# 開始 run7 4864
# 結束 run5 18452
# 開始 run8 18452
# 結束 run6 12284
# 開始 run9 12284
# 結束 run7 4864
# 結束 run8 18452
# 結束 run9 12284
# 進程已結束,退出代碼 0


import os
import time
import random
from multiprocessing import Pool

def work(n):
    print("開始 run%s" % n, os.getpid())
    time.sleep(1)
    print("結束 run%s" % n, os.getpid())
if __name__ == '__main__':
    print("主進程")
    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l=[]
    for i in range(5):
        res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行
                                          # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務
                                          # 須要注意的是,進程池中的三個進程不會同時開啓或者同時結束
                                          # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。
        res_l.append(res)
    # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果
    # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了
    p.close()
    p.join()
    # print(res_l,"》》》》》》》》》》")
    for res in res_l:
        print(res.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get

# 執行結果
    主進程
    開始 run0 14684
    開始 run1 15180
    開始 run2 18996
    結束 run0 14684
    開始 run3 14684
    結束 run1 15180
    開始 run4 15180
    結束 run2 18996
    結束 run3 14684
    結束 run4 15180
    None
    None
    None
    None
    None
    進程已結束,退出代碼 0
from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(0.5)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待進程池中全部進程執行完畢

    for aa in  res_l:
        print(aa.get())    # 獲取進程池中全部數據

# 0
# 1
# 4
# 9
# 16
# 25
# 36
# 49
# 64
# 81

 

# 若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數。
from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待進程池中全部進程執行完畢

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到全部結果
    print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理

    # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

 

4.進程池版 socketgithub

server

#
Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count()) #開啓6個客戶端,會發現2個客戶端處於等待狀態 #在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程 from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): print('進程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(4) while True: conn,*_=server.accept() p.apply_async(talk,args=(conn,)) # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問
client

from
socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))

 5. 進程池回調函數(主要用於爬蟲)json

須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:
我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
# 先執行fun1函數 fun1函數的返回值  做爲回調函數的參數  而後去執行回調函數
import os
import time
import random
from multiprocessing import Pool

# 先執行fun1函數 fun1函數的返回值  做爲回調函數的參數  而後去執行回調函數
def fun1(n):
    print("這是fun1函數",os.getpid())
    return  n*n

def fun2(aa):    #  參數只能回調哪一個參數
    print("這是fun2函數",os.getpid())
    print(aa)

if __name__ == '__main__':
    print("主進程",os.getpid())
    p=Pool(5)
    p.apply_async(fun1,args=(10,),callback=fun2)
    p.close()
    p.join()

# 說明fun2回調函數 回到 主進程中調用

執行結果
    主進程 19268
    這是fun1函數 12740
    這是fun2函數 19268
    100
    進程已結束,退出代碼 0
import os
import time
import random
from multiprocessing import Pool

# 先執行fun1函數 fun1函數的返回值  做爲回調函數的參數  而後去執行回調函數
def fun1(n):
    print("這是fun1函數",os.getpid())
    return  n*n

def fun2(aa):    #  參數只能回調哪一個參數
    print("這是fun2函數",os.getpid())
    print(aa)

if __name__ == '__main__':
    print("主進程",os.getpid())
    p=Pool(5)
    p.apply_async(fun1,args=(5,),callback=fun2)
    p.close()
    p.join()


# 說明fun2回調函數 回到 主進程中調用
執行結果
    主進程 7164
    這是fun1函數 3272
    這是fun2函數 7164
    25
    進程已結束,退出代碼 0

 爬蟲案例數組

import requests
from  multiprocessing import Pool

aa=requests.get("https://www.baidu.com")
print(aa)
print(aa.status_code)
print(aa.text)
print("11111",aa.content)
print(aa.__dict__)   查看裏面屬性
print("**************************************")

執行結果
<Response [200]>
200
<!DOCTYPE html>
<!--STATUS OK--><html> <head><meta http-equiv=content-type content=text/html;charset=utf-8><meta http-equiv=X-UA-Compatible content=IE=Edge><meta content=always name=referrer><link rel=stylesheet type=text/css href=https://ss1.bdstatic.com/5eN1bjq8AAUYm2zgoY3K/r/www/cache/bdorz/baidu.min.css><title>百度一下,你就知道</title></head> <body link=#0000cc> <div id=wrapper> <div id=head> <div class="head_wrapper"> <div class="s_form"> <div class="s_form_wrapper"> <div id=lg> <img hidefocus=true src=//www.baidu.com/img/bd_logo1.png width=270 height=129> </div> <form id=form name=f action=//www.baidu.com/s class="fm"> <input type=hidden name=bdorz_come value=1> <input type=hidden name=ie value=utf-8> <input type=hidden name=f value=8> <input type=hidden name=rsv_bp value=1> <input type=hidden name=rsv_idx value=1> <input type=hidden name=tn value=baidu><span class="bg s_ipt_wr"><input id=kw name=wd class="s_ipt" value maxlength=255 autocomplete=off autofocus=autofocus></span><span class="bg s_btn_wr"><input type=submit id=su value=百度一下 class="bg s_btn" autofocus></span> </form> </div> </div> <div id=u1> <a href=http://news.baidu.com name=tj_trnews class="mnav">新闻</a> <a href=https://www.hao123.com name=tj_trhao123 class="mnav">hao123</a> <a href=http://map.baidu.com name=tj_trmap class="mnav">地图</a> <a href=http://v.baidu.com name=tj_trvideo class="mnav">视频</a> <a href=http://tieba.baidu.com name=tj_trtieba class="mnav">贴吧</a> <noscript> <a href=http://www.baidu.com/bdorz/login.gif?login&amp;tpl=mn&amp;u=http%3A%2F%2Fwww.baidu.com%2f%3fbdorz_come%3d1 name=tj_login class="lb">登录</a> </noscript> <script>document.write('<a href="http://www.baidu.com/bdorz/login.gif?login&tpl=mn&u='+ encodeURIComponent(window.location.href+ (window.location.search === "" ? "?" : "&")+ "bdorz_come=1")+ '" name="tj_login" class="lb">登录</a>');
</script> <a href=//www.baidu.com/more/ name=tj_briicon class=bri style="display: block;">更多产品</a> </div> </div> </div> <div id=ftCon> <div id=ftConw> <p id=lh> <a href=http://home.baidu.com>关于百度</a> <a href=http://ir.baidu.com>About Baidu</a> </p> <p id=cp>&copy;2017&nbsp;Baidu&nbsp;<a href=http://www.baidu.com/duty/>使用百度前必读</a>&nbsp; <a href=http://jianyi.baidu.com/ class=cp-feedback>意见反馈</a>&nbsp;京ICP证030173号&nbsp; <img src=//www.baidu.com/img/gs.gif> </p> </div> </div> </div> </body> </html>

11111 b'<!DOCTYPE html>\r\n<!--STATUS OK--><html> <head><meta http-equiv=content-type content=text/html;charset=utf-8><meta' \
b' http-equiv=X-UA-Compatible content=IE=Edge><meta content=always name=referrer><link rel=stylesheet type=text/css ' \
b'href=https://ss1.bdstatic.com/5eN1bjq8AAUYm2zgoY3K/r/www/cache/bdorz/baidu.min.css><title>\xe7\x99\xbe\xe5\xba\xa6\
xe4\xb8\x80\xe4\xb8\x8b\xef\xbc\x8c\xe4\xbd\xa0\xe5\xb0\xb1\xe7\x9f\xa5\xe9\x81\x93</title></head> <body link=#0000cc> ' \
b'<div id=wrapper> <div id=head> <div class="head_wrapper"> <div class="s_form"> <div class="s_form_wrapper"> <div id=lg> <img' \
b'hidefocus=true src=//www.baidu.com/img/bd_logo1.png width=270 height=129> </div> <form id=form name=f action=//www.baid' \
b'u.com/s class="fm"> <input type=hidden name=bdorz_come value=1> <input type=hidden name=ie value=utf-8> <input type=hidde' \
b'n name=f value=8> <input type=hidden name=rsv_bp value=1> <input type=hidden name=rsv_idx value=1> <input type=hidden n' \
b'ame=tn value=baidu><span class="bg s_ipt_wr"><input id=kw name=wd class="s_ipt" value maxlength=255 autocomplete=off auto' \
b'focus=autofocus></span><span class="bg s_btn_wr"><input type=submit id=su value=\xe7\x99\xbe\xe5\xba\xa6\xe4\xb8\x80\xe' \
b'4\xb8\x8b class="bg s_btn" autofocus></span> </form> </div> </div> <div id=u1> <a href=http://news.baidu.com name=tj_trn' \
b'ews class="mnav">\xe6\x96\xb0\xe9\x97\xbb</a> <a href=https://www.hao123.com name=tj_trhao123 class="mnav">hao123</a> <a hre' \
b'f=http://map.baidu.com name=tj_trmap class="mnav">\xe5\x9c\xb0\xe5\x9b\xbe</a> <a href=http://v.baidu.com name=tj_trvideo ' \

\xe9\xa6\x88</a>&nbsp;\xe4\xba\xacICP\xe8\xaf\x81030173\xe5\x8f\xb7&nbsp; ' \
b'<img src=//www.baidu.com/img/gs.gif> </p> </div> </div> </div> </body> </html>\r\n'

def
get_ali(url): # print(url) # res=requests.get(url) # print(res) # print(res.status_code) # print(res.text) res=requests.get(url) if res.status_code==200: return res.content.decode("utf-8"),url,res.text def show(args): cont,url,text=args print(cont,text) print(url,len(cont)) if __name__=="__main__": ret=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(5) for url in ret: p.apply_async(get_ali,args=(url,),callback=show) p.close() p.join()
 
 
使用多進程請求多個url來減小網絡等待浪費的時間

from
multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的結果,其實徹底不必拿該結果,該結果已經傳給回調函數處理了 ''' 打印結果: <進程3388> get https://www.baidu.com <進程3389> get https://www.python.org <進程3390> get https://www.openstack.org <進程3388> get https://help.github.com/ <進程3387> parse https://www.baidu.com <進程3389> get http://www.sina.com.cn/ <進程3387> parse https://www.python.org <進程3387> parse https://help.github.com/ <進程3387> parse http://www.sina.com.cn/ <進程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''
import re
from urllib.request import urlopen
from multiprocessing import Pool

def get_page(url,pattern):
    response=urlopen(url).read().decode('utf-8')
    return pattern,response

def parse_page(info):
    pattern,page_content=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0].strip(),
            'title':item[1].strip(),
            'actor':item[2].strip(),
            'time':item[3].strip(),
        }
        print(dic)
if __name__ == '__main__':
    regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
    pattern1=re.compile(regex,re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }
    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)
    for i in res_l:
        i.get()

{'index': '1', 'title': '絕殺慕尼黑', 'actor': '主演:弗拉基米爾·馬什科夫,約翰·薩維奇,伊萬·科列斯尼科夫', 'time': '上映時間:2019-06-13'}
{'index': '2', 'title': '千與千尋', 'actor': '主演:柊瑠美,周冬雨,入野自由', 'time': '上映時間:2019-06-21'}
{'index': '3', 'title': '命運之夜——天之杯II :迷失之蝶', 'actor': '主演:杉山紀彰,下屋則子,神谷浩史', 'time': '上映時間:2019-07-12'}
{'index': '4', 'title': '玩具總動員4', 'actor': '主演:湯姆·漢克斯,蒂姆·艾倫,安妮·波茨', 'time': '上映時間:2019-06-21'}
{'index': '5', 'title': '掃毒2天地對決', 'actor': '主演:劉德華,古天樂,苗僑偉', 'time': '上映時間:2019-07-05'}
{'index': '6', 'title': '蜘蛛俠:英雄遠征', 'actor': '主演:湯姆·赫蘭德,傑克·吉倫哈爾,塞繆爾·傑克遜', 'time': '上映時間:2019-06-28'}
{'index': '7', 'title': '愛寵大機密2', 'actor': '主演:帕頓·奧斯瓦爾特,馮紹峯,凱文·哈特', 'time': '上映時間:2019-07-05'}
{'index': '8', 'title': '獅子王', 'actor': '主演:唐納德·格洛弗,塞斯·羅根,詹姆斯·厄爾·瓊斯', 'time': '上映時間:2019-07-12'}
{'index': '9', 'title': '機動戰士高達NT', 'actor': '主演:榎木淳彌,村中知,松浦愛弓', 'time': '上映時間:2019-07-12'}
{'index': '10', 'title': '最好的咱們', 'actor': '主演:陳飛宇,何藍逗,惠英紅', 'time': '上映時間:2019-06-06'}

 6. 進程池返回值網絡

 
 
異步獲取返回值

def
fun (i): # time.sleep(1) return i*i if __name__ == '__main__': p=Pool(5) for i in range(6): res=p.apply_async(fun,args=(i,)) # # print(res) print(res.get()) #等待計算結果 阻塞獲取結果 等待下次循環 這裏至關於join # print(res.get())獲取到值可是和同步同樣 可是咱們這裏是異步 因此異步get阻塞

解決方案

import
os import time import random from multiprocessing import Pool # 異步獲取返回值 表示五個五個返回數據 def fun (i): time.sleep(1) return i*i if __name__ == '__main__': p=Pool(5) ret=[] for i in range(22): res=p.apply_async(fun,args=(i,)) # ret.append(res) #等待計算結果 阻塞獲取結果 等待下次循環 這裏至關於join for s in ret: print( s.get()) # 異步獲取返回值 表示五個五個返回數據
 
 
 
  


異步獲取返回值 map併發

# map 自帶join方法 close 方法    map 會先把每個計算結果 添加到列表中   在返回 列表
def fun (i):
    time.sleep(1)
    return  i*i
if __name__ == '__main__':
    p=Pool(5)
    res=p.map(fun,range(5)) #
    print(res)
# [0, 1, 4, 9, 16]
 
 
 
同步獲取返回值
def fun (i):
    return  i*i
if __name__ == '__main__':
    p=Pool(5)
    for i in  range(5):
         res=p.apply(fun,args=(i,))  #  apply結果就是fun的返回值
         print(res)

# 0
# 1
# 4
# 9
# 16
相關文章
相關標籤/搜索