multiprocessor(中)

 

1、進程同步(鎖)

  • 經過以前的學習,咱們想方設法實現了程序的異步,讓多個任務能夠同時在幾個進程中併發處理,他們之間的運行沒有順序,一旦開啓也不受咱們控制。儘管併發編程讓咱們能更加充分的利用IO資源,可是也給咱們帶來了新的問題:進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。html

  • 案例一、多個進程搶佔輸出資源,致使打印混亂實例node

     
     
     
    x
     
     
     
     
    import os
    import time
    import random
    from multiprocessing import Process
    def work(n):
        print('%s: %s is running' %(n,os.getpid()))
        time.sleep(random.random())
        print('%s:%s is done' %(n,os.getpid()))
    if __name__ == '__main__':
        for i in range(5):
            p=Process(target=work,args=(i,))
            p.start()
    # 看結果:經過結果能夠看出兩個問題:】
    #問題一:每一個進程中work函數的第一個打印就不是按照咱們for循環的0-4的順序來打印的
    #問題二:咱們發現,每一個work進程中有兩個打印,可是咱們看到全部進程中第一個打印的順序爲0-2-1-4-3,可是第二個打印沒有按照這個順序,變成了2-1-0-3-4,說明咱們一個進程中的程序的執行順序都混亂了。
    #問題的解決方法,第二個問題加鎖來解決,第一個問題是沒有辦法解決的,由於進程開到了內核,有操做系統來決定進程的調度,咱們本身控制不了
    # 0: 9560 is running
    # 2: 13824 is running
    # 1: 7476 is running
    # 4: 11296 is running
    # 3: 14364 is running
    # 2:13824 is done
    # 1:7476 is done
    # 0:9560 is done
    # 3:14364 is done
    # 4:11296 is done
    #經過加鎖解決第二個問題
    #由併發變成了串行,犧牲了運行效率,但避免了競爭
    def work(n,lock):
        #加鎖,保證每次只有一個進程在執行鎖裏面的程序,這一段程序對於全部寫上這個鎖的進程,你們都變成了串行
        lock.acquire()
        print('%s: %s is running' %(n,os.getpid()))
        time.sleep(1)
        print('%s:%s is done' %(n,os.getpid()))
        #解鎖,解鎖以後其餘進程才能去執行本身的程序
        lock.release()
        
        #注意這裏可使用with上下位處理的形式,而且這裏的with還爲咱們實現了報錯處理機制,若是使用上面的方式,一旦報錯,子進程將稱爲孤兒進程
       # with lock:
            #print('%s: %s is running' %(n,os.getpid()))
       #time.sleep(1)
        #print('%s:%s is done' %(n,os.getpid()))
            
    if __name__ == '__main__':
        lock=Lock()
        for i in range(5):
            p=Process(target=work,args=(i,lock))
            p.start()
    #打印結果:
    # 2: 10968 is running
    # 2:10968 is done
    # 0: 7932 is running
    # 0:7932 is done
    # 4: 4404 is running
    # 4:4404 is done
    # 1: 12852 is running
    # 1:12852 is done
    # 3: 980 is running
    # 3:980 is done
    #結果分析:(本身去屢次運行一下,看看結果,我拿出其中一個結果來看)經過結果咱們能夠看出,多進程剛開始去執行的時候,每次運行,首先打印出來哪一個進程的程序是不固定的,可是咱們解決了上面打印混亂示例代碼的第二個問題,那就是同一個進程中的兩次打印都是先完成的,而後才切換到下一個進程去,打印下一個進程中的兩個打印結果,說明咱們控制住了同一進程中的代碼執行順序,若是涉及到多個進程去操做同一個數據或者文件的時候,就不擔憂數據算錯或者文件中的內容寫入混亂了。
     
  • 鎖的應用場景:當多個進程須要操做同一個文件/數據庫的時候 ,會產生數據不安全,咱們應該使用鎖來避免多個進程同時修改一個文件python

  • 特色:web

    • 1.犧牲了效率 保證了數據的安全 2.用戶就會以爲很慢 體驗不好
  • 案例二、併發運行,效率高,可是競爭同一個文件,致使數據混亂數據庫

     
     
     
    xxxxxxxxxx
     
     
     
     
    #注意:首先在當前文件目錄下建立一個名爲db的文件
    #文件db的內容爲:{"count":1},只有這一行數據,而且注意,每次運行完了以後,文件中的1變成了0,你須要手動將0改成1,而後在去運行代碼。注意必定要用雙引號,否則json沒法識別
    from multiprocessing import Process,Lock
    import time,json,random
    #查看剩餘票數
    def search():
        dic=json.load(open('db')) #打開文件,直接load文件中的內容,拿到文件中的包含剩餘票數的字典
        print('\033[43m剩餘票數%s\033[0m' %dic['count'])
    def get():  #搶票
        dic=json.load(open('db'))
        time.sleep(0.1)       #模擬讀數據的網絡延遲,那麼進程之間的切換,致使全部人拿到的字典都是{"count": 1},也就是每一個人都拿到了這一票。
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(0.2)   #模擬寫數據的網絡延遲
            json.dump(dic,open('db','w'))
            #最終結果致使,每一個人顯示都搶到了票,這就出現了問題~
            print('\033[43m購票成功\033[0m')
    def task():
        search()
        get()
    if __name__ == '__main__':
        for i in range(3): #模擬併發100個客戶端搶票
            p=Process(target=task)
            p.start()
    #看結果分析:因爲網絡延遲等緣由使得進程切換,致使每一個人都搶到了這最後一張票
    # 剩餘票數1
    # 剩餘票數1
    # 剩餘票數1
    # 購票成功
    # 購票成功
    # 購票成功
    #加鎖版本
    def search():
        dic=json.load(open('db')) #打開文件,直接load文件中的內容,拿到文件中的包含剩餘票數的字典
        print('\033[43m剩餘票數%s\033[0m' %dic['count'])
    def get():  #搶票
        dic=json.load(open('db'))
        time.sleep(0.1)       #模擬讀數據的網絡延遲,那麼進程之間的切換,致使全部人拿到的字典都是{"count": 1},也就是每一個人都拿到了這一票。
        if dic['count'] >0:
            dic['count']-=1
            time.sleep(0.2)   #模擬寫數據的網絡延遲
            json.dump(dic,open('db','w'))
            #最終結果致使,每一個人顯示都搶到了票,這就出現了問題~
            print('\033[43m購票成功\033[0m')
        else:
            print('sorry,沒票了親!')
    def task(lock):
        search()
        #由於搶票的時候是發生數據變化的時候,全部咱們將鎖加加到這裏
        lock.acquire()
        get()
        lock.release()
    if __name__ == '__main__':
        lock = Lock() #建立一個鎖
        for i in range(3): #模擬併發100個客戶端搶票
            p=Process(target=task,args=(lock,)) #將鎖做爲參數傳給task函數
            p.start()
    #看結果分析:只有一我的搶到了票
    # 剩餘票數1
    # 剩餘票數1
    # 剩餘票數1
    # 購票成功   #幸運的人兒
    # sorry,沒票了親!
    # sorry,沒票了親!
     

    1555407596609

  • 進程鎖的總結:編程

 
 
 
