咱們常常會採用生產者/消費者關係的兩個線程來處理一個共享緩衝區的數據。例如一 個生產者線程接受用戶數據放入一個共享緩衝區裏,等待一個消費者線程對數據取出處理。可是若是緩衝區的過小而生產者和消費者兩個異步線程的速度不一樣時,容 易出現一個線程等待另外一個狀況。爲了儘量的縮短共享資源並以相同速度工做的各線程的等待時間,咱們能夠使用一個「隊列」來提供額外的緩衝區。
python
建立一個「隊列」對象編程
import Queue
myqueue = Queue.Queue(maxsize = 10)
Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。app
將一個值放入隊列中異步
myqueue.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數, 默認爲1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異 常。函數
將一個值從隊列中取出google
myqueue.get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲1。若是隊列爲空且block爲1,get()就使調用線程暫停,直至有項目可用。若是block爲0,隊列將引起Empty異常。url
咱們用一個例子來展現如何使用Queuespa
#!/usr/bin/env python import Queueimport threadingimport urllib2import time hosts = ["http://yahoo.com", "http://google.com.hk", "http://amazon.com", "http://ibm.com", "http://apple.com"] queue = Queue.Queue() class ThreadUrl(threading.Thread): """Threaded Url Grab""" def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while True: #grabs host from queue host = self.queue.get() #grabs urls of hosts and prints first 1024 bytes of page url = urllib2.urlopen(host) print url.read(1024) #signals to queue job is done self.queue.task_done() start = time.time()def main(): #spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadUrl(queue) t.setDaemon(True) t.start() #populate queue with data for host in hosts: queue.put(host) #wait on the queue until everything has been processed queue.join() main()print "Elapsed Time: %s" % (time.time() - start)
在 Python 中使用線程時,這個模式是一種很常見的而且推薦使用的方式。具體工做步驟描述以下:線程
在使用這個模式時須要注意一點:經過將守護線程設置爲 true,將容許主線程或者程序僅在守護線程處於活動狀態時纔可以退出。這種方式建立了一種簡單的方式以控制程序流程,由於在退出以前,您能夠對隊列執行 join 操做、或者等到隊列爲空。code
join()
保持阻塞狀態,直處處理了隊列中的全部項目爲止。在將一個項目添加到該隊列時,未完成的任務的總數就會增長。當使用者線程調用 task_done() 以表示檢索了該項目、並完成了全部的工做時,那麼未完成的任務的總數就會減小。當未完成的任務的總數減小到零時,join() 就會結束阻塞狀態。