python線程安全隊列用法

1.今天來利用queue來實現一個線程安全隊列。python

2.場景:有時咱們須要將一個大任務劃分紅不少小任務,每一個小任務執行完獲得結果須要放入一個安全隊列裏面,一個收集結果的線程就實時從隊列中取結果將子任務結果聚合,造成大任務計算的結果。安全

3.代碼實現app

import queue
import threading
import time

## 封裝的安全隊列類(十分通用)
class SafeQueue(threading.Thread):
    # 退出隊列的信號
    SIG_QUIT = 'sig_quit'
    def __init__(self,recv_calback):
        threading.Thread.__init__(self)
        ## 構造線程安全隊列
        self.Q = queue.Queue()
        self.recv_calback = recv_calback
        self.start()

    #放入隊列
    def put(self,datas):
        threadName = threading.currentThread().name
        self.Q.put(datas)

    #關閉隊列
    def close(self):
        self.put(SafeQueue.SIG_QUIT)

    ##主循環,處理隊列接收
    def run(self):
        while True:
           try:
               datas = self.Q.get()
               if datas == SafeQueue.SIG_QUIT: #收到退出隊列信號
                   break
                #回調客戶端
               self.recv_calback(datas)
           except: # 拋出打斷異常
               break

##隊列回調函數
def queue_callback(datas):
    print("接收到數據:",datas)
    ## 將子任務結果加入 全局集合
    try:
        array_mutex.acquire()#鎖定
        datas_array.append(datas)
        if len(datas_array) == 4:
            safeQueue.close()
            print("=======大任務計算結束===========  result:", datas_array)
    finally:
        array_mutex.release()#釋放

## 子任務計算函數
def calclulate():
    threadName = threading.currentThread().name
    print(threadName , ' 正在計算')
    time.sleep(2)
    print(threadName, ' 計算完成,加入隊列')
    #將結果放入隊列
    safeQueue.put(threadName+"' result")

####  ----------  main start   ----------

#建立鎖
array_mutex = threading.Lock()
## 存儲 子任務計算結果的 集合
datas_array = []

##構造安全隊列
safeQueue = SafeQueue(queue_callback)

##開啓4 個子任務,開始計算
for i in range(1,5):
    threading.Thread(target=calclulate).start()

這個SafeQueue模型在項目開發中很是常見,這裏有什麼封裝的不足的謝謝指出。今天隊列就到這裏,謝謝你們。函數

相關文章
相關標籤/搜索