1.今天來利用queue來實現一個線程安全隊列。python
2.場景:有時咱們須要將一個大任務劃分紅不少小任務,每一個小任務執行完獲得結果須要放入一個安全隊列裏面,一個收集結果的線程就實時從隊列中取結果將子任務結果聚合,造成大任務計算的結果。安全
3.代碼實現app
import queue import threading import time ## 封裝的安全隊列類(十分通用) class SafeQueue(threading.Thread): # 退出隊列的信號 SIG_QUIT = 'sig_quit' def __init__(self,recv_calback): threading.Thread.__init__(self) ## 構造線程安全隊列 self.Q = queue.Queue() self.recv_calback = recv_calback self.start() #放入隊列 def put(self,datas): threadName = threading.currentThread().name self.Q.put(datas) #關閉隊列 def close(self): self.put(SafeQueue.SIG_QUIT) ##主循環,處理隊列接收 def run(self): while True: try: datas = self.Q.get() if datas == SafeQueue.SIG_QUIT: #收到退出隊列信號 break #回調客戶端 self.recv_calback(datas) except: # 拋出打斷異常 break ##隊列回調函數 def queue_callback(datas): print("接收到數據:",datas) ## 將子任務結果加入 全局集合 try: array_mutex.acquire()#鎖定 datas_array.append(datas) if len(datas_array) == 4: safeQueue.close() print("=======大任務計算結束=========== result:", datas_array) finally: array_mutex.release()#釋放 ## 子任務計算函數 def calclulate(): threadName = threading.currentThread().name print(threadName , ' 正在計算') time.sleep(2) print(threadName, ' 計算完成,加入隊列') #將結果放入隊列 safeQueue.put(threadName+"' result") #### ---------- main start ---------- #建立鎖 array_mutex = threading.Lock() ## 存儲 子任務計算結果的 集合 datas_array = [] ##構造安全隊列 safeQueue = SafeQueue(queue_callback) ##開啓4 個子任務,開始計算 for i in range(1,5): threading.Thread(target=calclulate).start()
這個SafeQueue模型在項目開發中很是常見,這裏有什麼封裝的不足的謝謝指出。今天隊列就到這裏,謝謝你們。函數