python萬能消費框架,新增7種中間件(或操做mq的包)和三種併發模式。

新增的中間件和併發模式見註釋。python

消息隊列中間件方面celery支持的,都要支持。併發模式,celery支持的都要支持。linux

從無限重複類似代碼抽取框架,作成萬能複用,是生產力的保障。redis

 

使用模板模式使加新中間件時候,在改實現消費框架的代碼很是方便,不會影響到原有中間件使用。sql

使用策略模式使加入新的併發模式,,在改實現消費框架的代碼很是方便,不會影響到原有併發模式。mongodb

因此實現消費框架的代碼雖然很長有1000多行,但修改和增長的時候不會出現如履薄冰的懼怕情緒。json

 

使用工廠模式,使得調用框架時候,很是容易切換基於不一樣消息中間件的使用,只須要改一個數字就改變消費和推送代碼使用的中間件。安全

使快速測試不一樣種類的中間件和併發方式變得很容易。 數據結構

 

7種中間件包括使用pika  rabbitpy  aqpstorm操做rabbitmq、基於redis的list數據結構、基於mongo queue包實現的mongo消息隊列 、基於python Queue對象的消息隊列(隨着python解釋器退出而消失)、基於使用persitqueue包實現的sqllite3本地持久化隊列。多線程

3種併發模式爲thread、gevent、evenlet模式。(支持基於多進程的分佈式模式,因爲啓動進程必須是在___name__ = main裏面,因此須要用戶本身寫,本身寫Process(target=f).start()。若是要使用多進程,通常也是前面三種模式 加 進程模式配合,例如多進程 + gevent ,只有100%純cpu計算的才適合純多進程。)併發

