信號量,事件,隊列

一:多進程方法彙總安全

# 多進程代碼
# from multiprocessing import Process
# 方法
    # 進程對象.start()     開啓一個子進程
    # 進程對象.join()      感知一個子進程的結束
    # 進程對象.terminate() 結束一個子進程
    # 進程對象.is_alive()  查看某個子進程是否還在運行
# 屬性
    # 進程對象.name        進程名
    # 進程對象.pid         進程號
    # 進程對象.daemon      值爲True的時候,表示新的子進程是一個守護進程
            # 守護進程 隨着主進程代碼的執行結束而結束
            # 必定在start以前設置


# from multiprocessing import Lock
# l = Lock()
# l.acquire()   # 拿鑰匙
# 會形成數據不安全的操做
# l.release()   # 還鑰匙

二:多進程補充知識:服務器

from multiprocessing import Process
def func():
    num = input('>>>')
    print(num)

if __name__ == '__main__':
    Process(target=func).start()

    # 多進程中server端子進程不能直接input。不然報錯。

 

三:dom

信號量 —— multiprocess.Semaphore
互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,
等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,
acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。
信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念

理解:就好像四我的一塊兒去ktv,屋裏只能有四人,必須有人出去他才能出去。用代碼怎麼實現呢?函數

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore

也能夠簡寫,如import tinme,random這樣  下面也能夠那樣簡寫。ui

def ktv(i,sem):
    sem.acquire()    #獲取鑰匙
    print('%s走進ktv'%i)
    time.sleep(random.randint(1,5))
    print('%s走出ktv'%i)
    sem.release()   #還鑰匙


if __name__ == '__main__' :
    sem = Semaphore(4)  #總共鑰匙總數爲4
    for i in range(20):
        p = Process(target=ktv,args=(i,sem))
        p.start()

解讀:總共 sem = Semaphore(4) 表示總共鑰匙總數爲4,而後把他進程實例化的參數,傳到上面,上面才能用sem.acquire()拿鑰匙這個和 sem.release() 表示還鑰匙。spa

注意:sem = Semaphore() 什麼都不寫,默認一把鑰匙。也就是一我的走進,一個出去才能下一我的,至關於買火車票上鎖了。線程

 

2 什麼是事件?爲何要引入事件。server

當咱們須要經過一個信號來控制多個進程阻塞或者執行就引入事件。對象

代碼表示:from multiprocessing import Event隊列

怎樣建立事件?

e=Event()   當一個事件建立時,默認是阻塞狀態。

怎麼查看是不是阻塞狀態呢?

print(e.is_set())  查看進程的狀態,默認是阻塞狀態。

 

怎麼改變他狀態呢?

e.set()  將事件改變爲非阻塞,也就是True

e.clear() 將事件改變爲阻塞,也就是False

 

e.wait()又是怎麼用的呢?

與e.is_set()連用,當判斷是阻塞狀態時,e.wait()起到阻塞做用,後面的代碼不執行。

也便是這樣:

# set 和 clear
    #  分別用來修改一個事件的狀態 True或者False
# is_set 用來查看一個事件的狀態
# wait 是依據事件的狀態來決定本身是否在wait處阻塞
    #  False阻塞 True不阻塞

例題:紅綠燈

import time
import random
from multiprocessing import Event,Process
def cars(e,i):
    if not e.is_set():
        print('car%i在等待'%i)
        e.wait()    # 阻塞 直到獲得一個 事件狀態變成 True 的信號
    print('\033[0;32;40mcar%i經過\033[0m' % i)

def light(e):
    while True:
        if e.is_set():
            e.clear()
            print('\033[31m紅燈亮了\033[0m')
        else:
            e.set()
            print('\033[32m綠燈亮了\033[0m')
        time.sleep(2)

if __name__ == '__main__':
    e = Event()
    traffic = Process(target=light,args=(e,))
    traffic.start()
    for i in range(20):
        car = Process(target=cars, args=(e,i))
        car.start()
        time.sleep(random.random())

解讀:

首先理解,燈和車是同時執行的,兩個進程相互不影響。

而後if e.is_set():

         e.clear()

      不執行紅燈,而是執行綠燈,下一行也是這樣。

打印結果:

綠燈亮了
car0經過
car1經過
car2經過
car3經過
紅燈亮了
car4在等待
car5在等待
car6在等待
car7在等待
car8在等待
綠燈亮了
car4經過
car7經過
car8經過
car5經過
car6經過
car9經過
car10經過
car11經過
紅燈亮了
car12在等待
car13在等待
綠燈亮了
car13經過
car12經過
car14經過
car15經過
car16經過

還在循環。

 

3 什麼是隊列?(先進先出)

實現多進程之間的通訊,也叫 IPC(Inter-Process Communication) 。

建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞 。

Queue([maxsize]) 
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。

方法介紹:

Queue([maxsize]) 
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。
另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具備如下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 
若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。
若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,
將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。
超時後將引起Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,
隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。

例題2:

from multiprocessing import Queue,Process
def produce(q):
    q.put('hello') 把hello放入隊列

def consume(q):
    print(q.get())  返回隊列中的項   也就是得到hello

if __name__ == '__main__':
    q = Queue()
    p = Process(target=produce,args=(q,))
    p.start()
    c = Process(target=consume, args=(q,))
    c.start()

打印結果:hello

 

4 生產者模型:使用queue模塊。

# 隊列
# 生產者消費者模型

# 生產者 進程
# 消費者 進程
import time
import random
from multiprocessing import Process,Queue
def consumer(q,name):
    while True:
        food = q.get()
        if food is None:
            print('%s獲取到了一個空'%name)
            break
        print('\033[31m%s消費了%s\033[0m' % (name,food))
        time.sleep(random.randint(1,3))

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        f = '%s生產了%s%s'%(name,food,i)
        print(f)
        q.put(f)

if __name__  == '__main__':
    q = Queue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    q.put(None)
    q.put(None)

 

升級版本:模塊JoinableQueue

import time
import random
from multiprocessing import Process,JoinableQueue   用的這個
def consumer(q,name):
    while True:
        food = q.get()
        print('\033[31m%s消費了%s\033[0m' % (name,food))
        time.sleep(random.randint(1,3))
        q.task_done()     # count - 1

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        f = '%s生產了%s%s'%(name,food,i)
        print(f)
        q.put(f)
    q.join()    # 阻塞  直到一個隊列中的全部數據 所有被處理完畢

if __name__  == '__main__':
    q = JoinableQueue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start()
    p2.start()
    c1.daemon = True   # 設置爲守護進程 主進程中的代碼執行完畢以後,子進程自動結束
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()      # 感知一個進程的結束

#  在消費者這一端:
    # 每次獲取一個數據
    # 處理一個數據
    # 發送一個記號 : 標誌一個數據被處理成功

# 在生產者這一端:
    # 每一次生產一個數據,
    # 且每一次生產的數據都放在隊列中
    # 在隊列中刻上一個記號
    # 當生產者所有生產完畢以後,
    # join信號 : 已經中止生產數據了
                # 且要等待以前被刻上的記號都被消費完
                # 當數據都被處理完時,join阻塞結束

# consumer 中把全部的任務消耗完
# producer 端 的 join感知到,中止阻塞
# 全部的producer進程結束
# 主進程中的p.join結束
# 主進程中代碼結束
# 守護進程(消費者的進程)結束
相關文章
相關標籤/搜索