from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() 上鎖 print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) lock.release() 解鎖 if __name__ == '__main__': lock=Lock() 造鎖(實例化一個鎖) for i in range(3): p=Process(target=work,args=(lock,)) p.start()
三臺電腦同一時刻共同調用打印機完成打印任務,即三個進程同一時刻共搶同一個資源(輸出平臺)json
多個進程共搶一個資源,應結果第一位,效率第二位,因此應該犧牲效率,保求結果(串行)安全
以上的代碼雖然完成串行結果,但沒有實現公平,執行的順序都是人爲寫好的,應該作到公平的搶佔資源,誰先搶到就執行誰網絡
from multiprocessing import Process from multiprocessing import Lock import time import random def task1(lock): print('task1') # 驗證cpu遇到io切換了 lock.acquire() print('task1: 開始打印') time.sleep(random.randint(1, 3)) print('task1: 打印完成') lock.release() def task2(lock): print('task2') # 驗證cpu遇到io切換了 lock.acquire() print('task2: 開始打印') time.sleep(random.randint(1, 3)) print('task2: 打印完成') lock.release() def task3(lock): print('task3') # 驗證cpu遇到io切換了 lock.acquire() print('task3: 開始打印') time.sleep(random.randint(1, 3)) print('task3: 打印完成') lock.release() if __name__ == '__main__': lock = Lock() p1 = Process(target=task1, args=(lock,)) p2 = Process(target=task2, args=(lock,)) p3 = Process(target=task3, args=(lock,)) p1.start() p2.start() p3.start()
上鎖時必定要給進程上同一把鎖,並且上鎖一次,就必定要解鎖一次併發
都是完成了進程之間的串行dom
join是經過人爲控制使進程串行ide
互斥鎖是隨機搶佔資源,保證了公平性ui
業務需求分析: 買票以前先要查票,必須經歷的流程: 你在查票的同時,100我的也在查本此列票. 買票時,你要先從服務端獲取到票數,票數>0 ,買票,而後服務端票數減一. 中間確定有網絡延遲. from multiprocessing import Process from multiprocessing import Lock import time import json import os import random 多進程原則上是不能互相通訊的,它們在內存級別數據隔離的.不表明磁盤上數據隔離. 它們能夠共同操做一個文件. def search(): time.sleep(random.random()) with open('db.json',encoding='utf-8') as f1: dic = json.load(f1) print(f'剩餘票數{dic["count"]}') def get(): with open('db.json',encoding='utf-8') as f1: dic = json.load(f1) time.sleep(random.randint(1,3)) if dic['count'] > 0: dic['count'] -= 1 with open('db.json', encoding='utf-8', mode='w') as f1: json.dump(dic,f1) print(f'{os.getpid()}用戶購買成功') else: print('沒票了.....') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(5): p = Process(target=task,args=(lock,)) p.start()
進程之間的通訊最好的方式是基於隊列spa
from multiprocessing import Process from multiprocessing import Queue import time import os import random # 多進程原則上是不能互相通訊的,它們在內存級別數據隔離的.不表明磁盤上數據隔離. # 它們能夠共同操做一個文件. def search(ticket): time.sleep(random.random()) print(f'剩餘票數{ticket}') def _get(q): dic = q.get() time.sleep(random.randint(1,3)) if dic['count'] > 0: dic['count'] -= 1 print(f'{os.getpid()}用戶購買成功') else: print('沒票了.....') q.put(dic) def task(ticket,q): search(ticket) _get(q) if __name__ == '__main__': q = Queue(1) q.put({'count': 1}) ticket = 1 for i in range(5): p = Process(target=task,args=(ticket,q)) p.start()
隊列是存在與內存中的一個容器,最大的特色:先進先出(FIFO)3d
利用隊列進行進程之間的通訊,簡單,方便,不用本身手動加鎖,隊列自帶優質阻塞,可持續化取數據code
from multiprocessing import Queue q = Queue(3) # 能夠設置元素個數 def func(): print('in func') q.put('alex') q.put({'count': 1}) q.put(func) q.put(666) # 當隊列數據已經達到上限,在插入數據的時候,程序就會夯住. 當你將數據所有取完繼續再取值時,程序也會夯住 put中有block參數 默認爲True 當你插入的數據超過最大限度,默認阻塞.改爲False 數據超過最大限度,不阻塞了直接報錯. put中有timeout參數 延時報錯,好比q.put(3,timeout = 3) 超過三秒再put不進,程序就會報錯 get也有這兩個參數
# 小米:搶手環4.預期發售10個. # 有100我的去搶. import os from multiprocessing import Queue from multiprocessing import Process def task(q): try: q.put(f'{os.getpid()}',block=False) except Exception: return if __name__ == '__main__': q = Queue(10) for i in range(100): p = Process(target=task,args=(q,)) p.start() for i in range(1,11): print(f'排名第{i}的用戶: {q.get()}',)
棧與隊列性質相似,特色與隊列相反:先進後出(FILO)
import queue q = queue.LifoQueue() q.put(1) q.put(3) q.put('barry') print(q.get()) print(q.get()) print(q.get()) print(q.get())
須要經過元組的形式放入,(int,數據) int 表明優先級,數字越低,優先級越高
import queue q = queue.PriorityQueue(3) q.put((10, '垃圾消息')) q.put((-9, '緊急消息')) q.put((3, '通常消息')) print(q.get()) print(q.get()) print(q.get())
生產者:生產數據進程
消費者:對生產者生產出來的數據作進一步處理進程
from multiprocessing import Process from multiprocessing import Queue import time import random def producer(name,q): for i in range(1,6): time.sleep(random.randint(1,3)) res = f'{i}號包子' q.put(res) print(f'\033[0;32m 生產者{name}: 生產了{res}\033[0m') def consumer(name,q): while 1: try: time.sleep(random.randint(1,3)) ret = q.get(timeout=5) print(f'消費者{name}: 吃了{ret}') except Exception: return if __name__ == '__main__': q = Queue() p1 = Process(target=producer, args=('alex',q)) p2 = Process(target=consumer, args=('barry',q)) p1.start() p2.start()
模型中有三個主體:生產者,消費者,容器隊列
合理的調控多個進程去生成數據以及提取數據,中間有個必不可少的環節就是容器隊列
若是沒有容器,生產者與消費者造成強耦合性,不合理,因此要有一個緩衝區(容器),平衡了生產力與消費力
基於文件 + 鎖 效率低,麻煩
基於隊列 推薦
基於管道 管道本身加鎖,底層可能會出現數據丟失損壞
多個進程搶佔一個資源
串行,有序以及數據安全
多個進程實現併發效果
生產者消費者模型