一:多進程方法彙總安全
# 多進程代碼 # from multiprocessing import Process # 方法 # 進程對象.start() 開啓一個子進程 # 進程對象.join() 感知一個子進程的結束 # 進程對象.terminate() 結束一個子進程 # 進程對象.is_alive() 查看某個子進程是否還在運行 # 屬性 # 進程對象.name 進程名 # 進程對象.pid 進程號 # 進程對象.daemon 值爲True的時候,表示新的子進程是一個守護進程 # 守護進程 隨着主進程代碼的執行結束而結束 # 必定在start以前設置 # from multiprocessing import Lock # l = Lock() # l.acquire() # 拿鑰匙 # 會形成數據不安全的操做 # l.release() # 還鑰匙
二:多進程補充知識:服務器
from multiprocessing import Process def func(): num = input('>>>') print(num) if __name__ == '__main__': Process(target=func).start() # 多進程中server端子進程不能直接input。不然報錯。
三:dom
信號量 —— multiprocess.Semaphore
互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。 假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待, 等到有人出來才能再進去玩。 實現: 信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時, acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。 信號量同步機制適用於訪問像服務器這樣的有限資源。 信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
理解:就好像四我的一塊兒去ktv,屋裏只能有四人,必須有人出去他才能出去。用代碼怎麼實現呢?函數
import time import random from multiprocessing import Process from multiprocessing import Semaphore
也能夠簡寫,如import tinme,random這樣 下面也能夠那樣簡寫。ui
def ktv(i,sem): sem.acquire() #獲取鑰匙 print('%s走進ktv'%i) time.sleep(random.randint(1,5)) print('%s走出ktv'%i) sem.release() #還鑰匙 if __name__ == '__main__' : sem = Semaphore(4) #總共鑰匙總數爲4 for i in range(20): p = Process(target=ktv,args=(i,sem)) p.start()
解讀:總共 sem = Semaphore(4) 表示總共鑰匙總數爲4,而後把他進程實例化的參數,傳到上面,上面才能用sem.acquire()拿鑰匙這個和 sem.release() 表示還鑰匙。spa
注意:sem = Semaphore() 什麼都不寫,默認一把鑰匙。也就是一我的走進,一個出去才能下一我的,至關於買火車票上鎖了。線程
2 什麼是事件?爲何要引入事件。server
當咱們須要經過一個信號來控制多個進程阻塞或者執行就引入事件。對象
代碼表示:from multiprocessing import Event隊列
怎樣建立事件?
e=Event() 當一個事件建立時,默認是阻塞狀態。
怎麼查看是不是阻塞狀態呢?
print(e.is_set()) 查看進程的狀態,默認是阻塞狀態。
怎麼改變他狀態呢?
e.set() 將事件改變爲非阻塞,也就是True
e.clear() 將事件改變爲阻塞,也就是False
e.wait()又是怎麼用的呢?
與e.is_set()連用,當判斷是阻塞狀態時,e.wait()起到阻塞做用,後面的代碼不執行。
也便是這樣:
# set 和 clear # 分別用來修改一個事件的狀態 True或者False # is_set 用來查看一個事件的狀態 # wait 是依據事件的狀態來決定本身是否在wait處阻塞 # False阻塞 True不阻塞
例題:紅綠燈
import time import random from multiprocessing import Event,Process def cars(e,i): if not e.is_set(): print('car%i在等待'%i) e.wait() # 阻塞 直到獲得一個 事件狀態變成 True 的信號 print('\033[0;32;40mcar%i經過\033[0m' % i) def light(e): while True: if e.is_set(): e.clear() print('\033[31m紅燈亮了\033[0m') else: e.set() print('\033[32m綠燈亮了\033[0m') time.sleep(2) if __name__ == '__main__': e = Event() traffic = Process(target=light,args=(e,)) traffic.start() for i in range(20): car = Process(target=cars, args=(e,i)) car.start() time.sleep(random.random())
解讀:
首先理解,燈和車是同時執行的,兩個進程相互不影響。
而後if e.is_set():
e.clear()
不執行紅燈,而是執行綠燈,下一行也是這樣。
打印結果:
綠燈亮了
car0經過
car1經過
car2經過
car3經過
紅燈亮了
car4在等待
car5在等待
car6在等待
car7在等待
car8在等待
綠燈亮了
car4經過
car7經過
car8經過
car5經過
car6經過
car9經過
car10經過
car11經過
紅燈亮了
car12在等待
car13在等待
綠燈亮了
car13經過
car12經過
car14經過
car15經過
car16經過
還在循環。
3 什麼是隊列?(先進先出)
實現多進程之間的通訊,也叫 IPC(Inter-Process Communication) 。
建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞 。
Queue([maxsize]) 建立共享的進程隊列。 參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。 底層隊列使用管道和鎖定實現。
方法介紹:
Queue([maxsize]) 建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。 另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 Queue的實例q具備如下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。 若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False, 將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。 超時後將引起Queue.Full異常。 q.qsize() 返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間, 隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。
例題2:
from multiprocessing import Queue,Process def produce(q): q.put('hello') 把hello放入隊列 def consume(q): print(q.get()) 返回隊列中的項 也就是得到hello if __name__ == '__main__': q = Queue() p = Process(target=produce,args=(q,)) p.start() c = Process(target=consume, args=(q,)) c.start()
打印結果:hello
4 生產者模型:使用queue模塊。
# 隊列 # 生產者消費者模型 # 生產者 進程 # 消費者 進程 import time import random from multiprocessing import Process,Queue def consumer(q,name): while True: food = q.get() if food is None: print('%s獲取到了一個空'%name) break print('\033[31m%s消費了%s\033[0m' % (name,food)) time.sleep(random.randint(1,3)) def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) f = '%s生產了%s%s'%(name,food,i) print(f) q.put(f) if __name__ == '__main__': q = Queue(20) p1 = Process(target=producer,args=('Egon','包子',q)) p2 = Process(target=producer, args=('wusir','泔水', q)) c1 = Process(target=consumer, args=(q,'alex')) c2 = Process(target=consumer, args=(q,'jinboss')) p1.start() p2.start() c1.start() c2.start() p1.join() p2.join() q.put(None) q.put(None)
升級版本:模塊JoinableQueue
import time import random from multiprocessing import Process,JoinableQueue 用的這個 def consumer(q,name): while True: food = q.get() print('\033[31m%s消費了%s\033[0m' % (name,food)) time.sleep(random.randint(1,3)) q.task_done() # count - 1 def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) f = '%s生產了%s%s'%(name,food,i) print(f) q.put(f) q.join() # 阻塞 直到一個隊列中的全部數據 所有被處理完畢 if __name__ == '__main__': q = JoinableQueue(20) p1 = Process(target=producer,args=('Egon','包子',q)) p2 = Process(target=producer, args=('wusir','泔水', q)) c1 = Process(target=consumer, args=(q,'alex')) c2 = Process(target=consumer, args=(q,'jinboss')) p1.start() p2.start() c1.daemon = True # 設置爲守護進程 主進程中的代碼執行完畢以後,子進程自動結束 c2.daemon = True c1.start() c2.start() p1.join() p2.join() # 感知一個進程的結束 # 在消費者這一端: # 每次獲取一個數據 # 處理一個數據 # 發送一個記號 : 標誌一個數據被處理成功 # 在生產者這一端: # 每一次生產一個數據, # 且每一次生產的數據都放在隊列中 # 在隊列中刻上一個記號 # 當生產者所有生產完畢以後, # join信號 : 已經中止生產數據了 # 且要等待以前被刻上的記號都被消費完 # 當數據都被處理完時,join阻塞結束 # consumer 中把全部的任務消耗完 # producer 端 的 join感知到,中止阻塞 # 全部的producer進程結束 # 主進程中的p.join結束 # 主進程中代碼結束 # 守護進程(消費者的進程)結束