"""思路1,將任務放在隊列 1)建立隊列:(初始化) 2)設置大小,線程池的最大容量 3)真實建立的線程 列表 4)空閒的線程數量2,着手開始處理任務 1)建立線程 2)空閒線程數量大於0,則再也不建立線程 3)建立線程池的數量 不能高於線程池的限制 4)根據任務個數判斷 建立線程的數量 2)線程去隊列中取任務 1)取任務包(任務包是一個元祖) 2)任務爲空時,再也不取(終止)"""import timeimport threadingimport queuestopEvent = object() # 中止任務的標誌class ThreadPool(object): def __init__(self, max_thread): # 建立任務隊列,能夠放無限個任務 self.queue = queue.Queue() # 指定最大線程數 self.max_thread = max_thread # 中止標誌 self.terminal = False # 建立真實線程數 self.generate_list = [] # 空閒線程數 self.free_thread = [] def run(self, action, args, callback=None): """ 線程池執行一個任務 :param action:任務函數 :param args:任務參數 :param callback:執行完任務的回調函數,成功或者失敗的返回值。 :return: """ # 線程池運行的條件:1) if len(self.free_thread) == 0 and len(self.generate_list) < self.max_thread: self.generate_thread() task = (action, args, callback) self.queue.put(task) def callback(self): """ 回調函數:循環取獲取任務,並執行任務函數 :return: """ # 獲取當前線程 current_thread = threading.current_thread() self.generate_list.append(current_thread) # 取任務並執行 event = self.queue.get() # 事件類型是任務 while event != stopEvent: # 重點是這個判斷 使任務終止 # 解開任務包 ,(任務是一個元祖) # 執行任務 # 標記:執行任務前的狀態,執行任務後的狀態 action, args, callback = event try: ret = action(*args) success = True except Exception as x: success = False ret = x if callback is not None: try: callback(success, ret) except Exception as e: print(e) else: pass if not self.terminal: self.free_thread.append(current_thread) event = self.queue.get() self.free_thread.remove(current_thread) else: # 中止進行取任務 event = stopEvent else: # 不是元祖,不是任務,則清空當前線程,不在去取任務 self.generate_list.remove(current_thread) def generate_thread(self): """ 建立一個線程 :return: """ t = threading.Thread(target=self.callback) t.start() # 終止取任務 def terminals(self): """ 不管是否還有任務,終止線程 :return: """ self.terminal = True def close(self): """ 執行完全部的任務後,全部線程中止 :return: """ num = len(self.generate_list) self.queue.empty() while num: self.queue.put(stopEvent) num -= 1def test(pi): time.sleep(0.5) print(pi)pool = ThreadPool(10)for i in range(100): pool.run(action=test, args=(i,))pool.terminals()pool.close()