Python之路(第三十八篇) 併發編程:進程同步鎖/互斥鎖、信號量、事件、隊列、生產者消費者模型

1、進程鎖(同步鎖/互斥鎖)

進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,python

而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。數據庫

例子編程

  #併發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
  from multiprocessing import Process
  import os,time
  def work():
      print('%s is running' %os.getpid())
      time.sleep(2)
      print('%s is done' %os.getpid())
  ​
  if __name__ == '__main__':
      for i in range(3):
          p=Process(target=work)
          p.start()

  

 

加鎖後json

  
  #加鎖後由併發變成了串行,犧牲了運行效率,但避免了競爭
  ​
  from multiprocessing import Process,Lock
  import os,time
  def work(mutex):
      mutex.acquire() #開始加鎖
      print('%s is running' %os.getpid())
      time.sleep(2)
      print('%s is done' %os.getpid())
      mutex.release() #釋放鎖,在加鎖期間別的進程都要等
  ​
  if __name__ == '__main__':
      mutex = Lock()
      for i in range(3):
          p=Process(target=work,args=(mutex,))
          p.start()

  

 

例子2安全

多個進程共享同一文件服務器

文件當數據庫,模擬搶票網絡

 

未加鎖版併發

  
  #文件db.txt的內容爲:{"count":1}
  #注意必定要用雙引號,否則json沒法識別
  ​
  # 併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
  from multiprocessing import Process,Lock
  import time,json,random,os
  def search():
      dic=json.load(open('db.txt'))
      print('\033[43m剩餘票數%s\033[0m' %dic['count'])
  ​
  def get():
      dic=json.load(open('db.txt'))
      time.sleep(0.1) #模擬讀數據的網絡延遲
      if dic['count'] >0:
          dic['count']-=1
          time.sleep(0.2) #模擬寫數據的網絡延遲
          json.dump(dic,open('db.txt','w'))
          print('%s\033[43m購票成功\033[0m'%(os.getpid()))
  ​
  def task(lock):
      search()
      get()
  if __name__ == '__main__':
      lock=Lock()
      for i in range(10): #模擬併發10個客戶端搶票
          p=Process(target=task,args=(lock,))
          p.start()

  

輸出結果app

  
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  剩餘票數1
  4120購票成功
  2692購票成功
  7328購票成功
  13444購票成功
  13632購票成功
  13560購票成功
  13752購票成功
  12564購票成功
  13720購票成功
  13488購票成功

  

加鎖版dom

 

  
  import multiprocessing,time,json,random
  ​
  def search(name):
      with open("db.txt","r",encoding="utf-8") as f:
          data_dic = json.load(f)
          time.sleep(random.uniform(0,2))
          if data_dic["count"] >= 1 :
              print("已查詢到票還有%s張,當前系統時間 %s"%(data_dic["count"],time.asctime()))
          else:
              print("系統票源不足!當前系統時間 %s"%time.asctime())
  ​
  def buy(name):
      with open("db.txt","r+",encoding="utf-8") as f:
          data_dic = json.load(f)
      if data_dic["count"] > 0 :
          with open("db.txt", "w", encoding="utf-8") as g:
              new_ticket_count = data_dic["count"] - 1
              data_dic.update({"count":new_ticket_count})
              json.dump(data_dic,g)
          print("%s購票成功!"%name)
      else:
          print("%s購票失敗!"%name)
  ​
  def task(name,mutex):
      search(name)  # 查詢無需加鎖
      mutex.acquire()
      buy(name)  #針對修改文件的關鍵操做加鎖
      mutex.release()
  ​
  ​
  if __name__ == "__main__":
      mutex = multiprocessing.Lock()
      for i in range(10):
          p = multiprocessing.Process(target=task,args=("乘客%s"%i,mutex))
          p.start()
 

  

分析

  
  #加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
  雖然能夠用文件共享數據實現進程間通訊,但問題是:
  1.效率低(共享數據基於文件,而文件是硬盤上的數據)
  2.須要本身加鎖處理
  ​
  ​
  ​
  #所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
  1 隊列和管道都是將數據存放於內存中
  2 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
  咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

  

 

2、信號量(multiprocess.Semaphore)

互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。實現:信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念。

例子

  
  # 多進程中的組件
  # ktv
  # 4個
  # 一套資源  同一時間 只能被n我的訪問
  # 某一段代碼 同一時間 只能被n個進程執行
  import time
  import random
  from multiprocessing import Process
  from multiprocessing import Semaphore
  ​
  # sem = Semaphore(4)
  # sem.acquire()
  # print('拿到第一把鑰匙')
  # sem.acquire()
  # print('拿到第二把鑰匙')
  # sem.acquire()
  # print('拿到第三把鑰匙')
  # sem.acquire()
  # print('拿到第四把鑰匙')
  # sem.acquire()
  # print('拿到第五把鑰匙')
  def ktv(i,sem):
      sem.acquire()    #獲取鑰匙
      print('%s走進ktv'%i)
      time.sleep(random.randint(1,5))
      print('%s走出ktv'%i)
      sem.release()
  ​
  ​
  if __name__ == '__main__' :
      sem = Semaphore(4)
      for i in range(20):
          p = Process(target=ktv,args=(i,sem))
          p.start()

  

 

