生產者消費者問題

問題描述:併發

生產者在生產產品,這些產品將提供給若干個消費者去消費,爲了使生產者和消費者能併發執行,在二者之間設置一個具備多個緩衝區的緩衝池,生產者將它生產的產品放入一個緩衝區中,消費者能夠從緩衝區中取走產品進行消費,顯然生產者和消費者之間必須保持同步,即不容許消費者到一個空的緩衝區中取產品,也不容許生產者向一個已經放入產品的緩衝區中再次投放產品。app

條件變量解決方案:dom

  • 基於隊列構建一個緩衝區,生產者在隊尾填充,消費者在隊頭獲取。隊列緩衝區做爲多個線程的共享資源。
  • 因爲多個消費者和生產者線程能夠併發訪問緩衝區,須要互斥鎖來控制對緩衝區的互斥訪問。
  • 隊列空時消費者線程須要等到隊列中存在資源、隊列滿時生產者線程須要等到隊列中有資源被消費。經過使用條件變量來實現線程的阻塞、通知以達到生產、消費線程的同步。
from threading import Lock
from threading import Condition
import threading

class myQueue:
    def __init__(self, size):
        self.size = size
        self.list = list()
        self.lock = Lock()
        self.notFullCond = Condition(self.lock)
        self.notEmptyCond = Condition(self.lock)

    def isFull(self):
        if self.size == len(self.list):
            return True
        return False

    def isEmpty(self):
        if 0 == len(self.list):
            return True
        return False
    
    def enQueue(self, elem):
        self.lock.acquire()
        while self.isFull(): #隊列滿時觸發等待notFullCond條件,線程阻塞同時釋放互斥鎖
            print('queue is full, waiting...')
            self.notFullCond.wait()   
        print(threading.current_thread().getName() + ' product ' + str(elem))
        self.list.append(elem)
#當有資源進入隊列通知全部等待notEmptyCond條件的線程,等釋放互斥鎖後,等待notEmptyCond條件的線程獲取鎖,再次判斷條件
        self.notEmptyCond.notify_all()
        self.lock.release()

    def deQueue(self):
        self.lock.acquire()
        while self.isEmpty(): #隊列空時觸發等待notEmptyCond條件,線程阻塞同時釋放互斥鎖
            print('queue is empty, waiting...')
            self.notEmptyCond.wait()
        elem = self.list[0]
        del(self.list[0])
        print(threading.current_thread().getName() + ' consume ' + str(elem))
#當有資源出隊列通知全部等待notFullCond條件的線程,等釋放互斥鎖後,等待notFullCond條件的線程獲取鎖,再次判斷條件
        self.notFullCond.notify_all()
        self.lock.release()

        return elem

信號量解決方案:測試

  • 經過信號量來控制線程的同步,信號量管理能夠得到資源的個數,初始隊列爲空,寫信號量資源個數爲隊列長度,讀信號量資源個數爲0
from threading import Lock
from threading import Semaphore
import threading

class mySemQueue:
    def __init__(self, size):
        self.size = size
        self.list = list()
        self.lock = Lock()
        self.writeSem = Semaphore(size)#初始化寫信號量
        self.readSem  = Semaphore(0)   #初始化讀信號量
    
    def enQueue(self, elem):
        self.writeSem.acquire()        #資源入隊申請寫信號量,若是爲0則阻塞
        
        self.lock.acquire()        #互斥鎖來保證資源的互斥訪問
        self.list.append(elem)
        print(threading.current_thread().getName() + ' product ' + str(elem))
        self.lock.release()

        self.readSem.release()         #資源入隊後釋放一個讀信號量,若是其它線程阻塞在這個信號量上,喚醒該線程

    def deQueue(self):
        self.readSem.acquire()         #資源出隊申請讀信號量,若是爲0則阻塞

        self.lock.acquire()
        elem = self.list[0]
        del(self.list[0])
        print(threading.current_thread().getName() + ' consume ' + str(elem))
        self.lock.release()
        
        self.writeSem.release()        #資源出隊後釋放一個寫信號量,若是其它線程阻塞在這個信號量上,喚醒該線程
        
        return elem
  • 測試
from threading import Thread
import sys
import threading

class myThread(Thread):
    def __init__(self, func):
        Thread.__init__(self)
        self.func = func

    def run(self):
        print(threading.current_thread().getName() + ' start')
        self.func()

from myThread import myThread
from myQueue import myQueue
import random
import sys

def producter():
    while True:
        elem =random.randint(1, 100)
        que.enQueue(elem)

def consumer():
    while True:
        que.deQueue()

fp = open('log.txt','w')
sys.stdout = fp

que = myQueue(10)
t1 = myThread(producter)
t2 = myThread(consumer)
t3 = myThread(consumer)
t1.start()
t2.start()
t3.start()
相關文章
相關標籤/搜索