Python線程同步機制: Locks, RLocks, Semaphores, Condition

翻譯自Laurent Luce的博客
原文名稱:Python threads synchronization: Locks, RLocks, Semaphores, Conditions, Events and Queues
原文鏈接:http://www.laurentluce.com/posts/python-threads-synchronization-locks-rlocks-semaphores-conditions-events-and-queues/ python

本文詳細地闡述了Python線程同步機制。你將學習到如下有關Python線程同步機制:Lock,RLock,Semaphore,Condition,Event和Queue,還有Python的內部是如何實現這些機制的。 本文給出的程序的源代碼能夠在github上找到。 git

首先讓咱們來看一個沒有使用線程同步的簡單程序。 github

線程(Threading)

咱們但願編程一個從一些URL中得到內容而且將內容寫入文件的程序,完成這個程序能夠不使用線程,爲了加快獲取的速度,咱們使用2個線程,每一個線程處理一半的URL。 編程

注:完成這個程序的最好方式是使用一個URL隊列,可是如下面的例子開始個人講解更加合適。 服務器

類FetchUrls是threading.Thread的子類,他擁有一個URL列表和一個寫URL內容的文件對象。 app

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
class FetchUrls(threading.Thread):  """  下載URL內容的線程  """   def __init__(self, urls, output):  """  構造器   @param urls 須要下載的URL列表  @param output 寫URL內容的輸出文件  """  threading.Thread.__init__(self)  self.urls = urls  self.output = output   def run(self):  """  實現父類Thread的run方法,打開URL,而且一個一個的下載URL的內容  """  while self.urls:  url = self.urls.pop()  req = urllib2.Request(url)  try:  d = urllib2.urlopen(req)  except urllib2.URLError, e:  print 'URL %s failed: %s' % (url, e.reason)  self.output.write(d.read())  print 'write done by %s' % self.name  print 'URL %s fetched by %s' % (url, self.name) 

main函數啓動了兩個線程,以後讓他們下載URL內容。 dom

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
def main():  # URL列表1  urls1 = ['http://www.google.com', 'http://www.facebook.com']  # URL列表2  urls2 = ['http://www.yahoo.com', 'http://www.youtube.com']  f = open('output.txt', 'w+')  t1 = FetchUrls(urls1, f)  t2 = FetchUrls(urls2, f)  t1.start()  t2.start()  t1.join()  t2.join()  f.close()  if __name__ == '__main__':  main() 

上面的程序將出現兩個線程同時寫一個文件的狀況,致使文件一團亂碼。咱們須要找到一種在給定的時間裏只有一個線程寫文件的方法。實現的方法就是使用像鎖(Locks)這樣的線程同步機制。 ide

鎖(Lock)

鎖有兩種狀態:被鎖(locked)和沒有被鎖(unlocked)。擁有acquire()和release()兩種方法,而且遵循一下的規則: 函數

  • 若是一個鎖的狀態是unlocked,調用acquire()方法改變它的狀態爲locked;
  • 若是一個鎖的狀態是locked,acquire()方法將會阻塞,直到另外一個線程調用release()方法釋放了鎖;
  • 若是一個鎖的狀態是unlocked調用release()會拋出RuntimeError異常;
  • 若是一個鎖的狀態是locked,調用release()方法改變它的狀態爲unlocked。

解決上面兩個線程同時寫一個文件的問題的方法就是:咱們給類FetchUrls的構造器中傳入一個鎖(lock),使用這個鎖來保護文件操做,實如今給定的時間只有一個線程寫文件。下面的代碼只顯示了關於lock部分的修改。完整的源碼能夠在threads/lock.py中找到。 post

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
class FetchUrls(threading.Thread):  ...   def __init__(self, urls, output, lock):  ...  self.lock = lock #傳入的lock對象   def run(self):  ...  while self.urls:  ...  self.lock.acquire() #得到lock對象,lock狀態變爲locked,而且阻塞其餘線程獲取lock對象(寫文件的權利)  print 'lock acquired by %s' % self.name  self.output.write(d.read())  print 'write done by %s' % self.name  print 'lock released by %s' % self.name  self.lock.release() #釋放lock對象,lock狀態變爲unlocked,其餘的線程能夠從新獲取lock對象  ...  def main():  ...  lock = threading.Lock()  ...  t1 = FetchUrls(urls1, f, lock)  t2 = FetchUrls(urls2, f, lock)  ... 