xxxxxxxxxx
 
 
 
 
加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.須要本身加鎖處理
所以咱們最好找尋一種解決方案可以兼顧:
一、效率高(多個進程共享一塊內存的數據)
二、幫咱們處理好鎖問題。
這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道
 

2、隊列和管道

  • 咱們經過上面的學習知道鎖實際上是基於文件的基礎上實現的,也就是當咱們要讀寫一個數據時,須要涉及到使用鎖,來約束多個進程之間的秩序 。可是效率底,因此咱們想改變這種文件類型的操做,那麼咱們將學習使用基於socket和管道實現的高效方式:隊列json

  • 隊列和管道都是將數據存放於內存中。windows

    • 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來, 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。安全

      1555408563614

    • IPC通訊機制:Inter Process Communication,進程間通訊或者跨進程通訊,是指兩個進程之間進行數據交換的過程。IPC不是某個系統所獨有的,任何一個操做系統都須要有相應的IPC機制, 好比Windows上能夠經過剪貼板、管道和郵槽等來進行進程間通訊,而Linux上能夠經過命名共享內容、信號量等來進行進程間通訊。Android它也有本身的進程間通訊方式,Android建構在Linux基礎上,繼承了一部分Linux的通訊方式網絡

  • 進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。隊列就像一個特殊的列表,可是能夠設置固定長度,而且從前面插入數據,從後面取出數據,先進先出

  • 隊列的建立方式:

    •  
       
       
      xxxxxxxxxx
       
       
       
       
      Queue([maxsize]) 建立共享的進程隊列。
      參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖實現。
       
  • 隊列須要記住的方法是:

     
     
     
    xxxxxxxxxx
     
     
     
     
    q = Queue([maxsize]) 
    建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 
    Queue的實例q具備如下方法:
    q.get( [ block [ ,timeout ] ] ) 
    返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。若是沒有值就會等到天荒地老
    q.get_nowait( ) 
    同q.get(False)方法。
    q.put(item [, block [,timeout ] ] ) 
    將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。若是設置了最大的容量,put只能一直等
    q.qsize() 
    返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。
    q.empty() 
    若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
    q.full() 
    若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。
    q.close() 
    關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
    q.cancel_join_thread() 
    不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。
    q.join_thread() 
    鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲
     
  • 隊列代碼實例

     
     
     
    xxxxxxxxxx
     
     
     
     
    from multiprocessing import Queue # 此Queue 是進程隊列,區別於from queue import Queue
    q=Queue(3) #建立一個隊列對象,隊列長度爲3
    #put ,get ,put_nowait,get_nowait,full,empty
    q.put(3)   #往隊列中添加數據
    q.put(2)
    q.put(1)
    # q.put(4)   # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。
                # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。天荒地老
    try:
        q.put_nowait(4) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。
    except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。
        print('隊列已經滿了')
    # 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。
    print(q.full()) #查看是否滿了,滿了返回True,不滿返回False
    print(q.get())  #取出數據
    print(q.get())
    print(q.get())
    # print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。天荒地老
    try:
        q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。
    except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
        print('隊列已經空了')
    print(q.empty()) #空了
    #看下面的隊列的時候 按照編號看
    import time
    from multiprocessing import Process, Queue
    def f(q):
        # q = Queue() #9. 咱們在主進程中開啓了一個q,若是咱們在子進程中的函數裏面再開一個q,那麼你下面q.put('姑娘,多少錢~')添加到了新建立的這q裏裏面了
        q.put('姑娘,多少錢~')  #4.調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。
        print(q.qsize()) #6.查看隊列中有多少條數據了 # 2 這裏有不肯定性,有時候這裏爲1
    def f2(q):
        print('》》》》》》》》')
        print(q.get())  #5.取數據
    if __name__ == '__main__':
        q = Queue() #1.建立一個Queue對象
        q.put('小鬼')
        p = Process(target=f, args=(q,)) #2.建立一個進程
        p2 = Process(target=f2, args=(q,)) #3.建立一個進程
        p.start()
        p2.start()
        print(q.qsize())
        time.sleep(1) #7.若是阻塞一點時間,就會出現主進程運行太快,致使咱們在子進程中查看qsize爲1個。
        print(q.get()) #結果:姑娘,多少錢~
        p.join() 
     # 以上代碼證實這個隊列是能夠實現進程之間的數據共享的
    #一個複雜一點的例子
    import os
    import time
    import multiprocessing
    # 向queue中輸入數據的函數
    def inputQ(queue):
        info = str(os.getpid()) + '(put):' + str(time.asctime())
        queue.put(info)
    # 向queue中輸出數據的函數
    def outputQ(queue):
        info = queue.get()
        print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))
    # Main
    if __name__ == '__main__':
        #windows下,若是開啓的進程比較多的話,程序會崩潰,爲了防止這個問題,使用freeze_support()方法來解決。知道就行啦
        multiprocessing.freeze_support()
        record1 = []   # store input processes
        record2 = []   # store output processes
        queue = multiprocessing.Queue(3)
        # 輸入進程
        for i in range(1,10):  
            process = multiprocessing.Process(target=inputQ,args=(queue,))
            process.start()
            record1.append(process)
        # 輸出進程
        for i in range(10):
            process = multiprocessing.Process(target=outputQ,args=(queue,))
            process.start()
            record2.append(process)
        for p in record1:
            p.join()
        for p in record2:
            p.join()
            
      #隊列是進程安全的:同一時間只能一個進程拿到隊列中的一個數據,你拿到了一個數據,這個數據別人就拿不到了。
     

