#!/user/bin/evn python # -*- coding:utf-8 -*- import threading import queue,time ''' 線程池的思路: 將任務依次放在隊列裏面 而後從隊列取出任務交給線程執行 ''' stopEvent=object()#任務完了的標誌---下面咱們將任務包封裝到元組中 class ThreadPool(object): def __init__(self,max_num): #建立隊列 self.q=queue.Queue() #建立線程最大的數量(線程池的最大容量) self.max_num=max_num #空閒線程列表(數量) self.free_list=[] #真實建立的線程列表(數量) self.gemerate_list=[] #中斷任務標誌 self.terminal=False self.num=0 def run(self,func,args,callback=None): #func:任務函數 #args:任務函數的參數 #callback:線程執行成功或者失敗後執行的回調函數 task=(func,args,callback)#將任務封裝到元組中 ==任務包 #將任務包放到隊列中 self.q.put(task) #建立線程 if len(self.free_list)==0 and len(self.gemerate_list)<self.max_num : self.generate_thread() #建立線程 def generate_thread(self): #建立一個線程 t=threading.Thread(target=self.call) t.start() def call(self): ''' 循環去獲取任務函數並執行任務函數 ''' #獲取當前線程 current_thread=threading.currentThread #將當前線程添加到列表中 self.gemerate_list.append(current_thread) #獲取任務 Event=self.q.get() while Event!=stopEvent :#表示是任務 #分解任務包 func,args,callable=Event status=True #標誌執行成功 try: #執行任務 ret=func(*args) except Exception as e: status=False ret=e #執行回調函數callback if callback==None: pass else: callback(status,ret) if self.terminal :#不終止任務 Event=stopEvent else: #標記:我空閒了 self.free_list.append(current_thread) #再從隊列去取任務 Event=self.q.get() #將線程從空閒列表中移除 self.free_list.remove(current_thread) else:#表示不是任務 self.gemerate_list.remove(current_thread) #任務執行完畢後 中止運行 def close(self): time.sleep(2) num=len(self.gemerate_list) print(num) while num: self.q.put(stopEvent)#往隊列添加中止標誌(建立了多少線程,就添加多少) num-=1 def terminals(self): self.terminal=True#標記任務終止 while self.gemerate_list: self.q.put(stopEvent) self.q.empty()#清空隊列 #回調函數 def callback(statue,result): # print(statue) # print(result) pass #任務函數 def action(args): time.sleep(1) print(args) return args #建立線程池對象 pool=ThreadPool(10) for item in range(100): ''' #將任務放在隊列中 #着手開始處理任務(線程處理) --建立線程 有空閒線程,則再也不建立線程 沒有空閒線程,開始建立線程 1.不能高於線程池的限制 2.根據任務來判斷 --去隊列取任務 ''' pool.run(func=action,args=(item,),callback=callback) # pool.close()#任務執行完後 #pool.terminals()終止任務