如下是程序的輸出:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
$ python locks.py lock acquired by Thread-2 write done by Thread-2 lock released by Thread-2 URL http://www.youtube.com fetched by Thread-2 lock acquired by Thread-1 write done by Thread-1 lock released by Thread-1 URL http://www.facebook.com fetched by Thread-1 lock acquired by Thread-2 write done by Thread-2 lock released by Thread-2 URL http://www.yahoo.com fetched by Thread-2 lock acquired by Thread-1 write done by Thread-1 lock released by Thread-1 URL http://www.google.com fetched by Thread-1 

從上面的輸出咱們能夠看出,寫文件的操做被鎖保護,沒有出現兩個線程同時寫一個文件的現象。

下面咱們看一下Python內部是如何實現鎖(Lock)的。我正在使用的Python版本是Linux操做系統上的Python 2.6.6。

threading模塊的Lock()方法就是thread.allocate_lock,代碼能夠在Lib/threading.py中找到。

1 2
Lock = _allocate_lock _allocate_lock = thread.allocate_lock 

C的實如今Python/thread_pthread.h中。程序假定你的系統支持POSIX信號量(semaphores)。sem_init()初始化鎖(Lock)所在地址的信號量。初始的信號量值是1,意味着鎖沒有被鎖(unlocked)。信號量將在處理器的不一樣線程之間共享。

1 2 3 4 5 6 7 8 9 10 11 12 13
PyThread_type_lock PyThread_allocate_lock(void) {  ...  lock = (sem_t *)malloc(sizeof(sem_t));   if (lock) {  status = sem_init(lock,0,1);  CHECK_STATUS("sem_init");  ....  }  ... } 

當acquire()方法被調用時,下面的C代碼將被執行。默認的waitflag值是1,表示調用將被被阻塞直到鎖被釋放。sem_wait()方法減小信號量的值或者被阻塞直到信號量大於零。

1 2 3 4 5 6 7 8 9 10 11 12
int PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) {  ...  do {  if (waitflag)  status = fix_status(sem_wait(thelock));  else  status = fix_status(sem_trywait(thelock));  } while (status == EINTR); /* Retry if interrupted by a signal */  .... } 

當release()方法被調用時,下面的C代碼將被執行。sem_post()方法增長信號量。

1 2 3 4 5 6 7
void PyThread_release_lock(PyThread_type_lock lock) {  ...  status = sem_post(thelock);  ... } 

能夠將鎖(Lock)與「with」語句一塊兒使用,鎖能夠做爲上下文管理器(context manager)。使用「with」語句的好處是:當程序執行到「with」語句時,acquire()方法將被調用,當程序執行完「with」語句時,release()方法會被調用(譯註:這樣咱們就不用顯示地調用acquire()和release()方法,而是由「with」語句根據上下文來管理鎖的獲取和釋放。)下面咱們用「with」語句重寫FetchUrls類。

1 2 3 4 5 6 7 8 9 10 11 12
class FetchUrls(threading.Thread):  ...  def run(self):  ...  while self.urls:  ...  with self.lock: #使用「with」語句管理鎖的獲取和釋放  print 'lock acquired by %s' % self.name  self.output.write(d.read())  print 'write done by %s' % self.name  print 'lock released by %s' % self.name  ... 

可重入鎖(RLock)

RLock是可重入鎖(reentrant lock),acquire()可以不被阻塞的被同一個線程調用屢次。要注意的是release()須要調用與acquire()相同的次數才能釋放鎖。

使用Lock,下面的代碼第二次調用acquire()時將被阻塞:

1 2 3
lock = threading.Lock() lock.acquire() lock.acquire() 

若是你使用的是RLock,下面的代碼第二次調用acquire()不會被阻塞:

1 2 3
rlock = threading.RLock() rlock.acquire() rlock.acquire() 

RLock使用的一樣是thread.allocate_lock(),不一樣的是他跟蹤宿主線程(the owner thread)來實現可重入的特性。下面是RLock的acquire()實現。若是調用acquire()的線程是資源的全部者,記錄調用acquire()次數的計數器就會加1。若是不是,就將試圖去獲取鎖。線程第一次得到鎖時,鎖的擁有者將會被保存,同時計數器初始化爲1。

