進程同步控制

鎖——multiprocessing.Lock

什麼是鎖?

當多個進程使用同一份數據資源的時候,會引起數據安全或順序混亂問題。這個時候咱們但願進程能夠一個一個的去獲取和修改數據,將幾個併發的進程編程串行,這樣就能夠保證數據的安全。咱們能夠引用Lock模塊來幫咱們來實現將異步執行的程序在加鎖的代碼段中同步去執行。python

mport os
import time
import random
from multiprocessing import Process

def work(n):
    print('%s: %s is running' %(n,os.getpid()))
    time.sleep(random.random())
    print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
    for i in range(3):
        p=Process(target=work,args=(i,))
        p.start()
多進程搶佔輸出資源
# 由併發變成了串行,犧牲了運行效率,但避免了競爭
import os
import time
import random
from multiprocessing import Process,Lock

def work(lock,n):
    lock.acquire()
    print('%s: %s is running' % (n, os.getpid()))
    time.sleep(random.random())
    print('%s: %s is done' % (n, os.getpid()))
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,i))
        p.start()

使用鎖維護執行順序
使用鎖來維護執行順序

上面這種狀況雖然使用加鎖的形式來實現了順序的執行, 可是程序由併發編程串行了。這樣雖然下降了效率,卻保證了數據的安全。編程

 

模擬搶票,來看看數據安全的重要性。json

#文件db的內容爲:{"count":1}
#注意必定要用雙引號,否則json沒法識別
#併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        print('\033[43m購票成功\033[0m')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(100): #模擬併發100個客戶端搶票
        p=Process(target=task)
        p.start()
多進程同時搶購餘票
#文件db的內容爲:{"count":5}
#注意必定要用雙引號,否則json沒法識別
#併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(random.random()) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(random.random()) #模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        print('\033[32m購票成功\033[0m')
    else:
        print('\033[31m購票失敗\033[0m')

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(100): #模擬併發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()
使用鎖來保證數據安全

總結:

加鎖能夠保證多個進程修改同一塊數據時, 同一時間只能有一個任務能夠進行修改, 即串行的修改。犧牲效率卻保證了數據安全。安全

雖然能夠用文件共享數據實現進程間通訊:服務器

  1. 效率低(共享數據基於穩健, 而文件是硬盤的數據)
  2. 須要本身加鎖處理

所以咱們最好尋找一種解決方案可以兼顧:網絡

  1. 效率高(多個進程共享一塊內存的數據)
  2. 幫咱們處理好鎖問題。

這就是multiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。併發

隊列和管道都是將數據存放在內存中app

隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來。dom

咱們應該儘可能避免使用共享數據, 儘量使用消息傳遞和隊列, 避免處理複雜的同步和鎖問題, 並且在進程數目增多時, 每每能夠得到更好的可拓展性。異步

 

信號量——multiprocessing.Semaphore(瞭解)

信號量概念介紹:

互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念

代碼示例:
from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print('%s 佔到一間ktv小屋' %user)
    time.sleep(random.randint(0,3)) #模擬每一個人在ktv中待的時間不一樣
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

例子
信號量代碼示例

 

 

事件——multiprocessing.Event(瞭解)

概念及方法介紹:

python進程的事件主要提供了四個方法:

set(), wait(), clear(), is_set()

事件處理的機制: 全局定義了一個"Flag", 若是"Flag"值爲False, 那麼當程序執行event.wait()方法時就會阻塞, 若是"Flag"值爲Ture, 那麼event.wait()方法時便再也不阻塞.

事件的初始"Flag"狀態爲False

clear: 將"Flag"設置爲False

set: 將"Flag"設置爲True

is_set: 查看"Flag"狀態

from multiprocessing import Process, Event
import time, random


def car(e, n):
    while True:
        if not e.is_set():  # 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait()    # 阻塞,等待is_set()的值變成True,模擬信號燈爲綠色
            print('\033[32m車%s 看見綠燈亮了\033[0m' % n)
            time.sleep(random.randint(3, 6))
            if not e.is_set():   #若是is_set()的值是Flase,也就是紅燈,仍然回到while語句開始
                continue
            print('車開遠了,car', n)
            break


def police_car(e, n):
    while True:
        if not e.is_set():# 進程剛開啓,is_set()的值是Flase,模擬信號燈爲紅色
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait(0.1) # 阻塞,等待設置等待時間,等待0.1s以後沒有等到綠燈就闖紅燈走了
            if not e.is_set():
                print('\033[33m紅燈,警車先走\033[0m,car %s' % n)
            else:
                print('\033[33;46m綠燈,警車走\033[0m,car %s' % n)
        break



def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            print('######', e.is_set())
            e.clear()  # ---->將is_set()的值設置爲False
        else:
            e.set()    # ---->將is_set()的值設置爲True
            print('***********',e.is_set())


if __name__ == '__main__':
    e = Event()
    for i in range(10):
        p=Process(target=car,args=(e,i,))  # 建立是個進程控制10輛車
        p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))  # 建立5個進程控制5輛警車
        p.start()
    t = Process(target=traffic_lights, args=(e, 10))  # 建立一個進程控制紅綠燈
    t.start()

    print('============》')

紅綠燈實例
代碼示例
相關文章
相關標籤/搜索