3、事件(multiprocess.Event)

python進程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

clear:將「Flag」設置爲False,set:將「Flag」設置爲True.

 

例子

  
  from multiprocessing import Event
  ​
  e = Event()
  print(e.is_set()) #初始設置爲False
  print("數據111")
  e.set()  #設置以後爲True
  print("數據222")
  print(e.is_set()) #打印設置以後的狀態
  e.wait()  #當值爲False會阻塞,當值爲Ture是,不會阻塞
  print("數據333")
  e.clear() #清除事件狀態,設置爲False
  print(e.is_set())  #打印清除以後的狀態
  print("數據444")
  e.wait()  #此時值爲False,程序會一直阻塞
  print("數據555")

  

 

輸出結果

  
  False
  數據111
  數據222
  True
  數據333
  False
  數據444

  

 

例子

簡單的紅綠燈事件

  
  from multiprocessing import Event,Process
  import time
  import random
  ​
  ​
  def cars(e,num):
      if not e.is_set(): # 進程剛開啓,is_set()的值是False,模擬信號燈爲紅色
          print("%s車正在等待通行"%num)
          e.wait() # 阻塞,等待信號燈切換
      print("%s車已經經過" % num) #打印已經經過的進程
  ​
  ​
  def light(e):
  ​
      #模擬定時切換紅綠燈
      while True:
          if e.is_set():
              e.clear() #>將is_set()的值設置爲False
              print("\033[31m紅燈亮了\033[0m")
          else:
              e.set() #>將is_set()的值設置爲True
              print("\033[32m綠燈亮了\033[0m")
          time.sleep(2)
  ​
  if __name__ == "__main__":
      e = Event()
      traffic = Process(target=light,args=(e,))
      traffic.start() #啓動紅綠燈進程
      for i in range(20):
          car = Process(target=cars,args=(e,"布加迪%s"%i))
          car.start()
          time.sleep(random.random())

  

 

4、進程間通訊——隊列和管道

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

隊列

 隊列就至關於一個容器,裏面能夠放數據,特色是先放進去先拿出來,即先進先出。

建立隊列的類(底層就是以管道和鎖定的方式實現)

  
  Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。 

  

參數

  
  maxsize是隊列中容許最大項數,省略則無大小限制。  

  

  方法介紹:

  
  q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
  q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
   
  q.get_nowait():同q.get(False)
  q.put_nowait():同q.put(False)
  ​
  q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
  q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
  q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣

  

其餘方法(瞭解):

  
  q.close() 
  關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
  ​
  q.cancel_join_thread() 
  不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。
  ​
  q.join_thread() 
  鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。

  

例子

  from multiprocessing import Queue
  ​
  q = Queue(3)  # 建立一個隊列對象,並給他設置容器大小,即能放幾個數據
  q.put(1)  # put()方法是往容器裏放數據
  q.put([2,3])
  q.put({"k1":4})
  # q.put("mi") # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。
  try:
      q.put_nowait(3) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。
  except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。
      print('隊列已經滿了')
  ​
  # 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。
  print(q.full()) #返回True ,滿了
  print(q.get())  #get()方法是從容器裏拿數據
  print(q.get())
  print(q.get())
  # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。
  try:
      q.get_nowait() # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。
  except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
      print('隊列已經空了')
  ​
  print(q.empty()) #空了

  

例子

  
  import time
  from multiprocessing import Queue, Process
  ​
  ​
  def task(q):
      q.put(" hello! 時間%s"%time.asctime())  # 調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。
  ​
  ​
  if __name__ == '__main__':
      q = Queue(3)#建立一個Queue對象
      p = Process(target=task, args=(q,)) #建立一個子進程
      p.start()
      print(q.get()) #在主進程打印從子進程獲取的數據

  


   

 

生產者消費者模型

生產者消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者消費者模型

生產者指的是生產數據的任務,消費者指的是處理數據的任務,在併發編程中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者和消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

 

基於隊列實現生產者消費者模型

  
  from multiprocessing import Process, Queue
  import time, random, os
  ​
  ​
  def consumer(q):
      while True:
          res = q.get()
          time.sleep(random.randint(1, 3))
          print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))
  ​
  ​
  def producer(q):
      for i in range(10):
          time.sleep(random.randint(1, 3))
          res = '包子%s' % i
          q.put(res)
          print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))
  ​
  ​
  if __name__ == '__main__':
      q = Queue()
      # 生產者們:即廚師們
      p1 = Process(target=producer, args=(q,))
  ​
      # 消費者們:即吃貨們
      c1 = Process(target=consumer, args=(q,))
  ​
      # 開始
      p1.start()
      c1.start()
      print('主')
 

  