3、生產者消費者模型

  • 在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度,來是整個生產和消費達到最大化和平衡。

  • 爲何要使用生產者和消費者模型:

    • 在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
  • 什麼是生產者消費者模型:

    • 生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力,而且我能夠根據生產速度和消費速度來均衡一下多少個生產者能夠爲多少個消費者提供足夠的服務,就能夠開多進程等等,而這些進程都是到阻塞隊列或者說是緩衝區中去獲取或者添加數據。

1555412939184

img

  • 基於隊列來實現一個生產者消費者模型:

     
     
     
    xxxxxxxxxx
     
     
     
     
    #生產者消費者模型總結
        #程序中有兩類角色
            一類負責生產數據(生產者)
            一類負責處理數據(消費者)  
        #引入生產者消費者模型爲了解決的問題是:
            平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度   
        #如何實現:
            生產者<-->隊列<——>消費者
        #生產者消費者模型實現類程序的解耦和
    from multiprocessing import Process,Queue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            time.sleep(random.randint(1,3))
            print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
    def producer(q):
        for i in range(1,10):
            time.sleep(random.randint(1,3))
            res='包子%s' %i
            q.put(res)
            print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
    if __name__ == '__main__':
        q=Queue()
        #生產者們:即廚師們
        p1=Process(target=producer,args=(q,))
        #消費者們:即吃貨們
        c1=Process(target=consumer,args=(q,))
        p1.start() #開始
        c1.start()
        print('主')  #可是這裏有個問題,while True的子進程並未結束,經過上面基於隊列的生產者消費者代碼示例,咱們發現一個問題:主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。
        
     #改進版本,解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環:子進程生產者在生產完畢後發送結束信號None。
    from multiprocessing import Process,Queue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            if res is None:break #收到結束信號則結束
            time.sleep(random.randint(1,3))
            print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
    def producer(q):
        for i in range(1,5):
            time.sleep(random.randint(1,3))
            res='包子%s' %i
            q.put(res)
            print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
        q.put(None) #在本身的子進程的最後加入一個結束信號
    if __name__ == '__main__':
        q=Queue()
        #生產者們:即廚師們
        p1=Process(target=producer,args=(q,))
        #消費者們:即吃貨們
        c1=Process(target=consumer,args=(q,))
        #開始
        p1.start()
        c1.start()
        print('主')
    #注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號
    import time
    import random
    from multiprocessing import Process,Queue
    def producer(q):
        for i in range(1,10): #不能是 put(0) 就會直接結束一個進程由於if not food
            time.sleep(random.random())
            food = '泔水%s'%i
            print('%s生產了%s'%('taibai',food))
            q.put(food)
    def consumer(q,name):
        while True:
            food = q.get()   # food = 食物/None
            if not food : break
            time.sleep(random.uniform(1,2))
            print('%s 吃了 %s'%(name,food))
    if __name__ == '__main__':
        q = Queue()
        p1 = Process(target=producer,args=(q,))
        p1.start()
        c1 = Process(target=consumer,args=(q,'alex'))
        c1.start()
        c2 = Process(target=consumer,args=(q,'wusir'))
        c2.start()
        p1.join()  #必須等待p1執行完
        q.put(None) # 這裏須要注意的是,又有幾個消費者(while True)就要發幾回None,不然會出現孤兒進程
        q.put(None)
    #但上述解決方式,在有多個生產者和多個消費者時,因爲隊列咱們說了是進程安全的,我一個進程拿走告終束信號,另一個進程就拿不到了,還須要多發送一個結束信號,有幾個取數據的進程就要發送幾個結束信號,咱們則須要用一個很low的方式去解決。
    from multiprocessing import Process,Queue
    import random
    import time
    def producer(name,q):
        for i in range(1,7):
            # 處理數據延遲
         &nbnbsp;  time.sleep(random.random())
            data= 'data %s'%i
            q.put(data)
            print('生產者 %s 生產了數據: %s'%(name,data))
    def consumer(name,q):
        while 1:
            data= q.get()
            if data is None:  #當接受到None是跳出循環
                print('\033[31;1m %s 消費完了 \033[0m'%name)
                break
            print('消費者 %s 收到數據: %s'%(name,data))
    if __name__ =='__main__':
        q=Queue()
        pro_name=['alex','wusir','wang']
        con_name=['aaa','bb','c','ddd']
        pro_lis=[]
        for i in pro_name:  # 將全部的生產者寫入列表,而且啓動
            p=Process(target=producer,args=(i,q))
            p.start()
            pro_lis.append(p)
        for i in con_name:  # 啓動消費者的進程
            p=Process(target=consumer,args=(i,q))
            p.start()
        for i in pro_lis:  # 等待每一個進程阻塞,知道結束完
            i.join()
        for i in range(len(con_name)):  #發送四個None結束相應的消費者
            q.put(None)
     

     

