python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了multiprocessing。
html
multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。python
multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。linux
須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。數據庫
1.建立進程的類:Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)
強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
2. 參數介紹:
1 group參數未使用,值始終爲None
3 target表示調用對象,即子進程要執行的任務
5 args表示調用對象的位置參數元組,args=(1,2,'jame',)
7 kwargs表示調用對象的字典,kwargs={'name':'jame','age':18}
9 name爲子進程的名稱編程
3.方法介紹:
p.start():啓動進程,並調用該子進程中的p.run()
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法 .
p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死 鎖。
p.is_alive():若是p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。json
timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程 windows
4.屬性介紹:
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。數組
這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)安全
1.建立並開啓子進程的兩種方法: 網絡
from multiprocessing import Process import os import time def task1(name): print('%s: %s is start,父進程pid:[%s]'%(name,os.getpid(),os.getppid())) time.sleep(1) print('%s :%s is stop,父進程Pid:[%s]'%(name,os.getpid(),os.getppid())) def task2(name): print('%s: %s is start,父進程pid:[%s]'%(name,os.getpid(),os.getppid())) time.sleep(1) print('%s :%s is stop,父進程Pid:[%s]'%(name,os.getpid(),os.getppid())) if __name__ == '__main__': p1=Process(name='001子進程',target=task1,args=('子進程1',)) p2=Process(target=task2,kwargs={'name':'子進程2'}) print(p1.name) print(p2.name) p2.start() print('若是有join則最後打印父進程.pid:%s,父進程的父進程:%s'%(os.getpid(),os.getppid()))
#開進程的方法二:#在linux下運行,windows下會提示要在main方法下運行 from multiprocessing import Process import time class Myp(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s is running'%self.name) time.sleep(1) print('%s is stop '%self.name) p1=Myp('egon') p1.start() print('主進程')
2.進程之間的內存是隔離的
from multiprocessing import Process n=100 #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就能夠了 y=111 def work(): global n n=0 print('子進程內n,y: ',n,y) if __name__ == '__main__': p=Process(target=work) p.start() y=222 print('主進程內n,y:',n,y) ''' 主進程內n,y: 100 222 子進程內n,y: 0 111 總結:子進程是複製父進程的變量, 子進程改更本身的變量後,不影響父進程的變量的值。 '''
3.用多進程方式使socket有併發的做用?
#Author http://www.cnblogs.com/Jame-mei from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg: break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': while True: conn,client_addr=server.accept() print(conn,client_addr) p=Process(target=talk,kwargs={'conn':conn,'client_addr':client_addr}) p.start()
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf
問題:每來一個客戶端,都在服務端開啓一個進程,若是併發來一個萬個客戶端,要開啓一萬個進程嗎,你本身嘗試着在你本身的機器上開啓一萬個,10萬個進程試一試。
解決方法:進程池!
4.Process對象的join方法
#Author http://www.cnblogs.com/Jame-mei from multiprocessing import Process import time import random def task(name): print('%s is start' %name) time.sleep(random.randrange(1,3)) print('%s is stop' %name) if __name__ == '__main__': p=Process(target=task,args=('jame',)) p.start() p.join() #p.join(0.0001) #等待p中止,等0.0001秒就再也不等了 print('開始') ''' jame is start jame is stop 開始 #因此是主進程等待子進程結束,並非p等着p結束噢 '''
#Author http://www.cnblogs.com/Jame-mei from multiprocessing import Process import time import random def piao(name): print('%s is piaoing' %name) time.sleep(random.randint(1,3)) print('%s is piao end' %name) p1=Process(target=piao,args=('egon',)) p2=Process(target=piao,args=('alex',)) p3=Process(target=piao,args=('yuanhao',)) p4=Process(target=piao,args=('wupeiqi',)) p1.start() p2.start() p3.start() p4.start() #有的同窗會有疑問:既然join是等待進程結束,那麼我像下面這樣寫,進程不就又變成串行的了嗎? #固然不是了,必須明確:p.join()是讓誰等? #很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p, #詳細解析以下: #進程只要start就會在開始運行了,因此p1-p4.start()時,系統中已經有四個併發的進程了 #而咱們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵 #join是讓主線程等,而p1-p4仍然是併發執行的,p1.join的時候,其他p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接經過檢測,無需等待 # 因此4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間 p1.join() p2.join() p3.join() p4.join() print('主線程') #上述啓動進程與join進程能夠簡寫爲 # p_l=[p1,p2,p3,p4] # # for p in p_l: # p.start() # # for p in p_l: # p.join() #有了join,程序不就是串行了嗎???
5.Process對象的其餘方法或屬性(瞭解)
from multiprocessing import Process import time #terminate與is_alive def task(name): print('%s is start'%name) time.sleep(1) print('%s is start'%name) if __name__ == '__main__': p1=Process(target=task,args=('子進程',)) p1.start() p1.terminate() #關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活 print(p1.is_alive()) #True time.sleep(2) #先先等2秒,看看p1是否關閉 print(p1.is_alive()) #False
#Author http://www.cnblogs.com/Jame-mei from multiprocessing import Process import time,os #name pid def task(name): print('%s is start pid:%s,父進程:%s'%(name,os.getpid(),os.getppid())) time.sleep(1) def task2(name): print('%s is start pid:%s,父進程:%s'%(name,os.getpid(),os.getppid())) time.sleep(1) if __name__ == '__main__': p1=Process(target=task,args=('子進程task',)) p2=Process(name='子進程2',target=task,args=('子進程task2',)) p1.start() p2.start() print('p1進程的名字:',p1.name) print('p2進程的名字:',p2.name) ''' p1進程的名字: Process-1 p2進程的名字: 子進程2 子進程task is start pid:2540,父進程:9524 子進程task2 is start pid:10332,父進程:9524 '''
注意:在windows中Process()必須放到# if __name__ == '__main__':下建立與執行進程.....
if __name__ == "__main__" since statements inside this if-statement will not get called upon import. 因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()內部調用的原,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。
主進程建立守護進程:
其一:守護進程會在主進程代碼執行結束後就終止
其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
# @Time : 2018/9/4 9:39 # @Author : Jame from multiprocessing import Process import time,random def task(name): print('%s is start'%name) time.sleep(random.randrange(1,3)) print('%s is stop'%name) if __name__ == '__main__': p=Process(target=task,args=('jame',)) p.daemon=True #要在p.start()以前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p也要終止運行。 p.start() print('主進程')
# @Time : 2018/9/4 9:45 # @Author : Jame #主進程代碼運行結束,守護進程就會結束! from multiprocessing import Process import time def foo(): print(123) time.sleep(1) print('end123') def bar(): print(456) time.sleep(3) print('end456') if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print('main----------------------') # 打印該行則主進程代碼結束,則守護進程p1應該被終止。 # 可能會有p1任務執行的打印信息123,由於主進程打印main----時,p1也執行了,可是隨即被終止!
進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件或者同一個打印終端是沒問題的.
而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理!
1.多進程共享同一個打印終端
#併發運行,效率高,可是競爭同一個打印終端,帶來了打印錯亂
# @Time : 2018/9/4 9:57 # @Author : Jame #併發運行,效率高,可是競爭同一個打印終端,帶來了打印錯亂 from multiprocessing import Process import os,time def work(): print('%s is running'%os.getpid()) time.sleep(2) print('%s is done'%os.getpid()) if __name__ == '__main__': for i in range(3): p=Process(target=work) p.start() ''' 4896 is running 7876 is running 7712 is running 4896 is done 7876 is done 7712 is done '''
#由併發變成了串行,犧牲了運行效率,可是避免了競爭
# @Time : 2018/9/4 10:05 # @Author : Jame #由併發變成了串行,犧牲了運行效率,可是避免了競爭 from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() #加鎖 print('%s is running'%os.getpid()) time.sleep(1) print('%s is done'%os.getpid()) lock.release() #釋放鎖 if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,)) p.start() ''' 4928 is running 4928 is done 9284 is running 9284 is done 4412 is running 4412 is done '''
2.多進程共享同一文件
文件當數據庫,模擬搶票
1):併發運行,併發效率高,可是競爭寫入統一文件,數據寫入錯亂
# @Time : 2018/9/4 10:13 # @Author : Jame from multiprocessing import Process,Lock import time,json,random #文件db的內容:{"count":1} #注意必定要用雙引號,否則json沒法識別 def search(): dic=json.load(open('db.txt')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模擬讀取數據的網絡延遲 if dic['count']>0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網絡延遲 json.dump(dic,open('db.txt','w')) print('\003[43m購票成功\003[0m') def task(lock): search() get() if __name__ == '__main__': lock=Lock() for i in range(100): #模擬100個併發的客戶端搶票 p=Process(target=task,args=(lock,)) p.start()
2):加鎖:購票行爲變成了串行,犧牲了運行效率,可是保證了數據的安全。
#文件db的內容爲:{"count":1} #注意json的格式必定是雙引號,否則沒法識別. from multiprocessing import Process,Lock import time,json,random #1.查詢剩餘票數 def search(): dic=json.load(open('db2.txt')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) #2.模擬搶票 def get(): dic=json.load(open('db2.txt')) time.sleep(0.1) #模擬讀取數據的網絡延遲 if dic['count']>0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網絡延遲 json.dump(dic,open('db2.txt','w')) print('\003[43m購票成功\003[0m') def task(lock): search() lock.acquire() #加鎖處理 get() lock.release() #釋放鎖處理 if __name__ == '__main__': lock=Lock() for i in range(100): #模擬100個併發的客戶端搶票 p=Process(target=task,args=(lock,)) p.start()
總結:
1.加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,便是串行的修改,速度慢了,可是犧牲了速度保證了數據的安全性和一致性。
雖然能夠用文件共享數據實現進程間通訊,可是問題是:
*效率低(共享數據基於文件,而文件是硬盤上的數據)
*須要本身加鎖處理
2.所以,multiprocessing爲咱們提供了基於消息的IPC通訊機制:隊列和管道。
我應該儘可能避免使用共享數據,儘量的使用消息傳遞和隊列,避免處理複雜的同步和問題,並且在進程數目增多時,每每能夠得到更好的可擴展性。
*效率高(隊列和管道都是將數據放在內存中的)
*幫咱們處理好鎖的問題(隊列又是基於管道+鎖實現的,可讓咱們從複雜的鎖問題中解放出來)
進程間是彼此隔離的,要實現進程間通訊(IPC),multiprocessing模塊支持2種形式:隊列和管道,這2中方式都是使用消息傳遞的。
1.建立隊列的類(底層就是以管道和鎖定的方式實現的):
1 Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。
2.參數介紹
maxsize 是隊列中容許最大項數,省略則無大小限制。
3.方法介紹
3.1 主要方法:
def put(self, obj, block=True, timeout=None): ''' 做用:能夠插入數據隊列中。 參數及做用:2個可選參數:blocked和timeout。 若是block=True(默認參數),而且timeout爲正值,則該方法會阻塞timeout指定的時間,直到該隊列有剩餘空間。 若是超時會拋出Queue.Full異常。 若是block=False,可是該Queue已滿,會馬上拋出Queue.Full異常。 ''' def get(self, block=True, timeout=None): ''' 做用:從隊列讀取並刪除一個元素。 參數及做用:2個可選參數:blocked和timeout。 若是block=True(默認參數),而且timeout爲正值,那麼在等待時間沒有取到任何元素,會拋出Queue.Full異常。 若是block=False,有兩種狀況存在,若是Queue有一個值可取,則當即返回該值,不然,若是隊列爲空,會馬上拋出Queue.Full異常。 ''' def put_nowait(self, obj): ''' 同put(False) 做用:能夠插入數據隊列中。 ''' def get_nowait(self): ''' 同get(False) 做用:從隊列讀取並刪除一個元素。 ''' def empty(self): ''' 做用:調用此方法爲空時,返回True。 能否可靠:否! 好比在返回True的過程當中,若是隊列中又加入了項目。 ''' def full(self): ''' 做用:調用此方法已滿,返回True。 能否可靠:否! 好比在返回True的過程當中,若是隊列中的項目被取走。 ''' def qsize(self): ''' 做用:返回隊列中目前的正確數量。 能否可靠:否! 好比在返回True的過程當中,若是隊列中又加入了項目,或者項目被取走等。 '''
3.2 其餘方法(瞭解)
def cancel_join_thread(self): ''' 做用:不會在進程退出時,自動鏈接後臺線程。 能夠防止join_thread()方法阻塞。 ''' def close(self): ''' 做用:關閉隊列,防止隊列中加入更多數據。 調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入數據,而後將在此方法完成後立刻關閉。 若是q被垃圾收集,將調用此方法。 關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或者異常。 例如:若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤操做。 ''' def join_thread(self): ''' 做用:鏈接隊列的後臺線程。 '''
4.隊列應用
4.1 簡單實例
# @Time : 2018/9/4 11:24 # @Author : Jame from multiprocessing import Queue ''' multiprocessing 模塊支持進程間通訊的兩種形式:管道和隊列 都是基於消息傳遞實現的,可是隊列接口 ''' q=Queue(3) #put get put_nowait get_nowait full empty q.put(3) q.put(3) q.put(3) print(q.full()) #True print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #True
4.2.生產者消費者模型
在併發編程中使用生產者和消費者模式可以解決絕大數併發問題。該模式經過平衡 生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
4.2.1.爲何要生產者 和 消費者?
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。
在線程開發過程當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等消費者處理完,才能繼續生產數據。
同理,若是消費者的處理能力大於生產者,那麼消費者必須等待生產者。爲了解決這個問題因而就引入了生者產和消費者模式。
4.2.2 什麼是生產者消費者模式?
生產者消費者模式就是經過一個容器來解決生產和消費者的強耦合問題。
生產者和消費者彼此之間不直接通信,而是經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列
裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
1):基於隊列實現生產者消費者模型!
# @Time : 2018/9/4 14:45 # @Author : Jame from multiprocessing import Process,Queue import time,random,os #1消費者 def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m'%(os.getpid(),res)) #2 生產者 def producer(q): for i in range(5): time.sleep(random.randint(1,3)) res='包子%s'%i q.put(res) print('\033[44m%s 生產了 %s\033[0m'%(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:廚師 p1=Process(target=producer,args=(q,)) #消費者:吃貨顧客 c1=Process(target=consumer,args=(q,)) #開始運轉 p1.start() c1.start() print('主進程.....')
可是這裏的主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。如何解決,請看下一步。
2):讓生產者在生產完後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環。
# @Time : 2018/9/4 14:45 # @Author : Jame from multiprocessing import Process,Queue import time,random,os #1消費者 def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m'%(os.getpid(),res)) #2 生產者 def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s'%i q.put(res) print('\033[44m%s 生產了 %s\033[0m'%(os.getpid(),res)) q.put(None) if __name__ == '__main__': q=Queue() #生產者們:廚師 p1=Process(target=producer,args=(q,)) #消費者:吃貨顧客 c1=Process(target=consumer,args=(q,)) #開始運轉 p1.start() c1.start() print('主進程.....')
注意:這裏的結束信號None,不必定要由生產者發,主進程裏一樣能夠發,可是主進程須要等生產者結束後才應該發送該信號。
# @Time : 2018/9/4 14:45 # @Author : Jame from multiprocessing import Process,Queue import time,random,os #1消費者 def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m'%(os.getpid(),res)) #2 生產者 def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s'%i q.put(res) print('\033[44m%s 生產了 %s\033[0m'%(os.getpid(),res)) #q.put(None) if __name__ == '__main__': q=Queue() #生產者們:廚師 p1=Process(target=producer,args=(q,)) #消費者:吃貨顧客 c1=Process(target=consumer,args=(q,)) #開始運轉 p1.start() c1.start() p1.join() q.put(None) #發送結束信號 print('主進程.....')
注意:若是有多個生產者和消費者的時候,咱們則須要發送多個結束信號None給消費者。(方式比較Low,瞭解一下)
# @Time : 2018/9/4 14:45 # @Author : Jame from multiprocessing import Process,Queue import time,random,os #1消費者 def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m'%(os.getpid(),res)) #2 生產者 def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s %s'%(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m'%(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:廚師們 p1=Process(target=producer,args=('包子',q,)) p2=Process(target=producer,args=('骨頭',q)) #消費者:吃貨顧客們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) #開始運轉 p1.start() p2.start() c1.start() c2.start() p1.join() #必須保證生產者所有生產完,才能發送結束信號 p2.join() q.put(None) #有幾個消費者就應該發送幾個結束信號None q.put(None) #發送結束信號 print('主進程.....')
最後,咱們無非就是發送結束信號而已,有另一種隊列提供了這種機制。
JoinableQueue([maxsize]) :這就像一個Queue對象,但隊列容許項目的使用者 ,通知生成者 項目已經被成功處理。
通知進程是使用共享的信息和條件變量來實現的。
參數:maxsize 是隊列中容許最大項數,省略則無大小限制。
方法:
JoinableQueue 的實例p 除了與Queue對象相同的方法以外,還具備:
q.task_done() :使用者使用此方法發出信號,表示q.get() 的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目的數量,將引起ValueError異常。
q.join() :生產者調用此方法進行阻塞,知道隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目調用q.task_done()方法爲止。
應用:
#主進程等--->p1,p2等---->c1,c2 #p1,p2結束了,證實c1,c2確定全都收完了p1,p2發到隊列的數據
#於是c1,c2也沒有存在的價值了,應該隨着主進程的結束而結束,因此設置成守護進程。
# @Time : 2018/9/4 14:45 # @Author : Jame from multiprocessing import Process,Queue,JoinableQueue import time,random,os #1消費者 def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m'%(os.getpid(),res)) q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了! #2 生產者 def producer(name,q): for i in range(5): time.sleep(random.randint(1,3)) res='%s %s'%(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m'%(os.getpid(),res)) q.join() if __name__ == '__main__': #q=Queue() q=JoinableQueue() #生產者們:廚師們 p1=Process(target=producer,args=('包子',q,)) p2=Process(target=producer,args=('骨頭',q)) #消費者:吃貨顧客們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True c2.daemon=True #開始運轉 ''' p1.start() p2.start() c1.start() c2.start() ''' p_l=[p1,p2,c1,c2] for p in p_l: p.start() p1.join() p2.join() print('主進程.....')
總結:生產者消費者模型
#程序中有兩類角色
一類負責生產數據(生產者)
一類負責處理數據(消費者)
#引入生產者消費者模型爲了解決的問題是
平衡生產者和消費者之間的工做能力,從而提升程序總體處理數據的速度。
#如何實現:
生產者<------>隊列<------>消費者
生產者消費者模型實現類程序的解耦合
進程間通訊IPC方式二:管道(不推薦使用,瞭解便可)
1.管道的類,參數,方法介紹
1.建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象。
強調一點:必須在產生Process對象以前產生管道。
2.參數介紹:
dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
3.主要方法:
conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。
若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
conn1.send(obj):經過鏈接發送對象。
obj是與序列化兼容的任意對象
#其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。
若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。
若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):
經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。
結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收
conn1.recv_bytes_into(buffer [, offset]):
接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
2.基於管道實現進程間通訊(與隊列的方式是相似的,隊列就是管道加鎖實現的)
# @Time : 2018/9/4 17:55 # @Author : Jame from multiprocessing import Process,Pipe import time,os def consumer(p,name): left,right=p left.close() while True: try: baozi=right.recv() print('%s 收到包子:%s'%(name,baozi)) except EOFError: right.close() break def producer(seq,p): left,right=p right.close() for i in seq: left.send(i) else: left.close() if __name__ == '__main__': left,right=Pipe() c1=Process(target=consumer,args=((left,right),'c1')) c1.start() seq=(i for i in range(10)) producer(seq,(left,right)) right.close() left.close() c1.join() print('主進程...')
注意:
生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端。
若是忘記了執行這些步驟,程序可能會再消費中的recv()操做上掛起。
管道是由操做系統進行引用計數的,必須再全部進程中關閉管道後才能生產EOFError異常。
所以在生產者中關閉管道不會有任何效果,付費消費者中也關閉了相同的管道端點。
管道能夠用於雙向通訊,利用一般在客戶端 // 服務端中使用情的請求 // 響應模型或者遠程調用,就可使用管道編寫與進程交互的程序:
# @Time : 2018/9/5 9:19 # @Author : Jame from multiprocessing import Process,Pipe import time,os def adder(p,name): server,client=p client.close() while True: try: x,y=server.recv() except EOFError: server.close() break res=x+y server.send(res) print('server done') if __name__ == '__main__': server,client=Pipe() c1=Process(target=adder,args=((server,client),'c1')) c1.start() server.close() client.send((10,20)) print(client.recv()) client.close() c1.join() print('主進程')
將來基於消息傳遞的併發編程是大勢所趨,即使是使用線程,推薦的作法也是將程序設計爲大量獨立的線程集合。
經過消息隊列交換數據,這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中。
進程間通訊應該儘可能避免使用本節所講的共享數據的方式。
# @Time : 2018/9/5 14:21 # @Author : Jame from multiprocessing import Manager,Process,Lock import os def work(dic,lock): dic['count']-=1 print(dic['count']) if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':10}) p_l=[] for i in range(10): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的 雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,
互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去,若是指定信號量爲3,那麼來一我的得到一把鎖,計數加1,當計數等於3時,後面的人均須要等待。
一旦釋放,就有人能夠得到一把鎖 信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念。
# @Time : 2018/9/5 14:49 # @Author : Jame from multiprocessing import Process,Semaphore import time,random def go_wc(sem,user): sem.acquire() start = time.time() print('%s 佔到一個坑位...'%user) time.sleep(random.randint(0,3)) #模擬每一個人蹲坑的速度,因人而異... stop=time.time() print('=========================%s 佔用時間:%s'%(user,(stop-start))) sem.release() if __name__ == '__main__': sem=Semaphore(2) p_l=[] for i in range(10): p=Process(target=go_wc,args=(sem,'user:%s'%i,)) p.start() p_l.append(p) for i in p_l: i.join() print('=================>>>>>>>>>>>>') ''' user:1 佔到一個坑位... user:4 佔到一個坑位... =========================user:4 佔用時間:0.0 user:2 佔到一個坑位... =========================user:2 佔用時間:1.0000574588775635 user:3 佔到一個坑位... =========================user:3 佔用時間:0.0 user:0 佔到一個坑位... =========================user:1 佔用時間:2.0001144409179688 user:5 佔到一個坑位... =========================user:0 佔用時間:1.0000569820404053 user:7 佔到一個坑位... =========================user:5 佔用時間:1.0000574588775635 user:8 佔到一個坑位... =========================user:8 佔用時間:0.0 user:6 佔到一個坑位... =========================user:6 佔用時間:2.0001144409179688 user:9 佔到一個坑位... =========================user:7 佔用時間:3.000171422958374 =========================user:9 佔用時間:1.0000569820404053 =================>>>>>>>>>>>> '''
python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
clear:將「Flag」設置爲False set:將「Flag」設置爲True
# @Time : 2018/9/5 15:08 # @Author : Jame from multiprocessing import Process,Event import time,random def car(e,n): while True: if not e.is_set(): #False print('\033[31m紅燈亮\033[0m,car%s等着'%n) e.wait() print('\033[32m車%s 看見綠燈亮了\033[0m'%n) time.sleep(random.randint(1,3)) if not e.is_set(): continue print('走你,car',n) break def police_car(e,n): while True: if not e.is_set(): print('\033[31m紅燈亮\033[0m,car%s等着'%n) e.wait() print('燈的是 %s,警車走了,car%s'%(e.is_set(),n)) break def traffic_lights(e,inverval): while True: time.sleep(inverval) if e.is_set(): e.clear() #e.is_set()--->False else: e.set() if __name__ == '__main__': e=Event() for i in range(5): p=Process(target=police_car,args=(e,i,)) p.start() t=Process(target=traffic_lights,args=(e,3)) t.start() print('主======================>>>>')
在利用python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,而且操做能夠節約大量時間。多進程是實現併發的手段之一,須要注意的問題:
0.1很明顯須要併發執行的任務通暢要遠大於核數;
0.2一個操做系統不可能無限開啓進程,一般有幾個核就開幾個進程;
0.3進程開啓過多,效率反而會降低(開啓進程後須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行);
例如:當咱們操做對象的數目不太對的時候,能夠直接利用multiprocessing中的Process動態生成多個進程,十幾個還好,可是若是成百上千個....手動的去限制進程數量卻又太過繁瑣,
所以能夠發揮進程池的功效。
咱們就能夠經過維護一個進程池來控制進程數目,好比httpd的進程模式,規定最小進程數和最大進程數。
注意:對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時候,若是池子還沒滿,那麼就應該建立一個新的進程
用來執行該請求;但若是池中的進程數量已經達到最大值了,那麼該請求會等待,知道池中有進程結束,就會重用進程池中的進程。
1.建立進程池的類
若是指定numprocess 爲3,則進程池會從無到有建立3個進程,而後自始自終的使用這個3個進程去執行全部任務,不會開啓其餘進程。
Pool(numprocess, processes=None, initializer=None, initargs(),maxtasksperchild=None, context=None)
2.參數介紹
numprocess #要建立的進程數,若是省略,則默認使用cpu_count()的值。
initializer #是每一個工做進程啓動時要執行的可調用對象,默認None。
initargs #是要傳給initializer的參數組
3.方法介紹
3.1主要方法
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。須要強調的是:此操做並不會在全部池工做進程中並執行func函數。
若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做 中的結果。
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
3.2其餘方法
apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
4.進程池應用
練習1:
# @Time : 2018/9/5 17:36 # @Author : Jame from multiprocessing import Pool import os,time def work(n): print('%s run'%os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #進程池建立3個進程,之後都是這3個進程執行任務。 res_l=[] for i in range(5): res=p.apply(work,args=(i,)) #同步調用,知道本次任務執行完畢拿到res。 # 等待任務work任務的過程當中可能有阻塞也可能沒有阻塞。 # 可是無論任務是否存在阻塞,同步調用都會在原地等待,只是等待中如有任務發生了阻塞就會被奪去cpu的執行權限。 res_l.append(res) print(res_l)
# @Time : 2018/9/5 17:36 # @Author : Jame from multiprocessing import Pool import os,time def work(n): print('%s run'%os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #進程池建立3個進程,之後都是這3個進程執行任務。 res_l=[] for i in range(5): res=p.apply_async(work,args=(i,)) #若是使用異步提交任務,主進程須要使用join,等待進程池任務都處理完了,而後能夠用get收集結果。 #不然,主進程結束,進程池還沒來得及執行,也就跟着一塊兒結束了。 res_l.append(res) p.close() p.join() for res in res_l: print(res.get()) #使用get來獲取apply_async的結果,若是是apply,則沒有get方法,由於apply時同步執行,馬上後去結果,無需get
練習2:使用進程池維護固定數目的進程(重寫練習1)
# @Time : 2018/9/6 9:18 # @Author : Jame import socket from multiprocessing import Pool import os server=socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): print('進程pid:%s'%os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(processes=2) while True: conn,client_addr=server.accept() p.apply_async(talk,args=(conn,client_addr)) #p.apply(talk,args=(conn,client_addr)) #通過測試,同步的時候同一時間只有一個客戶端能夠收到服務端的回覆 #按照前後順序,斷開client1後,client2收到了服務端回覆。 ''' 進程pid:8488 進程pid:9160 進程pid:8876 進程pid:9364 #Pool 內的進程數量默認是cpu 的核數,假設爲4(查看方法os.cpu_count()) #開啓6個客戶端,會發現2個客戶端處理等待狀態 #每一個進程內查看Pid,會發現pid使用爲4個,即多個客戶端公用4個進程. 當把Pool中調整到p=Pool(processes=2),會發現客戶端同一時間只有2個進程進來。 '''
# @Time : 2018/9/6 9:18 # @Author : Jame import socket client=socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>>:').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
# @Time : 2018/9/6 10:57 # @Author : Jame from multiprocessing import Process,Pool import time def func(msg): print('msg:',msg) time.sleep(1) return msg if __name__ == '__main__': pool=Pool(processes=3) #同一時間進程是數量是3個 res_l=[] for i in range(10): msg='hello %d'%i res=pool.apply_async(func,(msg,)) #異步執行 res_l.append(res) print('=====================>>>>>>>>>>>') pool.close() pool.join() # 調用join以前,先調用close函數,不然會出錯,執行完close後不會有新的進程加入到pool中, # join函數等待全部子進程結束。 print(res_l) #這裏和同步調用結果不同,看到的是一組對象列表。。。 #而非跟同步調用同樣的結果列表,但這一步是join後執行的,證實結果已經計算完畢,剩下的事情就是調用每一個對象下的get方法獲取結果! for i in res_l: print(i.get()) #使用get來獲取apply_async的結果,若是是apply,則沒有get方法,由於apply時同步執行的,馬上獲取結果根本不須要get。
# @Time : 2018/9/6 10:57 # @Author : Jame from multiprocessing import Process,Pool import time def func(msg): print('msg:',msg) time.sleep(1) return msg if __name__ == '__main__': pool=Pool(processes=3) res_l=[] for i in range(10): msg='hello %d'%i res=pool.apply(func,(msg,)) #同步執行,便是執行完一個拿到結果,再去執行另一個。 res_l.append(res) print('=====================>>>>>>>>>>>') pool.close() pool.join() #調用join以前,先調用close函數,不然會出錯,執行完close後不會有新的進程加入到pool中, #join函數等待全部子進程結束。 print(res_l)#看到的就是最終的結果組成的列表。 #['hello 0', 'hello 1', 'hello 2', 'hello 3', 'hello 4', 'hello 5', 'hello 6', 'hello 7', 'hello 8', 'hello 9'] for i in res_l: #apply是同步的,全部就直接獲得結果,沒有get()方法!!! print(i)
5.回調函數
回調函數的場景:進程池中任何一個任務一旦處理完成,就當即告知主進程,我好了哦,你能夠處理結果了。
主進程則調用一個函數去處理結果,該函數即回調函數。
咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時候就省去了I/O的過程,直接拿到的是任務的結果。
# @Time : 2018/9/6 14:00 # @Author : Jame from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程:%s> get %s'%(os.getpid(),url)) respone=requests.get(url) if respone.status_code==200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<進程:%s> parse %s '%(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n'%(res['url'],len(res['text'])) with open('db3.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.taobao.com', 'http://www.jd.com', 'http://www.sina.com.cn' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() #print([res.get() for res in res_l]) for res in res_l: print(res.get()) #這裏拿到的是get_page的結果,其實徹底不必拿該結果,該結果已經傳給了回調函數處理了。 ''' <進程:9488> get https://www.baidu.com <進程:8188> get https://www.python.org <進程:9440> get https://www.taobao.com <進程:9440> get http://www.jd.com <進程:6880> parse https://www.taobao.com <進程:9488> get http://www.sina.com.cn <進程:6880> parse https://www.baidu.com <進程:6880> parse https://www.python.org <進程:6880> parse http://www.jd.com <進程:6880> parse http://www.sina.com.cn {'url': 'https://www.baidu.com', {'url': 'https://www.python.org', {'url': 'https://www.taobao.com', {'url': 'http://www.jd.com', {'url': 'http://www.sina.com.cn', '''
# @Time : 2018/9/6 14:00 # @Author : Jame from multiprocessing import Pool import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code==200: return (response.text,pattern) def pasrse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1 = re.compile( r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<', re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, 'http://finance.sina.com.cn/stock/usstock/sector.shtml':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=pasrse_page) res_l.append(res) for i in res_l: print(i.get())
*若是在主進程中等待進程池全部任務執行完畢,再統一處理結果,則無序回調函數。
# @Time : 2018/9/6 15:32 # @Author : Jame from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() nums=[] for res in res_l: nums.append(res.get()) print(nums) #[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
6.進程池的其餘實現方法