python自定義線程池

關於python的多線程,由與GIL的存在被廣大羣主所詬病,說python的多線程不是真正的多線程。但多線程處理IO密集的任務效率仍是能夠槓槓的。python

我實現的這個線程池實際上是根據銀角的思路來實現的。多線程

主要思路:


 

  任務獲取和執行:app

  一、任務加入隊列,等待線程來獲取並執行。ide

  二、按需生成線程,每一個線程循環取任務。函數

  線程銷燬:spa

  一、獲取任務是終止符時,線程中止。線程

  二、線程池close()時,向任務隊列加入和已生成線程等量的終止符。設計

  三、線程池terminate()時,設置線程下次任務取到爲終止符。code


流程概要設計:


詳細代碼:

  


 

import threading
import contextlib
from Queue import Queue
import time

class ThreadPool(object):
    def __init__(self, max_num):
        self.StopEvent = 0#線程任務終止符,當線程從隊列獲取到StopEvent時,表明此線程能夠銷燬。可設置爲任意與任務有區別的值。
        self.q = Queue()
        self.max_num = max_num  #最大線程數
        self.terminal = False   #是否設置線程池強制終止
        self.created_list = [] #已建立線程的線程列表
        self.free_list = [] #空閒線程的線程列表
        self.Deamon=False #線程是不是後臺線程

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback:
        :return: 若是線程池已經終止,則返回True不然None
        """

        if len(self.free_list) == 0 and len(self.created_list) < self.max_num:
            self.create_thread()
        task = (func, args, callback,)
        self.q.put(task)

    def create_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)
        t.setDaemon(self.Deamon)
        t.start()
        self.created_list.append(t)#將當前線程加入已建立線程列表created_list

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.current_thread()   #獲取當前線程對象·
        event = self.q.get()    #從任務隊列獲取任務
        while event != self.StopEvent:   #判斷獲取到的任務是不是終止符

            func, arguments, callback = event#從任務中獲取函數名、參數、和回調函數名
            try:
                result = func(*arguments)
                func_excute_status =True#func執行成功狀態
            except Exception as e:
                func_excute_status = False
                result =None
                print '函數執行產生錯誤', e#打印錯誤信息

            if func_excute_status:#func執行成功後才能執行回調函數
                if callback is not None:#判斷回調函數是不是空的
                    try:
                        callback(result)
                    except Exception as e:
                        print '回調函數執行產生錯誤', e  # 打印錯誤信息


            with self.worker_state(self.free_list,current_thread):
                #執行完一次任務後,將線程加入空閒列表。而後繼續去取任務,若是取到任務就將線程從空閒列表移除
                if self.terminal:#判斷線程池終止命令,若是須要終止,則使下次取到的任務爲StopEvent。
                    event = self.StopEvent
                else: #不然繼續獲取任務
                    event = self.q.get()  # 當線程等待任務時,q.get()方法阻塞住線程,使其持續等待

        else:#若線程取到的任務是終止符,就銷燬線程
            #將當前線程從已建立線程列表created_list移除
            self.created_list.remove(current_thread)

    def close(self):
        """
        執行完全部的任務後,全部線程中止
        """
        full_size = len(self.created_list)#按已建立的線程數量往線程隊列加入終止符。
        while full_size:
            self.q.put(self.StopEvent)
            full_size -= 1

    def terminate(self):
        """
        不管是否還有任務,終止線程
        """
        self.terminal = True
        while self.created_list:
            self.q.put(self.StopEvent)

        self.q.queue.clear()#清空任務隊列

    def join(self):
        """
        阻塞線程池上下文,使全部線程執行完後才能繼續
        """
        for t in self.created_list:
            t.join()


    @contextlib.contextmanager#上下文處理器,使其可使用with語句修飾
    def worker_state(self, state_list, worker_thread):
        """
        用於記錄線程中正在等待的線程數
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)






if __name__ == '__main__':
    def Foo(arg):
        return arg
        # time.sleep(0.1)

    def Bar(res):
        print res

    pool=ThreadPool(5)
    # pool.Deamon=True#需在pool.run以前設置
    for i in range(1000):
        pool.run(func=Foo,args=(i,),callback=Bar)
    pool.close()
    pool.join()
    # pool.terminate()

    print "任務隊列裏任務數%s" %pool.q.qsize()
    print "當前存活子線程數量:%d" % threading.activeCount()
    print "當前線程建立列表:%s" %pool.created_list
    print "當前線程建立列表:%s" %pool.free_list
複製代碼
import threading
import contextlib
from Queue import Queue
import time