1 2 3 4 5 6 7 8 9 10 11 12 13
def acquire(self, blocking=1):  me = _get_ident()  if self.__owner == me:  self.__count = self.__count + 1  ...  return 1  rc = self.__block.acquire(blocking)  if rc:  self.__owner = me  self.__count = 1  ...  ...  return rc 

下面咱們看一下可重入鎖(RLock)的release()方法。首先它會去確認調用者是不是鎖的擁有者。若是是的話,計數器減1;若是計數器爲0,那麼鎖將會被釋放,這時其餘線程就能夠去獲取鎖了。

1 2 3 4 5 6 7 8 9
def release(self):  if self.__owner != _get_ident():  raise RuntimeError("cannot release un-acquired lock")  self.__count = count = self.__count - 1  if not count:  self.__owner = None  self.__block.release()  ...  ... 

條件(Condition)

條件同步機制是指:一個線程等待特定條件,而另外一個線程發出特定條件知足的信號。 解釋條件同步機制的一個很好的例子就是生產者/消費者(producer/consumer)模型。生產者隨機的往列表中「生產」一個隨機整數,而消費者從列表中「消費」整數。完整的源碼能夠在threads/condition.py中找到

在producer類中,producer得到鎖,生產一個隨機整數,通知消費者有了可用的「商品」,而且釋放鎖。producer無限地向列表中添加整數,同時在兩個添加操做中間隨機的停頓一下子。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
class Producer(threading.Thread):  """  向列表中生產隨機整數  """   def __init__(self, integers, condition):  """  構造器   @param integers 整數列表  @param condition 條件同步對象  """  threading.Thread.__init__(self)  self.integers = integers  self.condition = condition   def run(self):  """  實現Thread的run方法。在隨機時間向列表中添加一個隨機整數  """  while True:  integer = random.randint(0, 256)  self.condition.acquire() #獲取條件鎖  print 'condition acquired by %s' % self.name  self.integers.append(integer)  print '%d appended to list by %s' % (integer, self.name)  print 'condition notified by %s' % self.name  self.condition.notify() #喚醒消費者線程  print 'condition released by %s' % self.name  self.condition.release() #釋放條件鎖  time.sleep(1) #暫停1秒鐘 

下面是消費者(consumer)類。它獲取鎖,檢查列表中是否有整數,若是沒有,等待生產者的通知。當消費者獲取整數以後,釋放鎖。
注意在wait()方法中會釋放鎖,這樣生產者就能得到資源而且生產「商品」。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
class Consumer(threading.Thread):  """  從列表中消費整數  """   def __init__(self, integers, condition):  """  構造器   @param integers 整數列表  @param condition 條件同步對象  """  threading.Thread.__init__(self)  self.integers = integers  self.condition = condition   def run(self):  """  實現Thread的run()方法,從列表中消費整數  """  while True:  self.condition.acquire() #獲取條件鎖  print 'condition acquired by %s' % self.name  while True:  if self.integers: #判斷是否有整數  integer = self.integers.pop()  print '%d popped from list by %s' % (integer, self.name)  break  print 'condition wait by %s' % self.name  self.condition.wait() #等待商品,而且釋放資源  print 'condition released by %s' % self.name  self.condition.release() #最後釋放條件鎖 

下面咱們編寫main方法,建立兩個線程:

1 2 3 4 5 6 7 8 9 10 11 12
def main():  integers = []  condition = threading.Condition()  t1 = Producer(integers, condition)  t2 = Consumer(integers, condition)  t1.start()  t2.start()  t1.join()  t2.join()  if __name__ == '__main__':  main() 

下面是程序的輸出:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
$ python condition.py condition acquired by Thread-1 159 appended to list by Thread-1 condition notified by Thread-1 condition released by Thread-1 condition acquired by Thread-2 159 popped from list by Thread-2 condition released by Thread-2 condition acquired by Thread-2 condition wait by Thread-2 condition acquired by Thread-1 116 appended to list by Thread-1 condition notified by Thread-1 condition released by Thread-1 116 popped from list by Thread-2 condition released by Thread-2 condition acquired by Thread-2 condition wait by Thread-2 

Thread-1添加159到列表中,通知消費者同時釋放鎖,Thread-2得到鎖,取回159,而且釋放鎖。此時由於執行time.sleep(1),生產者正在睡眠,當消費者再次試圖獲取整數時,列表中並無整數,這時消費者進入等待狀態,等待生產者的通知。當wait()被調用時,它會釋放資源,從而生產者可以利用資源生產整數。