4、JoinableQueue 隊列

  •  
     
     
    xxxxxxxxxx
     
     
     
     
    JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
    #參數介紹:
        maxsize是隊列中容許最大項數,省略則無大小限制。    
      #方法介紹:
        JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
        q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
        q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止,也就是隊列中的數據所有被get拿走了。
     

1555420266275

  • 使用JoinableQueue實現生產消費者模型

    from multiprocessing import Process,JoinableQueue
    import time,random,os
    def consumer(q):
        while True:
            res=q.get()
            # time.sleep(random.randint(1,3))
            time.sleep(random.random())
            print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
            q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走並執行完了,減一
    
    def producer(name,q):
        for i in range(10):
            # time.sleep(random.randint(1,3))
            time.sleep(random.random())
            res='%s%s' %(name,i)
            q.put(res)
            print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
        print('%s生產結束'%name)
        q.join() #生產完畢,使用此方法進行阻塞,直到隊列中全部項目均被處理。
        print('%s生產結束~~~~~~'%name)
    
    if __name__ == '__main__':
        q=JoinableQueue()
        #生產者們:即廚師們
        p1=Process(target=producer,args=('包子',q))
        p2=Process(target=producer,args=('骨頭',q))
        p3=Process(target=producer,args=('泔水',q))
    
        #消費者們:即吃貨們
        c1=Process(target=consumer,args=(q,))
        c2=Process(target=consumer,args=(q,))
        c1.daemon=True #若是不加守護,那麼主進程結束不了,可是加了守護以後,必須確保生產者的內容生產完而且被處理完了,全部必須還要在主進程給生產者設置join,才能確保生產者生產的任務被執行完了,而且可以確保守護進程在全部任務執行完成以後才隨着主進程的結束而結束。
        c2.daemon=True
    
        #開始
        p_l=[p1,p2,p3,c1,c2]
        for p in p_l:
            p.start()
    
        p1.join() #我要確保你的生產者進程結束了,生產者進程的結束標誌着你生產的全部的人任務都已經被處理完了
        p2.join()
        p3.join()
        print('主')
        
        # 主進程等--->p1,p2,p3等---->c1,c2
        # p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據
        # 於是c1,c2也沒有存在的價值了,不須要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,因此設置成守護進程就能夠了。
    

    1555421072044

