3一、互斥鎖與進程間通訊(隊列)

咱們以前作了多進程併發,那麼大家有沒有發現問題。若是說多個進程共享同一個數據,好比搶火車票你們同時在客戶端查看同時購買會出現什麼問題呢?今天咱們將講述進程鎖還有進程間通訊,進程之間彼此隔離,他們須要一個第三方聯繫起來。編程

 

本篇導航:json

 

1、互斥鎖安全

進程之間數據隔離,可是共享一套文件系統,於是能夠經過文件來實現進程直接的通訊,但問題是必須本身加鎖處理多線程

注意:加鎖的目的是爲了保證多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改,沒錯,速度是慢了,犧牲了速度而保證了數據安全。併發

一、上廁所dom

先舉個通俗易懂的例子,家裏的廁所,你要上廁所進去後會先鎖門,廁所門就至關於一個互斥鎖,當你在裏面的時候別人過來上廁所就只能在門口等。ide

from multiprocessing import Process,Lock
import os
import time
def work(mutex):
    mutex.acquire()  #上鎖
    print('task[%s] 上廁所' %os.getpid())
    time.sleep(3)
    print('task[%s] 上完廁所' %os.getpid())
    mutex.release()  #開鎖

if __name__ == '__main__':
    mutex=Lock()  #實例化(互斥鎖)
    p1=Process(target=work,args=(mutex,))
    p2=Process(target=work,args=(mutex,))
    p3=Process(target=work,args=(mutex,))
    p1.start()
    p2.start()
    p3.start()

    print('start...')

二、模擬搶票學習

#文件db的內容爲:{"count":1}  #票數能夠本身定義
#注意必定要用雙引號,否則json沒法識別
from multiprocessing import Process,Lock
import json
import time
import random
import os
def search() :  #查看票數
    dic=json.load(open('db.txt',))
    print('剩餘票數%s' %dic['count'])
def get_ticket() :  #購票
    dic = json.load(open('db.txt',))
    if dic['count'] > 0 :
        dic['count'] -= 1
        json.dump(dic,open('db.txt','w'))
        print('%s 購票成功' %os.getpid())
def task(mutex) :  #購票流程
    search()
    time.sleep(random.randint(1, 3)) #模擬購票一系列繁瑣的過程所花費的時間
    mutex.acquire()
    get_ticket()
    mutex.release()
if __name__ == '__main__' :
    mutex = Lock()
    for i in range(50):
        p = Process(target=task,args=(mutex,))
        p.start()

 

2、Process對象其餘屬性使用案例補充ui

一、deamon守護進程spa

p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置

ps:

from multiprocessing import Process
import os
import time
def work():
    print('%s is working' %os.getpid())
    time.sleep(10)
    print('%s is ending' % os.getpid())
if __name__ == '__main__':
    p1=Process(target=work)
    p2=Process(target=work)
    p3=Process(target=work)
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start()
    p2.start()
    p3.start()
    time.sleep(2)
    print('start。。。')

二、join等待子進程

p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程

ps:

from multiprocessing import Process
import os
import time
def work():
    print('%s is working' %os.getpid())
    time.sleep(3)
if __name__ == '__main__':
    p1=Process(target=work)
    p2=Process(target=work)
    p3=Process(target=work)
    p1.daemon=True
    p2.daemon=True
    p3.daemon=True
    p1.start() #初始化1
    p2.start() #初始化2
    p3.start() #初始化3

    p3.join()
    p1.join()
    p2.join()
    print('基於初始化的結果來繼續運行')

三、terminate,is_alive,name,pid

p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True

p.name:進程的名稱
p.pid:進程的pid

ps:

from multiprocessing import Process
import os
import time
def work():
    print('%s is working' %os.getpid())
    time.sleep(3)
if __name__ == '__main__':
    p1=Process(target=work)
    p2=Process(target=work)
    p3=Process(target=work)
    p1.start() #初始化1
    p2.start() #初始化2
    p3.start() #初始化3

    p1.terminate()  #不建議使用
    print(p1.is_alive())
    #雖然已經強制終止進程了可是操做系統終止進程也須要時間因此此時仍是True
    print(p1.name)  #若是沒有起名默認Process-1後面的數字按子進程順序排
    print(p2.name)
    print(p1.pid)  # p1.pid == os.getpid()
    print('基於初始化的結果來繼續運行')

 

3、進程間通訊

