本文經過文章同步功能推送至博客園,顯示排版可能會有所錯誤,請見諒!
寫在前文:在Python中給多進程提供了進程池類,對於線程,Python2並無直接提供線程池類(Python3中提供了線程池功能),而線程池在並行中應用較普遍,所以實現一個進程池的功能十分必要。本文基於隊列(queue)功能來實現線程池功能。 python
在Python3標準庫中提供了線程池、進程池功能,推薦使用標準庫。 app
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor
實現代碼: 函數
#!/usr/bin/env python3 # -*- coding:utf-8 -*- __auth__ = "SongWei" import threading,queue,time class Threadpool: '''基於隊列queue實現的線程池''' def __init__(self,max_thread=1): '''建立進程隊列''' self.queue = queue.Queue(maxsize=max_thread) def apply(self,target=None,args=(),callback=None,calljoin=True,**kwargs): ''':param callback 回調函數 當子線程函數運行結束後將返回值傳入回調函數 :param calljoin 布爾值 回調函數是否阻塞進程池 默認True 只有當目標函數和回調函數都執行結束後才視爲該線程結束 其餘參數同threading.Thread類 注意:只有當目標函數和回調函數都執行結束後,消息隊列纔會取回值(即回調函數會阻塞線程池) ''' if not callback: callback = self._callback t = threading.Thread(target=self._decorate(target,callback,calljoin),args=args,**kwargs) self.queue.put(t) t.start() def join(self): ''' 當線程池中還有未執行結束的子線程時 阻塞主線程 注意:當calljoin=False時 因回調函數在消息隊列取回後才執行 故join不會等待回調函數 ''' while self.queue.qsize(): time.sleep(0.05) def _decorate(self,target,callback,calljoin): ''':param target 接收一個目標函數 :param callback 接受一個回調函數 :param backjoin 布爾值 若爲真 則當回調函數執行結束後才釋放隊列 不然 當目標函數執行結束後就會釋放隊列 本函數本質上是一個裝飾器,即運行目標函數後,執行隊列取回(self.queque.get()),並將返回值做爲參數執行回調函數。 ''' def wrapper(*args,**kwargs): res = target(*args,**kwargs) if calljoin: callback(res) self.queue.get() else: self.queue.get() callback(res) return res return wrapper def _callback(self,*args,**kwargs): '''沒有傳入回調函數時 什麼也不幹''' pass 調用示例: result_list = [] def func(arg): print('正在等待執行%s' % arg) time.sleep(10) return arg def back(res): print('我已經取回了數據:%s' % res) result_list.append(res) pool = Threadpool(max_thread=20) for i in range(40): pool.apply(target=func,args=(i,),callback=back) pool.join() print(result_list)