3七、進程之間的通訊

一、生產者消費者模型Queue:隊列(管道+鎖,雙向通訊)安全

    爲了解決數據供需不平衡的狀況(即生產者生產的多或者消費者消費的多形成的失衡),有一種解決的方法:子進程生產數據,子進程處理數據。app

生產者消費者模型:dom

  一、消費者要處理多少數據是不肯定的ide

  二、因此只能用while循環來處理數據,可是while循環沒法結束ui

  三、須要生產者發送信號spa

  四、有多少個消費者,就須要發送多少個信號操作系統

  五、可是發送的信號數量須要根據生產者和消費者的數量進行計算,因此很是不方便。   code

import time
import random
from multiprocessing import Process
from multiprocessing import Queue

def producer(q,food):
    for i in range(5):
        q.put('%s-%s'%(food,i))
        print('生產了%s'%(food))
        time.sleep(random.random())
    q.put(None)
    q.put(None)
    q.put(None)

def consumer(q,name):
    while True:
        food=q.get()
        if food == None:break
        print('%s吃了%s'%(name,food))


if __name__ =='__main__':
    q=Queue()
    p1=Process(target=producer,args=(q,'骨頭'))
    p1.start()
    p2=Process(target=producer,args=(q,'泔水'))
    p2.start()
    p3 = Process(target=consumer, args=(q, 'alex'))
    p3.start()
    p4 = Process(target=consumer, args=(q, 'egon'))
    p4.start()
    p5 = Process(target=consumer, args=(q, 'jin'))
    p5.start()
    

二、joinableQueue:順序爲:生產者生產的數據所有被消費------->生產者進程結束------->主進程執行代碼結束—-->消費者守護進程結束。blog

  put和get的一個計數機制,每次get數據以後發送task_done,put端接收到計數-1,直到計數爲0就能感知到。隊列

import time
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue
# put   q.join
# get  處理數據 task_done 消費完了
def producer(q,food):
    for i in range(5):
        q.put('%s-%s'%(food,i))
        print('生產了%s'%food)
        # time.sleep(random.randint(1,3))
        time.sleep(random.random())
    q.join()  # 等待 消費者 把全部的數據都處理完

def consumer(q,name):
    while True:
        food = q.get()   # 生產者不生產仍是生產的慢
        print('%s 吃了 %s'%(name,food))
        q.task_done()

if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,'泔水'))
    p1.start()
    p2 = Process(target=producer, args=(q, '骨頭'))
    p2.start()
    c1 = Process(target=consumer, args=(q, 'alex'))
    c1.daemon = True
    c1.start()
    c2 = Process(target=consumer, args=(q, 'egon'))
    c2.daemon = True
    c2.start()
    c3 = Process(target=consumer, args=(q, 'jin'))
    c3.daemon = True
    c3.start()

    p1.join()  # 等待p1執行完畢
    p2.join()  # 等待p2執行完畢

三、ipc(inter-process-commnication內部進程交流)——

  pipe(管道):支持雙向通訊,數據不安全。

  首先來看管道是怎麼用的

  

from multiprocessing import  Pipe
p1,p2=Pipe()
p1.send('hello')        #一個發送,一個接收
print(p2.recv())
print('————————————————————————')
p2.send('hihihi')
p2.close()              #關閉p2發送端
print(p1.recv())
#print(p1.recv())     #放開的話,繼續接收,會報錯,報錯類型:EOFError

管道在多進程中的做用:主進程中發送到管道,而後在子進程中接收。

應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。 

  

from multiprocessing import Process
from multiprocessing import Pipe,Lock
def func(p):
    foo,son=p
    foo.close()             #只用son,就把foo給關了
    while True:
        try:
            print(son.recv())
        except EOFError:        #異常處理
            break

if __name__ =='__main__':
    foo,son=Pipe()
    p=Process(target=func,args=((foo,son),))
    p.start()
    son.close()                #只用foo,把son給關了
    foo.send('hello')
    foo.send('hello')
    foo.send('hello')
    foo.send('heo')
    foo.close()

pipe實現消費者生產者模型:

  

  

 1 from multiprocessing import Process,Pipe,Lock
 2 
 3 def consumer(p,name,lock):
 4     produce, consume=p
 5     produce.close()
 6     while True:
 7         # lock.acquire()
 8         baozi=consume.recv()
 9         # lock.release()
10         if baozi:
11             print('%s 收到包子:%s' %(name,baozi))
12         else:
13             consume.close()
14             break
15 
16 def producer(p,n):
17     produce, consume=p
18     consume.close()
19     for i in range(n):
20         produce.send(i)
21     produce.send(None)
22     produce.send(None)
23     produce.close()
24 
25 if __name__ == '__main__':
26     produce,consume=Pipe()
27     lock = Lock()
28     c1=Process(target=consumer,args=((produce,consume),'c1',lock))
29     c2=Process(target=consumer,args=((produce,consume),'c2',lock))
30     p1=Process(target=producer,args=((produce,consume),10))
31     c1.start()
32     c2.start()
33     p1.start()
34 
35     produce.close()
36     consume.close()
37 
38     c1.join()
39     c2.join()
40     p1.join()
41     print('主進程')
pipe實現消費者生產者模型

 四、Manager:是一個類,提供了能夠進行數據共享的一個機制,提供了不少數據類型dict 、list、pipe(這些並不提供數據安全的支持)

from multiprocessing import Process,Manager,Lock
def work(dict,lock):
    lock.acquire()
    dict['count']-=1
    lock.release()

if __name__ =='__main__':
    lock=Lock()
    m=Manager()        #要用Manager定義dict
    dict=m.dict({'count':100})
    l=[]
    for i in range(100):
        p=Process(target=work,args=(dict,lock))
        p.start()
        l.append(p)
    [p.join() for p in l]
    print(dict)

 五、進程池:相似與有一個池子(工做間),把進程(員工)放進去,進行工做。接下來是發任務,而後循環對任務進行工做。(進程池這個功能已經被實現了)。

import os
import time
import random
from multiprocessing import Pool
from multiprocessing import Process
def func(i):
    i+=1

if __name__=='__main__':
    # p=Pool(5)                #一、帶#的是另外一種方法:用進程池
    # start=time.time()
    # p.map(func,range(1000))   #至關於target=func,args=next(iterable)
    # p.close()                 #是不容許再向進程池中添加任務
    # p.join()
    # print(time.time()-start)
    p = Process()                 #二、起進程的辦法,太佔內存,費時間:起進程的時候要分配資源,分配內存,耗費時間
    start=time.time()
    l=[]
    for i in range(100):
        p=Process(target=func,args=(i,))
        p.start()
        l.append(p)
    [i.join() for i in l]
    print(time.time()-start)
相關文章
相關標籤/搜索