咱們學習了經過使用共享的文件的方式,實現進程直接的共享,即共享數據的方式,這種方式必須考慮周全同步、鎖等問題。並且文件是操做系統提供的抽象,能夠做爲進程直接通訊的介質,與mutiprocess模塊無關

但其實mutiprocessing模塊爲咱們提供了基於消息的IPC通訊機制:隊列和管道。IPC機制中的隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

一、進程間通訊(IPC)方式一:隊列(推薦使用)

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

能夠往隊列裏聽任意類型的數據

隊列:先進先出

1)導入

from multiprocessing import Queue

2)實例化

q=Queue(3)  #3是隊列中規定容許的最大項數,省略即不限大小

3)主要方法

q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
 
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣

4)其餘方法(瞭解)

q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞
q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲

5)應用

from multiprocessing import Process,Queue
#1:能夠往隊列裏聽任意類型的數據 2 隊列:先進先出
q=Queue(3)
q.put('first')
q.put('second')
q.put('third')
# q.put('fourht')
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
# print(q.get())
print(q.empty()) #空了

# q=Queue(3)
# q.put('first',block=False)
# q.put('second',block=False)
# q.put('third',block=False)
# q.put('fourth',block=True,timeout=3)

# q.get(block=False)
# q.get(block=True,timeout=3)

# q.get_nowait() #q.get(block=False)

6)生產消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。

基於隊列實現生產者消費者模型

from multiprocessing import Process,Queue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        if res is None:
            break
        time.sleep(random.randint(1,3))
        print('%s 吃了 %s' % (os.getpid(), res))
def producer(q):
    for i in range(5):
        time.sleep(2)
        res='包子%s' %i
        q.put(res)
        print('%s 製造了 %s' %(os.getpid(),res))
    q.put(None)
if __name__ == '__main__':
    q=Queue()
    #生產者們:廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:吃貨們
    p2=Process(target=consumer,args=(q,))

    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print('')
生產者消費者模型

7)建立隊列的另一個類

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

 maxsize是隊列中容許最大項數,省略則無大小限制。 

JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
    q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
    q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
from multiprocessing import Process,JoinableQueue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('%s 吃了 %s' % (os.getpid(), res))
        q.task_done()

def product_baozi(q):
    for i in range(5):
        time.sleep(2)
        res='包子%s' %i
        q.put(res)
        print('%s 製造了 %s' %(os.getpid(),res))
    q.join()

if __name__ == '__main__':
    q=JoinableQueue()
    #生產者們:廚師們
    p1=Process(target=product_baozi,args=(q,))

    #消費者們:吃貨們
    p4=Process(target=consumer,args=(q,))
    p4.daemon=True

    p1.start()
    p4.start()

    p1.join()
    print('')
    #p2結束了
生產者消費者模型2
from multiprocessing import Process,JoinableQueue
import time
import random
import os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('%s 吃了 %s' % (os.getpid(), res))
        q.task_done()

def product_baozi(q):
    for i in range(3):
        time.sleep(2)
        res='包子%s' %i
        q.put(res)
        print('%s 製造了 %s' %(os.getpid(),res))
    q.join()

def product_gutou(q):
    for i in range(3):
        time.sleep(2)
        res='骨頭%s' %i
        q.put(res)
        print('%s 製造了 %s' %(os.getpid(),res))
    q.join()

def product_ganshui(q):
    for i in range(3):
        time.sleep(2)
        res='泔水%s' %i
        q.put(res)
        print('%s 製造了 %s' %(os.getpid(),res))
    q.join()
if __name__ == '__main__':
    q=JoinableQueue()
    #生產者們:廚師們
    p1=Process(target=product_baozi,args=(q,))
    p2=Process(target=product_gutou,args=(q,))
    p3=Process(target=product_ganshui,args=(q,))

    #消費者們:吃貨們
    p4=Process(target=consumer,args=(q,))
    p5=Process(target=consumer,args=(q,))
    p4.daemon=True
    p5.daemon=True
    #設置爲守護進程,在主線程中止時p也中止,可是不用擔憂,producer內調用q.join保證了consumer已經處理完隊列中的全部元素
    p_l=[p1,p2,p3,p4,p5]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()

    print('')
生產者消費者模型3

二、進程間通訊(IPC)方式二:管道(不推薦使用,瞭解便可)

三、 進程間通訊方式三:共享數據(不推薦使用,瞭解便可)

(對這兩種方式瞭解不深,有興趣的能夠本身搜索相關文章)

相關文章
相關標籤/搜索