class ThreadPool(object):
    def __init__(self, max_num):
        self.StopEvent = 0#線程任務終止符,當線程從隊列獲取到StopEvent時,表明此線程能夠銷燬。可設置爲任意與任務有區別的值。
        self.q = Queue()
        self.max_num = max_num  #最大線程數
        self.terminal = False   #是否設置線程池強制終止
        self.created_list = [] #已建立線程的線程列表
        self.free_list = [] #空閒線程的線程列表
        self.Deamon=False #線程是不是後臺線程

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback:
        :return: 若是線程池已經終止,則返回True不然None
        """

        if len(self.free_list) == 0 and len(self.created_list) < self.max_num:
            self.create_thread()
        task = (func, args, callback,)
        self.q.put(task)

    def create_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)
        t.setDaemon(self.Deamon)
        t.start()
        self.created_list.append(t)#將當前線程加入已建立線程列表created_list

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.current_thread()   #獲取當前線程對象·
        event = self.q.get()    #從任務隊列獲取任務
        while event != self.StopEvent:   #判斷獲取到的任務是不是終止符

            func, arguments, callback = event#從任務中獲取函數名、參數、和回調函數名
            try:
                result = func(*arguments)
                func_excute_status =True#func執行成功狀態
            except Exception as e:
                func_excute_status = False
                result =None
                print '函數執行產生錯誤', e#打印錯誤信息

            if func_excute_status:#func執行成功後才能執行回調函數
                if callback is not None:#判斷回調函數是不是空的
                    try:
                        callback(result)
                    except Exception as e:
                        print '回調函數執行產生錯誤', e  # 打印錯誤信息


            with self.worker_state(self.free_list,current_thread):
                #執行完一次任務後,將線程加入空閒列表。而後繼續去取任務,若是取到任務就將線程從空閒列表移除
                if self.terminal:#判斷線程池終止命令,若是須要終止,則使下次取到的任務爲StopEvent。
                    event = self.StopEvent
                else: #不然繼續獲取任務
                    event = self.q.get()  # 當線程等待任務時,q.get()方法阻塞住線程,使其持續等待

        else:#若線程取到的任務是終止符,就銷燬線程
            #將當前線程從已建立線程列表created_list移除
            self.created_list.remove(current_thread)

    def close(self):
        """
        執行完全部的任務後,全部線程中止
        """
        full_size = len(self.created_list)#按已建立的線程數量往線程隊列加入終止符。
        while full_size:
            self.q.put(self.StopEvent)
            full_size -= 1

    def terminate(self):
        """
        不管是否還有任務,終止線程
        """
        self.terminal = True
        while self.created_list:
            self.q.put(self.StopEvent)

        self.q.queue.clear()#清空任務隊列

    def join(self):
        """
        阻塞線程池上下文,使全部線程執行完後才能繼續
        """
        for t in self.created_list:
            t.join()


    @contextlib.contextmanager#上下文處理器,使其可使用with語句修飾
    def worker_state(self, state_list, worker_thread):
        """
        用於記錄線程中正在等待的線程數
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)






if __name__ == '__main__':
    def Foo(arg):
        return arg
        # time.sleep(0.1)

    def Bar(res):
        print res

    pool=ThreadPool(5)
    # pool.Deamon=True#需在pool.run以前設置
    for i in range(1000):
        pool.run(func=Foo,args=(i,),callback=Bar)
    pool.close()
    pool.join()
    # pool.terminate()

    print "任務隊列裏任務數%s" %pool.q.qsize()
    print "當前存活子線程數量:%d" % threading.activeCount()
    print "當前線程建立列表:%s" %pool.created_list
    print "當前線程建立列表:%s" %pool.free_list
複製代碼

 

 關於上下文處理:


 

來個簡單例子說明:
下面的代碼手動自定義了一個myopen方法,模擬咱們常見的with open() as f:語句。具體的contextlib模塊使用,會單獨開章來將。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# coding:utf-8
import  contextlib
 
@contextlib .contextmanager   #定義該函數支持上下文with語句
def  myopen(filename,mode):
     f = open (filename,mode)
     try :
         yield  f.readlines()   #正常執行返回f.readlines()
     except  Exception as e:
         print  e
 
     finally :
         f.close()   #最後在with代碼快執行完畢後返回執行finally下的f.close()實現關閉文件
 
if  __name__  = =  '__main__' :
     with myopen(r 'c:\ip1.txt' , 'r' ) as f:
         for  line  in  f:
             print  line

總結


  實現這個線程池我吐血三升啊。對象

相關文章
相關標籤/搜索