Python線程編程—同步隊列

Python線程編程—同步隊列

咱們常常會採用生產者/消費者關係的兩個線程來處理一個共享緩衝區的數據。例如一 個生產者線程接受用戶數據放入一個共享緩衝區裏,等待一個消費者線程對數據取出處理。可是若是緩衝區的過小而生產者和消費者兩個異步線程的速度不一樣時,容 易出現一個線程等待另外一個狀況。爲了儘量的縮短共享資源並以相同速度工做的各線程的等待時間,咱們能夠使用一個「隊列」來提供額外的緩衝區。
python

建立一個「隊列」對象編程

import Queue
myqueue = Queue.Queue(maxsize = 10)

Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。app

將一個值放入隊列中異步

myqueue.put(10)

調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數, 默認爲1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異 常。函數

將一個值從隊列中取出google

myqueue.get()

調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲1。若是隊列爲空且block爲1,get()就使調用線程暫停,直至有項目可用。若是block爲0,隊列將引起Empty異常。url

咱們用一個例子來展現如何使用Queuespa

複製代碼
#!/usr/bin/env python import Queueimport threadingimport urllib2import time
          
hosts = ["http://yahoo.com", "http://google.com.hk", "http://amazon.com",
          "http://ibm.com", "http://apple.com"]
          
queue = Queue.Queue()
          class ThreadUrl(threading.Thread):
      """Threaded Url Grab"""        def __init__(self, queue):
              threading.Thread.__init__(self)
              self.queue = queue
          
       def run(self):
           while True:
                #grabs host from queue                 host = self.queue.get()     
                #grabs urls of hosts and prints first 1024 bytes of page                 url = urllib2.urlopen(host)
                print url.read(1024)        
                #signals to queue job is done                 self.queue.task_done()
   
 start = time.time()def main():
     #spawn a pool of threads, and pass them queue instance       for i in range(5):
            t = ThreadUrl(queue)
            t.setDaemon(True)
            t.start()
              
      #populate queue with data        for host in hosts:
            queue.put(host)
       #wait on the queue until everything has been processed         queue.join()
          
main()print "Elapsed Time: %s" % (time.time() - start)
複製代碼

 

在 Python 中使用線程時,這個模式是一種很常見的而且推薦使用的方式。具體工做步驟描述以下:線程

  1. 建立一個 Queue.Queue() 的實例,而後使用數據對它進行填充。
  2. 將通過填充數據的實例傳遞給線程類,後者是經過繼承 threading.Thread 的方式建立的。
  3. 生成守護線程池。
  4. 每次從隊列中取出一個項目,並使用該線程中的數據和 run 方法以執行相應的工做。
  5. 在完成這項工做以後,使用 queue.task_done() 函數向任務已經完成的隊列發送一個信號。
  6. 對隊列執行 join 操做,實際上意味着等到隊列爲空,再退出主程序。

在使用這個模式時須要注意一點:經過將守護線程設置爲 true,將容許主線程或者程序僅在守護線程處於活動狀態時纔可以退出。這種方式建立了一種簡單的方式以控制程序流程,由於在退出以前,您能夠對隊列執行 join 操做、或者等到隊列爲空。code


join()

保持阻塞狀態,直處處理了隊列中的全部項目爲止。在將一個項目添加到該隊列時,未完成的任務的總數就會增長。當使用者線程調用 task_done() 以表示檢索了該項目、並完成了全部的工做時,那麼未完成的任務的總數就會減小。當未完成的任務的總數減小到零時,join() 就會結束阻塞狀態。

相關文章
相關標籤/搜索