python實現線程池

線程池

 

簡單線程池

import queue
import threading
import time

class ThreadPool(object):  #建立線程池類

    def __init__(self, max_num=20):  #建立一個最大長度爲20的隊列
        self.queue = queue.Queue(max_num)  #建立一個隊列
        for i in range(max_num):  #循環把線程對象加入到隊列中
            self.queue.put(threading.Thread)  #把線程的類名放進去,執行完這個Queue

    def get_thread(self):  #定義方法從隊列裏獲取線程
        return self.queue.get()  #在隊列中獲取值

    def add_thread(self):  #線程執行完任務後,在隊列裏添加線程
        self.queue.put(threading.Thread)




def func(pool,a1):
    time.sleep(1)
    print(a1)
    pool.add_thread()  #線程執行完任務後,隊列裏再加一個線程

p = ThreadPool(10)  #執行init方法;  一次最多執行10個線程

for i in range(100):
    thread = p.get_thread()  #線程池10個線程,每一次循環拿走一個拿到類名,沒有就等待
    t = thread(target=func, args=(p, i,))  #建立線程;  線程執行func函數的這個任務;args是給函數傳入參數
    t.start()  #激活線程

 

複雜線程池

線程池要點:
1,建立線程池時,是在須要執行線程的時候建立線程,而不是建立好最大隊列等待執行
2,建立一個回調函數,檢查出剩餘隊列的任務,當線程執行完函數的時候通知線程池,
3,使用線程池時讓其循環獲取任務,並執行
4,線程池,讓其自行的去激活線程,執行完成後,關閉退出html

import queue
import threading
import time
import contextlib

StopEvent = object()


class ThreadPool(object):

    def __init__(self, max_num):
        self.q = queue.Queue()  # 最多建立的線程數(線程池最大容量)
        self.max_num = max_num

        self.terminal = False  #若是爲True 終止全部線程,不在獲取新任務
        self.generate_list = []  # 真實建立的線程列表
        self.free_list = []# 空閒線程數量

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
        :return: 若是線程池已經終止,則返回True不然None
        """

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()  #建立線程
        w = (func, args, callback,)  #把參數封裝成元祖
        self.q.put(w)  #添加到任務隊列

    def generate_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread  # 獲取當前線程
        self.generate_list.append(current_thread)  #添加到已經建立的線程裏

        event = self.q.get()  # 取任務並執行
        while event != StopEvent:  # 是元組=》是任務;若是不爲中止信號  執行任務

            func, arguments, callback = event  #解開任務包; 分別取出值
            try:
                result = func(*arguments)  #運行函數,把結果賦值給result
                status = True  #運行結果是否正常
            except Exception as e:
                status = False  #表示運行不正常
                result = e  #結果爲錯誤信息

            if callback is not None:  #是否存在回調函數
                try:
                    callback(status, result)  #執行回調函數
                except Exception as e:
                    pass

            if self.terminal:  # 默認爲False,若是調用terminal方法
                event = StopEvent  #等於全局變量,表示中止信號
            else:
                # self.free_list.append(current_thread)  #執行完畢任務,添加到閒置列表
                # event = self.q.get()  #獲取任務
                # self.free_list.remove(current_thread)  # 獲取到任務以後,從閒置列表中刪除;不是元組,就不是任務
                with self.worker_state(self.free_list, current_thread):
                    event = self.q.get()

        else:
            self.generate_list.remove(current_thread)  #若是收到終止信號,就從已經建立的線程列表中刪除

    def close(self):  #終止線程
        num = len(self.generate_list)  #獲取總共建立的線程數
        while num:
            self.q.put(StopEvent)  #添加中止信號,有多少線程添加多少表示終止的信號
            num -= 1


    def terminate(self):   #終止線程(清空隊列)

        self.terminal = True  #把默認的False更改爲True

        while self.generate_list:  #若是有已經建立線程存活
            self.q.put(StopEvent)  #有幾個線程就發幾個終止信號
        self.q.empty()  #清空隊列

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)




def work(i):
    print(i)

pool = ThreadPool(10)
for item in range(50):
    pool.run(func=work, args=(item,))
# 將任務放在隊列中
#      着手開始處理任務
#         - 建立線程
#                 - 有空閒線程,擇再也不建立線程
#                 - 不能高於線程池的限制
#                 - 根據任務個數判斷
#         - 線程去隊列中取任務

pool.terminate()

 

 

詳細參考:http://www.cnblogs.com/wupeiqi/articles/4839959.htmlapp

相關文章
相關標籤/搜索