1、生產者消費者程序員
主要是爲解耦(藉助隊列來實現生產者消費者模型)安全
import queue # 不能進行多進程之間的數據傳輸app
(1)from multiprocessing import Queue 藉助Queue解決生產者消費者模型,隊列是安全的。異步
q = Queue(num)async
num :爲隊列的最大長度函數
q.get() # 阻塞等待獲取數據,若是有數據直接獲取,若是沒有數據,阻塞等待spa
q.put() # 阻塞,若是能夠繼續往隊列中放數據,就直接放,不能放就阻塞等待code
q.get_nowait() # 不阻塞,若是有數據直接獲取,沒有數據就報錯對象
q.put_nowait() # 不阻塞,若是能往隊列中放數據直接放,不能夠就報錯blog
(2)from multiprocessing import JoinableQueue # 可鏈接的隊列
JoinableQueue 是繼承Queue ,因此能夠使用Queue中的方法
而且JoinableQueue 又多了兩個方法
q.join() # 用於生產者。等待q.task_done的返回結果,經過返回結果,生產者就能得到消費者當前消費了多少個數據。
q.task_done() # 用於消費者,是指每消費隊列中的一個數據,就給join返回一個標識。
1 from multiprocessing import Queue,Process,Pool,JoinableQueue 2
3
4 def consumer(q,lis): 5 while 1: 6 for i in lis: 7 print(i + '拿走了' + q.get()) 8 q.task_done() # get() 一次就會給生產者的join返回一次數據
9
10
11 def producer(q,name1): 12 for i in range(1,9): 13 q.put(name1 + '第%s號劍'% i) 14 q.join() # 記錄了生產者往隊列中添加了8個數據,此時會阻塞,等待消費返回8次數據,後生產者進程纔會結束
15
16
17 if __name__ == '__main__': 18 q = JoinableQueue() # 實例化一個隊列
19 p = Process(target=consumer,args=(q,['蓋聶','衛莊','高漸離','勝七','掩日'])) 20 p1 = Process(target=producer,args=(q,'越王八劍')) 21 p.daemon = True # 注意是把消費者設置爲守護進程,會隨着主進程的結束而結束。
22 p.start() 23 p1.start() 24 p1.join() # 主進程會等待生產者進程結束後才結束,而生產者進程又會等待消費者進程消費完之後才結束。
2、進程之間的共享內存
from multiprocessing import Manager,Value
m = Manager()
num = m.dict({鍵 :值})
num = m.list([1,2,3])
from multiprocessing import Process,Manager,Value def func(num): for i in num: print(i - 1) # 結果爲:0,1,2
if __name__ == '__main__': m = Manager() # 用來進程之間共享數據的
num = m.list([1,2,3]) p = Process(target=func,args=(num,)) p.start() p.join() # 等待func子進程執行完畢後結束
#################Value################
from multiprocessing import Process,Manager,Value def func1(num): print(num) num.value += 1 # 和Manager用法不同
print(num.value) if __name__ == '__main__': num = Value('i',123) # Manager裏面不須要傳參數
p = Process(target=func1,args=(num,)) p.start() p.join()
3、進程池
進程池:一個池子,裏邊有固定數量的進程。這些進程一直處於待命狀態,一旦有任務來,立刻就去處理。
進程池還會幫程序員去管理池中的進程。
from multiprocessing import Pool
p = Pool(os.cpu_count() + 1)
進程池有三個方法:
map(func,iterable) 有返回值
iterable:可迭代對象,是把可迭代對象中的每一個元素一次傳給任務函數當參數
from multiprocessing import Pool def func(num): num += 1
print(num) return num # 返回給map方法
if __name__ == '__main__': p = Pool() res = p.map(func,[i for i in range(10)]) # 參數爲目標對象和可迭代對象
p.close() p.join() # 等待子進程結束
print('主進程',res) # res是一個列表
apply(func,args=()) :apply的實現是進程之間是同步的,池中的進程一個一個的去執行。
func :進程池中的進程執行的任務函數。
args :可迭代對象型的參數,是傳給任務函數的參數。
同步處理任務時,不須要close和join
同步處理任務時,進程池中的全部進程是普通進程(主進程須要等待其子進程執行結束)
from multiprocessing import Pool def func(num): num += 1
return num if __name__ == '__main__': p = Pool(5) # 實例化5個進程
for i in range(100): res = p.apply(func,args=(i,)) # 這裏傳的參數是元祖,這裏是同步執行
print(res)
apply_async(func,args=(),callback=None) :進城之間是異步的,
func :進程池中的進程執行的任務函數。
args :可迭代對象型的參數,是傳給任務函數的參數
from multiprocessing import Pool def func(num): num += 1 return num if __name__ == '__main__': p = Pool(5) # 實例化5個進程 lis = [] for i in range(100): res = p.apply_async(func,args=(i,)) # 異步執行,5個進程同時去調用func lis.append(res) print(res) # 打印結果爲 <multiprocessing.pool.ApplyResult object at 0x0347F3D0> p.close() # Pool中用apply_async異步執行時必須關閉進程 p.join() # 由於是異步執行因此須要等待子進程結束 print(lis) # 100個<multiprocessing.pool.ApplyResult object at 0x0347F3D0> 這種存放在列表中 [print(i.get()) for i in lis] # 輸出100個數字[1......100]
callback :回調函數,就是說每當進程池中有進程處理完任務,返回的結果能夠交給回調函數,由回調函數進行進一步的處理,回調函數只有異步纔有,同步是沒有的
異步處理任務時,進程池中的全部進程是守護進程(主進程代碼執行完畢守護進程就結束)
異步處理任務時,必需要加上close和join
回調函數的使用:
進程的任務函數的返回值,被當成回調函數的形參接收到,以此進行進一步的處理操做
回調函數是由主進程調用的,而不是子進程,子進程只負責把結果傳遞給回調函數。
from multiprocessing import Pool import requests import os def func(ulr): res = requests.get(ulr) print('func進程的pid:%s' % os.getpid(),'父進程的pid:%s' % os.getppid()) if res.status_code == 200: return ulr,res.text def cal_back(sta): # func中返回的值被自動調用,並當成形參傳進來
ulr,text = sta print('callback回調函數的pid:%s'% os.getpid(),'父進程的pid:%s' % os.getppid()) # 回調函數的pid和父進程的pid同樣
if __name__ == '__main__': p = Pool(5) lis = ['https://www.baidu.com', 'http://www.jd.com', 'http://www.taobao.com', 'http://www.mi.com', 'http://www.cnblogs.com', 'https://www.bilibili.com', ] print('父進程的pid:%s' % os.getpid()) for i in lis: p.apply_async(func,(i,),callback=cal_back) # 異步的執行每個進程,這裏的傳參和Process不一樣,這裏必須這樣寫callback=cal_back
# 異步執行程序func,在每一個任務結束後,在func中return回一個結果,這個結果會自動的被callback函數調用,並當成形參來接收。
p.close() # 進程間異步必須加上close()
p.join() # 等待子進程的結束
4、管道機制
from multiprocessing import Pipe
con1,con2 = Pipe()
管道是不安全的
管道是用於多進程之間通訊的一種方式。
若是在單進程中使用管道,con1發數據,那麼就用con2來收數據
con2發數據,那麼就用con1來收數據
若是在多進程中使用管道,那麼就必須是父進程使用con1收,子進程就必須使用con2發
父進程使用con1發,子進程就必須使用con2收
父進程使用con2收,子進程就必須使用con1發
父進程使用con2發,子進程就必須使用con1收
在管道中有一個著名的錯誤叫作EOFError。是指,父進程若是關閉了發送端,子進程還繼續收數據,那麼就會引起EOFError。
# 單進程中管道的應用
from multiprocessing import Pipe con1,con2 = Pipe() # 管道機制
con1.send('123') # con1發送,須要con2來接收 是固定
print(con2.recv()) con2.send('456') # con2發送,須要con1來接收 是固定
print(con1.recv())
# 多進程中管道的應用
from multiprocessing import Process,Pipe def func(con): con1,con2 = con con1.close() # 由於子進程只用con2與父進程通訊,因此關閉了
while 1: try: print(con2.recv()) # 接收父進程con1發來的數據
except EOFError: # 若是父進程的con1發完數據,並關閉管道,子進程的con2繼續接收數據,就會報錯。
con2.close() # 當接到報錯,此時數據已經接收完畢,關閉con2管道。
break # 退出循環
if __name__ == '__main__': con1,con2 = Pipe() p = Process(target=func,args=(con1,con2)) p.start() con2.close() # 由於父進程是用con1來發數據的,con2提早關閉。
for i in range(10): # 生產數據
con1.send('郭%s' % i) # 給子進程的con2發送數據
con1.close() # 生產完數據,關閉父進程的con1管道