進程鎖python
#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.須要本身加鎖處理json
異步運行到某段程序時轉換成同步:下降了效率可是確保了數據的安全性(串行),引入multiprocessing中的Lock:安全
from multiprocessing import Process,Lock
import time import json import random def get_ticket(i,ticket_lock): print("你們都到齊了") ticket_lock.acquire() #搶鑰匙 如下變成串行 with open("ticket","r") as f: last_ticket_info=json.load(f) last_ticket=last_ticket_info["count"] if last_ticket>0: time.sleep(random.random()) last_ticket=last_ticket-1 last_ticket_info["count"]=last_ticket with open("ticket","w") as f: json.dump(last_ticket_info,f) print("%s號搶到了" % i) else: print("%s號沒搶到了" % i) ticket_lock.release() #交鑰匙給下一我的 if __name__=="__main__": ticket_lock=Lock() #建立鎖得的對象 for i in range(10): p=Process(target=get_ticket,args=(i,ticket_lock)) # 傳參 p.start()
#所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
dom
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。異步
隊列ui
進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。隊列就像一個特殊的列表,可是能夠設置固定長度,而且從前面插入數據,從後面取出數據,先進先出。spa
from multiprocessing import Queue q=Queue(3) #建立一個隊列對象,隊列長度爲3 #put ,get ,put_nowait,get_nowait,full,empty q.put(3) #往隊列中添加數據 q.put(2) q.put(1) # q.put(4) # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。 # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。 try: q.put_nowait(4) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。 print('隊列已經滿了') # 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。 print(q.full()) #查看是否滿了,滿了返回True,不滿返回False print(q.get()) #取出數據 print(q.get()) print(q.get()) # print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。 try: q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。 print('隊列已經空了') print(q.empty()) #空了 隊列的簡單用法
隊列是進程安全的:同一時間只能一個進程拿到隊列中的一個數據,你拿到了一個數據,這個數據別人就拿不到了。線程
生產者和消費者模型:code
import time from multiprocessing import Process,Queue def produce(q): for i in range(1,11): time.sleep(1) print("生產了%s包子"% i) q.put(i) def consumer(q): while 1: time.sleep(2) s=q.get() if s==None: #判斷是否還有數據(接收完成信號) break else: print("消費者吃了%s包子" % s) if __name__=="__main__": q= Queue(20) #設置隊列長度 pro_p=Process(target=produce,args=(q,)) #建立進程 pro_p.start() con_p=Process(target=consumer,args=(q,)) #建立進程 con_p.start() pro_p.join() q.put(None) #在主程序中加信號也能夠在生產者中添加完成信號
利用JoinableQueue來解決:對象
import time from multiprocessing import Process,JoinableQueue def produce(q): for i in range(1,11): time.sleep(1) print("生產了%s包子"% i) q.put(i) q.join() #接收完成的信號結束進程 def consumer(q): while 1: time.sleep(2) s=q.get() print("消費者吃了%s包子" % s) q.task_done()#給共享的進程發送完成的信號 if __name__=="__main__": q= JoinableQueue(20) #設置隊列長度 pro_p=Process(target=produce,args=(q,)) #建立對象 pro_p.start() con_p=Process(target=consumer,args=(q,)) #建立對象 con_p.daemon=True con_p.start() pro_p.join() #等待生產者進程完成 print("主程序運行完成")
信號量
互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
import time import random from multiprocessing import Process,Semaphore def dbj(i,s): s.acquire() #進程鎖 串行 print('%s號男主人公來洗腳'%i) print('-------------') time.sleep(random.randrange(3,6)) s.release() if __name__ == '__main__': s = Semaphore(4) #建立一個計數器,每次acquire就減1,直到減到0,那麼上面的任務只有4個在同時異步的執行,後面的進程須要等待. for i in range(10): p1 = Process(target=dbj,args=(i,s,)) p1.start()
事件
python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
clear:將「Flag」設置爲False
set:將「Flag」設置爲True
from multiprocessing import Process,Semaphore,Event import time,random e = Event() #建立一個事件對象 print(e.is_set()) #is_set()查看一個事件的狀態,默認爲False,可經過set方法改成True print('look here!') # e.set() #將is_set()的狀態改成True。 # print(e.is_set())#is_set()查看一個事件的狀態,默認爲False,可經過set方法改成Tr # e.clear() #將is_set()的狀態改成False # print(e.is_set())#is_set()查看一個事件的狀態,默認爲False,可經過set方法改成Tr e.wait() #根據is_set()的狀態結果來決定是否在這阻塞住,is_set()=False那麼就阻塞,is_set()=True就不阻塞 print('give me!!') #set和clear 修改事件的狀態 set-->True clear-->False #is_set 用來查看一個事件的狀態 #wait 依據事件的狀態來決定是否阻塞 False-->阻塞 True-->不阻塞 事件方法的使用