Python -- Queue模塊

學習契機

最近的一個項目中在使用grpc時遇到一個問題,因爲client端可多達200,每一個端口每10s向grpc server發送一次請求,server端接受client的請求後根據request信息更新數據庫,再將數據庫和配置文件的某些數據封裝後返回給client。原代碼的性能是0.26s/request,遠遠達不到所需性能,其中數據庫更新操做耗時達到80%,其中一個優化點就是將數據庫更新操做放在獨立的線程中。
在次以前沒有使用過線程編碼,學以至用後本着加深理解的想法,將這個過程記錄下來,這裏先記下用於線程間通訊的隊列Queue的相關知識。數據庫

概念

Python2中隊列庫名稱爲Queue,Python3中已更名爲queue,項目使用Python2.7.5版本,天然是使用Queue。
Queue模塊中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列PriorityQueue。這些隊列都實現了鎖原語,可在多線程通訊中直接使用。安全

Queue模塊定義瞭如下類及異常,在隊列類中,maxsize限制可入隊列數據的數量,值小於等於0時表明不限制:多線程

  • Queue.Queue(maxsize=0) FIFO隊列
  • Queue.LifoQueue(maxsize=0) LIFO隊列
  • Queue.PriorityQueue(maxsize=0) 優先級隊列
  • Queue.Empty TODO
  • Queue.Full

Queue(Queue、LifoQueue、PriorityQueue)對象提供如下方法:性能

  • Queue.qsize()
    返回隊列大小,可是不保證qsize() > 0時,get()不會阻塞;也不保證qsize() < maxsize時,put()不會阻塞。
  • Queue.empty()
    返回True時,不保證put()時不會阻塞;返回False時不保證get()不會阻塞。
  • Queue.full()
    返回True時,不保證get()時不會阻塞;返回False時不保證put()不會阻塞。
  • Queue.put(item[, block[, timeout]])
    block默認值爲False,指定爲True時表明能夠阻塞,若同時指定timeout,在超時時返回Full exception。
  • Queue.put_nowait(item)
    等同put(item, False)
  • Queue.get([block[, timeout]])
  • Queue.get_nowait()
    等同get(item, False)
  • Queue.task_done()
    消費者線程調用。調用get()後,可調用task_done()告訴隊列該任務已經處理完畢。
    若是當前一個join()正在阻塞,它將在隊列中的全部任務都處理完時恢復執行(即每個由put()調用入隊的任務都有一個對應的task_done()調用)。
  • Queue.join()
    阻塞調用線程,直到隊列中的全部任務被處理掉。
    只要有數據被加入隊列,未完成的任務數就會增長。當消費者線程調用task_done()(意味着有消費者取得任務並完成任務),未完成的任務數就會減小。當未完成的任務數降到0,join()解除阻塞。

應用

UpdateThread是單一消費者進程,獲取FIFO隊列中的數據處理,GrpcThread是multi生產者線程,須要對往隊列中丟數據這個操做加鎖保證數據前後順序。學習

import threading
import Queue
import time

q = Queue.Queue()
q_lock = threading.Lock()


class UpdateThread(threading.Thread):

    def __init__(self):
        super(self.__class__, self).__init__()
        self.setName(self.__class__.__name__)
        self._setName = self.setName

    @staticmethod
    def update_stat():
        global q
        while not q.empty():
            stat = q.get()
            print 'Update stat (%s) in db' % stat

    def run(self):
        while True:
            self.update_stat()
            time.sleep(0.1)


class GrpcThread(threading.Thread):

    def compose_stat(self, stat):
        global q
        q_lock.acquire()
        q.put('%d: %s' % (stat, self.name))
        q_lock.release()
        return

    def run(self):
        for i in range(10):
            self.compose_stat(i)
            time.sleep(0.1)


def launch_update_thread():
    UpdateThread().start()


if __name__ == '__main__':
    launch_update_thread()
    thread1 = GrpcThread()
    thread2 = GrpcThread()

    thread1.start()
    thread2.start()
相關文章
相關標籤/搜索