生產者消費者模型總結

  
  #程序中有兩類角色
      一類負責生產數據(生產者)
      一類負責處理數據(消費者)
      
  #引入生產者消費者模型爲了解決的問題是:
      平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度
      
  #如何實現:
      生產者<-->隊列<——>消費者
  #生產者消費者模型實現類程序的解耦和

  

此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環

  import time, random, os
  from multiprocessing import Process, Queue
  ​
  ​
  def consumer(q):
      while True:
          res = q.get()
          if res is None: break  # 收到結束信號則結束
          time.sleep(random.randint(1, 3))
          print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))
  ​
  ​
  def producer(q):
      for i in range(10):
          time.sleep(random.randint(1, 3))
          res = '包子%s' % i
          q.put(res)
          print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))
      q.put(None)  # 發送結束信號,生產者在生產完畢後發送結束信號None
  ​
  ​
  if __name__ == '__main__':
      q = Queue()
      # 生產者們:即廚師們
      p1 = Process(target=producer, args=(q,))
  ​
      # 消費者們:即吃貨們
      c1 = Process(target=consumer, args=(q,))
  ​
      # 開始
      p1.start()
      c1.start()
      print('主')

  

  

 

注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號。但上述解決方式,在有多個生產者和多個消費者時,須要屢次發送None信號。

  import multiprocessing
  import time
  import random
  ​
  ​
  def producer(name, q):
      for i in range(2):
          res = "包子%s" % i
          time.sleep(random.randint(0, 1))
          print("%s生產了%s" % (name, res))
          q.put(res)
  ​
  ​
  def consumer(name, q):
      while True:
          res = q.get()
          if q.get() is None:  # 收到結束信號則結束
              print("沒包子吃了")
              break
          print("%s吃了%s" % (name, res))
  ​
  ​
  if __name__ == "__main__":
      q = multiprocessing.Queue()
      p1 = multiprocessing.Process(target=producer, args=("jack", q))
      p2 = multiprocessing.Process(target=producer, args=("charles", q))
      p3 = multiprocessing.Process(target=producer, args=("pony", q))
      c1 = multiprocessing.Process(target=consumer, args=("nick", q))
      c2 = multiprocessing.Process(target=consumer, args=("nicholas", q))
      p_list = []
      p_list.append(p1)
      p_list.append(p2)
      p_list.append(p3)
      for p in p_list:
          p.start()
      c1.start()
      c2.start()
      p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號
      p2.join()
      p3.join()
      q.put(None)  # 發送結束信號,有幾個消費者就應該發送幾回結束信號None
      q.put(None)  # 發送結束信號
      print("end........")
 

  

  

這裏有另一種隊列提供了這種機制,JoinableQueue。

JoinableQueue([maxsize])

其實就是一種隊列,但又比隊列要多兩種方法,task_done()和join()方法,正是有這兩種方法就能夠解決上面的問題。

建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

 

方法介紹

  
  JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法:
  ​
  q.task_done() 
  使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。
  ​
  q.join() 
  生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。 
  下面的例子說明如何創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。

  

例子

import multiprocessing
import time
import random


def producer(name, q):
    for i in range(2):
        res = "包子%s" % i
        time.sleep(random.randint(0, 1))
        print("%s生產了%s" % (name, res))
        q.put(res)
    q.join()  # 只有顧客把隊列的包子所有拿走後,三個生產者進程才能所有結束


def consumer(name, q):
    while True:
        res = q.get()
        print("%s吃了%s" % (name, res))
        q.task_done()  # 發信號告訴隊列,又吃完了一個,從隊列中取走一個數據並處理完成


if __name__ == "__main__":
    # q = multiprocessing.Queue()
    q = multiprocessing.JoinableQueue()
    p1 = multiprocessing.Process(target=producer, args=("jack", q))
    p2 = multiprocessing.Process(target=producer, args=("charles", q))
    p3 = multiprocessing.Process(target=producer, args=("pony", q))
    c1 = multiprocessing.Process(target=consumer, args=("nick", q))
    c2 = multiprocessing.Process(target=consumer, args=("nicholas", q))
    p_list = []
    p_list.append(p1)
    p_list.append(p2)
    p_list.append(p3)
    for p in p_list:
        p.start()
    c1.daemon = True  # 將c1\c2設置成守護進程,只要主進程結束了,那麼顧客就收到了全部的數據
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()

    print("end........")
# 主進程等--->p1,p2,p3等---->c1,c2
# p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據
# 於是c1,c2也沒有存在的價值了,不須要繼續阻塞在進程中影響主進程了。
# 應該隨着主進程的結束而結束,因此設置成守護進程就能夠了。
相關文章
相關標籤/搜索