使用多進程的目的,開32進程,最高可使雙路e5 32核 cpu使用率達到3200%, 將cpu打滿 ,充分利用cpu資源。 若是不開多進程,就算程序忙的要命,cpu使用率過不了110%,浪費cpu資源。

 

 

   1 # -*- coding: utf-8 -*-
   2 # @Author  : ydf
   3 
   4 """
   5 類celery的worker模式,可用於一切須要分佈式併發的地方,最好是io類型的。能夠分佈式調度起一切函數。
   6 rabbitmq生產者和消費者框架。徹底實現了celery worker模式的所有功能,使用更簡單。支持自動重試指定次數,
   7 消費確認,指定數量的併發線程,和指定頻率控制1秒鐘只運行幾回, 同時對mongodb類型的異常作了特殊處理
   8 最開始寫得是使用pika包,非線程安全,後來加入rabbitpy,rabbitpy包推送會丟失部分數據,推薦pika包使用
   9 單下劃線表明保護,雙下劃線表明私有。只要關注公有方法就能夠,其他是類內部自調用方法。
  10 
  11 
  12 
  13 3月15日
  14 1)、新增RedisConsumer 是基於redis中間件的消費框架,不支持隨意暫停程序或者斷點,會丟失一部分正在運行中的任務,推薦使用rabbitmq的方式。
  15 get_consumer是使用工廠模式來生成基於rabbit和reids的消費者,使用不一樣中間件的消費框架更靈活一點點,只須要修改一個數字。
  16 
  17 3月20日
  18 2)、增長支持函數參數過濾的功能,能夠隨時放心屢次推送相同的任務到中間件,會先檢查該任務是否須要執行,避免浪費cpu和流量,加快處理速度。
  19 基於函數參數值的過濾,須要設置 do_task_filtering 參數爲True才生效,默認爲False。
  20 3)、新增支持了函數的參數是多個參數,須要設置is_consuming_function_use_multi_params 爲True生效,爲了兼容老代碼默認爲False。
  21 區別是消費函數原來須要
  22 def f(body):   # 函數有且只能有一個參數,是字典的多個鍵值對來表示參數的值。
  23     print(body['a'])
  24     print(body['b'])
  25 
  26 如今能夠
  27 def f(a,b):
  28     print(a)
  29     print(b)
  30 
  31 對於推送的部分,都是同樣的,都是推送 {"a":1,"b":2}
  32 
  33 開啓消費都是   get_consumer('queue_test', consuming_function=f).start_consuming_message()
  34 
  35 6月3日
  36 1) 增長了RedisPublisher類,和增長get_publisher工廠模式
  37 方法同mqpublisher同樣,這是爲了加強一致性,之後每一個業務的推送和消費,
  38 若是不直接使用RedisPublisher  RedisConsumerer RabbitmqPublisher RabbitMQConsumer這些類,而是使用get_publisher和get_consumer來獲取發佈和消費對象,
  39 支持修改一個全局變量的broker_kind數字來切換全部平臺消費和推送的中間件種類。
  40 2)增長指定不運行的時間的配置。例如能夠白天不運行,只在晚上運行。
  41 3)增長了函數超時的配置,當函數運行時間超過n秒後,自動殺死函數,拋出異常。
  42 4) 增長每分鐘函數運行次數統計,和按照最近一分鐘運行函數次數來預估多久能夠運行完成當前隊列剩餘的任務。
  43 5) 增長一個判斷函數,阻塞判斷連續多少分鐘隊列裏面是空的。判斷任務疑似完成。
  44 6)增長一個終止消費者的標誌,設置標誌後終止循環調度消息。
  45 7) consumer對象增長內置一個屬性,表示相同隊列名的publisher實例。
  46 
  47 6月29日
  48 1) 增長消息過時時間的配置,消費時候距離發佈時候超過必定時間,丟棄任務。
  49 2)增長基於python內置Queue對象的本地隊列做爲中間件的發佈者和消費者,公有方法的用法與redis和mq的徹底一致,
  50 方便沒有安裝mq和redis的環境使用測試除分佈式之外的其餘主要功能。使用內置queue沒法分佈式和不支持程序重啓任務接續。
  51 好處是能夠改一個數字就把代碼運行起來在本地測試,不會接受和推送消息到中間件影響別人,別人也影響不了本身,自測很合適。
  52 3)實例化發佈者時候,不在初始化方法中連接中間件,延遲到首次真正使用操做中間件的方法。
  53 4)BoundedThreadpoolExecutor替換成了新的CustomThreadpoolExecutor
  54 
  55 
  56 7月2日
  57 加入了gevent併發模式,設置concurrent_mode爲2生效。
  58 
  59 7月3日
  60 加入了evenlet併發模式,設置concurrent_mode爲3生效。
  61 
  62 7月4日
  63 1)增長使用amqpstorm實現的rabbit操做的中間件,設置broker_kind爲4生效,支持消費確認
  64 2)增長mongo-queue實現的mongodb爲中間件的隊列,設置broker_kind爲5生效,支持確認消費
  65 3)增長persistqueue sqllite3實現的本地持久化隊列,支持多進程和屢次啓動不在同一個解釋器下的本地分佈式。比python內置Queue對象增長了持久化和支持不一樣啓動批次的腳本推送 消費。sqllite不須要安裝這個中間件就能夠更方便使用。設置broker_kind爲6生效,支持確認消費。
  66 
  67 """
  68 # import functools
  69 import abc
  70 # import atexit
  71 import atexit
  72 import copy
  73 from queue import Queue
  74 import threading
  75 import gevent
  76 import eventlet
  77 import traceback
  78 import typing
  79 import json
  80 from collections import Callable, OrderedDict
  81 import time
  82 from functools import wraps
  83 from threading import Lock, Thread
  84 import unittest
  85 
  86 from mongomq import MongoQueue  # pip install mongo-mq==0.0.1
  87 import sqlite3
  88 import persistqueue  # pip install persist-queue==0.4.2
  89 import amqpstorm  # pip install AMQPStorm==2.7.1
  90 from amqpstorm.basic import Basic as AmqpStormBasic
  91 from amqpstorm.queue import Queue as AmqpStormQueue
  92 import rabbitpy
  93 from pika import BasicProperties
  94 # noinspection PyUnresolvedReferences
  95 from pika.exceptions import ChannelClosed
  96 # from rabbitpy.message import Properties
  97 import pika
  98 from pika.adapters.blocking_connection import BlockingChannel
  99 from pymongo.errors import PyMongoError
 100 from app.utils_ydf import (LogManager, LoggerMixin, RedisMixin, RedisBulkWriteHelper, RedisOperation, decorators, time_util, LoggerLevelSetterMixin, nb_print, CustomThreadPoolExecutor, MongoMixin)
 101 # noinspection PyUnresolvedReferences
 102 from app.utils_ydf import BoundedThreadPoolExecutor, block_python_exit
 103 from app.utils_ydf.custom_evenlet_pool_executor import CustomEventletPoolExecutor, check_evenlet_monkey_patch, evenlet_timeout_deco
 104 from app.utils_ydf.custom_gevent_pool_executor import GeventPoolExecutor, check_gevent_monkey_patch, gevent_timeout_deco
 105 from app import config as app_config
 106 
 107 # LogManager('pika').get_logger_and_add_handlers(10)
 108 # LogManager('pika.heartbeat').get_logger_and_add_handlers(10)
 109 # LogManager('rabbitpy').get_logger_and_add_handlers(10)
 110 # LogManager('rabbitpy.base').get_logger_and_add_handlers(10)
 111 from app.utils_ydf.custom_threadpool_executor import check_not_monkey
 112 
 113 
 114 def delete_keys_from_dict(dictx: dict, keys: list):
 115     for dict_key in keys:
 116         dictx.pop(dict_key)
 117 
 118 
 119 def delete_keys_and_return_new_dict(dictx: dict, keys: list):
 120     dict_new = copy.copy(dictx)  # 主要是去掉一級鍵 publish_time,淺拷貝便可。
 121     for dict_key in keys:
 122         try:
 123             dict_new.pop(dict_key)
 124         except KeyError:
 125             pass
 126     return dict_new
 127 
 128 
 129 class ExceptionForRetry(Exception):
 130     """爲了重試的,拋出錯誤。只是定義了一個子類,用不用均可以"""
 131 
 132 
 133 class ExceptionForRequeue(Exception):
 134     """框架檢測到此錯誤,從新放回隊列中"""
 135 
 136 
 137 class ExceptionForRabbitmqRequeue(ExceptionForRequeue):  # 之後去掉這個異常,拋出上面那個異常就能夠了。
 138     """遇到此錯誤,從新放回隊列中"""
 139 
 140 
 141 class RabbitmqClientRabbitPy:
 142     """
 143     使用rabbitpy包。
 144     """
 145 
 146     # noinspection PyUnusedLocal
 147     def __init__(self, username, password, host, port, virtual_host, heartbeat=0):
 148         rabbit_url = f'amqp://{username}:{password}@{host}:{port}/{virtual_host}?heartbeat={heartbeat}'
 149         self.connection = rabbitpy.Connection(rabbit_url)
 150 
 151     def creat_a_channel(self) -> rabbitpy.AMQP:
 152         return rabbitpy.AMQP(self.connection.channel())  # 使用適配器,使rabbitpy包的公有方法幾乎接近pika包的channel的方法。
 153 
 154 
 155 class RabbitmqClientPika:
 156     """
 157     使用pika包,多線程不安全的包。
 158     """
 159 
 160     def __init__(self, username, password, host, port, virtual_host, heartbeat=0):
 161         """
 162         parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
 163 
 164         connection = pika.SelectConnection(parameters=parameters,
 165                                   on_open_callback=on_open)
 166         :param username:
 167         :param password:
 168         :param host:
 169         :param port:
 170         :param virtual_host:
 171         :param heartbeat:
 172         """
 173         credentials = pika.PlainCredentials(username, password)
 174         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
 175             host, port, virtual_host, credentials, heartbeat=heartbeat))
 176         # self.connection = pika.SelectConnection(pika.ConnectionParameters(
 177         #     host, port, virtual_host, credentials, heartbeat=heartbeat))
 178 
 179     def creat_a_channel(self) -> BlockingChannel:
 180         return self.connection.channel()
 181 
 182 
 183 class RabbitMqFactory:
 184     def __init__(self, username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60 * 10, is_use_rabbitpy=0):
 185         """
 186         :param username:
 187         :param password:
 188         :param port:
 189         :param virtual_host:
 190         :param heartbeat:
 191         :param is_use_rabbitpy: 爲0使用pika,多線程不安全。爲1使用rabbitpy,多線程安全的包。
 192         """
 193         if is_use_rabbitpy:
 194             self.rabbit_client = RabbitmqClientRabbitPy(username, password, host, port, virtual_host, heartbeat)
 195         else:
 196             self.rabbit_client = RabbitmqClientPika(username, password, host, port, virtual_host, heartbeat)
 197 
 198     def get_rabbit_cleint(self):
 199         return self.rabbit_client
 200 
 201 
 202 class AbstractPublisher(LoggerLevelSetterMixin, metaclass=abc.ABCMeta, ):
 203     has_init_broker = 0
 204 
 205     def __init__(self, queue_name, log_level_int=10, logger_prefix='', is_add_file_handler=True, clear_queue_within_init=False, is_add_publish_time=False, ):
 206         """
 207         :param queue_name:
 208         :param log_level_int:
 209         :param logger_prefix:
 210         :param is_add_file_handler:
 211         :param clear_queue_within_init:
 212         """
 213         self._queue_name = queue_name
 214         if logger_prefix != '':
 215             logger_prefix += '--'
 216         logger_name = f'{logger_prefix}{self.__class__.__name__}--{queue_name}'
 217         self.logger = LogManager(logger_name).get_logger_and_add_handlers(log_level_int, log_filename=f'{logger_name}.log' if is_add_file_handler else None)  #
 218         # self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=is_use_rabbitpy).get_rabbit_cleint()
 219         # self.channel = self.rabbit_client.creat_a_channel()
 220         # self.queue = self.channel.queue_declare(queue=queue_name, durable=True)
 221         self._lock_for_pika = Lock()
 222         self._lock_for_count = Lock()
 223         self._current_time = None
 224         self.count_per_minute = None
 225         self._init_count()
 226         self.custom_init()
 227         self.logger.info(f'{self.__class__} 被實例化了')
 228         self.publish_msg_num_total = 0
 229         self._is_add_publish_time = is_add_publish_time
 230         # atexit.register(self.__at_exit)
 231         if clear_queue_within_init:
 232             self.clear()
 233 
 234     def set_is_add_publish_time(self, is_add_publish_time=True):
 235         self._is_add_publish_time = is_add_publish_time
 236         return self
 237 
 238     def _init_count(self):
 239         with self._lock_for_count:
 240             self._current_time = time.time()
 241             self.count_per_minute = 0
 242 
 243     def custom_init(self):
 244         pass
 245 
 246     def publish(self, msg: typing.Union[str, dict]):
 247         if isinstance(msg, str):
 248             msg = json.loads(msg)
 249         if self._is_add_publish_time:
 250             # msg.update({'publish_time': time.time(), 'publish_time_format': time_util.DatetimeConverter().datetime_str})
 251             msg.update({'publish_time': round(time.time(), 4), })
 252         t_start = time.time()
 253         decorators.handle_exception(retry_times=10, is_throw_error=True, time_sleep=0.1)(self.concrete_realization_of_publish)(json.dumps(msg))
 254         self.logger.debug(f'向{self._queue_name} 隊列,推送消息 耗時{round(time.time() - t_start, 4)}秒  {msg}')
 255         with self._lock_for_count:
 256             self.count_per_minute += 1
 257             self.publish_msg_num_total += 1
 258         if time.time() - self._current_time > 10:
 259             self.logger.info(f'10秒內推送了 {self.count_per_minute} 條消息,累計推送了 {self.publish_msg_num_total} 條消息到 {self._queue_name} 中')
 260             self._init_count()
 261 
 262     @abc.abstractmethod
 263     def concrete_realization_of_publish(self, msg):
 264         raise NotImplementedError
 265 
 266     @abc.abstractmethod
 267     def clear(self):
 268         raise NotImplementedError
 269 
 270     @abc.abstractmethod
 271     def get_message_count(self):
 272         raise NotImplementedError
 273 
 274     @abc.abstractmethod
 275     def close(self):
 276         raise NotImplementedError
 277 
 278     def __enter__(self):
 279         return self
 280 
 281     def __exit__(self, exc_type, exc_val, exc_tb):
 282         self.close()
 283         self.logger.warning(f'with中自動關閉publisher鏈接,累計推送了 {self.publish_msg_num_total} 條消息 ')
 284 
 285     def __at_exit(self):
 286         self.logger.warning(f'程序關閉前,累計推送了 {self.publish_msg_num_total} 條消息 到 {self._queue_name} 中')
 287 
 288 
 289 def deco_mq_conn_error(f):
 290     @wraps(f)
 291     def _deco_mq_conn_error(self, *args, **kwargs):
 292         if not self.has_init_broker:
 293             self.logger.warning(f'對象的方法 【{f.__name__}】 首次使用 rabbitmq channel,進行初始化執行 init_broker 方法')
 294             self.init_broker()
 295             self.has_init_broker = 1
 296             return f(self, *args, **kwargs)
 297         # noinspection PyBroadException
 298         try:
 299             return f(self, *args, **kwargs)
 300         except (pika.exceptions.AMQPError, amqpstorm.AMQPError) as e:  # except Exception as e:   # 如今裝飾器用到了絕大多出地方,單個異常類型不行。ex
 301             self.logger.error(f'rabbitmq連接出錯   ,方法 {f.__name__}  出錯 ,{e}')
 302             self.init_broker()
 303             return f(self, *args, **kwargs)
 304 
 305     return _deco_mq_conn_error
 306 
 307 
 308 class RabbitmqPublisher(AbstractPublisher):
 309     """
 310     使用pika實現的。
 311     """
 312 
 313     # noinspection PyAttributeOutsideInit
 314     def init_broker(self):
 315         self.logger.warning(f'使用pika 連接mq')
 316         self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint()
 317         self.channel = self.rabbit_client.creat_a_channel()
 318         self.queue = self.channel.queue_declare(queue=self._queue_name, durable=True)
 319 
 320     # noinspection PyAttributeOutsideInit
 321     @deco_mq_conn_error
 322     def concrete_realization_of_publish(self, msg):
 323         with self._lock_for_pika:  # 親測pika多線程publish會出錯
 324             self.channel.basic_publish(exchange='',
 325                                        routing_key=self._queue_name,
 326                                        body=msg,
 327                                        properties=BasicProperties(
 328                                            delivery_mode=2,  # make message persistent   2(1是非持久化)
 329                                        )
 330                                        )
 331 
 332     @deco_mq_conn_error
 333     def clear(self):
 334         self.channel.queue_purge(self._queue_name)
 335         self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功')
 336 
 337     @deco_mq_conn_error
 338     def get_message_count(self):
 339         with self._lock_for_pika:
 340             queue = self.channel.queue_declare(queue=self._queue_name, durable=True)
 341             return queue.method.message_count
 342 
 343     # @deco_mq_conn_error
 344     def close(self):
 345         self.channel.close()
 346         self.rabbit_client.connection.close()
 347         self.logger.warning('關閉pika包 連接')
 348 
 349 
 350 class RabbitmqPublisherUsingRabbitpy(AbstractPublisher):
 351     """
 352     使用rabbitpy包實現的。
 353     """
 354 
 355     # noinspection PyAttributeOutsideInit
 356     def init_broker(self):
 357         self.logger.warning(f'使用rabbitpy包 連接mq')
 358         self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint()
 359         self.channel = self.rabbit_client.creat_a_channel()
 360         self.queue = self.channel.queue_declare(queue=self._queue_name, durable=True)
 361 
 362     # @decorators.tomorrow_threads(10)
 363     @deco_mq_conn_error
 364     def concrete_realization_of_publish(self, msg):
 365         # noinspection PyTypeChecker
 366         self.channel.basic_publish(
 367             exchange='',
 368             routing_key=self._queue_name,
 369             body=msg,
 370             properties={'delivery_mode': 2},
 371         )
 372 
 373     @deco_mq_conn_error
 374     def clear(self):
 375         self.channel.queue_purge(self._queue_name)
 376         self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功')
 377 
 378     @deco_mq_conn_error
 379     def get_message_count(self):
 380         # noinspection PyUnresolvedReferences
 381         ch_raw_rabbity = self.channel.channel
 382         return rabbitpy.amqp_queue.Queue(ch_raw_rabbity, self._queue_name, durable=True)
 383 
 384     # @deco_mq_conn_error
 385     def close(self):
 386         self.channel.close()
 387         self.rabbit_client.connection.close()
 388         self.logger.warning('關閉rabbitpy包 連接mq')
 389 
 390 
 391 class RabbitmqPublisherUsingAmqpStorm(AbstractPublisher):
 392     # 使用amqpstorm包實現的mq操做。
 393     # 實例屬性沒在init裏面寫,形成補全很麻煩,寫在這裏作類屬性,方便pycharm補全
 394     connection = amqpstorm.UriConnection
 395     channel = amqpstorm.Channel
 396     channel_wrapper_by_ampqstormbaic = AmqpStormBasic
 397     queue = AmqpStormQueue
 398 
 399     # noinspection PyAttributeOutsideInit
 400     # @decorators.synchronized
 401     def init_broker(self):
 402         # username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60 * 10
 403         self.logger.warning(f'使用AmqpStorm包 連接mq')
 404         self.connection = amqpstorm.UriConnection(
 405             f'amqp://{app_config.RABBITMQ_USER}:{app_config.RABBITMQ_PASS}@{app_config.RABBITMQ_HOST}:{app_config.RABBITMQ_PORT}/{app_config.RABBITMQ_VIRTUAL_HOST}?heartbeat={60 * 10}'
 406         )
 407         self.channel = self.connection.channel()  # type:amqpstorm.Channel
 408         self.channel_wrapper_by_ampqstormbaic = AmqpStormBasic(self.channel)
 409         self.queue = AmqpStormQueue(self.channel)
 410         self.queue.declare(queue=self._queue_name, durable=True)
 411 
 412     # @decorators.tomorrow_threads(10)
 413     @deco_mq_conn_error
 414     def concrete_realization_of_publish(self, msg):
 415         self.channel_wrapper_by_ampqstormbaic.publish(exchange='',
 416                                                       routing_key=self._queue_name,
 417                                                       body=msg,
 418                                                       properties={'delivery_mode': 2}, )
 419         # nb_print(msg)
 420 
 421     @deco_mq_conn_error
 422     def clear(self):
 423         self.queue.purge(self._queue_name)
 424         self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功')
 425 
 426     @deco_mq_conn_error
 427     def get_message_count(self):
 428         # noinspection PyUnresolvedReferences
 429         return self.queue.declare(queue=self._queue_name, durable=True)['message_count']
 430 
 431     # @deco_mq_conn_error
 432     def close(self):
 433         self.channel.close()
 434         self.connection.close()
 435         self.logger.warning('關閉rabbitpy包 連接mq')
 436 
 437 
 438 class RedisPublisher(AbstractPublisher, RedisMixin):
 439     """
 440     使用redis做爲中間件
 441     """
 442 
 443     def concrete_realization_of_publish(self, msg):
 444         # noinspection PyTypeChecker
 445         self.redis_db7.rpush(self._queue_name, msg)
 446 
 447     def clear(self):
 448         self.redis_db7.delete(self._queue_name)
 449         self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功')
 450 
 451     def get_message_count(self):
 452         # nb_print(self.redis_db7,self._queue_name)
 453         return self.redis_db7.llen(self._queue_name)
 454 
 455     def close(self):
 456         # self.redis_db7.connection_pool.disconnect()
 457         pass
 458 
 459 
 460 class MongoMqPublisher(AbstractPublisher, MongoMixin):
 461     # 使用mongo-queue包實現的基於mongodb的隊列。
 462     # noinspection PyAttributeOutsideInit
 463     def custom_init(self):
 464         self.queue = MongoQueue(
 465             self.mongo_16_client.get_database('conqume_queues').get_collection(self._queue_name),
 466             consumer_id=f"consumer-{time_util.DatetimeConverter().datetime_str}",
 467             timeout=600,
 468             max_attempts=3,
 469             ttl=0)
 470 
 471     def concrete_realization_of_publish(self, msg):
 472         # noinspection PyTypeChecker
 473         self.queue.put(json.loads(msg))
 474 
 475     def clear(self):
 476         self.queue.clear()
 477         self.logger.warning(f'清除 mongo隊列 {self._queue_name} 中的消息成功')
 478 
 479     def get_message_count(self):
 480         return self.queue.size()
 481 
 482     def close(self):
 483         pass
 484 
 485 
 486 class PersistQueuePublisher(AbstractPublisher):
 487     """
 488     使用persistqueue實現的本地持久化隊列。
 489     這個是本地持久化,支持本地多個啓動的python腳本共享隊列任務。與LocalPythonQueuePublisher相比,不會隨着python解釋器退出,致使任務丟失。
 490     """
 491 
 492     # noinspection PyAttributeOutsideInit
 493     def custom_init(self):
 494         # noinspection PyShadowingNames
 495         def _my_new_db_connection(self, path, multithreading, timeout):  # 主要是改了sqlite文件後綴,方便pycharm識別和打開。
 496             # noinspection PyUnusedLocal
 497             conn = None
 498             if path == self._MEMORY:
 499                 conn = sqlite3.connect(path,
 500                                        check_same_thread=not multithreading)
 501             else:
 502                 conn = sqlite3.connect('{}/data.sqlite'.format(path),
 503                                        timeout=timeout,
 504                                        check_same_thread=not multithreading)
 505             conn.execute('PRAGMA journal_mode=WAL;')
 506             return conn
 507 
 508         persistqueue.SQLiteAckQueue._new_db_connection = _my_new_db_connection  # 打猴子補丁。
 509         # REMIND 官方測試基於sqlite的本地持久化,比基於純文件的持久化,使用相同固態硬盤和操做系統狀況下,速度快3倍以上,因此這裏選用sqlite方式。
 510 
 511         self.queue = persistqueue.SQLiteAckQueue(path='/sqllite_queues', name=self._queue_name, auto_commit=True, serializer=json, multithreading=True)
 512 
 513     def concrete_realization_of_publish(self, msg):
 514         # noinspection PyTypeChecker
 515         self.queue.put(msg)
 516 
 517     # noinspection PyProtectedMember
 518     def clear(self):
 519         sql = f'{"DELETE"}  {"FROM"} ack_queue_{self._queue_name}'
 520         self.logger.info(sql)
 521         self.queue._getter.execute(sql)
 522         self.queue._getter.commit()
 523         self.logger.warning(f'清除 本地持久化隊列 {self._queue_name} 中的消息成功')
 524 
 525     def get_message_count(self):
 526         return self.queue.qsize()
 527 
 528     def close(self):
 529         pass
 530 
 531 
 532 local_pyhton_queue_name__local_pyhton_queue_obj_map = dict()  # 使local queue和其餘中間件徹底同樣的使用方式,使用映射保存隊列的名字,使消費和發佈經過隊列名字能找到隊列對象。
 533 
 534 
 535 class LocalPythonQueuePublisher(AbstractPublisher):
 536     """
 537     使用redis做爲中間件
 538     """
 539 
 540     # noinspection PyAttributeOutsideInit
 541     def custom_init(self):
 542         if self._queue_name not in local_pyhton_queue_name__local_pyhton_queue_obj_map:
 543             local_pyhton_queue_name__local_pyhton_queue_obj_map[self._queue_name] = Queue()
 544         self.queue = local_pyhton_queue_name__local_pyhton_queue_obj_map[self._queue_name]
 545 
 546     def concrete_realization_of_publish(self, msg):
 547         # noinspection PyTypeChecker
 548         self.queue.put(msg)
 549 
 550     def clear(self):
 551         # noinspection PyUnresolvedReferences
 552         self.queue.queue.clear()
 553         self.logger.warning(f'清除 本地隊列中的消息成功')
 554 
 555     def get_message_count(self):
 556         return self.queue.qsize()
 557 
 558     def close(self):
 559         pass
 560 
 561 
 562 class RedisFilter(RedisMixin):
 563     def __init__(self, redis_key_name):
 564         self._redis_key_name = redis_key_name
 565 
 566     @staticmethod
 567     def _get_ordered_str(value):
 568         """對json的鍵值對在redis中進行過濾,須要先把鍵值對排序,不然過濾會不許確如 {"a":1,"b":2} 和 {"b":2,"a":1}"""
 569         if isinstance(value, str):
 570             value = json.loads(value)
 571         ordered_dict = OrderedDict()
 572         for k in sorted(value):
 573             ordered_dict[k] = value[k]
 574         return json.dumps(ordered_dict)
 575 
 576     def add_a_value(self, value: typing.Union[str, dict]):
 577         self.redis_db7.sadd(self._redis_key_name, self._get_ordered_str(value))
 578 
 579     def check_value_exists(self, value):
 580         return self.redis_db7.sismember(self._redis_key_name, self._get_ordered_str(value))
 581 
 582 
 583 class AbstractConsumer(LoggerLevelSetterMixin, metaclass=abc.ABCMeta, ):
 584     time_interval_for_check_do_not_run_time = 60
 585     BROKER_KIND = None
 586 
 587     @property
 588     @decorators.synchronized
 589     def publisher_of_same_queue(self):
 590         if not self._publisher_of_same_queue:
 591             self._publisher_of_same_queue = get_publisher(self._queue_name, broker_kind=self.BROKER_KIND)
 592             if self._msg_expire_senconds:
 593                 self._publisher_of_same_queue.set_is_add_publish_time()
 594         return self._publisher_of_same_queue
 595 
 596     @classmethod
 597     def join_shedual_task_thread(cls):
 598         """
 599 
 600         :return:
 601         """
 602         """
 603         def ff():
 604             RabbitmqConsumer('queue_test', consuming_function=f3, threads_num=20, msg_schedule_time_intercal=2, log_level=10, logger_prefix='yy平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message()
 605             RabbitmqConsumer('queue_test2', consuming_function=f4, threads_num=20, msg_schedule_time_intercal=4, log_level=10, logger_prefix='zz平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message()
 606             AbstractConsumer.join_shedual_task_thread()            # 若是開多進程啓動消費者,在linux上須要這樣寫下這一行。
 607 
 608 
 609         if __name__ == '__main__':
 610             [Process(target=ff).start() for _ in range(4)]
 611 
 612         """
 613         ConcurrentModeDispatcher.join()
 614 
 615     def __init__(self, queue_name, *, consuming_function: Callable = None, function_timeout=0, threads_num=50, specify_threadpool=None, concurrent_mode=1,
 616                  max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, msg_expire_senconds=0,
 617                  logger_prefix='', create_logger_file=True, do_task_filtering=False, is_consuming_function_use_multi_params=True,
 618                  is_do_not_run_by_specify_time_effect=False, do_not_run_by_specify_time=('10:00:00', '22:00:00'), schedule_tasks_on_main_thread=False):
 619         """
 620         :param queue_name:
 621         :param consuming_function: 處理消息的函數。
 622         :param function_timeout : 超時秒數,函數運行超過這個時間,則自動殺死函數。爲0是不限制。
 623         :param threads_num:
 624         :param specify_threadpool:使用指定的線程池,能夠多個消費者共使用一個線程池,不爲None時候。threads_num失效
 625         :param concurrent_mode:併發模式,暫時支持 線程 、gevent、eventlet三種模式。  1線程  2 gevent 3 evenlet
 626         :param max_retry_times:
 627         :param log_level:
 628         :param is_print_detail_exception:
 629         :param msg_schedule_time_intercal:消息調度的時間間隔,用於控頻
 630         :param logger_prefix: 日誌前綴,可以使不一樣的消費者生成不一樣的日誌
 631         :param create_logger_file : 是否建立文件日誌
 632         :param do_task_filtering :是否執行基於函數參數的任務過濾
 633         :is_consuming_function_use_multi_params  函數的參數是不是傳統的多參數,不爲單個body字典表示多個參數。
 634         :param is_do_not_run_by_specify_time_effect :是否使不運行的時間段生效
 635         :param do_not_run_by_specify_time   :不運行的時間段
 636         :param schedule_tasks_on_main_thread :直接在主線程調度任務,意味着不能直接在當前主線程同時開啓兩個消費者。
 637         """
 638         self._queue_name = queue_name
 639         self.queue_name = queue_name  # 能夠換成公有的,省得外部訪問有警告。
 640         self.consuming_function = consuming_function
 641         self._function_timeout = function_timeout
 642         self._threads_num = threads_num
 643         self._specify_threadpool = specify_threadpool
 644         self._threadpool = None  # 單獨加一個檢測消息數量和心跳的線程
 645         self._concurrent_mode = concurrent_mode
 646         self._max_retry_times = max_retry_times
 647         self._is_print_detail_exception = is_print_detail_exception
 648         self._msg_schedule_time_intercal = msg_schedule_time_intercal if msg_schedule_time_intercal > 0.001 else 0.001
 649         self._msg_expire_senconds = msg_expire_senconds
 650 
 651         if self._concurrent_mode not in (1, 2, 3):
 652             raise ValueError('設置的併發模式不正確')
 653         self._concurrent_mode_dispatcher = ConcurrentModeDispatcher(self)
 654 
 655         self._logger_prefix = logger_prefix
 656         self._log_level = log_level
 657         if logger_prefix != '':
 658             logger_prefix += '--'
 659         logger_name = f'{logger_prefix}{self.__class__.__name__}--{self._concurrent_mode_dispatcher.concurrent_name}--{queue_name}'
 660         # nb_print(logger_name)
 661         self.logger = LogManager(logger_name).get_logger_and_add_handlers(log_level, log_filename=f'{logger_name}.log' if create_logger_file else None)
 662         self.logger.info(f'{self.__class__} 被實例化')
 663 
 664         self._do_task_filtering = do_task_filtering
 665         self._redis_filter_key_name = f'filter:{queue_name}'
 666         self._redis_filter = RedisFilter(self._redis_filter_key_name)
 667 
 668         self._is_consuming_function_use_multi_params = is_consuming_function_use_multi_params
 669         self._lock_for_pika = Lock()
 670 
 671         self._execute_task_times_every_minute = 0  # 每分鐘執行了多少次任務。
 672         self._lock_for_count_execute_task_times_every_minute = Lock()
 673         self._current_time_for_execute_task_times_every_minute = time.time()
 674 
 675         self._msg_num_in_broker = 0
 676         self._last_timestamp_when_has_task_in_queue = 0
 677         self._last_timestamp_print_msg_num = 0
 678 
 679         self._is_do_not_run_by_specify_time_effect = is_do_not_run_by_specify_time_effect
 680         self._do_not_run_by_specify_time = do_not_run_by_specify_time  # 能夠設置在指定的時間段不運行。
 681         self._schedule_tasks_on_main_thread = schedule_tasks_on_main_thread
 682 
 683         self.stop_flag = False
 684 
 685         self._publisher_of_same_queue = None
 686 
 687     @property
 688     @decorators.synchronized
 689     def threadpool(self):
 690         return self._concurrent_mode_dispatcher.build_pool()
 691 
 692     def keep_circulating(self, time_sleep=0.001, exit_if_function_run_sucsess=False, is_display_detail_exception=True):
 693         """間隔一段時間,一直循環運行某個方法的裝飾器
 694         :param time_sleep :循環的間隔時間
 695         :param is_display_detail_exception
 696         :param exit_if_function_run_sucsess :若是成功了就退出循環
 697         """
 698 
 699         def _keep_circulating(func):
 700             # noinspection PyBroadException
 701             @wraps(func)
 702             def __keep_circulating(*args, **kwargs):
 703                 while 1:
 704                     if self.stop_flag:
 705                         break
 706                     try:
 707                         result = func(*args, **kwargs)
 708                         if exit_if_function_run_sucsess:
 709                             return result
 710                     except Exception as e:
 711                         msg = func.__name__ + '   運行出錯\n ' + traceback.format_exc(limit=10) if is_display_detail_exception else str(e)
 712                         self.logger.error(msg)
 713                     finally:
 714                         time.sleep(time_sleep)
 715 
 716             return __keep_circulating
 717 
 718         return _keep_circulating
 719 
 720     def start_consuming_message(self):
 721         self.logger.warning(f'開始消費 {self._queue_name} 中的消息')
 722         # self.threadpool.submit(decorators.keep_circulating(20)(self.check_heartbeat_and_message_count))
 723         self.threadpool.submit(self.keep_circulating(20)(self.check_heartbeat_and_message_count))
 724         if self._schedule_tasks_on_main_thread:
 725             # decorators.keep_circulating(1)(self._shedual_task)()
 726             self.keep_circulating(1)(self._shedual_task)()
 727         else:
 728             # t = Thread(target=decorators.keep_circulating(1)(self._shedual_task))
 729             self._concurrent_mode_dispatcher.schedulal_task_with_no_block()
 730 
 731     @abc.abstractmethod
 732     def _shedual_task(self):
 733         raise NotImplementedError
 734 
 735     def _run_consuming_function_with_confirm_and_retry(self, kw: dict, current_retry_times=0):
 736         if self._do_task_filtering and self._redis_filter.check_value_exists(kw['body']):  # 對函數的參數進行檢查,過濾已經執行過而且成功的任務。
 737             self.logger.info(f'redis的 [{self._redis_filter_key_name}] 鍵 中 過濾任務 {kw["body"]}')
 738             self._confirm_consume(kw)
 739             return
 740         with self._lock_for_count_execute_task_times_every_minute:
 741             self._execute_task_times_every_minute += 1
 742             if time.time() - self._current_time_for_execute_task_times_every_minute > 60:
 743                 self.logger.info(
 744                     f'一分鐘內執行了 {self._execute_task_times_every_minute} 次函數 [ {self.consuming_function.__name__} ] ,預計'
 745                     f'還須要 {time_util.seconds_to_hour_minute_second(self._msg_num_in_broker / self._execute_task_times_every_minute * 60)} 時間'
 746                     f'才能執行完成 {self._msg_num_in_broker}個剩餘的任務 ')
 747                 self._current_time_for_execute_task_times_every_minute = time.time()
 748                 self._execute_task_times_every_minute = 0
 749 
 750         if current_retry_times < self._max_retry_times + 1:
 751             # noinspection PyBroadException
 752             t_start = time.time()
 753             try:
 754                 function_run = self.consuming_function if self._function_timeout == 0 else self._concurrent_mode_dispatcher.timeout_deco(self._function_timeout)(self.consuming_function)
 755                 if self._is_consuming_function_use_multi_params:  # 消費函數使用傳統的多參數形式
 756                     function_run(**delete_keys_and_return_new_dict(kw['body'], ['publish_time', 'publish_time_format']))
 757                 else:
 758                     function_run(delete_keys_and_return_new_dict(kw['body'], ['publish_time', 'publish_time_format']))  # 消費函數使用單個參數,參數自身是一個字典,由鍵值對錶示各個參數。
 759                 self._confirm_consume(kw)
 760                 if self._do_task_filtering:
 761                     self._redis_filter.add_a_value(kw['body'])  # 函數執行成功後,添加函數的參數排序後的鍵值對字符串到set中。
 762 
 763                 self.logger.debug(f'{self._concurrent_mode_dispatcher.get_concurrent_info()}  函數 {self.consuming_function.__name__}  '
 764                                   f'第{current_retry_times + 1}次 運行, 正確了,函數運行時間是 {round(time.time() - t_start, 4)} 秒,入參是 【 {kw["body"]} 】')
 765             except Exception as e:
 766                 if isinstance(e, (PyMongoError, ExceptionForRequeue)):  # mongo常常維護備份時候插入不了或掛了,或者本身主動拋出一個ExceptionForRequeue類型的錯誤會從新入隊,不受指定重試次數逇約束。
 767                     self.logger.critical(f'函數 [{self.consuming_function.__name__}] 中發生錯誤 {type(e)}  {e}')
 768                     return self._requeue(kw)
 769                 self.logger.error(f'函數 {self.consuming_function.__name__}  第{current_retry_times + 1}次發生錯誤,'
 770                                   f'函數運行時間是 {round(time.time() - t_start, 4)} 秒,\n  入參是 【 {kw["body"]} 】   \n 緣由是 {type(e)} {e} ', exc_info=self._is_print_detail_exception)
 771                 self._run_consuming_function_with_confirm_and_retry(kw, current_retry_times + 1)
 772         else:
 773             self.logger.critical(f'函數 {self.consuming_function.__name__} 達到最大重試次數 {self._max_retry_times} 後,仍然失敗, 入參是 【 {kw["body"]} 】')  # 錯得超過指定的次數了,就確認消費了。
 774             self._confirm_consume(kw)
 775 
 776     @abc.abstractmethod
 777     def _confirm_consume(self, kw):
 778         """確認消費"""
 779         raise NotImplementedError
 780 
 781         # noinspection PyUnusedLocal
 782 
 783     def check_heartbeat_and_message_count(self):
 784         self._msg_num_in_broker = self.publisher_of_same_queue.get_message_count()
 785         if time.time() - self._last_timestamp_print_msg_num > 60:
 786             self.logger.info(f'[{self._queue_name}] 隊列中還有 [{self._msg_num_in_broker}] 個任務')
 787             self._last_timestamp_print_msg_num = time.time()
 788         if self._msg_num_in_broker != 0:
 789             self._last_timestamp_when_has_task_in_queue = time.time()
 790         return self._msg_num_in_broker
 791 
 792     @abc.abstractmethod
 793     def _requeue(self, kw):
 794         """從新入隊"""
 795         raise NotImplementedError
 796 
 797     def _submit_task(self, kw):
 798         if self._judge_is_daylight():
 799             self._requeue(kw)
 800             time.sleep(self.time_interval_for_check_do_not_run_time)
 801             return
 802         if self._msg_expire_senconds != 0 and time.time() - self._msg_expire_senconds > kw['body']['publish_time']:
 803             self.logger.warning(f'消息發佈時戳是 {kw["body"]["publish_time"]} {kw["body"].get("publish_time_format", "")},距離如今 {round(time.time() - kw["body"]["publish_time"], 4)} 秒 ,'
 804                                 f'超過了指定的 {self._msg_expire_senconds} 秒,丟棄任務')
 805             self._confirm_consume(kw)
 806             return 0
 807         self.threadpool.submit(self._run_consuming_function_with_confirm_and_retry, kw)
 808         time.sleep(self._msg_schedule_time_intercal)
 809 
 810     def _judge_is_daylight(self):
 811         if self._is_do_not_run_by_specify_time_effect and self._do_not_run_by_specify_time[0] < time_util.DatetimeConverter().time_str < self._do_not_run_by_specify_time[1]:
 812             self.logger.warning(f'如今時間是 {time_util.DatetimeConverter()} ,如今時間是在 {self._do_not_run_by_specify_time} 之間,不運行')
 813             return True
 814 
 815     def __str__(self):
 816         return f'隊列爲 {self.queue_name} 函數爲 {self.consuming_function} 的消費者'
 817 
 818 
 819 # noinspection PyProtectedMember
 820 class ConcurrentModeDispatcher(LoggerMixin):
 821     schedulal_thread_to_be_join = []
 822     concurrent_mode = None
 823     schedual_task_always_use_thread = False
 824 
 825     def __init__(self, consumerx: AbstractConsumer):
 826         self.consumer = consumerx
 827         if self.__class__.concurrent_mode is not None and self.consumer._concurrent_mode != self.__class__.concurrent_mode:
 828             raise ValueError('同一解釋器中不能夠設置兩種併發類型')
 829         self._concurrent_mode = self.__class__.concurrent_mode = self.consumer._concurrent_mode
 830         concurrent_name = ''
 831         self.timeout_deco = None
 832         if self._concurrent_mode == 1:
 833             concurrent_name = 'thread'
 834             self.timeout_deco = decorators.timeout
 835         elif self._concurrent_mode == 2:
 836             concurrent_name = 'gevent'
 837             self.timeout_deco = gevent_timeout_deco
 838         elif self._concurrent_mode == 3:
 839             concurrent_name = 'evenlet'
 840             self.timeout_deco = evenlet_timeout_deco
 841         self.concurrent_name = concurrent_name
 842         self.logger.warning(f'{self.consumer} 設置併發模式爲 {self.concurrent_name}')
 843 
 844     def build_pool(self):
 845         if self.consumer._threadpool:
 846             return self.consumer._threadpool
 847 
 848         pool_type = None  # 是按照ThreadpoolExecutor寫的三個鴨子類,公有方法名和功能寫成徹底一致,能夠互相替換。
 849         if self._concurrent_mode == 1:
 850             pool_type = CustomThreadPoolExecutor
 851             check_not_monkey()
 852         elif self._concurrent_mode == 2:
 853             pool_type = GeventPoolExecutor
 854             check_gevent_monkey_patch()
 855         elif self._concurrent_mode == 3:
 856             pool_type = CustomEventletPoolExecutor
 857             check_evenlet_monkey_patch()
 858         self.consumer._threadpool = self.consumer._specify_threadpool if self.consumer._specify_threadpool else pool_type(self.consumer._threads_num + 1)  # 單獨加一個檢測消息數量和心跳的線程
 859         self.logger.warning(f'{self.concurrent_name} {self.consumer._threadpool}')
 860         return self.consumer._threadpool
 861 
 862     def schedulal_task_with_no_block(self):
 863         if self.schedual_task_always_use_thread:
 864             t = Thread(target=self.consumer.keep_circulating(1)(self.consumer._shedual_task))
 865             self.__class__.schedulal_thread_to_be_join.append(t)
 866             t.start()
 867         else:
 868             if self._concurrent_mode == 1:
 869                 t = Thread(target=self.consumer.keep_circulating(1)(self.consumer._shedual_task))
 870                 self.__class__.schedulal_thread_to_be_join.append(t)
 871                 t.start()
 872             elif self._concurrent_mode == 2:
 873                 g = gevent.spawn(self.consumer.keep_circulating(1)(self.consumer._shedual_task), )
 874                 self.__class__.schedulal_thread_to_be_join.append(g)
 875             elif self._concurrent_mode == 3:
 876                 g = eventlet.spawn(self.consumer.keep_circulating(1)(self.consumer._shedual_task), )
 877                 self.__class__.schedulal_thread_to_be_join.append(g)
 878         atexit.register(self.join)
 879 
 880     @classmethod
 881     def join(cls):
 882         nb_print((cls.schedulal_thread_to_be_join, len(cls.schedulal_thread_to_be_join), '模式:', cls.concurrent_mode))
 883         if cls.schedual_task_always_use_thread:
 884             for t in cls.schedulal_thread_to_be_join:
 885                 nb_print(t)
 886                 t.join()
 887         else:
 888             if cls.concurrent_mode == 1:
 889                 for t in cls.schedulal_thread_to_be_join:
 890                     nb_print(t)
 891                     t.join()
 892             elif cls.concurrent_mode == 2:
 893                 # cls.logger.info()
 894                 nb_print(cls.schedulal_thread_to_be_join)
 895                 gevent.joinall(cls.schedulal_thread_to_be_join, raise_error=True, )
 896             elif cls.concurrent_mode == 3:
 897                 for g in cls.schedulal_thread_to_be_join:
 898                     # eventlet.greenthread.GreenThread.
 899                     nb_print(g)
 900                     g.wait()
 901 
 902     def get_concurrent_info(self):
 903         concurrent_info = ''
 904         if self._concurrent_mode == 1:
 905             concurrent_info = f'[{threading.current_thread()}  {threading.active_count()}]'
 906         elif self._concurrent_mode == 2:
 907             concurrent_info = f'[{gevent.getcurrent()}  {threading.active_count()}]'
 908         elif self._concurrent_mode == 3:
 909             # noinspection PyArgumentList
 910             concurrent_info = f'[{eventlet.getcurrent()}  {threading.active_count()}]'
 911         return concurrent_info
 912 
 913 
 914 def wait_for_possible_has_finish_all_tasks(queue_name: str, minutes: int, send_stop_to_broker=0, broker_kind: int = 0, ):
 915     """
 916       因爲是異步消費,和存在隊列一邊被消費,一邊在推送,或者還有結尾少許任務還在確認消費者實際還沒完全運行完成。  但有時候須要判斷 全部任務,務是否完成,提供一個不精確的判斷,要搞清楚緣由和場景後再慎用。
 917     :param queue_name: 隊列名字
 918     :param minutes: 連續多少分鐘沒任務就判斷爲消費已完成
 919     :param send_stop_to_broker :發送中止標誌到中間件,這回致使消費退出循環調度。
 920     :param broker_kind: 中間件種類
 921     :return:
 922     """
 923     if minutes <= 1:
 924         raise ValueError('疑似完成任務,判斷時間最少須要設置爲2分鐘內,最好是是10分鐘')
 925     pb = get_publisher(queue_name, broker_kind=broker_kind)
 926     no_task_time = 0
 927     while 1:
 928         # noinspection PyBroadException
 929         try:
 930             message_count = pb.get_message_count()
 931         except Exception as e:
 932             nb_print(e)
 933             message_count = -1
 934         if message_count == 0:
 935             no_task_time += 30
 936         else:
 937             no_task_time = 0
 938         time.sleep(30)
 939         if no_task_time > minutes * 60:
 940             break
 941     if send_stop_to_broker:
 942         pb.publish({'stop': 1})
 943     pb.close()
 944 
 945 
 946 class RabbitmqConsumer(AbstractConsumer):
 947     """
 948     使用pika包實現的。
 949     """
 950     BROKER_KIND = 0
 951 
 952     def _shedual_task(self):
 953         channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel()
 954         channel.queue_declare(queue=self._queue_name, durable=True)
 955         channel.basic_qos(prefetch_count=self._threads_num)
 956 
 957         def callback(ch, method, properties, body):
 958             body = body.decode()
 959             self.logger.debug(f'從rabbitmq的 [{self._queue_name}] 隊列中 取出的消息是:  {body}')
 960             body = json.loads(body)
 961             kw = {'ch': ch, 'method': method, 'properties': properties, 'body': body}
 962             self._submit_task(kw)
 963 
 964         channel.basic_consume(callback,
 965                               queue=self._queue_name,
 966                               # no_ack=True
 967                               )
 968         channel.start_consuming()
 969 
 970     def _confirm_consume(self, kw):
 971         with self._lock_for_pika:
 972             try:
 973                 kw['ch'].basic_ack(delivery_tag=kw['method'].delivery_tag)  # 確認消費
 974             except pika.exceptions.AMQPError as e:
 975                 self.logger.error(f'pika確認消費失敗  {e}')
 976 
 977     def _requeue(self, kw):
 978         with self._lock_for_pika:
 979             # ch.connection.add_callback_threadsafe(functools.partial(self.__ack_message_pika, ch, method.delivery_tag))
 980             return kw['ch'].basic_nack(delivery_tag=kw['method'].delivery_tag)  # 當即從新入隊。
 981 
 982     @staticmethod
 983     def __ack_message_pika(channelx, delivery_tagx):
 984         """Note that `channel` must be the same pika channel instance via which
 985         the message being ACKed was retrieved (AMQP protocol constraint).
 986         """
 987         if channelx.is_open:
 988             channelx.basic_ack(delivery_tagx)
 989         else:
 990             # Channel is already closed, so we can't ACK this message;
 991             # log and/or do something that makes sense for your app in this case.
 992             pass
 993 
 994 
 995 class RabbitmqConsumerAmqpStorm(AbstractConsumer):
 996     """
 997     使用AmqpStorm實現的,多線程安全的,不用加鎖。
 998     """
 999     BROKER_KIND = 4
