基於隊列queue實現的線程池

本文經過文章同步功能推送至博客園,顯示排版可能會有所錯誤,請見諒!

寫在前文:在Python中給多進程提供了進程池類,對於線程,Python2並無直接提供線程池類(Python3中提供了線程池功能),而線程池在並行中應用較普遍,所以實現一個進程池的功能十分必要。本文基於隊列(queue)功能來實現線程池功能。 python

在Python3標準庫中提供了線程池、進程池功能,推薦使用標準庫。 app

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor

實現代碼: 函數

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
__auth__ = "SongWei"
import threading,queue,time
 
class Threadpool:
    '''基於隊列queue實現的線程池'''
 
    def __init__(self,max_thread=1):
        '''建立進程隊列'''
        self.queue = queue.Queue(maxsize=max_thread)
 
    def apply(self,target=None,args=(),callback=None,calljoin=True,**kwargs):
        ''':param callback 回調函數 當子線程函數運行結束後將返回值傳入回調函數
            :param calljoin 布爾值  回調函數是否阻塞進程池 默認True 只有當目標函數和回調函數都執行結束後才視爲該線程結束
            其餘參數同threading.Thread類
            注意:只有當目標函數和回調函數都執行結束後,消息隊列纔會取回值(即回調函數會阻塞線程池)
        '''
        if not callback:
            callback = self._callback
        t = threading.Thread(target=self._decorate(target,callback,calljoin),args=args,**kwargs)
        self.queue.put(t)
        t.start()
 
    def join(self):
        '''
            當線程池中還有未執行結束的子線程時 阻塞主線程
            注意:當calljoin=False時 因回調函數在消息隊列取回後才執行 故join不會等待回調函數
        '''
        while self.queue.qsize():
            time.sleep(0.05)
 
    def _decorate(self,target,callback,calljoin):
        ''':param target 接收一個目標函數
            :param callback 接受一個回調函數
            :param backjoin 布爾值 若爲真 則當回調函數執行結束後才釋放隊列 不然 當目標函數執行結束後就會釋放隊列
            本函數本質上是一個裝飾器,即運行目標函數後,執行隊列取回(self.queque.get()),並將返回值做爲參數執行回調函數。
        '''
        def wrapper(*args,**kwargs):
            res = target(*args,**kwargs)
            if calljoin:
                callback(res)
                self.queue.get()
            else:
                self.queue.get()
                callback(res)
            return res
        return wrapper
 
    def _callback(self,*args,**kwargs):
        '''沒有傳入回調函數時 什麼也不幹'''
        pass
 
調用示例:
result_list = []
def func(arg):
    print('正在等待執行%s' % arg)
    time.sleep(10)
    return arg
 
def back(res):
    print('我已經取回了數據:%s' % res)
    result_list.append(res)
 
pool = Threadpool(max_thread=20)
for i in range(40):
    pool.apply(target=func,args=(i,),callback=back)
pool.join()
print(result_list)
相關文章
相關標籤/搜索