day034 鎖,信號量,事件,隊列,子進程與子進程通訊,生產者消費者模型,joinableQueue

進程鎖python

#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.須要本身加鎖處理
json

異步運行到某段程序時轉換成同步:下降了效率可是確保了數據的安全性(串行),引入multiprocessing中的Lock:安全

 

from multiprocessing import Process,Lock
import time import json import random def get_ticket(i,ticket_lock): print("你們都到齊了") ticket_lock.acquire() #搶鑰匙 如下變成串行 with open("ticket","r") as f: last_ticket_info=json.load(f) last_ticket=last_ticket_info["count"] if last_ticket>0: time.sleep(random.random()) last_ticket=last_ticket-1 last_ticket_info["count"]=last_ticket with open("ticket","w") as f: json.dump(last_ticket_info,f) print("%s號搶到了" % i) else: print("%s號沒搶到了" % i) ticket_lock.release() #交鑰匙給下一我的 if __name__=="__main__": ticket_lock=Lock() #建立鎖得的對象 for i in range(10): p=Process(target=get_ticket,args=(i,ticket_lock)) # 傳參 p.start()

#所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
dom

隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
異步

隊列ui

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。隊列就像一個特殊的列表,可是能夠設置固定長度,而且從前面插入數據,從後面取出數據,先進先出。spa

 

from multiprocessing import Queue
q=Queue(3) #建立一個隊列對象,隊列長度爲3

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)   #往隊列中添加數據
q.put(2)
q.put(1)
# q.put(4)   # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。
           # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。
try:
    q.put_nowait(4) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。
    print('隊列已經滿了')

# 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。
print(q.full()) #查看是否滿了,滿了返回True,不滿返回False

print(q.get())  #取出數據
print(q.get())
print(q.get())
# print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。
try:
    q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了')

print(q.empty()) #空了

隊列的簡單用法

 

隊列是進程安全的:同一時間只能一個進程拿到隊列中的一個數據,你拿到了一個數據,這個數據別人就拿不到了。線程

生產者和消費者模型:code

 

import time
from multiprocessing import Process,Queue
def produce(q):
    for i in range(1,11):
        time.sleep(1)
        print("生產了%s包子"% i)
        q.put(i)
def consumer(q):
    while 1:
        time.sleep(2)
        s=q.get()
        if s==None:    #判斷是否還有數據(接收完成信號)
            break
        else:
            print("消費者吃了%s包子" % s)
if __name__=="__main__":
    q= Queue(20)      #設置隊列長度
    pro_p=Process(target=produce,args=(q,))   #建立進程
    pro_p.start()
    con_p=Process(target=consumer,args=(q,))  #建立進程
    con_p.start()
    pro_p.join()
    q.put(None)      #在主程序中加信號也能夠在生產者中添加完成信號

 

利用JoinableQueue來解決:對象

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
 參數介紹:
 maxsize是隊列中容許最大項數,省略則無大小限制。   
方法介紹:
  JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
  q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
  q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止,也就是隊列中的數據所有被get拿走了。
 
import time
from multiprocessing import Process,JoinableQueue
def produce(q):
    for i in range(1,11):
        time.sleep(1)
        print("生產了%s包子"% i)
        q.put(i)
    q.join()       #接收完成的信號結束進程
def consumer(q):
    while 1:
        time.sleep(2)
        s=q.get()
        print("消費者吃了%s包子" % s)
        q.task_done()#給共享的進程發送完成的信號
if __name__=="__main__":
    q= JoinableQueue(20)   #設置隊列長度
    pro_p=Process(target=produce,args=(q,)) #建立對象
    pro_p.start()
    con_p=Process(target=consumer,args=(q,)) #建立對象
    con_p.daemon=True
    con_p.start()
    pro_p.join()  #等待生產者進程完成
    print("主程序運行完成")

信號量

互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。

import time
import random
from multiprocessing import Process,Semaphore
def dbj(i,s):
    s.acquire()   #進程鎖 串行
    print('%s號男主人公來洗腳'%i)
    print('-------------')
    time.sleep(random.randrange(3,6))
    s.release()

if __name__ == '__main__':
    s = Semaphore(4) #建立一個計數器,每次acquire就減1,直到減到0,那麼上面的任務只有4個在同時異步的執行,後面的進程須要等待.
    for i in range(10):
        p1 = Process(target=dbj,args=(i,s,))
        p1.start()

 

 

事件

python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。
 事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
clear:將「Flag」設置爲False
set:將「Flag」設置爲True

from multiprocessing import Process,Semaphore,Event
import time,random

e = Event() #建立一個事件對象
print(e.is_set())  #is_set()查看一個事件的狀態,默認爲False,可經過set方法改成True
print('look here!')
# e.set()          #將is_set()的狀態改成True。
# print(e.is_set())#is_set()查看一個事件的狀態,默認爲False,可經過set方法改成Tr
# e.clear()        #將is_set()的狀態改成False
# print(e.is_set())#is_set()查看一個事件的狀態,默認爲False,可經過set方法改成Tr
e.wait()           #根據is_set()的狀態結果來決定是否在這阻塞住,is_set()=False那麼就阻塞,is_set()=True就不阻塞
print('give me!!')

#set和clear  修改事件的狀態 set-->True   clear-->False
#is_set     用來查看一個事件的狀態
#wait       依據事件的狀態來決定是否阻塞 False-->阻塞  True-->不阻塞

事件方法的使用
相關文章
相關標籤/搜索