1000 
1001     def _shedual_task(self):
1002         # noinspection PyTypeChecker
1003         def callback(amqpstorm_message: amqpstorm.Message):
1004             body = amqpstorm_message.body
1005             self.logger.debug(f'從rabbitmq的 [{self._queue_name}] 隊列中 取出的消息是:  {body}')
1006             body = json.loads(body)
1007             kw = {'amqpstorm_message': amqpstorm_message, 'body': body}
1008             self._submit_task(kw)
1009 
1010         rp = RabbitmqPublisherUsingAmqpStorm(self.queue_name)
1011         rp.init_broker()
1012         rp.channel_wrapper_by_ampqstormbaic.qos(self._threads_num)
1013         rp.channel_wrapper_by_ampqstormbaic.consume(callback=callback, queue=self.queue_name, no_ack=False)
1014         rp.channel.start_consuming(auto_decode=True)
1015 
1016     def _confirm_consume(self, kw):
1017         # noinspection PyBroadException
1018         try:
1019             kw['amqpstorm_message'].ack()  # 確認消費
1020         except Exception as e:
1021             self.logger.error(f'AmqpStorm確認消費失敗  {type(e)} {e}')
1022 
1023     def _requeue(self, kw):
1024         kw['amqpstorm_message'].nack(requeue=True)
1025 
1026 
1027 class RabbitmqConsumerRabbitpy(AbstractConsumer):
1028     """
1029     使用rabbitpy實現的
1030     """
1031     BROKER_KIND = 1
1032 
1033     def _shedual_task(self):
1034         # noinspection PyTypeChecker
1035         channel = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint().creat_a_channel()  # type:  rabbitpy.AMQP         #
1036         channel.queue_declare(queue=self._queue_name, durable=True)
1037         channel.basic_qos(prefetch_count=self._threads_num)
1038         for message in channel.basic_consume(self._queue_name, no_ack=False):
1039             body = message.body.decode()
1040             self.logger.debug(f'從rabbitmq {self._queue_name} 隊列中 取出的消息是:  {body}')
1041             kw = {'message': message, 'body': json.loads(message.body.decode())}
1042             self._submit_task(kw)
1043 
1044     def _confirm_consume(self, kw):
1045         kw['message'].ack()
1046 
1047     def _requeue(self, kw):
1048         kw['message'].nack(requeue=True)
1049 
1050 
1051 class RedisConsumer(AbstractConsumer, RedisMixin):
1052     """
1053     redis做爲中間件實現的。
1054     """
1055     BROKER_KIND = 2
1056 
1057     def _shedual_task(self):
1058         while True:
1059             t_start = time.time()
1060             task_bytes = self.redis_db7.blpop(self._queue_name)[1]  # 使用db7
1061             if task_bytes:
1062                 self.logger.debug(f'取出的任務時間是 {round(time.time() - t_start, 4)}    消息是:  {task_bytes.decode()}  ')
1063                 task_dict = json.loads(task_bytes)
1064                 kw = {'body': task_dict}
1065                 self._submit_task(kw)
1066 
1067     def _confirm_consume(self, kw):
1068         pass  # redis沒有確認消費的功能。
1069 
1070     def _requeue(self, kw):
1071         self.redis_db7.rpush(self._queue_name, json.dumps(kw['body']))
1072 
1073 
1074 class MongoMqConsumer(AbstractConsumer, MongoMixin):
1075     """
1076     Mongo queue包實現的基於mongo的消息隊列,支持消費確認。
1077     """
1078     BROKER_KIND = 5
1079 
1080     def _shedual_task(self):
1081         mp = MongoMqPublisher(self.queue_name)
1082         while True:
1083             t_start = time.time()
1084             job = mp.queue.next()
1085             if job is not None:
1086                 self.logger.debug(f'取出的任務時間是 {round(time.time() - t_start, 4)}    消息是:  {job.payload}  ')
1087                 kw = {'body': job.payload, 'job': job}
1088                 self._submit_task(kw)
1089             else:
1090                 time.sleep(self._msg_schedule_time_intercal)
1091 
1092     def _confirm_consume(self, kw):
1093         kw['job'].complete()
1094 
1095     def _requeue(self, kw):
1096         kw['job'].release()
1097 
1098 
1099 class PersistQueueConsumer(AbstractConsumer):
1100     """
1101     persist queue包實現的本地持久化消息隊列。
1102     """
1103     BROKER_KIND = 6
1104 
1105     def _shedual_task(self):
1106         pub = PersistQueuePublisher(self.queue_name)
1107         while True:
1108             t_start = time.time()
1109             item = pub.queue.get()
1110             self.logger.debug(f'取出的任務時間是 {round(time.time() - t_start, 4)}    消息是:  {item}  ')
1111             kw = {'body': json.loads(item), 'q': pub.queue, 'item': item}
1112             self._submit_task(kw)
1113 
1114     def _confirm_consume(self, kw):
1115         kw['q'].ack(kw['item'])
1116 
1117     def _requeue(self, kw):
1118         kw['q'].nack(kw['item'])
1119 
1120 
1121 class LocalPythonQueueConsumer(AbstractConsumer):
1122     BROKER_KIND = 3
1123 
1124     @property
1125     def local_python_queue(self) -> Queue:
1126         return local_pyhton_queue_name__local_pyhton_queue_obj_map[self._queue_name]
1127 
1128     def _shedual_task(self):
1129         while True:
1130             t_start = time.time()
1131             task = self.local_python_queue.get()
1132             if isinstance(task, str):
1133                 task = json.loads(task)
1134             self.logger.debug(f'取出的任務時間是 {round(time.time() - t_start, 4)}    消息是:  {json.dumps(task)}  ')
1135             task_dict = task
1136             kw = {'body': task_dict}
1137             self._submit_task(kw)
1138 
1139     def _confirm_consume(self, kw):
1140         pass
1141 
1142     def _requeue(self, kw):
1143         self.local_python_queue.put(kw['body'])
1144 
1145 
1146 def get_publisher(queue_name, *, log_level_int=10, logger_prefix='', is_add_file_handler=False, clear_queue_within_init=False, is_add_publish_time=False, broker_kind=0):
1147     """
1148     :param queue_name:
1149     :param log_level_int:
1150     :param logger_prefix:
1151     :param is_add_file_handler:
1152     :param clear_queue_within_init:
1153     :param is_add_publish_time:是否添加發布時間到中間件,若是設置了過時時間不爲0,須要設爲True
1154     :param broker_kind: 中間件或使用包的種類。
1155     :return:
1156     """
1157     all_kwargs = copy.deepcopy(locals())
1158     all_kwargs.pop('broker_kind')
1159     if broker_kind == 0:
1160         return RabbitmqPublisher(**all_kwargs)
1161     elif broker_kind == 1:
1162         return RabbitmqPublisherUsingRabbitpy(**all_kwargs)
1163     elif broker_kind == 2:
1164         return RedisPublisher(**all_kwargs)
1165     elif broker_kind == 3:
1166         return LocalPythonQueuePublisher(**all_kwargs)
1167     elif broker_kind == 4:
1168         return RabbitmqPublisherUsingAmqpStorm(**all_kwargs)
1169     elif broker_kind == 5:
1170         return MongoMqPublisher(**all_kwargs)
1171     elif broker_kind == 6:
1172         return PersistQueuePublisher(**all_kwargs)
1173     else:
1174         raise ValueError('設置的中間件種類數字不正確')
1175 
1176 
1177 def get_consumer(queue_name, *, consuming_function: Callable = None, function_timeout=0, threads_num=50, specify_threadpool=None, concurrent_mode=1,
1178                  max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, msg_expire_senconds=0,
1179                  logger_prefix='', create_logger_file=True, do_task_filtering=False, is_consuming_function_use_multi_params=True,
1180                  is_do_not_run_by_specify_time_effect=False, do_not_run_by_specify_time=('10:00:00', '22:00:00'),
1181                  schedule_tasks_on_main_thread=False, broker_kind=0):
1182     """
1183     使用工廠模式再包一層,經過設置數字來生成基於不一樣中間件或包的consumer。
1184     :param queue_name:
1185     :param consuming_function: 處理消息的函數。
1186     :param function_timeout : 超時秒數,函數運行超過這個時間,則自動殺死函數。爲0是不限制。
1187     :param threads_num:
1188     :param specify_threadpool:使用指定的線程池,能夠多個消費者共使用一個線程池,不爲None時候。threads_num失效
1189     :param concurrent_mode:併發模式,1線程 2gevent 3eventlet
1190     :param max_retry_times:
1191     :param log_level:
1192     :param is_print_detail_exception:
1193     :param msg_schedule_time_intercal:消息調度的時間間隔,用於控頻
1194     :param msg_expire_senconds:消息過時時間,爲0永不過時,爲10則表明,10秒以前發佈的任務若是如今才輪到消費則丟棄任務。
1195     :param logger_prefix: 日誌前綴,可以使不一樣的消費者生成不一樣的日誌
1196     :param create_logger_file : 是否建立文件日誌
1197     :param do_task_filtering :是否執行基於函數參數的任務過濾
1198     :param is_consuming_function_use_multi_params  函數的參數是不是傳統的多參數,不爲單個body字典表示多個參數。
1199     :param is_do_not_run_by_specify_time_effect :是否使不運行的時間段生效
1200     :param do_not_run_by_specify_time   :不運行的時間段
1201     :param schedule_tasks_on_main_thread :直接在主線程調度任務,意味着不能直接在當前主線程同時開啓兩個消費者。
1202     :param broker_kind:中間件種類,,不要設置爲1。 0 使用pika連接mq,2使用redis,3使用python內置Queue
1203     :return
1204     """
1205     all_kwargs = copy.copy(locals())
1206     all_kwargs.pop('broker_kind')
1207     if broker_kind == 0:
1208         return RabbitmqConsumer(**all_kwargs)
1209     elif broker_kind == 1:
1210         return RabbitmqConsumerRabbitpy(**all_kwargs)
1211     elif broker_kind == 2:
1212         return RedisConsumer(**all_kwargs)
1213     elif broker_kind == 3:
1214         return LocalPythonQueueConsumer(**all_kwargs)
1215     elif broker_kind == 4:
1216         return RabbitmqConsumerAmqpStorm(**all_kwargs)
1217     elif broker_kind == 5:
1218         return MongoMqConsumer(**all_kwargs)
1219     elif broker_kind == 6:
1220         return PersistQueueConsumer(**all_kwargs)
1221     else:
1222         raise ValueError('設置的中間件種類數字不正確')
1223 
1224 
1225 # noinspection PyMethodMayBeStatic,PyShadowingNames
1226 class _Test(unittest.TestCase, LoggerMixin, RedisMixin):
1227     """
1228     演示一個簡單求和的例子。
1229     """
1230 
1231     @unittest.skip
1232     def test_publisher_with(self):
1233         """
1234         測試上下文管理器。
1235         :return:
1236         """
1237         with RabbitmqPublisher('queue_test') as rp:
1238             for i in range(1000):
1239                 rp.publish(str(i))
1240 
1241     @unittest.skip
1242     def test_publish_rabbit(self):
1243         """
1244         測試mq推送
1245         :return:
1246         """
1247         rabbitmq_publisher = RabbitmqPublisher('queue_test', log_level_int=10, logger_prefix='yy平臺推送')
1248         rabbitmq_publisher.clear()
1249         for i in range(500000):
1250             try:
1251                 time.sleep(1)
1252                 rabbitmq_publisher.publish({'a': i, 'b': 2 * i})
1253             except Exception as e:
1254                 print(e)
1255 
1256         rabbitmq_publisher = RabbitmqPublisher('queue_test2', log_level_int=20, logger_prefix='zz平臺推送')
1257         rabbitmq_publisher.clear()
1258         [rabbitmq_publisher.publish({'somestr_to_be_print': str(i)}) for i in range(500000)]
1259 
1260     @unittest.skip
1261     def test_publish_redis(self):
1262         # 若是須要批量推送
1263         for i in range(10007):
1264             # 最犀利的批量操做方式,自動聚合多條redis命令,支持多種redis混合命令批量操做。
1265             RedisBulkWriteHelper(self.redis_db7, 1000).add_task(RedisOperation('lpush', 'queue_test', json.dumps({'a': i, 'b': 2 * i})))
1266         [self.redis_db7.lpush('queue_test', json.dumps({'a': j, 'b': 2 * j})) for j in range(500)]
1267         print('推送完畢')
1268 
1269     @unittest.skip
1270     def test_consume(self):
1271         """
1272         單參數表明全部傳參
1273         :return:
1274         """
1275 
1276         def f(body):
1277             self.logger.info(f'消費此消息 {body}')
1278             # print(body['a'] + body['b'])
1279             time.sleep(5)  # 模擬作某事須要阻塞10秒種,必須用併發。
1280 
1281         # 把消費的函數名傳給consuming_function,就這麼簡單。
1282         rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=20, msg_schedule_time_intercal=0.5, log_level=10, logger_prefix='yy平臺消費',
1283                                              is_consuming_function_use_multi_params=False)
1284         rabbitmq_consumer.start_consuming_message()
1285 
1286     @unittest.skip
1287     def test_consume2(self):
1288         """
1289         測試支持傳統參數形式,不是用一個字典裏面包含全部參數。
1290         :return:
1291         """
1292 
1293         def f2(a, b):
1294             self.logger.debug(f'a的值是 {a}')
1295             self.logger.debug(f'b的值是 {b}')
1296             print(f'{a} + {b} 的和是  {a + b}')
1297             time.sleep(3)  # 模擬作某事須要阻塞10秒種,必須用併發。
1298 
1299         # 把消費的函數名傳給consuming_function,就這麼簡單。
1300         RabbitmqConsumer('queue_test', consuming_function=f2, threads_num=60, msg_schedule_time_intercal=5, log_level=10, logger_prefix='yy平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message()
1301 
1302     @unittest.skip
1303     def test_redis_filter(self):
1304         """
1305         測試基於redis set結構的過濾器。
1306         :return:
1307         """
1308         redis_filter = RedisFilter('abcd')
1309         redis_filter.add_a_value({'a': 1, 'c': 3, 'b': 2})
1310         redis_filter.check_value_exists({'a': 1, 'c': 3, 'b': 2})
1311         redis_filter.check_value_exists({'a': 1, 'b': 2, 'c': 3})
1312         with decorators.TimerContextManager():
1313             print(redis_filter.check_value_exists('{"a": 1, "b": 2, "c": 3}'))
1314         with decorators.TimerContextManager():
1315             # 實測百萬元素的set,過濾檢查不須要1毫秒,通常最多100萬個酒店。
1316             print(RedisFilter('filter:mafengwo-detail_task').check_value_exists({"_id": "69873340"}))
1317 
1318     @unittest.skip
1319     def test_run_two_function(self):
1320         # 演示連續運行兩個consumer
1321         def f3(a, b):
1322             print(f'{a} + {b} = {a + b}')
1323             time.sleep(10)  # 模擬作某事須要阻塞10秒種,必須用併發。
1324 
1325         def f4(somestr_to_be_print):
1326             print(f'打印 {somestr_to_be_print}')
1327             time.sleep(20)  # 模擬作某事須要阻塞10秒種,必須用併發。
1328 
1329         RabbitmqConsumer('queue_test', consuming_function=f3, threads_num=20, msg_schedule_time_intercal=2, log_level=10, logger_prefix='yy平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message()
1330         RabbitmqConsumer('queue_test2', consuming_function=f4, threads_num=20, msg_schedule_time_intercal=4, log_level=10, logger_prefix='zz平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message()
1331         # AbstractConsumer.join_shedual_task_thread()
1332 
1333     @unittest.skip
1334     def test_local_python_queue_as_broker(self):
1335         def f8(x, y):
1336             nb_print((x, y))
1337 
1338         consumer = get_consumer('queue_testlll', consuming_function=f8, threads_num=30, msg_schedule_time_intercal=1, log_level=10, logger_prefix='uu平臺消費',
1339                                 function_timeout=20, is_print_detail_exception=True, msg_expire_senconds=5, broker_kind=3)  # 經過設置broker_kind,一鍵切換中間件爲mq或redis
1340         get_publisher('queue_testlll', broker_kind=3, is_add_publish_time=True).publish({'x': 3, 'y': 4})
1341         consumer.publisher_of_same_queue.set_is_add_publish_time(True).publish({'x': 1, 'y': 2})
1342         nb_print(consumer.publisher_of_same_queue.get_message_count())
1343         consumer.start_consuming_message()
1344         for i in range(10000):
1345             consumer.publisher_of_same_queue.publish({'x': i, 'y': i * 2})
1346             time.sleep(2)
1347 
1348     # @unittest.skip
1349     def test_factory_pattern_consumer(self):
1350         """
1351         測試工廠模式來生成消費者
1352         :return:
1353         """
1354 
1355         def f2(a, b):
1356             # body_dict = json.loads(body)
1357 
1358             self.logger.info(f'消費此消息 {a}  {b} ,結果是  {a + b}')
1359             # print(body_dict['a'] + body_dict['b'])
1360             time.sleep(30)  # 模擬作某事須要阻塞10秒種,必須用併發。
1361             # 把消費的函數名傳給consuming_function,就這麼簡單。
1362 
1363         consumer = get_consumer('queue_test5', consuming_function=f2, threads_num=30, msg_schedule_time_intercal=1, log_level=10, logger_prefix='zz平臺消費',
1364                                 function_timeout=20, is_print_detail_exception=True, msg_expire_senconds=500, broker_kind=0)  # 經過設置broker_kind,一鍵切換中間件爲mq或redis
1365         consumer.publisher_of_same_queue.clear()
1366         [consumer.publisher_of_same_queue.publish({'a': i, 'b': 2 * i}) for i in range(1)]
1367         time.sleep(10)  # sleep測試消息過時。
1368         get_publisher('queue_test5', broker_kind=0).set_is_add_publish_time().publish({'a': 1000, 'b': 2000})
1369         consumer.start_consuming_message()
1370         # consumer.join_shedual_task_thread()
1371         # block_python_exit.just_block_python_exit()
1372         # show_current_threads_num(block=True)
1373 
1374 
1375 if __name__ == '__main__':
1376     # noinspection PyArgumentList
1377     unittest.main(sleep_time=1)

 

 

 gevent 併發模式:

 

 

 

 

 

本地持久化隊列,使用sqlite3模擬消息隊列的圖片。


mongodb模擬的消息隊列

相關文章
相關標籤/搜索