翻譯自Laurent Luce的博客
原文名稱:Python threads synchronization: Locks, RLocks, Semaphores, Conditions, Events and Queues
原文鏈接:http://www.laurentluce.com/posts/python-threads-synchronization-locks-rlocks-semaphores-conditions-events-and-queues/ python
本文詳細地闡述了Python線程同步機制。你將學習到如下有關Python線程同步機制:Lock,RLock,Semaphore,Condition,Event和Queue,還有Python的內部是如何實現這些機制的。 本文給出的程序的源代碼能夠在github上找到。 git
首先讓咱們來看一個沒有使用線程同步的簡單程序。 github
咱們但願編程一個從一些URL中得到內容而且將內容寫入文件的程序,完成這個程序能夠不使用線程,爲了加快獲取的速度,咱們使用2個線程,每一個線程處理一半的URL。 編程
注:完成這個程序的最好方式是使用一個URL隊列,可是如下面的例子開始個人講解更加合適。 服務器
類FetchUrls是threading.Thread的子類,他擁有一個URL列表和一個寫URL內容的文件對象。 app
class FetchUrls(threading.Thread): """ 下載URL內容的線程 """ def __init__(self, urls, output): """ 構造器 @param urls 須要下載的URL列表 @param output 寫URL內容的輸出文件 """ threading.Thread.__init__(self) self.urls = urls self.output = output def run(self): """ 實現父類Thread的run方法,打開URL,而且一個一個的下載URL的內容 """ while self.urls: url = self.urls.pop() req = urllib2.Request(url) try: d = urllib2.urlopen(req) except urllib2.URLError, e: print 'URL %s failed: %s' % (url, e.reason) self.output.write(d.read()) print 'write done by %s' % self.name print 'URL %s fetched by %s' % (url, self.name) |
main函數啓動了兩個線程,以後讓他們下載URL內容。 dom
def main(): # URL列表1 urls1 = ['http://www.google.com', 'http://www.facebook.com'] # URL列表2 urls2 = ['http://www.yahoo.com', 'http://www.youtube.com'] f = open('output.txt', 'w+') t1 = FetchUrls(urls1, f) t2 = FetchUrls(urls2, f) t1.start() t2.start() t1.join() t2.join() f.close() if __name__ == '__main__': main() |
上面的程序將出現兩個線程同時寫一個文件的狀況,致使文件一團亂碼。咱們須要找到一種在給定的時間裏只有一個線程寫文件的方法。實現的方法就是使用像鎖(Locks)這樣的線程同步機制。 ide
鎖有兩種狀態:被鎖(locked)和沒有被鎖(unlocked)。擁有acquire()和release()兩種方法,而且遵循一下的規則: 函數
解決上面兩個線程同時寫一個文件的問題的方法就是:咱們給類FetchUrls的構造器中傳入一個鎖(lock),使用這個鎖來保護文件操做,實如今給定的時間只有一個線程寫文件。下面的代碼只顯示了關於lock部分的修改。完整的源碼能夠在threads/lock.py中找到。 post
class FetchUrls(threading.Thread): ... def __init__(self, urls, output, lock): ... self.lock = lock #傳入的lock對象 def run(self): ... while self.urls: ... self.lock.acquire() #得到lock對象,lock狀態變爲locked,而且阻塞其餘線程獲取lock對象(寫文件的權利) print 'lock acquired by %s' % self.name self.output.write(d.read()) print 'write done by %s' % self.name print 'lock released by %s' % self.name self.lock.release() #釋放lock對象,lock狀態變爲unlocked,其餘的線程能夠從新獲取lock對象 ... def main(): ... lock = threading.Lock() ... t1 = FetchUrls(urls1, f, lock) t2 = FetchUrls(urls2, f, lock) ... |
$ python locks.py lock acquired by Thread-2 write done by Thread-2 lock released by Thread-2 URL http://www.youtube.com fetched by Thread-2 lock acquired by Thread-1 write done by Thread-1 lock released by Thread-1 URL http://www.facebook.com fetched by Thread-1 lock acquired by Thread-2 write done by Thread-2 lock released by Thread-2 URL http://www.yahoo.com fetched by Thread-2 lock acquired by Thread-1 write done by Thread-1 lock released by Thread-1 URL http://www.google.com fetched by Thread-1 |
下面咱們看一下Python內部是如何實現鎖(Lock)的。我正在使用的Python版本是Linux操做系統上的Python 2.6.6。
Lock = _allocate_lock _allocate_lock = thread.allocate_lock |
PyThread_type_lock PyThread_allocate_lock(void) { ... lock = (sem_t *)malloc(sizeof(sem_t)); if (lock) { status = sem_init(lock,0,1); CHECK_STATUS("sem_init"); .... } ... } |
int PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) { ... do { if (waitflag) status = fix_status(sem_wait(thelock)); else status = fix_status(sem_trywait(thelock)); } while (status == EINTR); /* Retry if interrupted by a signal */ .... } |
void PyThread_release_lock(PyThread_type_lock lock) { ... status = sem_post(thelock); ... } |
能夠將鎖(Lock)與「with」語句一塊兒使用,鎖能夠做爲上下文管理器(context manager)。使用「with」語句的好處是:當程序執行到「with」語句時,acquire()方法將被調用,當程序執行完「with」語句時,release()方法會被調用(譯註:這樣咱們就不用顯示地調用acquire()和release()方法,而是由「with」語句根據上下文來管理鎖的獲取和釋放。)下面咱們用「with」語句重寫FetchUrls類。
class FetchUrls(threading.Thread): ... def run(self): ... while self.urls: ... with self.lock: #使用「with」語句管理鎖的獲取和釋放 print 'lock acquired by %s' % self.name self.output.write(d.read()) print 'write done by %s' % self.name print 'lock released by %s' % self.name ... |
RLock是可重入鎖(reentrant lock),acquire()可以不被阻塞的被同一個線程調用屢次。要注意的是release()須要調用與acquire()相同的次數才能釋放鎖。
lock = threading.Lock() lock.acquire() lock.acquire() |
rlock = threading.RLock() rlock.acquire() rlock.acquire() |
RLock使用的一樣是thread.allocate_lock(),不一樣的是他跟蹤宿主線程(the owner thread)來實現可重入的特性。下面是RLock的acquire()實現。若是調用acquire()的線程是資源的全部者,記錄調用acquire()次數的計數器就會加1。若是不是,就將試圖去獲取鎖。線程第一次得到鎖時,鎖的擁有者將會被保存,同時計數器初始化爲1。
def acquire(self, blocking=1): me = _get_ident() if self.__owner == me: self.__count = self.__count + 1 ... return 1 rc = self.__block.acquire(blocking) if rc: self.__owner = me self.__count = 1 ... ... return rc |
def release(self): if self.__owner != _get_ident(): raise RuntimeError("cannot release un-acquired lock") self.__count = count = self.__count - 1 if not count: self.__owner = None self.__block.release() ... ... |
條件同步機制是指:一個線程等待特定條件,而另外一個線程發出特定條件知足的信號。 解釋條件同步機制的一個很好的例子就是生產者/消費者(producer/consumer)模型。生產者隨機的往列表中「生產」一個隨機整數,而消費者從列表中「消費」整數。完整的源碼能夠在threads/condition.py中找到
class Producer(threading.Thread): """ 向列表中生產隨機整數 """ def __init__(self, integers, condition): """ 構造器 @param integers 整數列表 @param condition 條件同步對象 """ threading.Thread.__init__(self) self.integers = integers self.condition = condition def run(self): """ 實現Thread的run方法。在隨機時間向列表中添加一個隨機整數 """ while True: integer = random.randint(0, 256) self.condition.acquire() #獲取條件鎖 print 'condition acquired by %s' % self.name self.integers.append(integer) print '%d appended to list by %s' % (integer, self.name) print 'condition notified by %s' % self.name self.condition.notify() #喚醒消費者線程 print 'condition released by %s' % self.name self.condition.release() #釋放條件鎖 time.sleep(1) #暫停1秒鐘 |
class Consumer(threading.Thread): """ 從列表中消費整數 """ def __init__(self, integers, condition): """ 構造器 @param integers 整數列表 @param condition 條件同步對象 """ threading.Thread.__init__(self) self.integers = integers self.condition = condition def run(self): """ 實現Thread的run()方法,從列表中消費整數 """ while True: self.condition.acquire() #獲取條件鎖 print 'condition acquired by %s' % self.name while True: if self.integers: #判斷是否有整數 integer = self.integers.pop() print '%d popped from list by %s' % (integer, self.name) break print 'condition wait by %s' % self.name self.condition.wait() #等待商品,而且釋放資源 print 'condition released by %s' % self.name self.condition.release() #最後釋放條件鎖 |
def main(): integers = [] condition = threading.Condition() t1 = Producer(integers, condition) t2 = Consumer(integers, condition) t1.start() t2.start() t1.join() t2.join() if __name__ == '__main__': main() |
$ python condition.py condition acquired by Thread-1 159 appended to list by Thread-1 condition notified by Thread-1 condition released by Thread-1 condition acquired by Thread-2 159 popped from list by Thread-2 condition released by Thread-2 condition acquired by Thread-2 condition wait by Thread-2 condition acquired by Thread-1 116 appended to list by Thread-1 condition notified by Thread-1 condition released by Thread-1 116 popped from list by Thread-2 condition released by Thread-2 condition acquired by Thread-2 condition wait by Thread-2 |
class _Condition(_Verbose): def __init__(self, lock=None, verbose=None): _Verbose.__init__(self, verbose) if lock is None: lock = RLock() self.__lock = lock |
def wait(self, timeout=None): ... waiter = _allocate_lock() waiter.acquire() self.__waiters.append(waiter) saved_state = self._release_save() try: # 不管如何恢復狀態 (例如, KeyboardInterrupt) if timeout is None: waiter.acquire() ... ... finally: self._acquire_restore(saved_state) |
def notify(self, n=1): ... __waiters = self.__waiters waiters = __waiters[:n] ... for waiter in waiters: waiter.release() try: __waiters.remove(waiter) except ValueError: pass |
class Producer(threading.Thread): ... def run(self): while True: integer = random.randint(0, 256) with self.condition: print 'condition acquired by %s' % self.name self.integers.append(integer) print '%d appended to list by %s' % (integer, self.name) print 'condition notified by %s' % self.name self.condition.notify() print 'condition released by %s' % self.name time.sleep(1) class Consumer(threading.Thread): ... def run(self): while True: with self.condition: print 'condition acquired by %s' % self.name while True: if self.integers: integer = self.integers.pop() print '%d popped from list by %s' % (integer, self.name) break print 'condition wait by %s' % self.name self.condition.wait() print 'condition released by %s' % self.name |
semaphore = threading.Semaphore() semaphore.acquire() # 使用共享資源 ... semaphore.release() |
class _Semaphore(_Verbose): ... def __init__(self, value=1, verbose=None): _Verbose.__init__(self, verbose) self.__cond = Condition(Lock()) self.__value = value ... |
def acquire(self, blocking=1): rc = False self.__cond.acquire() while self.__value == 0: ... self.__cond.wait() else: self.__value = self.__value - 1 rc = True self.__cond.release() return rc |
def release(self): self.__cond.acquire() self.__value = self.__value + 1 self.__cond.notify() self.__cond.release() |
class _BoundedSemaphore(_Semaphore): """檢查release()的調用次數是否小於等於acquire()次數""" def __init__(self, value=1, verbose=None): _Semaphore.__init__(self, value, verbose) self._initial_value = value def release(self): if self._Semaphore__value >= self._initial_value: raise ValueError, "Semaphore released too many times" return _Semaphore.release(self) |
semaphore = threading.Semaphore() with semaphore: # 使用共享資源 ... |
基於事件的同步是指:一個線程發送/傳遞事件,另外的線程等待事件的觸發。 讓咱們再來看看前面的生產者和消費者的例子,如今咱們把它轉換成使用事件同步而不是條件同步。完整的源碼能夠在threads/event.py裏面找到。
class Producer(threading.Thread): """ 向列表中生產隨機整數 """ def __init__(self, integers, event): """ 構造器 @param integers 整數列表 @param event 事件同步對象 """ threading.Thread.__init__(self) self.integers = integers self.event = event def run(self): """ 實現Thread的run方法。在隨機時間向列表中添加一個隨機整數 """ while True: integer = random.randint(0, 256) self.integers.append(integer) print '%d appended to list by %s' % (integer, self.name) print 'event set by %s' % self.name self.event.set() #設置事件 self.event.clear() #發送事件 print 'event cleared by %s' % self.name time.sleep(1) |
class Consumer(threading.Thread): """ 從列表中消費整數 """ def __init__(self, integers, event): """ 構造器 @param integers 整數列表 @param event 事件同步對象 """ threading.Thread.__init__(self) self.integers = integers self.event = event def run(self): """ 實現Thread的run()方法,從列表中消費整數 """ while True: self.event.wait() #等待事件被觸發 try: integer = self.integers.pop() print '%d popped from list by %s' % (integer, self.name) except IndexError: # catch pop on empty list time.sleep(1) |
$ python event.py 124 appended to list by Thread-1 event set by Thread-1 event cleared by Thread-1 124 popped from list by Thread-2 223 appended to list by Thread-1 event set by Thread-1 event cleared by Thread-1 223 popped from list by Thread-2 |
事件鎖的Python內部實現,首先是Event鎖的構造器。構造器中建立了一個條件(Condition)鎖,來保護事件標誌(event flag),同事喚醒其餘線程當事件被設置時。
class _Event(_Verbose): def __init__(self, verbose=None): _Verbose.__init__(self, verbose) self.__cond = Condition(Lock()) self.__flag = False |
def set(self): self.__cond.acquire() try: self.__flag = True self.__cond.notify_all() finally: self.__cond.release() |
def clear(self): self.__cond.acquire() try: self.__flag = False finally: self.__cond.release() |
def wait(self, timeout=None): self.__cond.acquire() try: if not self.__flag: #若是flag不爲真 self.__cond.wait(timeout) finally: self.__cond.release() |
隊列是一個很是好的線程同步機制,使用隊列咱們不用關心鎖,隊列會爲咱們處理鎖的問題。 隊列(Queue)有如下4個用戶感興趣的方法:
class Producer(threading.Thread): """ 向隊列中生產隨機整數 """ def __init__(self, queue): """ 構造器 @param integers 整數列表 #譯註:不須要這個參數 @param queue 隊列同步對象 """ threading.Thread.__init__(self) self.queue = queue def run(self): """ 實現Thread的run方法。在隨機時間向隊列中添加一個隨機整數 """ while True: integer = random.randint(0, 256) self.queue.put(integer) #將生成的整數添加到隊列 print '%d put to queue by %s' % (integer, self.name) time.sleep(1) |
class Consumer(threading.Thread): """ 從隊列中消費整數 """ def __init__(self, queue): """ 構造器 @param integers 整數列表 #譯註:不須要這個參數 @param queue 隊列同步對象 """ threading.Thread.__init__(self) self.queue = queue def run(self): """ 實現Thread的run()方法,從隊列中消費整數 """ while True: integer = self.queue.get() print '%d popped from list by %s' % (integer, self.name) self.queue.task_done() |
$ python queue.py 61 put to queue by Thread-1 61 popped from list by Thread-2 6 put to queue by Thread-1 6 popped from list by Thread-2 |
class Queue: def __init__(self, maxsize=0): ... self.mutex = threading.Lock() self.not_empty = threading.Condition(self.mutex) self.not_full = threading.Condition(self.mutex) self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 |
def put(self, item, block=True, timeout=None): ... self.not_full.acquire() try: if self.maxsize > 0: ... elif timeout is None: while self._qsize() == self.maxsize: self.not_full.wait() self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() finally: self.not_full.release() |
def get(self, block=True, timeout=None): ... self.not_empty.acquire() try: ... elif timeout is None: while not self._qsize(): self.not_empty.wait() item = self._get() self.not_full.notify() return item finally: self.not_empty.release() |
def task_done(self): self.all_tasks_done.acquire() try: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished finally: self.all_tasks_done.release() def join(self): self.all_tasks_done.acquire() try: while self.unfinished_tasks: self.all_tasks_done.wait() finally: self.all_tasks_done.release() |