5、管道

  • 進程間通訊(IPC)方式二:管道(不推薦使用,瞭解便可),會致使數據不安全的狀況出現,後面咱們會說到爲何會帶來數據 不安全的問題。

    • 管道介紹:

      #建立管道的類:
      Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
      #參數介紹:
      dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
      #主要方法:
          conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
          conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
       #其餘方法:
      conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
      conn1.fileno():返回鏈接使用的整數文件描述符
      conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
       
      conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
      conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
       
      conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
      
    • 管道的使用

      from multiprocessing import Process, Pipe
      def f(conn):
          conn.send("Hello 妹妹") #子進程發送了消息
          conn.close()
      
      if __name__ == '__main__':
          parent_conn, child_conn = Pipe() #創建管道,拿到管道的兩端,雙工通訊方式,兩端均可以收發消息
          p = Process(target=f, args=(child_conn,)) #將管道的一段給子進程
          p.start() #開啓子進程
          print(parent_conn.recv()) #主進程接受了消息
          p.join()
      

      img

    • 應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起(就是阻塞)。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道的相同一端就會能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。

      • 如下會觸發EOFError報錯

        from multiprocessing import Process, Pipe
        
        def f(parent_conn,child_conn):
            #parent_conn.close() #不寫close將不會引起EOFError
            while True:
                try:
                    print(child_conn.recv())
                except EOFError:
                    child_conn.close()
                    break
        
        if __name__ == '__main__':
            parent_conn, child_conn = Pipe()
            p = Process(target=f, args=(parent_conn,child_conn,))
            p.start()
            child_conn.close()
            parent_conn.send('hello')
            parent_conn.close()
            p.join()    
            
         #主進程將管道的兩端都傳送給子進程,子進程和主進程共用管道的兩種報錯狀況,都是在recv接收的時候報錯的:
            1.主進程和子進程中的管道的相同一端都關閉了,出現EOFError;
            2.若是你管道的一端在主進程和子進程中都關閉了,可是你還用這個關閉的一端去接收消息,那麼就會出現OSError;
            因此你關閉管道的時候,就容易出現問題,須要將全部只用這個管道的進程中的兩端所有關閉才行。固然也能夠經過異常捕獲(try:except EOFerror)來處理。
            雖然咱們在主進程和子進程中都打印了一下conn1一端的對象,發現兩個再也不同一個地址,可是子進程中的管道和主進程中的管道仍是能夠通訊的,由於管道是同一套,系統可以記錄
        
相關文章
相關標籤/搜索