Python 自定義線程池

"""思路1,將任務放在隊列    1)建立隊列:(初始化)    2)設置大小,線程池的最大容量    3)真實建立的線程 列表    4)空閒的線程數量2,着手開始處理任務    1)建立線程        2)空閒線程數量大於0,則再也不建立線程        3)建立線程池的數量 不能高於線程池的限制        4)根據任務個數判斷  建立線程的數量    2)線程去隊列中取任務        1)取任務包(任務包是一個元祖)        2)任務爲空時,再也不取(終止)"""import timeimport threadingimport queuestopEvent = object()  # 中止任務的標誌class ThreadPool(object):    def __init__(self, max_thread):        # 建立任務隊列,能夠放無限個任務        self.queue = queue.Queue()        # 指定最大線程數        self.max_thread = max_thread        # 中止標誌        self.terminal = False        # 建立真實線程數        self.generate_list = []        # 空閒線程數        self.free_thread = []    def run(self, action, args, callback=None):        """        線程池執行一個任務        :param action:任務函數        :param args:任務參數        :param callback:執行完任務的回調函數,成功或者失敗的返回值。        :return:        """        # 線程池運行的條件:1)        if len(self.free_thread) == 0 and len(self.generate_list) < self.max_thread:            self.generate_thread()        task = (action, args, callback)        self.queue.put(task)    def callback(self):        """        回調函數:循環取獲取任務,並執行任務函數        :return:        """        # 獲取當前線程        current_thread = threading.current_thread()        self.generate_list.append(current_thread)        # 取任務並執行        event = self.queue.get()        # 事件類型是任務        while event != stopEvent:  # 重點是這個判斷  使任務終止            # 解開任務包 ,(任務是一個元祖)            # 執行任務            # 標記:執行任務前的狀態,執行任務後的狀態            action, args, callback = event            try:                ret = action(*args)                success = True            except Exception as x:                success = False                ret = x            if callback is not None:                try:                    callback(success, ret)                except Exception as e:                    print(e)            else:                pass            if not self.terminal:                self.free_thread.append(current_thread)                event = self.queue.get()                self.free_thread.remove(current_thread)            else:                # 中止進行取任務                event = stopEvent        else:            # 不是元祖,不是任務,則清空當前線程,不在去取任務            self.generate_list.remove(current_thread)    def generate_thread(self):        """        建立一個線程        :return:        """        t = threading.Thread(target=self.callback)        t.start()    # 終止取任務    def terminals(self):        """        不管是否還有任務,終止線程        :return:        """        self.terminal = True    def close(self):        """        執行完全部的任務後,全部線程中止        :return:        """        num = len(self.generate_list)        self.queue.empty()        while num:            self.queue.put(stopEvent)            num -= 1def test(pi):    time.sleep(0.5)    print(pi)pool = ThreadPool(10)for i in range(100):    pool.run(action=test, args=(i,))pool.terminals()pool.close()
相關文章
相關標籤/搜索