關於python的多線程,由與GIL的存在被廣大羣主所詬病,說python的多線程不是真正的多線程。但多線程處理IO密集的任務效率仍是能夠槓槓的。python
我實現的這個線程池實際上是根據銀角的思路來實現的。多線程
任務獲取和執行:app
一、任務加入隊列,等待線程來獲取並執行。ide
二、按需生成線程,每一個線程循環取任務。函數
線程銷燬:spa
一、獲取任務是終止符時,線程中止。線程
二、線程池close()時,向任務隊列加入和已生成線程等量的終止符。設計
三、線程池terminate()時,設置線程下次任務取到爲終止符。code
import threading import contextlib from Queue import Queue import time class ThreadPool(object): def __init__(self, max_num): self.StopEvent = 0#線程任務終止符,當線程從隊列獲取到StopEvent時,表明此線程能夠銷燬。可設置爲任意與任務有區別的值。 self.q = Queue() self.max_num = max_num #最大線程數 self.terminal = False #是否設置線程池強制終止 self.created_list = [] #已建立線程的線程列表 self.free_list = [] #空閒線程的線程列表 self.Deamon=False #線程是不是後臺線程 def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: :return: 若是線程池已經終止,則返回True不然None """ if len(self.free_list) == 0 and len(self.created_list) < self.max_num: self.create_thread() task = (func, args, callback,) self.q.put(task) def create_thread(self): """ 建立一個線程 """ t = threading.Thread(target=self.call) t.setDaemon(self.Deamon) t.start() self.created_list.append(t)#將當前線程加入已建立線程列表created_list def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.current_thread() #獲取當前線程對象· event = self.q.get() #從任務隊列獲取任務 while event != self.StopEvent: #判斷獲取到的任務是不是終止符 func, arguments, callback = event#從任務中獲取函數名、參數、和回調函數名 try: result = func(*arguments) func_excute_status =True#func執行成功狀態 except Exception as e: func_excute_status = False result =None print '函數執行產生錯誤', e#打印錯誤信息 if func_excute_status:#func執行成功後才能執行回調函數 if callback is not None:#判斷回調函數是不是空的 try: callback(result) except Exception as e: print '回調函數執行產生錯誤', e # 打印錯誤信息 with self.worker_state(self.free_list,current_thread): #執行完一次任務後,將線程加入空閒列表。而後繼續去取任務,若是取到任務就將線程從空閒列表移除 if self.terminal:#判斷線程池終止命令,若是須要終止,則使下次取到的任務爲StopEvent。 event = self.StopEvent else: #不然繼續獲取任務 event = self.q.get() # 當線程等待任務時,q.get()方法阻塞住線程,使其持續等待 else:#若線程取到的任務是終止符,就銷燬線程 #將當前線程從已建立線程列表created_list移除 self.created_list.remove(current_thread) def close(self): """ 執行完全部的任務後,全部線程中止 """ full_size = len(self.created_list)#按已建立的線程數量往線程隊列加入終止符。 while full_size: self.q.put(self.StopEvent) full_size -= 1 def terminate(self): """ 不管是否還有任務,終止線程 """ self.terminal = True while self.created_list: self.q.put(self.StopEvent) self.q.queue.clear()#清空任務隊列 def join(self): """ 阻塞線程池上下文,使全部線程執行完後才能繼續 """ for t in self.created_list: t.join() @contextlib.contextmanager#上下文處理器,使其可使用with語句修飾 def worker_state(self, state_list, worker_thread): """ 用於記錄線程中正在等待的線程數 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) if __name__ == '__main__': def Foo(arg): return arg # time.sleep(0.1) def Bar(res): print res pool=ThreadPool(5) # pool.Deamon=True#需在pool.run以前設置 for i in range(1000): pool.run(func=Foo,args=(i,),callback=Bar) pool.close() pool.join() # pool.terminate() print "任務隊列裏任務數%s" %pool.q.qsize() print "當前存活子線程數量:%d" % threading.activeCount() print "當前線程建立列表:%s" %pool.created_list print "當前線程建立列表:%s" %pool.free_list
import threading import contextlib from Queue import Queue import time class ThreadPool(object): def __init__(self, max_num): self.StopEvent = 0#線程任務終止符,當線程從隊列獲取到StopEvent時,表明此線程能夠銷燬。可設置爲任意與任務有區別的值。 self.q = Queue() self.max_num = max_num #最大線程數 self.terminal = False #是否設置線程池強制終止 self.created_list = [] #已建立線程的線程列表 self.free_list = [] #空閒線程的線程列表 self.Deamon=False #線程是不是後臺線程 def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: :return: 若是線程池已經終止,則返回True不然None """ if len(self.free_list) == 0 and len(self.created_list) < self.max_num: self.create_thread() task = (func, args, callback,) self.q.put(task) def create_thread(self): """ 建立一個線程 """ t = threading.Thread(target=self.call) t.setDaemon(self.Deamon) t.start() self.created_list.append(t)#將當前線程加入已建立線程列表created_list def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.current_thread() #獲取當前線程對象· event = self.q.get() #從任務隊列獲取任務 while event != self.StopEvent: #判斷獲取到的任務是不是終止符 func, arguments, callback = event#從任務中獲取函數名、參數、和回調函數名 try: result = func(*arguments) func_excute_status =True#func執行成功狀態 except Exception as e: func_excute_status = False result =None print '函數執行產生錯誤', e#打印錯誤信息 if func_excute_status:#func執行成功後才能執行回調函數 if callback is not None:#判斷回調函數是不是空的 try: callback(result) except Exception as e: print '回調函數執行產生錯誤', e # 打印錯誤信息 with self.worker_state(self.free_list,current_thread): #執行完一次任務後,將線程加入空閒列表。而後繼續去取任務,若是取到任務就將線程從空閒列表移除 if self.terminal:#判斷線程池終止命令,若是須要終止,則使下次取到的任務爲StopEvent。 event = self.StopEvent else: #不然繼續獲取任務 event = self.q.get() # 當線程等待任務時,q.get()方法阻塞住線程,使其持續等待 else:#若線程取到的任務是終止符,就銷燬線程 #將當前線程從已建立線程列表created_list移除 self.created_list.remove(current_thread) def close(self): """ 執行完全部的任務後,全部線程中止 """ full_size = len(self.created_list)#按已建立的線程數量往線程隊列加入終止符。 while full_size: self.q.put(self.StopEvent) full_size -= 1 def terminate(self): """ 不管是否還有任務,終止線程 """ self.terminal = True while self.created_list: self.q.put(self.StopEvent) self.q.queue.clear()#清空任務隊列 def join(self): """ 阻塞線程池上下文,使全部線程執行完後才能繼續 """ for t in self.created_list: t.join() @contextlib.contextmanager#上下文處理器,使其可使用with語句修飾 def worker_state(self, state_list, worker_thread): """ 用於記錄線程中正在等待的線程數 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) if __name__ == '__main__': def Foo(arg): return arg # time.sleep(0.1) def Bar(res): print res pool=ThreadPool(5) # pool.Deamon=True#需在pool.run以前設置 for i in range(1000): pool.run(func=Foo,args=(i,),callback=Bar) pool.close() pool.join() # pool.terminate() print "任務隊列裏任務數%s" %pool.q.qsize() print "當前存活子線程數量:%d" % threading.activeCount() print "當前線程建立列表:%s" %pool.created_list print "當前線程建立列表:%s" %pool.free_list
來個簡單例子說明:
下面的代碼手動自定義了一個myopen方法,模擬咱們常見的with open() as f:語句。具體的contextlib模塊使用,會單獨開章來將。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# coding:utf-8
import
contextlib
@contextlib
.contextmanager
#定義該函數支持上下文with語句
def
myopen(filename,mode):
f
=
open
(filename,mode)
try
:
yield
f.readlines()
#正常執行返回f.readlines()
except
Exception as e:
print
e
finally
:
f.close()
#最後在with代碼快執行完畢後返回執行finally下的f.close()實現關閉文件
if
__name__
=
=
'__main__'
:
with myopen(r
'c:\ip1.txt'
,
'r'
) as f:
for
line
in
f:
print
line
|
實現這個線程池我吐血三升啊。對象