下面咱們看一下Python內部是如何實現條件同步機制的。若是用戶沒有傳入鎖(lock)對象,condition類的構造器建立一個可重入鎖(RLock),這個鎖將會在調用acquire()和release()時使用。

1 2 3 4 5 6 7
class _Condition(_Verbose):   def __init__(self, lock=None, verbose=None):  _Verbose.__init__(self, verbose)  if lock is None:  lock = RLock()  self.__lock = lock 

接下來是wait()方法。爲了簡化說明,咱們假定在調用wait()方法時不使用timeout參數。wait()方法建立了一個名爲waiter的鎖,而且設置鎖的狀態爲locked。這個waiter鎖用於線程間的通信,這樣生產者(在生產完整數以後)就能夠通知消費者釋放waiter()鎖。鎖對象將會被添加到等待者列表,而且在調用waiter.acquire()時被阻塞。一開始condition鎖的狀態被保存,而且在wait()結束時被恢復。

1 2 3 4 5 6 7 8 9 10 11 12 13
def wait(self, timeout=None):  ...  waiter = _allocate_lock()  waiter.acquire()  self.__waiters.append(waiter)  saved_state = self._release_save()  try: # 不管如何恢復狀態 (例如, KeyboardInterrupt)  if timeout is None:  waiter.acquire()  ...  ...  finally:  self._acquire_restore(saved_state) 

當生產者調用notify()方法時,notify()釋放waiter鎖,喚醒被阻塞的消費者。

1 2 3 4 5 6 7 8 9 10 11
def notify(self, n=1):  ...  __waiters = self.__waiters  waiters = __waiters[:n]  ...  for waiter in waiters:  waiter.release()  try:  __waiters.remove(waiter)  except ValueError:  pass 

一樣Condition對象也能夠和「with」語句一塊兒使用,這樣「with」語句上下文會幫咱們調用acquire()和release()方法。下面的代碼使用「with」語句改寫了生產者和消費者類。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
class Producer(threading.Thread):  ...  def run(self):  while True:  integer = random.randint(0, 256)  with self.condition:  print 'condition acquired by %s' % self.name  self.integers.append(integer)  print '%d appended to list by %s' % (integer, self.name)  print 'condition notified by %s' % self.name  self.condition.notify()  print 'condition released by %s' % self.name  time.sleep(1)  class Consumer(threading.Thread):  ...  def run(self):  while True:  with self.condition:  print 'condition acquired by %s' % self.name  while True:  if self.integers:  integer = self.integers.pop()  print '%d popped from list by %s' % (integer, self.name)  break  print 'condition wait by %s' % self.name  self.condition.wait()  print 'condition released by %s' % self.name 

信號量(Semaphore)

信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。

信號量同步的例子:

1 2 3 4 5
semaphore = threading.Semaphore() semaphore.acquire()  # 使用共享資源 ... semaphore.release() 

讓咱們看一下信號量同步在Python內部是如何實現的。構造器使用參數value來表示計數器的初始值,默認值爲1。一個條件鎖實例用於保護計數器,同時當信號量被釋放時通知其餘線程。

1 2 3 4 5 6 7
class _Semaphore(_Verbose):  ...  def __init__(self, value=1, verbose=None):  _Verbose.__init__(self, verbose)  self.__cond = Condition(Lock())  self.__value = value  ... 

acquire()方法。若是信號量爲0,線程被條件鎖的wait()方法阻塞,直到被其餘線程喚醒;若是計數器大於0,調用acquire()使計數器減1。

1 2 3 4 5 6 7 8 9 10 11
def acquire(self, blocking=1):  rc = False  self.__cond.acquire()  while self.__value == 0:  ...  self.__cond.wait()  else:  self.__value = self.__value - 1  rc = True  self.__cond.release()  return rc 

信號量類的release()方法增長計數器的值而且喚醒其餘線程。

1 2 3 4 5
def release(self):  self.__cond.acquire()  self.__value = self.__value + 1  self.__cond.notify()  self.__cond.release() 

還有一個「有限」(bounded)信號量類,能夠確保release()方法的調用次數不能超過給定的初始信號量數值(value參數),下面是「有限」信號量類的Python代碼:

1 2 3 4 5 6 7 8 9 10
class _BoundedSemaphore(_Semaphore):  """檢查release()的調用次數是否小於等於acquire()次數"""  def __init__(self, value=1, verbose=None):  _Semaphore.__init__(self, value, verbose)  self._initial_value = value   def release(self):  if self._Semaphore__value >= self._initial_value:  raise ValueError, "Semaphore released too many times"  return _Semaphore.release(self) 

一樣信號量(Semaphore)對象能夠和「with」一塊兒使用:

1 2 3 4
semaphore = threading.Semaphore() with semaphore:  # 使用共享資源  ... 

事件(Event)

基於事件的同步是指:一個線程發送/傳遞事件,另外的線程等待事件的觸發。 讓咱們再來看看前面的生產者和消費者的例子,如今咱們把它轉換成使用事件同步而不是條件同步。完整的源碼能夠在threads/event.py裏面找到。

首先是生產者類,咱們傳入一個Event實例給構造器而不是Condition實例。一旦整數被添加進列表,事件(event)被設置和發送去喚醒消費者。注意事件(event)實例默認是被髮送的。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
class Producer(threading.Thread):  """  向列表中生產隨機整數  """   def __init__(self, integers, event):  """  構造器   @param integers 整數列表  @param event 事件同步對象  """  threading.Thread.__init__(self)  self.integers = integers  self.event = event   def run(self):  """  實現Thread的run方法。在隨機時間向列表中添加一個隨機整數  """  while True:  integer = random.randint(0, 256)  self.integers.append(integer)  print '%d appended to list by %s' % (integer, self.name)  print 'event set by %s' % self.name  self.event.set() #設置事件   self.event.clear() #發送事件  print 'event cleared by %s' % self.name  time.sleep(1) 

一樣咱們傳入一個Event實例給消費者的構造器,消費者阻塞在wait()方法,等待事件被觸發,即有可供消費的整數。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
class Consumer(threading.Thread):  """  從列表中消費整數  """   def __init__(self, integers, event):  """  構造器   @param integers 整數列表  @param event 事件同步對象  """  threading.Thread.__init__(self)  self.integers = integers  self.event = event   def run(self):  """  實現Thread的run()方法,從列表中消費整數  """  while True:  self.event.wait() #等待事件被觸發  try:  integer = self.integers.pop()  print '%d popped from list by %s' % (integer, self.name)  except IndexError:  # catch pop on empty list  time.sleep(1) 

下面是程序的輸出,Thread-1添加124到整數列表中,而後設置事件而且喚醒消費者。消費者從wait()方法中喚醒,在列表中獲取到整數。

1 2 3 4 5 6 7 8 9
$ python event.py 124 appended to list by Thread-1 event set by Thread-1 event cleared by Thread-1 124 popped from list by Thread-2 223 appended to list by Thread-1 event set by Thread-1 event cleared by Thread-1 223 popped from list by Thread-2 

事件鎖的Python內部實現,首先是Event鎖的構造器。構造器中建立了一個條件(Condition)鎖,來保護事件標誌(event flag),同事喚醒其餘線程當事件被設置時。

1 2 3 4 5
class _Event(_Verbose):  def __init__(self, verbose=None):  _Verbose.__init__(self, verbose)  self.__cond = Condition(Lock())  self.__flag = False 

接下來是set()方法,它設置事件標誌爲True,而且喚醒其餘線程。條件鎖對象保護程序修改事件標誌狀態的關鍵部分。

1 2 3 4 5 6 7
def set(self):  self.__cond.acquire()  try:  self.__flag = True  self.__cond.notify_all()  finally:  self.__cond.release() 

而clear()方法正好相反,它設置時間標誌爲False。

1 2 3 4 5 6
def clear(self):  self.__cond.acquire()  try:  self.__flag = False  finally:  self.__cond.release() 

最後,wait()方法將阻塞直到調用了set()方法,當事件標誌爲True時,wait()方法就什麼也不作。

1 2 3 4 5 6 7
def wait(self, timeout=None):  self.__cond.acquire()  try:  if not self.__flag: #若是flag不爲真  self.__cond.wait(timeout)  finally:  self.__cond.release() 

隊列(Queue)

隊列是一個很是好的線程同步機制,使用隊列咱們不用關心鎖,隊列會爲咱們處理鎖的問題。 隊列(Queue)有如下4個用戶感興趣的方法:

  • put: 向隊列中添加一個項;
  • get: 從隊列中刪除並返回一個項;
  • task_done: 當某一項任務完成時調用;
  • join: 阻塞知道全部的項目都被處理完。

