發一個可伸縮線程池大小的python線程池。已經過測試。

發一個可伸縮線程池大小的線程池。java

當任務很少時候,不開那麼多線程,當任務多的時候開更多線程。當長時間沒任務時候,將線程數量減少到必定數量。python

 java的Threadpoolexcutor能夠這樣,py的不行,修改爲具有這樣特性的線程池。多線程

 

 

"""
可自動實時調節線程數量的線程池。

"""

import atexit
import queue
import sys
import threading
import time
import weakref

from app.utils_ydf import LoggerMixin, nb_print, LoggerLevelSetterMixin

# noinspection PyShadowingBuiltins
# print = nb_print

_shutdown = False
_threads_queues = weakref.WeakKeyDictionary()


def _python_exit():
    global _shutdown
    _shutdown = True
    items = list(_threads_queues.items())
    for t, q in items:
        q.put(None)
    for t, q in items:
        t.join()


atexit.register(_python_exit)


class _WorkItem(LoggerMixin):
    def __init__(self, fn, args, kwargs):
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        # noinspection PyBroadException
        try:
            self.fn(*self.args, **self.kwargs)
        except BaseException as exc:
            self.logger.exception(f'函數 {self.fn.__name__} 中發生錯誤,錯誤緣由是 {type(exc)} {exc} ')

    def __str__(self):
        return f'{(self.fn.__name__, self.args, self.kwargs)}'


class CustomThreadPoolExecutor(LoggerMixin, LoggerLevelSetterMixin):
    def __init__(self, max_workers=None, thread_name_prefix=''):
        """
        最好須要兼容官方concurren.futures.ThreadPoolExecutor 和改版的BoundedThreadPoolExecutor,入參名字和個數保持了一致。
        :param max_workers:
        :param thread_name_prefix:
        """
        self._max_workers = max_workers or 4
        self._min_workers = 5
        self._thread_name_prefix = thread_name_prefix
        self.work_queue = queue.Queue(max_workers)
        # self._threads = set()
        self._threads = weakref.WeakSet()
        self._lock_compute_threads_free_count = threading.Lock()
        self.threads_free_count = 0
        self._shutdown = False
        self._shutdown_lock = threading.Lock()

    def set_min_workers(self, min_workers=5):
        self._min_workers = min_workers
        return self

    def change_threads_free_count(self, change_num):
        with self._lock_compute_threads_free_count:
            self.threads_free_count += change_num

    def submit(self, func, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('不能添加新的任務到線程池')
        self.work_queue.put(_WorkItem(func, args, kwargs))
        self._adjust_thread_count()

    def _adjust_thread_count(self):
        # if len(self._threads) < self._threads_num:
        self.logger.debug((self.threads_free_count, len(self._threads), len(_threads_queues), get_current_threads_num()))
        if self.threads_free_count < self._min_workers and len(self._threads) < self._max_workers:
            # t = threading.Thread(target=_work,
            #                      args=(self._work_queue,self))
            t = _CustomThread(self).set_log_level(self.logger.level)
            t.setDaemon(True)    # 這裏注意是守護線程。由於線程池裏面的每一個線程內部進入while 1了,這樣可以隨時接受任務,若是不使用守護線程,會形成了程序主線程來結束了,但程序仍然沒法結束。使用守護線程既能無限得到要執行的任務,又能使代碼結束。
            t.start()
            self._threads.add(t)
            _threads_queues[t] = self.work_queue

    def shutdown(self, wait=True):
        with self._shutdown_lock:
            self._shutdown = True
            self.work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False


class _CustomThread(threading.Thread, LoggerMixin, LoggerLevelSetterMixin):
    def __init__(self, executorx: CustomThreadPoolExecutor):
        super().__init__()
        self._executorx = executorx
        self._run_times = 0

    def _remove_thread(self, stop_resson=''):
        # noinspection PyUnresolvedReferences
        self.logger.debug(f'中止線程 {self._ident}, 觸發條件是 {stop_resson} ')
        self._executorx.change_threads_free_count(-1)
        self._executorx._threads.remove(self)
        _threads_queues.pop(self)

    # noinspection PyProtectedMember
    def run(self):
        # noinspection PyUnresolvedReferences
        self.logger.debug(f'新啓動線程 {self._ident} ')
        self._executorx.change_threads_free_count(1)
        while True:
            try:
                work_item = self._executorx.work_queue.get(block=True, timeout=60)
            except queue.Empty:
                # continue
                # self._remove_thread()
                # break
                if self._executorx.threads_free_count > self._executorx._min_workers:
                    self._remove_thread(f'當前線程超過60秒沒有任務,線程池中不在工做狀態中的線程數量是 {self._executorx.threads_free_count},超過了指定的數量 {self._executorx._min_workers}')
                    break
                else:
                    continue

            # nb_print(work_item)
            if work_item is not None:
                self._executorx.change_threads_free_count(-1)
                work_item.run()
                del work_item
                self._executorx.change_threads_free_count(1)
                self._run_times += 1
                if self._run_times == 50:
                    self._remove_thread(f'運行超過了50次,銷燬線程')
                    break
                continue
            if _shutdown or self._executorx._shutdown:
                self._executorx.work_queue.put(None)
                break


# @decorators.tomorrow_threads(20)
def show_current_threads_num(sleep_time=60, process_name='', block=False):
    process_name = sys.argv[0] if process_name == '' else process_name

    def _show_current_threads_num():
        while True:
            nb_print(f'{process_name} 進程 的 線程數量是 -->  {threading.active_count()}')
            time.sleep(sleep_time)

    if block:
        _show_current_threads_num()
    else:
        t = threading.Thread(target=_show_current_threads_num, daemon=True)
        t.start()


def get_current_threads_num():
    return threading.active_count()


if __name__ == '__main__':
    from app.utils_ydf import decorators, BoundedThreadPoolExecutor


    # @decorators.keep_circulating(1)
    def f1(a):
        time.sleep(0.2)
        nb_print(f'{a} 。。。。。。。')
        # raise Exception('拋個錯誤測試')


    # show_current_threads_num()
    pool = CustomThreadPoolExecutor(200).set_log_level(10).set_min_workers()
    # pool = BoundedThreadPoolExecutor(200)   # 測試對比原來寫的BoundedThreadPoolExecutor
    show_current_threads_num(sleep_time=5)
    for i in range(300):
        time.sleep(0.3)  # 這裏的間隔時間模擬,當任務來臨不密集,只須要少許線程就能搞定f1了,由於f1的消耗時間短,不須要開那麼多線程,CustomThreadPoolExecutor比BoundedThreadPoolExecutor 優點之一。
        pool.submit(f1, str(i))

    nb_print(6666)
    # pool.shutdown(wait=True)
    pool.submit(f1, 'yyyy')

    # 下面測試阻塞主線程退出的狀況。註釋掉能夠測主線程退出的狀況。
    while True:
        time.sleep(10)
相關文章
相關標籤/搜索