使用gevent包實現concurrent.futures.executor 相同的公有方法。組成鴨子類

類名不一樣,但公有方法的名字和提供的基本功能大體相同,但兩個類沒有共同繼承的祖先或者抽象類 接口來規定他,叫鴨子類。併發

 

使併發核心池可以在 threadpoolexetor和geventpoolexecutor自由選一種切換。app

 

 

實現方式。socket

# -*- coding: utf-8 -*-
# @Author  : ydf
# @Time    : 2019/7/2 14:11
import atexit
import time
import warnings
from collections import Callable

import gevent
from gevent import pool as gevent_pool
from gevent import monkey

from gevent.queue import JoinableQueue

from app.utils_ydf import LoggerMixin, nb_print, LogManager


def check_gevent_monkey_patch(raise_exc=True):
    if not monkey.is_module_patched('socket'):
        if raise_exc:
            warnings.warn(f'檢測到 你尚未打gevent包的猴子補丁,請在所運行的腳本第一行寫上  【import gevent.monkey;gevent.monkey.patch_all()】  這句話。')
            raise Exception(f'檢測到 你尚未打gevent包的猴子補丁,請在所運行的腳本第一行寫上  【import gevent.monkey;gevent.monkey.patch_all()】  這句話。')
    else:
        return 1


logger_gevent_timeout_deco = LogManager('logger_gevent_timeout_deco').get_logger_and_add_handlers()


def gevent_timeout_deco(timeout_t):
    def _gevent_timeout_deco(f):
        def __gevent_timeout_deceo(*args, **kwargs):
            timeout = gevent.Timeout(timeout_t, )
            timeout.start()
            try:
                f(*args, **kwargs)
            except gevent.Timeout as t:
                logger_gevent_timeout_deco.error(f'函數 {f} 運行超過了 {timeout_t} 秒')
                if t is not timeout:
                    nb_print(t)
                    # raise  # not my timeout
            finally:
                timeout.close()

        return __gevent_timeout_deceo

    return _gevent_timeout_deco


class GeventPoolExecutor(gevent_pool.Pool):
    def __init__(self, size=None, ):
        check_gevent_monkey_patch()
        super().__init__(size, )

    def submit(self, *args, **kwargs):
        self.spawn(*args, **kwargs)

    def shutdown(self):
        self.join()


if __name__ == '__main__':
    monkey.patch_all()


    def f2(x):

        time.sleep(1)
        nb_print(x)


    pool = GeventPoolExecutor(4)
  
    for i in range(15):
        nb_print(f'放入{i}')
        pool.submit(gevent_timeout_deco(8)(f2), i)
    nb_print(66666666)

 

對於收尾任務,threadpoolexecutor和這個還有少許不一樣,這個geventpool在腳本退出前不去主動join(shutdown)他,最後四個任務就會丟失 。 函數

threadpoolexecutor起的是守護線程,按道理也會出現這樣的結果,可是concurrent包裏面作了atexit處理。這裏也能夠使用atexit.register註冊shutdown達到一樣的目的,不須要手動調用join防止腳本提早退出。spa

 

 

實現eventlet的核心池,同理。線程

相關文章
相關標籤/搜索