下面咱們將上面的生產者/消費者的例子轉換成使用隊列。源代碼能夠在threads/queue.py中找到。

首先是生產者類,咱們不須要傳入一個整數列表,由於咱們使用隊列就能夠存儲生成的整數。生產者線程在一個無限循環中生成整數並將生成的整數添加到隊列中。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
class Producer(threading.Thread):  """  向隊列中生產隨機整數  """   def __init__(self, queue):  """  構造器   @param integers 整數列表 #譯註:不須要這個參數  @param queue 隊列同步對象  """  threading.Thread.__init__(self)  self.queue = queue   def run(self):  """  實現Thread的run方法。在隨機時間向隊列中添加一個隨機整數  """  while True:  integer = random.randint(0, 256)  self.queue.put(integer) #將生成的整數添加到隊列  print '%d put to queue by %s' % (integer, self.name)  time.sleep(1) 

下面是消費者類。線程從隊列中獲取整數,而且在任務完成時調用task_done()方法。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
class Consumer(threading.Thread):  """  從隊列中消費整數  """   def __init__(self, queue):  """  構造器   @param integers 整數列表 #譯註:不須要這個參數  @param queue 隊列同步對象  """  threading.Thread.__init__(self)  self.queue = queue   def run(self):  """  實現Thread的run()方法,從隊列中消費整數  """  while True:  integer = self.queue.get()  print '%d popped from list by %s' % (integer, self.name)  self.queue.task_done() 

如下是程序的輸出:

1 2 3 4 5
$ python queue.py 61 put to queue by Thread-1 61 popped from list by Thread-2 6 put to queue by Thread-1 6 popped from list by Thread-2 

隊列同步的最大好處就是隊列幫咱們處理了鎖。如今讓咱們去看看在Python內部是如何實現隊列同步機制的。

隊列(Queue)構造器建立一個鎖,保護隊列元素的添加和刪除操做。同時建立了一些條件鎖對象處理隊列事件,好比隊列不空事件(削除get()的阻塞),隊列不滿事件(削除put()的阻塞)和全部項目都被處理完事件(削除join()的阻塞)。

1 2 3 4 5 6 7 8
class Queue:  def __init__(self, maxsize=0):  ...  self.mutex = threading.Lock()  self.not_empty = threading.Condition(self.mutex)  self.not_full = threading.Condition(self.mutex)  self.all_tasks_done = threading.Condition(self.mutex)  self.unfinished_tasks = 0 

put()方法向隊列中添加一個項,或者阻塞若是隊列已滿。這時隊列非空,它喚醒阻塞在get()方法中的線程。更多關於Condition鎖的內容請查看上面的講解。

1 2 3 4 5 6 7 8 9 10 11 12 13 14
def put(self, item, block=True, timeout=None):  ...  self.not_full.acquire()  try:  if self.maxsize > 0:  ...  elif timeout is None:  while self._qsize() == self.maxsize:  self.not_full.wait()  self._put(item)  self.unfinished_tasks += 1  self.not_empty.notify()  finally:  self.not_full.release() 

get()方法從隊列中得到並刪除一個項,或者阻塞當隊列爲空時。這時隊列不滿,他喚醒阻塞在put()方法中的線程。

1 2 3 4 5 6 7 8 9 10 11 12 13
def get(self, block=True, timeout=None):  ...  self.not_empty.acquire()  try:  ...  elif timeout is None:  while not self._qsize():  self.not_empty.wait()  item = self._get()  self.not_full.notify()  return item  finally:  self.not_empty.release() 

當調用task_done()方法時,未完成任務的數量減1。若是未完成任務的數量爲0,線程等待隊列完成join()方法。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
def task_done(self):  self.all_tasks_done.acquire()  try:  unfinished = self.unfinished_tasks - 1  if unfinished <= 0:  if unfinished < 0:  raise ValueError('task_done() called too many times')  self.all_tasks_done.notify_all()  self.unfinished_tasks = unfinished  finally:  self.all_tasks_done.release()  def join(self):  self.all_tasks_done.acquire()  try:  while self.unfinished_tasks:  self.all_tasks_done.wait()  finally:  self.all_tasks_done.release() 

本文到此結束,但願您喜歡這篇文章。歡迎您的留言和反饋。

---EOF---

相關文章
相關標籤/搜索