import queue import threading import time class ThreadPool(object): #建立線程池類 def __init__(self, max_num=20): #建立一個最大長度爲20的隊列 self.queue = queue.Queue(max_num) #建立一個隊列 for i in range(max_num): #循環把線程對象加入到隊列中 self.queue.put(threading.Thread) #把線程的類名放進去,執行完這個Queue def get_thread(self): #定義方法從隊列裏獲取線程 return self.queue.get() #在隊列中獲取值 def add_thread(self): #線程執行完任務後,在隊列裏添加線程 self.queue.put(threading.Thread) def func(pool,a1): time.sleep(1) print(a1) pool.add_thread() #線程執行完任務後,隊列裏再加一個線程 p = ThreadPool(10) #執行init方法; 一次最多執行10個線程 for i in range(100): thread = p.get_thread() #線程池10個線程,每一次循環拿走一個拿到類名,沒有就等待 t = thread(target=func, args=(p, i,)) #建立線程; 線程執行func函數的這個任務;args是給函數傳入參數 t.start() #激活線程
線程池要點:
1,建立線程池時,是在須要執行線程的時候建立線程,而不是建立好最大隊列等待執行
2,建立一個回調函數,檢查出剩餘隊列的任務,當線程執行完函數的時候通知線程池,
3,使用線程池時讓其循環獲取任務,並執行
4,線程池,讓其自行的去激活線程,執行完成後,關閉退出html
import queue import threading import time import contextlib StopEvent = object() class ThreadPool(object): def __init__(self, max_num): self.q = queue.Queue() # 最多建立的線程數(線程池最大容量) self.max_num = max_num self.terminal = False #若是爲True 終止全部線程,不在獲取新任務 self.generate_list = [] # 真實建立的線程列表 self.free_list = []# 空閒線程數量 def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數) :return: 若是線程池已經終止,則返回True不然None """ if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() #建立線程 w = (func, args, callback,) #把參數封裝成元祖 self.q.put(w) #添加到任務隊列 def generate_thread(self): """ 建立一個線程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread # 獲取當前線程 self.generate_list.append(current_thread) #添加到已經建立的線程裏 event = self.q.get() # 取任務並執行 while event != StopEvent: # 是元組=》是任務;若是不爲中止信號 執行任務 func, arguments, callback = event #解開任務包; 分別取出值 try: result = func(*arguments) #運行函數,把結果賦值給result status = True #運行結果是否正常 except Exception as e: status = False #表示運行不正常 result = e #結果爲錯誤信息 if callback is not None: #是否存在回調函數 try: callback(status, result) #執行回調函數 except Exception as e: pass if self.terminal: # 默認爲False,若是調用terminal方法 event = StopEvent #等於全局變量,表示中止信號 else: # self.free_list.append(current_thread) #執行完畢任務,添加到閒置列表 # event = self.q.get() #獲取任務 # self.free_list.remove(current_thread) # 獲取到任務以後,從閒置列表中刪除;不是元組,就不是任務 with self.worker_state(self.free_list, current_thread): event = self.q.get() else: self.generate_list.remove(current_thread) #若是收到終止信號,就從已經建立的線程列表中刪除 def close(self): #終止線程 num = len(self.generate_list) #獲取總共建立的線程數 while num: self.q.put(StopEvent) #添加中止信號,有多少線程添加多少表示終止的信號 num -= 1 def terminate(self): #終止線程(清空隊列) self.terminal = True #把默認的False更改爲True while self.generate_list: #若是有已經建立線程存活 self.q.put(StopEvent) #有幾個線程就發幾個終止信號 self.q.empty() #清空隊列 @contextlib.contextmanager def worker_state(self, state_list, worker_thread): state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) def work(i): print(i) pool = ThreadPool(10) for item in range(50): pool.run(func=work, args=(item,)) # 將任務放在隊列中 # 着手開始處理任務 # - 建立線程 # - 有空閒線程,擇再也不建立線程 # - 不能高於線程池的限制 # - 根據任務個數判斷 # - 線程去隊列中取任務 pool.terminate()
詳細參考:http://www.cnblogs.com/wupeiqi/articles/4839959.htmlapp