*****
模型: 就是解決某個問題套路
生產者: 指的是產生數據的一方 (一段代碼)
消費者: 指的是處理數據的一方 (一段代碼)python
生活中處處都是這種模型dom
例如:飯店 廚師就是生產者 吃飯的人就是消費者函數
例如: 先爬取網頁數據(生產) 在解析網頁數據 (消費)網站
# 消費任務 def eat(food): for i in range(10): # 要消費 time.sleep(random.randint(0, 2)) print(food,"吃完了!") # 生產任務 def make_rose(): for i in range(10): # 再生產 time.sleep(random.randint(0, 2)) print("第%s盤青椒肉絲製做完成!" % i) rose = "第%s盤青椒肉絲" % i eat(rose) # 直接調用消費任務 # 開啓任務 make_rose()
生產者和消費,處理速度不平衡,一方快一方慢,致使一方須要等待另外一方 總體效率低下線程
本來,雙方是耦合 在一塊兒,消費必須等待生產者生成完畢在開始處理, 反過來翻譯
若是消費消費速度太慢,生產者必須等待其處理完畢才能開始生成下一個數據code
1.將雙方分開來.一方專門負責生成,一方專門負責處理隊列
這樣一來數據就不能直接傳遞了 由於消費者可能尚未處理完成,爲了使生產者能夠不斷的生成,則須要一個共同的容器進程
2.生產者完成後放入容器,消費者從容器中取出數據ip
這樣就解決了雙方能力不平衡的問題,作的快的一方能夠繼續作,不須要等待另外一方
案例:
def eat(q): for i in range(10): # 要消費 rose = q.get() time.sleep(random.randint(0, 2)) print(rose,"吃完了!") # 生產任務 def make_rose(q): for i in range(10): # 再生產 time.sleep(random.randint(0, 2)) print("第%s盤青椒肉絲製做完成!" % i) rose = "第%s盤青椒肉絲" % i # 將生成完成的數據放入隊列中 q.put(rose) if __name__ == '__main__': # 建立一個共享隊列 q = Queue() make_p = Process(target=make_rose,args=(q,)) eat_p = Process(target=eat,args=(q,)) make_p.start() eat_p.start()
可翻譯:爲可join的隊列
該隊列相比普通的Queue的區別在於該對列額外增長的了join函數
該函數爲阻塞函數,會阻塞直到等待隊列中全部數據都被處理完畢。
q = JoinableQueue() q.put(1) q.get() q.join() #阻塞 等待隊列中全部數據都被處理完畢 print("over")
執行以上函數,將致使進程沒法結束,註釋掉join調用就正常,發現join的確有阻塞的效果,
可是隊列中一共就一個數據,明明已經調用get取出了,爲何join依然阻塞呢?
這是由於get僅僅是取出數據,而join是等待數據處理完畢,也就是說:
取出數據還不算完,你處理完之後必須告知隊列處理完畢,經過task_done
q = JoinableQueue() q.put(1) q.get() q.task_done() # 數據處理完畢 q.join() #阻塞 等待隊列中全部數據都被處理完畢 print("over") #輸出: # over
須要注意的時,task_done的調用次數必大於等於隊列中的數據個數,join才能正常結束阻塞
q = JoinableQueue() q.put(1) q.put(1) q.get() q.task_done() # 數據處理完畢 q.join() #阻塞 等待隊列中全部數據都被處理完畢 print("over") #輸出: # over
主進程能夠明確知道隊列中的數據什麼時候被處理完畢
回顧以前的生產者消費者模型中,生產者與消費者都明確要處理的數據數量,可是實際開發中不少狀況是沒法提早明確的,例如:要爬去一個網站上的全部頁面,頁面數量數不固定的
from multiprocessing import Process,JoinableQueue,Queue import time,random def producter(name,q): for i in range(5): time.sleep(random.randint(1,2)) print("\033[46m%s生產了 熱狗%s\033[0m" % (name,i)) q.put("%s的 熱狗%s" % (name,i)) def customer(name,q): while True: time.sleep(random.randint(1, 2)) hot_dog = q.get() print("\033[47m%s 吃掉了 %s \033[0m" % (name,hot_dog)) if __name__ == '__main__': q = Queue() p1 = Process(target=producter,args=("北京🌭店",q)) p2 = Process(target=producter,args=("上海🌭店",q)) p3 = Process(target=producter, args=("深圳🌭店", q)) p1.start() p2.start() p3.start() c1 = Process(target=customer,args=("王思聰",q)) c1.start()
上述代碼沒法正常運行結束,是由於消費者進程中不清楚處理是否處理完成,因此一直在循環等待數據。
此時咱們就可使用joinablequeue隊列來讓主進程獲取生成者進程是否生成完畢的信號從而結束子進程
from multiprocessing import Process,JoinableQueue,Queue # q = JoinableQueue() # # q.put(1) # q.put(1) # # q.get() # q.task_done() # # # q.join() #阻塞 等待隊列中全部數據都被處理完畢 # print("over") import time,random def producter(name,q): for i in range(5): time.sleep(random.randint(1,3)) print("\033[46m%s生產了 熱狗%s\033[0m" % (name,i)) q.put("%s的 熱狗%s" % (name,i)) def customer(name,q): while True: time.sleep(random.randint(1, 2)) hot_dog = q.get() print("\033[47m%s 吃掉了 %s \033[0m" % (name,hot_dog)) # 一個數據處理完畢 q.task_done() if __name__ == '__main__': # q = Queue() q = JoinableQueue() p1 = Process(target=producter,args=("北京🌭店",q)) p2 = Process(target=producter,args=("上海🌭店",q)) p3 = Process(target=producter, args=("深圳🌭店", q)) p1.start() p2.start() p3.start() c1 = Process(target=customer,args=("王思聰",q)) c1.daemon = True # 使子進程跟隨主進程結束 c1.start() # 等待生產者進程所有生成完畢 p1.join() p2.join() p3.join() # 等待全部數據所有處理完畢 q.join() # 終止子進程 也能夠開啓子進程前將子進程設置爲守護進程來結束子進程 # c1.terminate()
進程池:
進程池與線程池使用方法徹底一致,放到線程池一塊兒講