以前咱們已經瞭解了不少進程相關的理論知識,瞭解進程是什麼應該再也不困難了,剛剛咱們已經瞭解了,運行中的程序就是一個進程。全部的進程都是經過它的父進程來建立的。所以,運行起來的python程序也是一個進程,那麼咱們也能夠在程序中再建立進程。多個進程能夠實現併發效果,也就是說,當咱們的程序中存在多個進程的時候,在某些時候,就會讓程序的執行速度變快。以咱們以前所學的知識,並不能實現建立進程這個功能,因此咱們就須要藉助python中強大的模塊。html
仔細說來,multiprocess不是一個模塊而是python中一個操做、管理進程的包。 之因此叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和進程有關的全部子模塊。因爲提供的子模塊很是多,爲了方便你們歸類記憶,我將這部分大體分爲四個部分:建立進程部分,進程同步部分,進程池部分,進程之間數據共享。python
process模塊是一個建立進程的模塊,藉助這個模塊,就能夠完成進程的建立。linux
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) 強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號 參數介紹: 1 group參數未使用,值始終爲None 2 target表示調用對象,即子進程要執行的任務 3 args表示調用對象的位置參數元組,args=(1,2,'egon',) 4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} 5 name爲子進程的名稱
p.start():啓動進程,並調用該子進程中的p.run() p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖 p.is_alive():若是p仍然運行,返回True p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
1 p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置 2 p.name:進程的名稱 3 p.pid:進程的pid 4 p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可) 5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)
在Windows操做系統中因爲沒有fork(linux操做系統中建立進程的機制),在建立子進程的時候會自動 import 啓動它的這個文件,而在 import 的時候又執行了整個文件。所以若是將process()直接寫在文件中就會無限遞歸建立子進程報錯。因此必須把建立子進程的部分使用if __name__ ==‘__main__’ 判斷保護起來,import 的時候 ,就不會遞歸運行了。
在一個python進程中開啓子進程,start方法和併發效果。ios
import time from multiprocessing import Process def f(name): print('hello', name) print('我是子進程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() time.sleep(1) print('執行主進程的內容了')5
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) print('我是子進程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() #p.join() print('我是父進程')
import os from multiprocessing import Process def f(x): print('子進程id :',os.getpid(),'父進程id :',os.getppid()) return x*x if __name__ == '__main__': print('主進程id :', os.getpid()) p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start()
進程的建立 git
import os import time from multiprocessing import Process # 進程模塊 def func(): time.sleep(2) print('in func',os.getpid(),os.getppid()) #getpid 子進程的pid getppid 父進程的pid if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p1 = Process(target=func) # 進程對象 p1.start() # 向操做系統提交了一個開啓子進程的申請 p2 = Process(target=func) # 進程對象 p2.start() # 向操做系統提交了一個開啓子進程的申請 print('主進程 的 代碼執行結束了') print結果 : 三個程序獨立 誰先執行完畢就先執行誰 # in main 6560 5364 # 主進程 的代碼執行結束了 # in func 6560 6560 子程序1 # in func 6560 6560 子程序2 執行子程序的時候至關於又建立了個空間,而且引入了模塊裏的內容,當執行子程序的時候又建立了個空間,加if __name__ == '__main__':就是爲了終止循環.當在本模塊裏時,__name__ == '__main__',當被引用到另外一個模塊時,__name__ ==被引用模塊的名字,條件改變了,因此終止循環 原理 if __name__ == '__main__': 使用python都是調用操做系統的命令來啓動進程 一樣使用python 不一樣的操做系統的操做是不一樣的 對於windows來講 必要加if __name__ == '__main__': 對於linux ios來講 沒必要要加if __name__ == '__main__':
給子進程傳參數 github
import os import time from multiprocessing import Process # 進程模塊 def func(num): time.sleep(2) print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p1 = Process(target=func,args=(1,)) # 進程對象 p1.start() # 向操做系統提交了一個開啓子進程的申請 p2 = Process(target=func,args=(2,)) # 進程對象 p2.start() # 向操做系統提交了一個開啓子進程的申請 print('主進程 的 代碼執行結束了')
開啓多個子進程 數據庫
import os import time from multiprocessing import Process # 進程模塊 def func(num): print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) for i in range(10): p = Process(target=func,args=(i,)) # args表示調用對象的位置參數元組 p.start() # start不是運行一個程序,而是調用操做系統的命令,要建立子進程 print('主進程 的 代碼執行結束了')
join方法: 阻塞,直到p這個子進程執行完畢以後再繼續執行 編程
import os import time from multiprocessing import Process # 進程模塊 def func(num): time.sleep(1) print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p = Process(target=func,args=(1,)) p.start() # start不是運行一個程序,而是調用操做系統的命令,要建立子進程 p.join() # 阻塞,直到p這個子進程執行完畢以後再繼續執行 print('主進程 的 代碼執行結束了')
一批任務使用joinjson
import os import time from multiprocessing import Process # 進程模塊 def func(num): print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p_l = [] for i in range(10): p = Process(target=func,args=(i,)) p.start() # start不是運行一個程序,而是調用操做系統的命令,要建立子進程,非阻塞 p_l.append(p) print(p_l) for p in p_l : p.join() # 阻塞,直到p這個子進程執行完畢以後再繼續執行 print('主進程 的 代碼執行結束了')
is_alive(查看子進程是否活着)
terminate(強制結束一個正在運行的進程)--非阻塞 windows
import os import time from multiprocessing import Process # 進程模塊 def func(num): time.sleep(2) print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p1 = Process(target=func,args=(1,)) # 進程對象 p1.start() # 向操做系統提交了一個開啓子進程的申請 print(p1.is_alive()) # 檢測進程是否在執行任務 p1.terminate() # 強制結束子進程 - 非阻塞 print(p1.is_alive()) # 檢測進程是否在執行任務 print('主進程 的 代碼執行結束了')
面向對象的方式開啓子進程
import os import time from multiprocessing import Process # 進程模塊 class MyProcess(Process): def __init__(self,num): #若是傳參須要:自定義__init__,須要執行父類的__init__方法 super().__init__() self.num = num def run(self): #重寫 run方法 print('in run ',self.num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main ', os.getpid(), os.getppid()) p = MyProcess(1) p.start()
進階,多個進程同時運行(注意,子進程的執行順序不是根據啓動順序決定的)
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start() p_lst.append(p)
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start() p_lst.append(p) p.join() # [p.join() for p in p_lst] print('父進程在執行')
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start() p_lst.append(p) # [p.join() for p in p_lst] print('父進程在執行')
除了上面這些開啓進程的方法,還有一種以繼承Process類的形式開啓進程的方式
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print(os.getpid()) print('%s 正在和女主播聊天' %self.name) p1=MyProcess('zhangsan') p2=MyProcess('lisi') p3=MyProcess('wangwu') p1.start() #start會自動調用run p2.start() # p2.run() p3.start() p1.join() p2.join() p3.join() print('主線程')
進程之間的數據隔離問題
from multiprocessing import Process def work(): global n n=0 print('子進程內: ',n) if __name__ == '__main__': n = 100 p=Process(target=work) p.start() print('主進程內: ',n)
注意:
進程與進程之間的內存中的數據是隔離的,內存空間是不能共享的
因此要想進行通訊,必須藉助其餘手段,且這兩個進程都是自願的
子進程的執行結果父進程獲取不到
父進程依賴子進程的執行結果呢
父進程如何獲取子進程的執行結果???
父子進程之間經過socket通訊
會隨着主進程的結束而結束。
主進程建立守護進程
其一:守護進程會在主進程代碼執行結束後就終止
其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self,person): super().__init__() self.person = person def run(self): print(os.getpid(),self.name) print('%s正在和女主播聊天' %self.person) p=Myprocess('哪吒') p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行 p.start() time.sleep(10) # 在sleep時查看進程id對應的進程ps -ef|grep id print('主')
from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------")#打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的打印信息123,由於主進程打印main----時,p1也執行了,可是隨即被終止.
import time from multiprocessing import Process def func1(): print('begin') time.sleep(3) print('wahaha') if __name__ == '__main__': p = Process(target=func1) p.daemon = True # 守護進程的屬性,默認是False,若是設置成True,就表示設置這個子進程爲一個守護進程,設置守護進程的操做應該在開啓子進程以前 p.start() time.sleep(1) print('主進程') print 結果: # begin # 主進程 緣由:由於 p = Process(target=func1),而p.daemon = True 建立了守護進程,主進程執行完,就不執行wahaha,守護進程設置func1,主進程代碼執行完畢後,func1代碼就會結束執行
import time from multiprocessing import Process def func1(): print('begin') time.sleep(3) print('wahaha') def func2(): while True: print('in func2') time.sleep(0.5) if __name__ == '__main__': Process(target=func1).start() p = Process(target=func2) p.daemon = True # 守護進程的屬性,默認是False,若是設置成True,就表示設置這個子進程爲一個守護進程 # 設置守護進程的操做應該在開啓子進程以前 p.start() time.sleep(1) print('主進程') print結果: begin in func2 in func2 主進程 wahaha 緣由: p = Process(target=func2),而p.daemon = True守護進程func2跟func1沒有關係,因此會執行wahaha,守護進程設置誰,主進程代碼執行完畢後,誰就會結束執行
總結:
python - multiprocessing Process 類 - 建立子進程 操做系統的差異 windows 開啓子進程的代碼必須寫在if __name__ == '__main__'下面 start: 只是向操做系統發出指令,建立進程 建立進程有必定的時間開銷 子進程和主進程的執行互不干擾,子進程和主進程是異步的 主進程會等待子進程的結束再結束 若是 主進程須要在子進程都運行結束以後再作某件事情 : join 複雜的計算 主進程如何拿到子進程計算的結果 ????? isalive 查看子進程是否活着 terminate強制結束一個正在運行的進程 - 非阻塞 使用面向對象的方式開啓一個子進程 繼承Process類 重寫run方法 若是要傳遞參數 : 自定義__init__,須要執行父類的__init__方法
多進程啓動tcp協議的socket來完成併發
import socket from multiprocessing import Process def talk(conn): try: while True: conn.send(b'hello') print(conn.recv(1024)) finally: conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',9091)) sk.listen() try: while True: conn,addr = sk.accept() Process(target=talk,args=(conn,)).start() finally: sk.close()
import socket import os sk = socket.socket() sk.connect(('127.0.0.1',9091)) while True: print(sk.recv(1024)) sk.send(str(os.getpid()).encode('utf-8'))
from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': #windows下start進程必定要寫到這下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start()
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
from multiprocessing import Process import time import random class Myprocess(Process): def __init__(self,person): self.name=person super().__init__() def run(self): print('%s正在和網紅臉聊天' %self.name) time.sleep(random.randrange(1,5)) print('%s還在和網紅臉聊天' %self.name) p1=Myprocess('哪吒') p1.start() p1.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活 print(p1.is_alive()) #結果爲True print('開始') print(p1.is_alive()) #結果爲False
1 class Myprocess(Process): 2 def __init__(self,person): 3 self.name=person # name屬性是Process中的屬性,標示進程的名字 4 super().__init__() # 執行父類的初始化方法會覆蓋name屬性 5 #self.name = person # 在這裏設置就能夠修改進程名字了 6 #self.person = person #若是不想覆蓋進程名,就修改屬性名稱就能夠了 7 def run(self): 8 print('%s正在和網紅臉聊天' %self.name) 9 # print('%s正在和網紅臉聊天' %self.person) 10 time.sleep(random.randrange(1,5)) 11 print('%s正在和網紅臉聊天' %self.name) 12 # print('%s正在和網紅臉聊天' %self.person) 13 14 15 p1=Myprocess('哪吒') 16 p1.start() 17 print(p1.pid) #能夠查看子進程的進程id
經過剛剛的學習,咱們想方設法實現了程序的異步,讓多個任務能夠同時在幾個進程中併發處理,他們之間的運行沒有順序,一旦開啓也不受咱們控制。儘管併發編程讓咱們能更加充分的利用IO資源,可是也給咱們帶來了新的問題。
當多個進程使用同一份數據資源的時候,就會引起數據安全或順序混亂問題。
import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' %(n,os.getpid())) time.sleep(random.random()) print('%s:%s is done' %(n,os.getpid())) if __name__ == '__main__': for i in range(3): p=Process(target=work,args=(i,)) p.start()
# 由併發變成了串行,犧牲了運行效率,但避免了競爭 import os import time import random from multiprocessing import Process,Lock def work(lock,n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,i)) p.start()
上面這種狀況雖然使用加鎖的形式實現了順序的執行,可是程序又從新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。
接下來,咱們以模擬搶票爲例,來看看數據安全的重要性。
#文件db的內容爲:{"count":1} #注意必定要用雙引號,否則json沒法識別 #併發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(0.1) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網絡延遲 json.dump(dic,open('db','w')) print('\033[43m購票成功\033[0m') def task(): search() get() if __name__ == '__main__': for i in range(100): #模擬併發100個客戶端搶票 p=Process(target=task) p.start()
#文件db的內容爲:{"count":5} #注意必定要用雙引號,否則json沒法識別 #併發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(random.random()) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(random.random()) #模擬寫數據的網絡延遲 json.dump(dic,open('db','w')) print('\033[32m購票成功\033[0m') else: print('\033[31m購票失敗\033[0m') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(100): #模擬併發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start()
當多個進程共享一段數據的時候,數據會出現不安全的現象, 須要加鎖來維護數據的安全性 lock = Lock() # 創造了一把鎖 lock.acquire() # 獲取了這把鎖的鑰匙 lock.release() # 歸還這把鎖的鑰匙 保證一段代碼在同一時刻只能被一個進程執行
#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 雖然能夠用文件共享數據實現進程間通訊,但問題是: 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 2.須要本身加鎖處理 #所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。 隊列和管道都是將數據存放於內存中 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來, 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
搶票的例子 每一個人都能 查看餘票 買票 import json import time from multiprocessing import Lock from multiprocessing import Process def search(i): with open('db','r') as f:count_dic = json.load(f) time.sleep(0.2) print('person %s 餘票 : %s張'%(i,count_dic['count'])) def buy(i): with open('db','r') as f:count_dic = json.load(f) time.sleep(0.2) if count_dic['count'] > 0: count_dic['count'] -= 1 print('person %s 購票成功'%i) time.sleep(0.2) with open('db','w') as f:json.dump(count_dic,f) def task(i,lock): search(i) lock.acquire()# 若是以前已經被acquire了 且 沒有被release 那麼進程會在這裏阻塞 buy(i) lock.release() if __name__ == '__main__': lock = Lock() for i in range(10): p = Process(target=task,args=(i,lock)) p.start()
IPC(Inter-Process Communication)
隊列主要用於維護秩序的比較多:先進先出,買票,秒殺
建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。
Queue([maxsize])
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
方法介紹
Queue([maxsize])
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具備如下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。
q.qsize()
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。
q.empty()
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。
q.close()
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
q.cancel_join_thread()
不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。
q.join_thread()
鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。
隊列 先進先出FIFO - 維護秩序的時候用的比較多 買票 秒殺 from queue import Queue q = Queue(2) # 2 參數裏面最多能夠放2個值 print(q.qsize()) # 隊列長度 q.put(12) q.put(3) q.put_nowait(4) # 不等待,沒有滿就放進去,滿了就報錯 print('***',q.qsize()) print(q.get()) print(q.get()) #有值的時候取值 沒有值的時候會阻塞 print(q.get_nowait()) # 當有值的時候取值 沒有值的時候會報錯 print(q.get_nowait()) print(q.full()) #判斷是否滿 返回布爾值 print(q.empty()) #判斷是否滿 返回布爾值 print(q.qsize()) print(q.get()) print(q.qsize())
from multiprocessing import Queue q = Queue() 在多進程中 q.empty() q.full()是不許的 q.empty() q.full() q.put_nowait() q.put() q.get_nowait() q.get()
''' multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列 都是基於消息傳遞實現的,可是隊列接口 ''' from multiprocessing import Queue q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) # q.put(3) # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。 # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。 try: q.put_nowait(3) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。 print('隊列已經滿了') # 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。 print(q.full()) #滿了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。 try: q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。 print('隊列已經空了') print(q.empty()) #空了
上面這個例子尚未加入進程通訊,只是先來看看隊列爲咱們提供的方法,以及這些方法的使用和現象。
from multiprocessing import Queue,Process def con(q): print(q.get()) if __name__ == '__main__': q = Queue() p = Process(target=con,args=(q,)) p.start() q.put(123)
from multiprocessing import Queue,Process def con(q): print(q.get()) def pro(q): q.put(123) if __name__ == '__main__': q = Queue() p = Process(target=con,args=(q,)) p.start() p = Process(target=pro, args=(q,)) p.start()
import time from multiprocessing import Process, Queue def f(q): q.put([time.asctime(), 'from Eva', 'hello']) #調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。 if __name__ == '__main__': q = Queue() #建立一個Queue對象 p = Process(target=f, args=(q,)) #建立一個進程 p.start() print(q.get()) p.join()
上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最早進入的數據。 接下來看一個稍微複雜一些的例子:
import os import time import multiprocessing # 向queue中輸入數據的函數 def inputQ(queue): info = str(os.getpid()) + '(put):' + str(time.asctime()) queue.put(info) # 向queue中輸出數據的函數 def outputQ(queue): info = queue.get() print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info)) # Main if __name__ == '__main__': multiprocessing.freeze_support() record1 = [] # store input processes record2 = [] # store output processes queue = multiprocessing.Queue(3) # 輸入進程 for i in range(10): process = multiprocessing.Process(target=inputQ,args=(queue,)) process.start() record1.append(process) # 輸出進程 for i in range(10): process = multiprocessing.Process(target=outputQ,args=(queue,)) process.start() record2.append(process) for p in record1: p.join() for p in record2: p.join()
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
基於隊列實現生產者消費者模型
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('主')
此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環。
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.put(None) #發送結束信號 if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('主')
注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() p1.join() q.put(None) #發送結束信號 print('主')
但上述解決方式,在有多個生產者和多個消費者時,咱們則須要用一個很low的方式去解決
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(name,q): for i in range(2): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) #開始 p1.start() p2.start() p3.start() c1.start() p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號 p2.join() p3.join() q.put(None) #有幾個消費者就應該發送幾回結束信號None q.put(None) #發送結束信號 print('主')
生產者消費者模型 - 解決創造(生產)數據和處理(消費)數據的效率不平衡問題
把創造數據 和 處理數據放在不一樣的進程中,
根據他們的效率來調整進程的個數
生產數據快 消費數據慢 內存空間的浪費
消費數據快 生產數據慢 效率低下
生產者消費者模型 會默寫 ***
import time import random from multiprocessing import Process,Queue def consumer(q,name): while True: food = q.get() # 取出生產好的食物 if food == 'stop':break print('%s 吃了 %s'%(name,food)) time.sleep(random.random()) def producer(q,name,food,n=10): for i in range(n): time.sleep(random.random()) fd = food+str(i) print('%s 生產了 %s'%(name,fd)) q.put(fd) #生產好的食物 fd = food+str(i)放進去 if __name__ == '__main__': q = Queue(10) c1 = Process(target=consumer,args=(q,'alex')) c1.start() c2 = Process(target=consumer, args=(q, 'alex')) c2.start() p1 = Process(target=producer,args=(q,'太白','泔水')) p1.start() p2 = Process(target=producer, args=(q, 'egon', '魚刺')) p2.start() p1.join() p2.join() # join 阻塞,p1,p2,運行完畢加入stop,當取到stop時,程序中止 q.put('stop') q.put('stop')
讓consumer停下來的方法
在全部生產者結束生產以後 向隊列中放入一個結束符
有幾個consumer就向隊列中放幾個結束符
在消費者消費的過程當中,接收到結束符,就結束消費的進程
import time import random from multiprocessing import JoinableQueue,Process # join 阻塞 def consumer(q,name): while True: food = q.get() print('%s 吃了 %s'%(name,food)) time.sleep(random.random()) q.task_done() # 每次執行就給隊列發個消息 def producer(q,name,food,n=10): for i in range(n): time.sleep(random.random()) fd = food+str(i) print('%s 生產了 %s'%(name,fd)) q.put(fd) q.join() #q.task_done(),q.join()結合使用,q.task_done()告訴隊列數量達到生產數量,q.join()不阻塞 if __name__ == '__main__': q = JoinableQueue() c1 = Process(target=consumer,args=(q,'alex')) c1.daemon = True c1.start() c2 = Process(target=consumer, args=(q, 'alex')) c2.daemon = True c2.start() p1 = Process(target=producer,args=(q,'太白','泔水')) p1.start() p2 = Process(target=producer, args=(q, 'egon', '魚刺')) p2.start() p1.join() p2.join()
JoinableQueue([maxsize])
建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
方法介紹
JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法:
q.task_done()
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。
q.join()
生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。
下面的例子說明如何創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了 def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.join() #生產完畢,使用此方法進行阻塞,直到隊列中全部項目均被處理。 if __name__ == '__main__': q=JoinableQueue() #生產者們:即廚師們 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True c2.daemon=True #開始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('主') #主進程等--->p1,p2,p3等---->c1,c2 #p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據 #於是c1,c2也沒有存在的價值了,不須要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,因此設置成守護進程就能夠了。
隊列總結:
只有multiprocessing中的隊列 才能幫助你 實現 IPC 永遠不可能出現數據不安全的狀況,多個進程不會同時取走同一個數據(隊列自帶鎖) 提供給你的方法 put get put_nowait get_nowait empty - 在多進程內不可靠 full - 在多進程內不可靠 qsize - 在多進程內不可靠 爲何多進程不可靠??? (若是隊列裏不是空的,裏面有東西,a進來看見裏面有東西,把消息發送回去,發送途中,b可能進來把東西拿走了,而a把裏面有東西的消息傳到時,隊列裏面已經空了,因此多進程不可用) 因爲先進先出的特色+進程通訊的功能+數據進程安全,常常用它來完成進程之間的通訊 生產者消費者模型 生產者和消費者的效率平衡的問題 內存的控制 - 隊列的長度限制 讓消費者自動停下來 JoinableQueue 在消費數據的時候 task_done 在生產端\主進程 join 管道 : Pipe 隊列就是基於管道實現的 隊列 數據安全的 管道 數據不安全的 隊列 = 管道 + 鎖
隊列就是基於管道實現的 隊列 數據安全的 管道 數據不安全的 隊列 = 管道 + 鎖 什麼是管道: ***************** 管道Pipe,IPC通訊的一種機制,隊列就是基於管道來完成通訊的,可是管道是原生的通訊方式,在進程之間會產生數據不安全的狀況,須要本身手動加鎖來處理,管道在數據傳輸過程當中,還涉及到一個端口管理,這個須要咱們在代碼中作處理才能使代碼更完善
from multiprocessing import Pipe left,right = Pipe() left.send('aaa') print(right.recv()) from multiprocessing import Pipe,Process def consumer(left,right): left.close() while True: try: print(right.recv()) except EOFError: break if __name__ == '__main__': left,right = Pipe() p = Process(target=consumer,args=(left,right)) p.start() right.close() for i in range(10): left.send('hello') left.close()
EOF異常的觸發
在這一個進程中 若是不在用這個端點了,應該close
這一在recv的時候,若是其餘端點都被關閉了,就可以知道不會在有新的消息傳進來
此時就不會在這裏阻塞等待,而是拋出一個EOFError
* close並非關閉了整個管道,而是修改了操做系統對管道端點的引用計數的處理
from multiprocessing import Process,Pipe def consumer(p,name): produce, consume=p produce.close() while True: try: baozi=consume.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: break def producer(p,seq=10): produce, consume=p consume.close() for i in range(seq): produce.send(i) # if __name__ == '__main__': produce,consume=Pipe() for i in range(5): # 5個一塊兒接收 c=Process(target=consumer,args=((produce,consume),'c1')) c.start() for i in range(5): p = Process(target=producer, args=((produce, consume),)) p.start() producer((produce,consume)) produce.close() consume.close()
管道和隊列的區別
管道一般指無名管道 1、它是半雙工的(即數據只能在一個方向上流動),具備固定的讀端和寫端 2、它只能用於具備親緣關係的進程中通訊(也就是父與子進程或者兄弟進程之間) 3、數據不可反覆讀取了,即讀了以後就沒有了 4,它能夠當作是一種特殊的文件,對於它的讀寫也可使用普通的read、write 等函數。可是它不是普通的文件,並不屬於其餘任何文件系統,而且只存在於內存中。 消息隊列 1、消息隊列是面向記錄的,其中的消息具備特定的格式以及特定的優先級 2、消息隊列獨立於發送與接收進程。進程終止時,消息隊列及其內容不會被刪除。 3、消息隊列能夠實現消息隨機查詢。 mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。 隊列和管道都是將數據存放於內存中,而隊列又是基於(管道+鎖)實現的, 可讓咱們從複雜的鎖問題中解脫出來,於是隊列纔是進程間通訊的最佳選擇。 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題, 並且在進程數目增多時,每每能夠得到更好的可展性。
信號量: Semaphore
信號量的本質: lock鎖 + count計數
多把鑰匙對應一把鎖
信號量(semaphore)與已經介紹過的 IPC 結構不一樣,它是一個計數器。信號量用於實現進程間的互斥與同步,而不是用於存儲進程間通訊數據。
特色:
信號量用於進程間同步,若要在進程間傳遞數據須要結合共享內存。
信號量基於操做系統的 PV 操做,程序對信號量的操做都是原子操做。
每次對信號量的 PV 操做不只限於對信號量值加 1 或減 1,並且能夠加減任意正整數。
支持信號量組。
信號量是一個計數器,能夠用來控制多個線程對共享資源的訪問.,它不是用於交換大批數據,而用於多線程之間的同步.它常做爲一種鎖機制,防止某進程在訪問資源時其它進程也訪問該資源.
所以,主要做爲進程間以及同一個進程內不一樣線程之間的同步手段.
互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
from multiprocessing import Process from multiprocessing import Semaphore # 信號量 ktv 4個小房子 10我的站在房子外面要進去玩兒 sem = Semaphore(3) # 參數3 有3把鑰匙 sem.acquire() print(1) sem.acquire() print(2) sem.acquire() print(3) # sem.release() # 還一把鑰匙 sem.acquire() # 阻塞 print(4)
假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。
import time import random from multiprocessing import Process,Semaphore def ktv(num,sem): sem.acquire() print('person%s進入了ktv' % num) time.sleep(random.randint(1,4)) print('person%s走出了ktv' % num) sem.release() if __name__ == '__main__': sem = Semaphore(4) for i in range(10): p = Process(target=ktv,args=(i,sem)) p.start()
事件: Event
事件 : 經過一個標記來控制對多個進程進行同步控制
在某個地方wait是否阻塞的行爲是根據事件對象內部的一個標記來決定的
在事件對象中提供的方法能夠修改這個標記的狀態
併發的時候
不少模型
事件
from multiprocessing import Event,Process wait() 方法 等待 事件內部的信號變成True 就不阻塞了 阻塞 若是這個標誌是False 那麼就阻塞 非阻塞 若是這個標誌是True 那麼就非阻塞 查看標誌 is_set() 修改標誌 set()將標誌設置爲True clear() 將標誌設置爲False e = Event() print(e.is_set()) # 在事件的建立之初 默認是False e.set() # 將標誌設置爲True print(e.is_set()) e.wait() # 至關於什麼都沒作pass e.clear() # 將標誌設置爲False # e.wait() # 永遠阻塞 e.wait(timeout=10) # 若是信號在阻塞10s以內變爲True,那麼不繼續阻塞直接pass, # 若是就阻塞10s以後狀態仍是沒變,那麼繼續, print(e.is_set()) # 不管前面的wait的timeout是否經過,個人狀態都不會所以改變
import time import random def traffic_light(e): print('\033[1;31m 紅燈亮\033[0m') while True: time.sleep(2) if e.is_set(): print('\033[1;31m 紅燈亮\033[0m') e.clear() else: print('\033[1;32m 綠燈亮\033[0m') e.set() 車 等或者經過 def car(id,e): if not e.is_set(): print('car %s 等待' % id) e.wait() print('car %s 經過'%id) def police_car(id,e): if not e.is_set(): e.wait(timeout = 0.5) print('police car %s 經過' % id) 主進程 啓動交通控制燈 啓動車的進程 if __name__ == '__main__': e = Event() p = Process(target=traffic_light,args=(e,)) p.start() car_lst = [car,police_car] for i in range(20): p = Process(target=random.choice(car_lst), args=(i,e)) p.start() time.sleep(random.randrange(0,3,2))
展望將來,基於消息傳遞的併發編程是大勢所趨
即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合,經過消息隊列交換數據。
這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中。
但進程間應該儘可能避免通訊,即使須要通訊,也應該選擇進程安全的工具來避免加鎖帶來的問題。
之後咱們會嘗試使用數據庫來解決如今進程之間的數據共享問題。
manager模塊介紹
進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的 雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
進程天生數據隔離,用共享用線程數據共享
Manager會建立一個服務進程,其餘的進程都統一來訪問這個server進程,從而達到多進程之間的數據通訊。
一旦主進程結束,則server進程也將結束
爲何要有進程池?進程池的概念。
在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?
在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。
Pool([numprocess [,initializer [, initargs]]]):建立進程池
參數介紹
numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組
1 p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 2 '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()''' 3 4 p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 5 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。''' 6 7 p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 8 9 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法 2 obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。 3 obj.ready():若是調用完成,返回True 4 obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常 5 obj.wait([timeout]):等待結果變爲可用。 6 obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
import os,time from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程當中可能有阻塞也可能沒有阻塞 # 但無論該任務是否存在阻塞,同步調用都會在原地等着 print(res_l)
import os import time import random from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(random.random()) return n**2 if __name__ == '__main__': p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行 # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務 # 須要注意的是,進程池中的三個進程不會同時開啓或者同時結束 # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。 res_l.append(res) # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果 # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了 p.close() p.join() for res in res_l: print(res.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
import os import time from multiprocessing import Pool def wahaha(): time.sleep(1) print(os.getpid()) return True if __name__ == '__main__': p = Pool(5) # CPU的個數(通常CPU個數是4) 或者 +1 ret_l = [] for i in range(20): ret = p.apply(func = wahaha) # 同步的,不用 print(ret)
import os import time from multiprocessing import Pool def wahaha(): time.sleep(1) print(os.getpid()) if __name__ == '__main__': p = Pool(5) # CPU的個數 或者 +1 ret_l = [] for i in range(20): ret = p.apply_async(func = wahaha) # async 異步的 ret_l.append(ret) p.close() # 關閉 不是進程池中的進程不工做了 # 而是關閉了進程池,讓任務不能再繼續提交了 p.join() # 等待這個池中提交的任務都執行完 # 表示等待全部子進程中的代碼都執行完 主進程才結束
import os import time from multiprocessing import Pool def wahaha(): time.sleep(1) print(os.getpid()) return True if __name__ == '__main__': p = Pool(5) # CPU的個數 或者 +1 ret_l = [] for i in range(20): ret = p.apply_async(func = wahaha) # apply_async 異步的 ret_l.append(ret) p.close() # 關閉 進程池中的進程不工做了 # 而是關閉了進程池,讓任務不能再繼續提交了 p.join() # 等待這個池中提交的任務都執行完 for ret in ret_l: print(ret.get())
import os import time from multiprocessing import Pool def wahaha(): time.sleep(1) print(os.getpid()) return True if __name__ == '__main__': p = Pool(5) # CPU的個數 或者 +1 ret_l = [] for i in range(20): ret = p.apply_async(func = wahaha) # apply_async 異步的 ret_l.append(ret) for ret in ret_l: print(ret.get())
異步的 apply_async 1.若是是異步的提交任務,那麼任務提交以後進程池和主進程也異步了, 主進程不會自動等待進程池中的任務執行完畢 2.若是須要主進程等待,須要p.join 可是join的行爲是依賴close 3.若是這個函數是有返回值的 也能夠經過ret.get()來獲取返回值 可是若是一邊提交一遍獲取返回值會讓程序變成同步的 因此要想保留異步的效果,應該講返回對象保存在列表裏,全部任務提交完成以後再來取結果 這種方式也能夠去掉join,來完成主進程的阻塞等待池中的任務執行完畢
進程池完成socket通訊
#Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count()) #開啓6個客戶端,會發現2個客戶端處於等待狀態 #在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程 from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): print('進程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool(4) while True: conn,*_=server.accept() p.apply_async(talk,args=(conn,)) # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
import socket from multiprocessing import Pool def talk(conn): try: while True: conn.send(b'hello') print(conn.recv(1024)) finally: conn.close() if __name__ == '__main__': p = Pool(5) sk = socket.socket() sk.bind(('127.0.0.1',9090)) sk.listen() while True: conn,addr = sk.accept() p.apply_async(func=talk,args=(conn,))
import socket import os sk = socket.socket() sk.connect(('127.0.0.1',9090)) while True: print(sk.recv(1024)) sk.send(str(os.getpid()).encode('utf-8'))
須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
「回調函數就是一個經過函數指針調用的函數。
若是你把函數的指針(地址)做爲參數傳遞給另外一個函數,當這個指針被用來調用其所指向的函數時,咱們就說這是回調函數。」
from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的結果,其實徹底不必拿該結果,該結果已經傳給回調函數處理了 ''' 打印結果: <進程3388> get https://www.baidu.com <進程3389> get https://www.python.org <進程3390> get https://www.openstack.org <進程3388> get https://help.github.com/ <進程3387> parse https://www.baidu.com <進程3389> get http://www.sina.com.cn/ <進程3387> parse https://www.python.org <進程3387> parse https://help.github.com/ <進程3387> parse http://www.sina.com.cn/ <進程3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}] '''
import re from urllib.request import urlopen from multiprocessing import Pool def get_page(url,pattern): response=urlopen(url).read().decode('utf-8') return pattern,response def parse_page(info): pattern,page_content=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0].strip(), 'title':item[1].strip(), 'actor':item[2].strip(), 'time':item[3].strip(), } print(dic) if __name__ == '__main__': regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>' pattern1=re.compile(regex,re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get()
若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數
from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待進程池中全部進程執行完畢 nums=[] for res in res_l: nums.append(res.get()) #拿到全部結果 print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理
*** 回調函數 _ 在主進程中執行
在發起任務的時候 指定callback參數
在每一個進程執行完apply_async任務以後,返回值會直接做爲參數傳遞給callback的函數,執行callback函數中的代碼
總結
進程的總結: IPC通訊 隊列 管道+鎖 管道 是隊列的底層 數據共享 _ 進程就是數據隔離的 Manager模塊 數據類型 都可以進行數據共享 一部分都是不加鎖 不支持數據進程安全 不安全的解決辦法 加鎖 進程池 進程不能無限開 會給操做系統調度增長負擔 且真正能被同時執行的進程最多也就和CPU個數相同等 進程的開啓和銷燬都要消耗資源和時間 進程 進程三狀態:就緒,運行,阻塞 同步異步阻塞非阻塞 請解釋異步非阻塞 給開發完成的全部裝飾器+log 是計算機中最小的資源分配單位 進程的建立 Process 進程之間的異步 自己子進程主進程之間都是異步的 進程之間的同步控制 Lock Semaphore Event 進程之間的數據隔離 自己進程與進程之間都是數據隔離的 進程之間通訊 IPC 管道 隊列 數據共享 Manager 進程池 -能夠獲取返回值 同步調用 - 基本不用的 異步調用 - 重要的 apply_async get獲取結果 close join 回調函數 Pool 回調函數在主進程中執行 apply_async(func = wahaha,callback = back) 進程間通訊方式有哪些? 管道、信號量、信號、消息隊列、共享內存、套接字 進程間通訊(IPC) 1)管道 管道分爲有名管道和無名管道 無名管道是一種半雙工的通訊方式,數據只能單向流動,並且只能在具備親緣關係的進程間使用.進程的親緣關係通常指的是父子關係。無明管道通常用於兩個不一樣進程之間的通訊。 當一個進程建立了一個管道,並調用fork建立本身的一個子進程後,父進程關閉讀管道端,子進程關閉寫管道端,這樣提供了兩個進程之間數據流動的一種方式。 有名管道也是一種半雙工的通訊方式,可是它容許無親緣關係進程間的通訊。 2)信號量 信號量是一個計數器,能夠用來控制多個線程對共享資源的訪問.,它不是用於交換大批數據,而用於多線程之間的同步.它常做爲一種鎖機制,防止某進程在訪問資源時其它進程也訪問該資源. 所以,主要做爲進程間以及同一個進程內不一樣線程之間的同步手段. 3)信號 信號是一種比較複雜的通訊方式,用於通知接收進程某個事件已經發生. 4)消息隊列 消息隊列是消息的鏈表,存放在內核中並由消息隊列標識符標識.消息隊列克服了信號傳遞信息少,管道只能承載無格式字節流以及緩衝區大小受限等特色. 消息隊列是UNIX下不一樣進程之間可實現共享資源的一種機制,UNIX容許不一樣進程將格式化的數據流以消息隊列形式發送給任意進程. 對消息隊列具備操做權限的進程均可以使用msget完成對消息隊列的操做控制.經過使用消息類型,進程能夠按任何順序讀信息,或爲消息安排優先級順序. 5)共享內存 共享內存就是映射一段能被其餘進程所訪問的內存,這段共享內存由一個進程建立,但多個進程均可以訪問.共享內存是最快的IPC(進程間通訊)方式, 它是針對其它進程間通訊方式運行效率低而專門設計的.它每每與其餘通訊機制,如信號量,配合使用,來實現進程間的同步與通訊. 6)套接字:可用於不一樣及其間的進程通訊