進程間通訊python
IPC算法
隊列安全
1,正常隊列併發
from queue import Queue
先進先出FIFO ----維護秩序時用的比較多
q = Queue() ##q是一個隊列
print(q)
q.put(12) ##q.put 是往q裏面放東西
print(q.get()) ## q.get() 是拿q裏面的東西
print(q.qsize()) ##隊列的長度
棧:先進後出 ----算法用的比較多
用棧實現三級菜單
計算文件夾的總大小
print(q.get()) ##沒有值得時候回阻塞
print(q.get_nowait()) ##當有值的時候取值
print(q.get_nowait()) #沒有值的時候會報錯
進程中的隊列
from multiprocessing import Queue
建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞
q.empty()
判斷隊列是否爲空,返回布爾值
q.full()
判斷隊列是否滿了,返回布爾值
主進程放,子進程取
from multiprocessing import Queue,Process
def con(q):
print(q.get())
if __name__ == '__main__':
q = Queue()
p = Process(target=con,args=(q,))
p.start()
q.put(123)
獲得打印結果 : 123
子進程放,另外一個子進程取
from multiprocessing import Queue,Process
def con(q):
print(q.get())
def pro(q):
q.put(123)
if __name__ =='__main__':
q = Queue()
p = Process(target=con,args=(q,))
p.start()
p= Process(target=pro,args=(q,))
p.start()
生產者消費者模型
生產者消費者模型 --解決創造(生產)數據和處理(消費)數據的效率不平衡問題
把創造數據 和 處理數據放在不一樣的進程中,
根據他們的效率在調整進程的個數
生產數據快,消費數據慢,內存空間的浪費
消費數據快,生產數據慢,效率低下
import time
import random
from multiprocessing import Process,Queue
def consumer(q,name):
while True:
food = q.get()
if food == 'stop':break
print('%s 吃了 %s'%(name,food))
time.sleep(random.random())
def producer(q,name,food,n=10):
for i in range(n):
time.sleep(random.random())
fd = food+str(i)
print('%s 生產了 %s' %(name,fd))
q.put(fd)
if __name__ =='__main__':
q = Queue(10)
c1 = Process(target=consumer,args=(q,'alex'))
c1.start()
c2 = Process(target=consumer,args=(q,'alex'))
c2.start()
p1 = Process(target=producer,args=(q,'太白','泔水'))
p1.start()
p2 = Process(target=producer,args=(q,'egon','魚刺'))
p2.start()
p1.join()
p2.join()
q.put('stop')
q.put('stop')
讓consumer停下來的方法
在全部生產者結束生產以後,向隊列中放入一個結束符
有幾個consumer就向隊列中放幾個結束符
在消費者消費的過程當中,接收到結束符,就結束消費的進程
Joinable Queue
建立可鏈接的共享進程隊列,這就像是一個Queue對象,但隊列潤許項目的使用者通知生產者項目已被
成功處理.通知進程是使用共享信號和條件變量來實現的
import time
import random
from multiprocessing import JoinableQueue,Process
def consumer(q,name):
while True:
food = q.get()
print('%s 吃了 %s'%(name,food))
time.sleep(random.random())
q.task_done()
def producer(q,name,food,n=10)
for i in range(n):
time.sleep(random.random())
fd = food+str(i)
print('%s 生產了 %s'%(name,fd))
q.put(fd)
q.join()
if __name__ =='__main__':
q = JoinableQueue()
c1 = Process(target=consumer,args=(q,'alex'))
c1.daemon = True
c1.start()
c2 = Process(target=consumer,args=(q,'alex'))
c2.daemon = True
c2.start()
p1 = Process(target=producer,args=(q,'太白','泔水'))
p1.start()
p2 = Process(target=producer,args=(q,'egon','魚刺'))
p2.start()
p1.join()
p2.join()
只有multiprocessing 中的隊列 才能幫助你 實現 IPC
永遠不可能出現數據不安全的狀況,多個進程不會同時取走同一個數據
提供給的方法
put
get
put_nowwait
get_nowwait
empty --- 在多進程內不可靠
full --- 在多進程內不可能
qsize --- 在多進程內不可靠
有圖先進先出的特色 + 進程通訊的功能 +數據進程安全,常常用它來完成進程之間的通訊
生產者消費者模型
生產者和消費者的效率平衡的問題
內存的控制 - 隊列的長度限制
讓消費者自動停下來
joinableQueue
在消費數據的時候 task_done
在生產端\主進程 join
管道
隊列就是基於管道實現的
隊列 數據安全的
管道 數據不安全的
隊列 = 管道 + 鎖
#管道初使用
from multiprocessing import Pipe
left,right = Pipe()
left.send('aaa')
print(right.recv())
#建立管道的類 pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道 #參數介紹: dumplex:默認管道是全雙工的,若是將duplex射程False,conn1只能用於接收,conn2只能用於發送 #主要方法: conn1.recv():接收conn2.send(obj) 發送的對象.若是沒有消息可接收,recv方法會一直阻塞.若是鏈接的另一端已經關閉,那麼recv方法會跑出EOFError. conn1.send(obj);經過鏈接發送對象.obj是與序列化兼容的任意對象 #其餘方法: conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回鏈接使用的整數文件描述符 conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
##EOFError## #在這一個進程中 若是不在用這個斷電了,應該close #這樣在recv的時候,若是其餘斷電都被關閉了,就可以知道不會再有新的消息傳進來 #此時就不會在這裏阻塞等待,而是跑出一個EOFError # close 並非關閉了整個管道,而是修改了操做系統對管道端點的引用計數的處理 from multiprocessing import Pipe,Process def consumer(left,right): left.close() while True: try: print(right.recv()) except EOFError: break if __name__ == '__main__': left,right = Pipe() p = Process(target=consumer,args=(left,right)) p.start() right.close() for i in range(10): left.send('hello') left.close()
管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生成EOFError異常app
from multiprocessing import Process,Pipe def consumer(p,name): produce, consume=p produce.close() while True: try: baozi=consume.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: break def producer(p,seq=10): produce, consume=p consume.close() for i in range(seq): produce.send(i) if __name__ == '__main__': produce,consume=Pipe() for i in range(5): c=Process(target=consumer,args=((produce,consume),'c1')) c.start() for i in range(5): p = Process(target=producer, args=((produce, consume))) p.start() producer((produce,consume)) produce.close() consume.close()
進程之間的數據共享dom
子進程能夠用住進程裏面的東西,想用就用,不想用就不用異步
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不枷鎖的話 容易形成數據錯亂 d['count']-=1 if __name__ == '__main__': lock = Lock() m = Manager() dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
進程池async
進程池的概念:ide
定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿池中的進程來處理任務,等處處理完畢,進程並不關閉函數
,而是將進程再放會進程池中繼續等待任務.若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程
執行完畢,才能繼續執行,就是說,池中的進程數量是固定的,那麼同一時間最多有固定數量的進程在執行,這樣不會增長
系統的調度難度,還節省開閉進程的時間,也必定程度上可以實現併發效果.
何時用進程池?
面向高計算型的場景,採用多進程
若是開啓的進程數超過5個
有幾個CPU就可以同時運行幾個進程
進程的弊端:開啓和關閉以及切換都會帶來很大的時間開銷過多的進程還會形成操做系統調度的壓力
import os import time from multiprocessing import Pool def func(i): time.sleep(0.4) print(os.getpid(),i) if __name__ =='__main__': p = Pool(5) #有5個進程數量 for i in range(20): #20個等待執行 p.apply_async(func,args= (i,)) p.close() p.join()
同步和異步 (進程池)
##同步請求的
# import os
# import time
# from multiprocessing import Pool
# ##同步請求的
# def wahaha():
# time.sleep(1)
# print(os.getpid())
# return True
#
# if __name__ =='__main__':
# p = Pool(5)
# ret_l = []
# for i in range(10):
# ret = p.apply(func = wahaha)
# print(ret)
# ##異步提交,不獲取返回值
# import time
# import os
# from multiprocessing import Process,Pool
# def wahaha():
# time.sleep(1)
# print(os.getpid())
#
# if __name__ =='__main__':
# p = Pool(5) ##cpu的個數 ,或者 +1
# ret_l = []
# for i in range(20):
# ret = p.apply_async(func = wahaha) ##async 異步的
# ret_l.append(ret)
# p.close() ##關閉,進程池中的進程不工做了
# ##而是關閉了進程池,讓人物不能再繼續提交了
# p.join() ##等待這個池中提交的人物都執行完
# ##表示等待全部子進程中的代碼都執行完, 主進程才結束
# ##異步提交,獲取返回值,等待全部任務都執行完畢以後再統一獲取結果(獲取值,所有獲取完之後得到返回值)
# import time
# import os
# from multiprocessing import Process,Pool
# def wahaha():
# time.sleep(1)
# print(os.getpid())
# return True
#
# if __name__=='__main__':
# p = Pool(5) ##cpu的個數,或者+1
# ret_l = []
# for i in range(20):
# ret = p.apply_async(func = wahaha) ##async 異步的
# ret_l.append(ret)
# p.close() ##關閉,進程池中的進程不工做了
# ##而是關閉了進程池,讓人物不能再繼續提交了
# p.join() ## 等待這個池中提交的任務都執行完
# for ret in ret_l:
# print(ret.get())
##異步提交,獲取返回值,一個任務執行完畢以後就能夠獲取到一個結果
import time
import os
from multiprocessing import Process,Pool
def wahaha():
time.sleep(1)
print(os.getpid())
return True
if __name__ == '__main__':
p = Pool(5)
ret_l = []
for i in range(20):
ret = p.apply_async(func=wahaha) #async 異步的
ret_l.append(ret)
for ret in ret_l:
print(ret.get())
##異步的 apply_async
##1.若是是異步的提交任務,那麼任務提交以後進程池和主進程也異步了
# 主進程不會自動等待進程池中的任務執行完畢
##2.若是須要主進程等待,須要p.join
# 可是join的行爲是依賴close
##3.若是這個函數是有返回值的
## 也能夠經過ret.get()來獲取返回值
## 可是若是一邊提交一邊獲取返回值會讓程序變成同步的
## 因此要想保留異步的效果,應該講返回對象保存在列表裏,全部任務提交完成以後再來取結果
## 這種方式也能夠去掉join,來完成主進程的阻塞等待池中的任務執行完畢
###############回調函數##########
##回調函數 _ 在主進程中執行
##在發起任務的時候 指定callback參數
##在每一個進程執行完apply_async任務以後,返回值會直接做爲參數傳遞給callback的函數,執行callback
##函數中的代碼
import os
import time
import random
from multiprocessing import Pool
def wahaha(num):
time.sleep(random.random())
print('pid:',os.getpid(),num)
return num
def back(arg):
print('call_back:',os.getpid(),arg)
if __name__=='__main__':
print('主進程',os.getpid())
p = Pool(5)
for i in range(20):
ret = p.apply_async(func = wahaha,args = (i,),callback=back) ##async 異步的
p.close()
p.join()
總結
進程進程三狀態同步異步阻塞非阻塞 請解釋異步非阻塞 給開發完成的全部裝飾器+log是計算機中最小的資源分配單位進程的建立 Process進程之間的異步 自己子進程主進程之間都是異步的進程之間的同步控制 Lock Semaphore Event進程之間的數據隔離 自己進程與進程之間都是數據隔離的進程之間通訊 IPC 管道 隊列數據共享 Manager進程池 -能夠獲取返回值 同步調用 - 基本不用的 異步調用 - 重要的 apply_async get獲取結果 close join 回調函數 Pool 回調函數在主進程中執行 apply_async(func = wahaha,callback = back)