新增的中間件和併發模式見註釋。python
消息隊列中間件方面celery支持的,都要支持。併發模式,celery支持的都要支持。linux
從無限重複類似代碼抽取框架,作成萬能複用,是生產力的保障。redis
使用模板模式使加新中間件時候,在改實現消費框架的代碼很是方便,不會影響到原有中間件使用。sql
使用策略模式使加入新的併發模式,,在改實現消費框架的代碼很是方便,不會影響到原有併發模式。mongodb
因此實現消費框架的代碼雖然很長有1000多行,但修改和增長的時候不會出現如履薄冰的懼怕情緒。json
使用工廠模式,使得調用框架時候,很是容易切換基於不一樣消息中間件的使用,只須要改一個數字就改變消費和推送代碼使用的中間件。安全
使快速測試不一樣種類的中間件和併發方式變得很容易。 數據結構
7種中間件包括使用pika rabbitpy aqpstorm操做rabbitmq、基於redis的list數據結構、基於mongo queue包實現的mongo消息隊列 、基於python Queue對象的消息隊列(隨着python解釋器退出而消失)、基於使用persitqueue包實現的sqllite3本地持久化隊列。多線程
3種併發模式爲thread、gevent、evenlet模式。(支持基於多進程的分佈式模式,因爲啓動進程必須是在___name__ = main裏面,因此須要用戶本身寫,本身寫Process(target=f).start()。若是要使用多進程,通常也是前面三種模式 加 進程模式配合,例如多進程 + gevent ,只有100%純cpu計算的才適合純多進程。)併發
使用多進程的目的,開32進程,最高可使雙路e5 32核 cpu使用率達到3200%, 將cpu打滿 ,充分利用cpu資源。 若是不開多進程,就算程序忙的要命,cpu使用率過不了110%,浪費cpu資源。
1 # -*- coding: utf-8 -*- 2 # @Author : ydf 3 4 """ 5 類celery的worker模式,可用於一切須要分佈式併發的地方,最好是io類型的。能夠分佈式調度起一切函數。 6 rabbitmq生產者和消費者框架。徹底實現了celery worker模式的所有功能,使用更簡單。支持自動重試指定次數, 7 消費確認,指定數量的併發線程,和指定頻率控制1秒鐘只運行幾回, 同時對mongodb類型的異常作了特殊處理 8 最開始寫得是使用pika包,非線程安全,後來加入rabbitpy,rabbitpy包推送會丟失部分數據,推薦pika包使用 9 單下劃線表明保護,雙下劃線表明私有。只要關注公有方法就能夠,其他是類內部自調用方法。 10 11 12 13 3月15日 14 1)、新增RedisConsumer 是基於redis中間件的消費框架,不支持隨意暫停程序或者斷點,會丟失一部分正在運行中的任務,推薦使用rabbitmq的方式。 15 get_consumer是使用工廠模式來生成基於rabbit和reids的消費者,使用不一樣中間件的消費框架更靈活一點點,只須要修改一個數字。 16 17 3月20日 18 2)、增長支持函數參數過濾的功能,能夠隨時放心屢次推送相同的任務到中間件,會先檢查該任務是否須要執行,避免浪費cpu和流量,加快處理速度。 19 基於函數參數值的過濾,須要設置 do_task_filtering 參數爲True才生效,默認爲False。 20 3)、新增支持了函數的參數是多個參數,須要設置is_consuming_function_use_multi_params 爲True生效,爲了兼容老代碼默認爲False。 21 區別是消費函數原來須要 22 def f(body): # 函數有且只能有一個參數,是字典的多個鍵值對來表示參數的值。 23 print(body['a']) 24 print(body['b']) 25 26 如今能夠 27 def f(a,b): 28 print(a) 29 print(b) 30 31 對於推送的部分,都是同樣的,都是推送 {"a":1,"b":2} 32 33 開啓消費都是 get_consumer('queue_test', consuming_function=f).start_consuming_message() 34 35 6月3日 36 1) 增長了RedisPublisher類,和增長get_publisher工廠模式 37 方法同mqpublisher同樣,這是爲了加強一致性,之後每一個業務的推送和消費, 38 若是不直接使用RedisPublisher RedisConsumerer RabbitmqPublisher RabbitMQConsumer這些類,而是使用get_publisher和get_consumer來獲取發佈和消費對象, 39 支持修改一個全局變量的broker_kind數字來切換全部平臺消費和推送的中間件種類。 40 2)增長指定不運行的時間的配置。例如能夠白天不運行,只在晚上運行。 41 3)增長了函數超時的配置,當函數運行時間超過n秒後,自動殺死函數,拋出異常。 42 4) 增長每分鐘函數運行次數統計,和按照最近一分鐘運行函數次數來預估多久能夠運行完成當前隊列剩餘的任務。 43 5) 增長一個判斷函數,阻塞判斷連續多少分鐘隊列裏面是空的。判斷任務疑似完成。 44 6)增長一個終止消費者的標誌,設置標誌後終止循環調度消息。 45 7) consumer對象增長內置一個屬性,表示相同隊列名的publisher實例。 46 47 6月29日 48 1) 增長消息過時時間的配置,消費時候距離發佈時候超過必定時間,丟棄任務。 49 2)增長基於python內置Queue對象的本地隊列做爲中間件的發佈者和消費者,公有方法的用法與redis和mq的徹底一致, 50 方便沒有安裝mq和redis的環境使用測試除分佈式之外的其餘主要功能。使用內置queue沒法分佈式和不支持程序重啓任務接續。 51 好處是能夠改一個數字就把代碼運行起來在本地測試,不會接受和推送消息到中間件影響別人,別人也影響不了本身,自測很合適。 52 3)實例化發佈者時候,不在初始化方法中連接中間件,延遲到首次真正使用操做中間件的方法。 53 4)BoundedThreadpoolExecutor替換成了新的CustomThreadpoolExecutor 54 55 56 7月2日 57 加入了gevent併發模式,設置concurrent_mode爲2生效。 58 59 7月3日 60 加入了evenlet併發模式,設置concurrent_mode爲3生效。 61 62 7月4日 63 1)增長使用amqpstorm實現的rabbit操做的中間件,設置broker_kind爲4生效,支持消費確認 64 2)增長mongo-queue實現的mongodb爲中間件的隊列,設置broker_kind爲5生效,支持確認消費 65 3)增長persistqueue sqllite3實現的本地持久化隊列,支持多進程和屢次啓動不在同一個解釋器下的本地分佈式。比python內置Queue對象增長了持久化和支持不一樣啓動批次的腳本推送 消費。sqllite不須要安裝這個中間件就能夠更方便使用。設置broker_kind爲6生效,支持確認消費。 66 67 """ 68 # import functools 69 import abc 70 # import atexit 71 import atexit 72 import copy 73 from queue import Queue 74 import threading 75 import gevent 76 import eventlet 77 import traceback 78 import typing 79 import json 80 from collections import Callable, OrderedDict 81 import time 82 from functools import wraps 83 from threading import Lock, Thread 84 import unittest 85 86 from mongomq import MongoQueue # pip install mongo-mq==0.0.1 87 import sqlite3 88 import persistqueue # pip install persist-queue==0.4.2 89 import amqpstorm # pip install AMQPStorm==2.7.1 90 from amqpstorm.basic import Basic as AmqpStormBasic 91 from amqpstorm.queue import Queue as AmqpStormQueue 92 import rabbitpy 93 from pika import BasicProperties 94 # noinspection PyUnresolvedReferences 95 from pika.exceptions import ChannelClosed 96 # from rabbitpy.message import Properties 97 import pika 98 from pika.adapters.blocking_connection import BlockingChannel 99 from pymongo.errors import PyMongoError 100 from app.utils_ydf import (LogManager, LoggerMixin, RedisMixin, RedisBulkWriteHelper, RedisOperation, decorators, time_util, LoggerLevelSetterMixin, nb_print, CustomThreadPoolExecutor, MongoMixin) 101 # noinspection PyUnresolvedReferences 102 from app.utils_ydf import BoundedThreadPoolExecutor, block_python_exit 103 from app.utils_ydf.custom_evenlet_pool_executor import CustomEventletPoolExecutor, check_evenlet_monkey_patch, evenlet_timeout_deco 104 from app.utils_ydf.custom_gevent_pool_executor import GeventPoolExecutor, check_gevent_monkey_patch, gevent_timeout_deco 105 from app import config as app_config 106 107 # LogManager('pika').get_logger_and_add_handlers(10) 108 # LogManager('pika.heartbeat').get_logger_and_add_handlers(10) 109 # LogManager('rabbitpy').get_logger_and_add_handlers(10) 110 # LogManager('rabbitpy.base').get_logger_and_add_handlers(10) 111 from app.utils_ydf.custom_threadpool_executor import check_not_monkey 112 113 114 def delete_keys_from_dict(dictx: dict, keys: list): 115 for dict_key in keys: 116 dictx.pop(dict_key) 117 118 119 def delete_keys_and_return_new_dict(dictx: dict, keys: list): 120 dict_new = copy.copy(dictx) # 主要是去掉一級鍵 publish_time,淺拷貝便可。 121 for dict_key in keys: 122 try: 123 dict_new.pop(dict_key) 124 except KeyError: 125 pass 126 return dict_new 127 128 129 class ExceptionForRetry(Exception): 130 """爲了重試的,拋出錯誤。只是定義了一個子類,用不用均可以""" 131 132 133 class ExceptionForRequeue(Exception): 134 """框架檢測到此錯誤,從新放回隊列中""" 135 136 137 class ExceptionForRabbitmqRequeue(ExceptionForRequeue): # 之後去掉這個異常,拋出上面那個異常就能夠了。 138 """遇到此錯誤,從新放回隊列中""" 139 140 141 class RabbitmqClientRabbitPy: 142 """ 143 使用rabbitpy包。 144 """ 145 146 # noinspection PyUnusedLocal 147 def __init__(self, username, password, host, port, virtual_host, heartbeat=0): 148 rabbit_url = f'amqp://{username}:{password}@{host}:{port}/{virtual_host}?heartbeat={heartbeat}' 149 self.connection = rabbitpy.Connection(rabbit_url) 150 151 def creat_a_channel(self) -> rabbitpy.AMQP: 152 return rabbitpy.AMQP(self.connection.channel()) # 使用適配器,使rabbitpy包的公有方法幾乎接近pika包的channel的方法。 153 154 155 class RabbitmqClientPika: 156 """ 157 使用pika包,多線程不安全的包。 158 """ 159 160 def __init__(self, username, password, host, port, virtual_host, heartbeat=0): 161 """ 162 parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F') 163 164 connection = pika.SelectConnection(parameters=parameters, 165 on_open_callback=on_open) 166 :param username: 167 :param password: 168 :param host: 169 :param port: 170 :param virtual_host: 171 :param heartbeat: 172 """ 173 credentials = pika.PlainCredentials(username, password) 174 self.connection = pika.BlockingConnection(pika.ConnectionParameters( 175 host, port, virtual_host, credentials, heartbeat=heartbeat)) 176 # self.connection = pika.SelectConnection(pika.ConnectionParameters( 177 # host, port, virtual_host, credentials, heartbeat=heartbeat)) 178 179 def creat_a_channel(self) -> BlockingChannel: 180 return self.connection.channel() 181 182 183 class RabbitMqFactory: 184 def __init__(self, username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60 * 10, is_use_rabbitpy=0): 185 """ 186 :param username: 187 :param password: 188 :param port: 189 :param virtual_host: 190 :param heartbeat: 191 :param is_use_rabbitpy: 爲0使用pika,多線程不安全。爲1使用rabbitpy,多線程安全的包。 192 """ 193 if is_use_rabbitpy: 194 self.rabbit_client = RabbitmqClientRabbitPy(username, password, host, port, virtual_host, heartbeat) 195 else: 196 self.rabbit_client = RabbitmqClientPika(username, password, host, port, virtual_host, heartbeat) 197 198 def get_rabbit_cleint(self): 199 return self.rabbit_client 200 201 202 class AbstractPublisher(LoggerLevelSetterMixin, metaclass=abc.ABCMeta, ): 203 has_init_broker = 0 204 205 def __init__(self, queue_name, log_level_int=10, logger_prefix='', is_add_file_handler=True, clear_queue_within_init=False, is_add_publish_time=False, ): 206 """ 207 :param queue_name: 208 :param log_level_int: 209 :param logger_prefix: 210 :param is_add_file_handler: 211 :param clear_queue_within_init: 212 """ 213 self._queue_name = queue_name 214 if logger_prefix != '': 215 logger_prefix += '--' 216 logger_name = f'{logger_prefix}{self.__class__.__name__}--{queue_name}' 217 self.logger = LogManager(logger_name).get_logger_and_add_handlers(log_level_int, log_filename=f'{logger_name}.log' if is_add_file_handler else None) # 218 # self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=is_use_rabbitpy).get_rabbit_cleint() 219 # self.channel = self.rabbit_client.creat_a_channel() 220 # self.queue = self.channel.queue_declare(queue=queue_name, durable=True) 221 self._lock_for_pika = Lock() 222 self._lock_for_count = Lock() 223 self._current_time = None 224 self.count_per_minute = None 225 self._init_count() 226 self.custom_init() 227 self.logger.info(f'{self.__class__} 被實例化了') 228 self.publish_msg_num_total = 0 229 self._is_add_publish_time = is_add_publish_time 230 # atexit.register(self.__at_exit) 231 if clear_queue_within_init: 232 self.clear() 233 234 def set_is_add_publish_time(self, is_add_publish_time=True): 235 self._is_add_publish_time = is_add_publish_time 236 return self 237 238 def _init_count(self): 239 with self._lock_for_count: 240 self._current_time = time.time() 241 self.count_per_minute = 0 242 243 def custom_init(self): 244 pass 245 246 def publish(self, msg: typing.Union[str, dict]): 247 if isinstance(msg, str): 248 msg = json.loads(msg) 249 if self._is_add_publish_time: 250 # msg.update({'publish_time': time.time(), 'publish_time_format': time_util.DatetimeConverter().datetime_str}) 251 msg.update({'publish_time': round(time.time(), 4), }) 252 t_start = time.time() 253 decorators.handle_exception(retry_times=10, is_throw_error=True, time_sleep=0.1)(self.concrete_realization_of_publish)(json.dumps(msg)) 254 self.logger.debug(f'向{self._queue_name} 隊列,推送消息 耗時{round(time.time() - t_start, 4)}秒 {msg}') 255 with self._lock_for_count: 256 self.count_per_minute += 1 257 self.publish_msg_num_total += 1 258 if time.time() - self._current_time > 10: 259 self.logger.info(f'10秒內推送了 {self.count_per_minute} 條消息,累計推送了 {self.publish_msg_num_total} 條消息到 {self._queue_name} 中') 260 self._init_count() 261 262 @abc.abstractmethod 263 def concrete_realization_of_publish(self, msg): 264 raise NotImplementedError 265 266 @abc.abstractmethod 267 def clear(self): 268 raise NotImplementedError 269 270 @abc.abstractmethod 271 def get_message_count(self): 272 raise NotImplementedError 273 274 @abc.abstractmethod 275 def close(self): 276 raise NotImplementedError 277 278 def __enter__(self): 279 return self 280 281 def __exit__(self, exc_type, exc_val, exc_tb): 282 self.close() 283 self.logger.warning(f'with中自動關閉publisher鏈接,累計推送了 {self.publish_msg_num_total} 條消息 ') 284 285 def __at_exit(self): 286 self.logger.warning(f'程序關閉前,累計推送了 {self.publish_msg_num_total} 條消息 到 {self._queue_name} 中') 287 288 289 def deco_mq_conn_error(f): 290 @wraps(f) 291 def _deco_mq_conn_error(self, *args, **kwargs): 292 if not self.has_init_broker: 293 self.logger.warning(f'對象的方法 【{f.__name__}】 首次使用 rabbitmq channel,進行初始化執行 init_broker 方法') 294 self.init_broker() 295 self.has_init_broker = 1 296 return f(self, *args, **kwargs) 297 # noinspection PyBroadException 298 try: 299 return f(self, *args, **kwargs) 300 except (pika.exceptions.AMQPError, amqpstorm.AMQPError) as e: # except Exception as e: # 如今裝飾器用到了絕大多出地方,單個異常類型不行。ex 301 self.logger.error(f'rabbitmq連接出錯 ,方法 {f.__name__} 出錯 ,{e}') 302 self.init_broker() 303 return f(self, *args, **kwargs) 304 305 return _deco_mq_conn_error 306 307 308 class RabbitmqPublisher(AbstractPublisher): 309 """ 310 使用pika實現的。 311 """ 312 313 # noinspection PyAttributeOutsideInit 314 def init_broker(self): 315 self.logger.warning(f'使用pika 連接mq') 316 self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint() 317 self.channel = self.rabbit_client.creat_a_channel() 318 self.queue = self.channel.queue_declare(queue=self._queue_name, durable=True) 319 320 # noinspection PyAttributeOutsideInit 321 @deco_mq_conn_error 322 def concrete_realization_of_publish(self, msg): 323 with self._lock_for_pika: # 親測pika多線程publish會出錯 324 self.channel.basic_publish(exchange='', 325 routing_key=self._queue_name, 326 body=msg, 327 properties=BasicProperties( 328 delivery_mode=2, # make message persistent 2(1是非持久化) 329 ) 330 ) 331 332 @deco_mq_conn_error 333 def clear(self): 334 self.channel.queue_purge(self._queue_name) 335 self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功') 336 337 @deco_mq_conn_error 338 def get_message_count(self): 339 with self._lock_for_pika: 340 queue = self.channel.queue_declare(queue=self._queue_name, durable=True) 341 return queue.method.message_count 342 343 # @deco_mq_conn_error 344 def close(self): 345 self.channel.close() 346 self.rabbit_client.connection.close() 347 self.logger.warning('關閉pika包 連接') 348 349 350 class RabbitmqPublisherUsingRabbitpy(AbstractPublisher): 351 """ 352 使用rabbitpy包實現的。 353 """ 354 355 # noinspection PyAttributeOutsideInit 356 def init_broker(self): 357 self.logger.warning(f'使用rabbitpy包 連接mq') 358 self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint() 359 self.channel = self.rabbit_client.creat_a_channel() 360 self.queue = self.channel.queue_declare(queue=self._queue_name, durable=True) 361 362 # @decorators.tomorrow_threads(10) 363 @deco_mq_conn_error 364 def concrete_realization_of_publish(self, msg): 365 # noinspection PyTypeChecker 366 self.channel.basic_publish( 367 exchange='', 368 routing_key=self._queue_name, 369 body=msg, 370 properties={'delivery_mode': 2}, 371 ) 372 373 @deco_mq_conn_error 374 def clear(self): 375 self.channel.queue_purge(self._queue_name) 376 self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功') 377 378 @deco_mq_conn_error 379 def get_message_count(self): 380 # noinspection PyUnresolvedReferences 381 ch_raw_rabbity = self.channel.channel 382 return rabbitpy.amqp_queue.Queue(ch_raw_rabbity, self._queue_name, durable=True) 383 384 # @deco_mq_conn_error 385 def close(self): 386 self.channel.close() 387 self.rabbit_client.connection.close() 388 self.logger.warning('關閉rabbitpy包 連接mq') 389 390 391 class RabbitmqPublisherUsingAmqpStorm(AbstractPublisher): 392 # 使用amqpstorm包實現的mq操做。 393 # 實例屬性沒在init裏面寫,形成補全很麻煩,寫在這裏作類屬性,方便pycharm補全 394 connection = amqpstorm.UriConnection 395 channel = amqpstorm.Channel 396 channel_wrapper_by_ampqstormbaic = AmqpStormBasic 397 queue = AmqpStormQueue 398 399 # noinspection PyAttributeOutsideInit 400 # @decorators.synchronized 401 def init_broker(self): 402 # username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60 * 10 403 self.logger.warning(f'使用AmqpStorm包 連接mq') 404 self.connection = amqpstorm.UriConnection( 405 f'amqp://{app_config.RABBITMQ_USER}:{app_config.RABBITMQ_PASS}@{app_config.RABBITMQ_HOST}:{app_config.RABBITMQ_PORT}/{app_config.RABBITMQ_VIRTUAL_HOST}?heartbeat={60 * 10}' 406 ) 407 self.channel = self.connection.channel() # type:amqpstorm.Channel 408 self.channel_wrapper_by_ampqstormbaic = AmqpStormBasic(self.channel) 409 self.queue = AmqpStormQueue(self.channel) 410 self.queue.declare(queue=self._queue_name, durable=True) 411 412 # @decorators.tomorrow_threads(10) 413 @deco_mq_conn_error 414 def concrete_realization_of_publish(self, msg): 415 self.channel_wrapper_by_ampqstormbaic.publish(exchange='', 416 routing_key=self._queue_name, 417 body=msg, 418 properties={'delivery_mode': 2}, ) 419 # nb_print(msg) 420 421 @deco_mq_conn_error 422 def clear(self): 423 self.queue.purge(self._queue_name) 424 self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功') 425 426 @deco_mq_conn_error 427 def get_message_count(self): 428 # noinspection PyUnresolvedReferences 429 return self.queue.declare(queue=self._queue_name, durable=True)['message_count'] 430 431 # @deco_mq_conn_error 432 def close(self): 433 self.channel.close() 434 self.connection.close() 435 self.logger.warning('關閉rabbitpy包 連接mq') 436 437 438 class RedisPublisher(AbstractPublisher, RedisMixin): 439 """ 440 使用redis做爲中間件 441 """ 442 443 def concrete_realization_of_publish(self, msg): 444 # noinspection PyTypeChecker 445 self.redis_db7.rpush(self._queue_name, msg) 446 447 def clear(self): 448 self.redis_db7.delete(self._queue_name) 449 self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功') 450 451 def get_message_count(self): 452 # nb_print(self.redis_db7,self._queue_name) 453 return self.redis_db7.llen(self._queue_name) 454 455 def close(self): 456 # self.redis_db7.connection_pool.disconnect() 457 pass 458 459 460 class MongoMqPublisher(AbstractPublisher, MongoMixin): 461 # 使用mongo-queue包實現的基於mongodb的隊列。 462 # noinspection PyAttributeOutsideInit 463 def custom_init(self): 464 self.queue = MongoQueue( 465 self.mongo_16_client.get_database('conqume_queues').get_collection(self._queue_name), 466 consumer_id=f"consumer-{time_util.DatetimeConverter().datetime_str}", 467 timeout=600, 468 max_attempts=3, 469 ttl=0) 470 471 def concrete_realization_of_publish(self, msg): 472 # noinspection PyTypeChecker 473 self.queue.put(json.loads(msg)) 474 475 def clear(self): 476 self.queue.clear() 477 self.logger.warning(f'清除 mongo隊列 {self._queue_name} 中的消息成功') 478 479 def get_message_count(self): 480 return self.queue.size() 481 482 def close(self): 483 pass 484 485 486 class PersistQueuePublisher(AbstractPublisher): 487 """ 488 使用persistqueue實現的本地持久化隊列。 489 這個是本地持久化,支持本地多個啓動的python腳本共享隊列任務。與LocalPythonQueuePublisher相比,不會隨着python解釋器退出,致使任務丟失。 490 """ 491 492 # noinspection PyAttributeOutsideInit 493 def custom_init(self): 494 # noinspection PyShadowingNames 495 def _my_new_db_connection(self, path, multithreading, timeout): # 主要是改了sqlite文件後綴,方便pycharm識別和打開。 496 # noinspection PyUnusedLocal 497 conn = None 498 if path == self._MEMORY: 499 conn = sqlite3.connect(path, 500 check_same_thread=not multithreading) 501 else: 502 conn = sqlite3.connect('{}/data.sqlite'.format(path), 503 timeout=timeout, 504 check_same_thread=not multithreading) 505 conn.execute('PRAGMA journal_mode=WAL;') 506 return conn 507 508 persistqueue.SQLiteAckQueue._new_db_connection = _my_new_db_connection # 打猴子補丁。 509 # REMIND 官方測試基於sqlite的本地持久化,比基於純文件的持久化,使用相同固態硬盤和操做系統狀況下,速度快3倍以上,因此這裏選用sqlite方式。 510 511 self.queue = persistqueue.SQLiteAckQueue(path='/sqllite_queues', name=self._queue_name, auto_commit=True, serializer=json, multithreading=True) 512 513 def concrete_realization_of_publish(self, msg): 514 # noinspection PyTypeChecker 515 self.queue.put(msg) 516 517 # noinspection PyProtectedMember 518 def clear(self): 519 sql = f'{"DELETE"} {"FROM"} ack_queue_{self._queue_name}' 520 self.logger.info(sql) 521 self.queue._getter.execute(sql) 522 self.queue._getter.commit() 523 self.logger.warning(f'清除 本地持久化隊列 {self._queue_name} 中的消息成功') 524 525 def get_message_count(self): 526 return self.queue.qsize() 527 528 def close(self): 529 pass 530 531 532 local_pyhton_queue_name__local_pyhton_queue_obj_map = dict() # 使local queue和其餘中間件徹底同樣的使用方式,使用映射保存隊列的名字,使消費和發佈經過隊列名字能找到隊列對象。 533 534 535 class LocalPythonQueuePublisher(AbstractPublisher): 536 """ 537 使用redis做爲中間件 538 """ 539 540 # noinspection PyAttributeOutsideInit 541 def custom_init(self): 542 if self._queue_name not in local_pyhton_queue_name__local_pyhton_queue_obj_map: 543 local_pyhton_queue_name__local_pyhton_queue_obj_map[self._queue_name] = Queue() 544 self.queue = local_pyhton_queue_name__local_pyhton_queue_obj_map[self._queue_name] 545 546 def concrete_realization_of_publish(self, msg): 547 # noinspection PyTypeChecker 548 self.queue.put(msg) 549 550 def clear(self): 551 # noinspection PyUnresolvedReferences 552 self.queue.queue.clear() 553 self.logger.warning(f'清除 本地隊列中的消息成功') 554 555 def get_message_count(self): 556 return self.queue.qsize() 557 558 def close(self): 559 pass 560 561 562 class RedisFilter(RedisMixin): 563 def __init__(self, redis_key_name): 564 self._redis_key_name = redis_key_name 565 566 @staticmethod 567 def _get_ordered_str(value): 568 """對json的鍵值對在redis中進行過濾,須要先把鍵值對排序,不然過濾會不許確如 {"a":1,"b":2} 和 {"b":2,"a":1}""" 569 if isinstance(value, str): 570 value = json.loads(value) 571 ordered_dict = OrderedDict() 572 for k in sorted(value): 573 ordered_dict[k] = value[k] 574 return json.dumps(ordered_dict) 575 576 def add_a_value(self, value: typing.Union[str, dict]): 577 self.redis_db7.sadd(self._redis_key_name, self._get_ordered_str(value)) 578 579 def check_value_exists(self, value): 580 return self.redis_db7.sismember(self._redis_key_name, self._get_ordered_str(value)) 581 582 583 class AbstractConsumer(LoggerLevelSetterMixin, metaclass=abc.ABCMeta, ): 584 time_interval_for_check_do_not_run_time = 60 585 BROKER_KIND = None 586 587 @property 588 @decorators.synchronized 589 def publisher_of_same_queue(self): 590 if not self._publisher_of_same_queue: 591 self._publisher_of_same_queue = get_publisher(self._queue_name, broker_kind=self.BROKER_KIND) 592 if self._msg_expire_senconds: 593 self._publisher_of_same_queue.set_is_add_publish_time() 594 return self._publisher_of_same_queue 595 596 @classmethod 597 def join_shedual_task_thread(cls): 598 """ 599 600 :return: 601 """ 602 """ 603 def ff(): 604 RabbitmqConsumer('queue_test', consuming_function=f3, threads_num=20, msg_schedule_time_intercal=2, log_level=10, logger_prefix='yy平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message() 605 RabbitmqConsumer('queue_test2', consuming_function=f4, threads_num=20, msg_schedule_time_intercal=4, log_level=10, logger_prefix='zz平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message() 606 AbstractConsumer.join_shedual_task_thread() # 若是開多進程啓動消費者,在linux上須要這樣寫下這一行。 607 608 609 if __name__ == '__main__': 610 [Process(target=ff).start() for _ in range(4)] 611 612 """ 613 ConcurrentModeDispatcher.join() 614 615 def __init__(self, queue_name, *, consuming_function: Callable = None, function_timeout=0, threads_num=50, specify_threadpool=None, concurrent_mode=1, 616 max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, msg_expire_senconds=0, 617 logger_prefix='', create_logger_file=True, do_task_filtering=False, is_consuming_function_use_multi_params=True, 618 is_do_not_run_by_specify_time_effect=False, do_not_run_by_specify_time=('10:00:00', '22:00:00'), schedule_tasks_on_main_thread=False): 619 """ 620 :param queue_name: 621 :param consuming_function: 處理消息的函數。 622 :param function_timeout : 超時秒數,函數運行超過這個時間,則自動殺死函數。爲0是不限制。 623 :param threads_num: 624 :param specify_threadpool:使用指定的線程池,能夠多個消費者共使用一個線程池,不爲None時候。threads_num失效 625 :param concurrent_mode:併發模式,暫時支持 線程 、gevent、eventlet三種模式。 1線程 2 gevent 3 evenlet 626 :param max_retry_times: 627 :param log_level: 628 :param is_print_detail_exception: 629 :param msg_schedule_time_intercal:消息調度的時間間隔,用於控頻 630 :param logger_prefix: 日誌前綴,可以使不一樣的消費者生成不一樣的日誌 631 :param create_logger_file : 是否建立文件日誌 632 :param do_task_filtering :是否執行基於函數參數的任務過濾 633 :is_consuming_function_use_multi_params 函數的參數是不是傳統的多參數,不爲單個body字典表示多個參數。 634 :param is_do_not_run_by_specify_time_effect :是否使不運行的時間段生效 635 :param do_not_run_by_specify_time :不運行的時間段 636 :param schedule_tasks_on_main_thread :直接在主線程調度任務,意味着不能直接在當前主線程同時開啓兩個消費者。 637 """ 638 self._queue_name = queue_name 639 self.queue_name = queue_name # 能夠換成公有的,省得外部訪問有警告。 640 self.consuming_function = consuming_function 641 self._function_timeout = function_timeout 642 self._threads_num = threads_num 643 self._specify_threadpool = specify_threadpool 644 self._threadpool = None # 單獨加一個檢測消息數量和心跳的線程 645 self._concurrent_mode = concurrent_mode 646 self._max_retry_times = max_retry_times 647 self._is_print_detail_exception = is_print_detail_exception 648 self._msg_schedule_time_intercal = msg_schedule_time_intercal if msg_schedule_time_intercal > 0.001 else 0.001 649 self._msg_expire_senconds = msg_expire_senconds 650 651 if self._concurrent_mode not in (1, 2, 3): 652 raise ValueError('設置的併發模式不正確') 653 self._concurrent_mode_dispatcher = ConcurrentModeDispatcher(self) 654 655 self._logger_prefix = logger_prefix 656 self._log_level = log_level 657 if logger_prefix != '': 658 logger_prefix += '--' 659 logger_name = f'{logger_prefix}{self.__class__.__name__}--{self._concurrent_mode_dispatcher.concurrent_name}--{queue_name}' 660 # nb_print(logger_name) 661 self.logger = LogManager(logger_name).get_logger_and_add_handlers(log_level, log_filename=f'{logger_name}.log' if create_logger_file else None) 662 self.logger.info(f'{self.__class__} 被實例化') 663 664 self._do_task_filtering = do_task_filtering 665 self._redis_filter_key_name = f'filter:{queue_name}' 666 self._redis_filter = RedisFilter(self._redis_filter_key_name) 667 668 self._is_consuming_function_use_multi_params = is_consuming_function_use_multi_params 669 self._lock_for_pika = Lock() 670 671 self._execute_task_times_every_minute = 0 # 每分鐘執行了多少次任務。 672 self._lock_for_count_execute_task_times_every_minute = Lock() 673 self._current_time_for_execute_task_times_every_minute = time.time() 674 675 self._msg_num_in_broker = 0 676 self._last_timestamp_when_has_task_in_queue = 0 677 self._last_timestamp_print_msg_num = 0 678 679 self._is_do_not_run_by_specify_time_effect = is_do_not_run_by_specify_time_effect 680 self._do_not_run_by_specify_time = do_not_run_by_specify_time # 能夠設置在指定的時間段不運行。 681 self._schedule_tasks_on_main_thread = schedule_tasks_on_main_thread 682 683 self.stop_flag = False 684 685 self._publisher_of_same_queue = None 686 687 @property 688 @decorators.synchronized 689 def threadpool(self): 690 return self._concurrent_mode_dispatcher.build_pool() 691 692 def keep_circulating(self, time_sleep=0.001, exit_if_function_run_sucsess=False, is_display_detail_exception=True): 693 """間隔一段時間,一直循環運行某個方法的裝飾器 694 :param time_sleep :循環的間隔時間 695 :param is_display_detail_exception 696 :param exit_if_function_run_sucsess :若是成功了就退出循環 697 """ 698 699 def _keep_circulating(func): 700 # noinspection PyBroadException 701 @wraps(func) 702 def __keep_circulating(*args, **kwargs): 703 while 1: 704 if self.stop_flag: 705 break 706 try: 707 result = func(*args, **kwargs) 708 if exit_if_function_run_sucsess: 709 return result 710 except Exception as e: 711 msg = func.__name__ + ' 運行出錯\n ' + traceback.format_exc(limit=10) if is_display_detail_exception else str(e) 712 self.logger.error(msg) 713 finally: 714 time.sleep(time_sleep) 715 716 return __keep_circulating 717 718 return _keep_circulating 719 720 def start_consuming_message(self): 721 self.logger.warning(f'開始消費 {self._queue_name} 中的消息') 722 # self.threadpool.submit(decorators.keep_circulating(20)(self.check_heartbeat_and_message_count)) 723 self.threadpool.submit(self.keep_circulating(20)(self.check_heartbeat_and_message_count)) 724 if self._schedule_tasks_on_main_thread: 725 # decorators.keep_circulating(1)(self._shedual_task)() 726 self.keep_circulating(1)(self._shedual_task)() 727 else: 728 # t = Thread(target=decorators.keep_circulating(1)(self._shedual_task)) 729 self._concurrent_mode_dispatcher.schedulal_task_with_no_block() 730 731 @abc.abstractmethod 732 def _shedual_task(self): 733 raise NotImplementedError 734 735 def _run_consuming_function_with_confirm_and_retry(self, kw: dict, current_retry_times=0): 736 if self._do_task_filtering and self._redis_filter.check_value_exists(kw['body']): # 對函數的參數進行檢查,過濾已經執行過而且成功的任務。 737 self.logger.info(f'redis的 [{self._redis_filter_key_name}] 鍵 中 過濾任務 {kw["body"]}') 738 self._confirm_consume(kw) 739 return 740 with self._lock_for_count_execute_task_times_every_minute: 741 self._execute_task_times_every_minute += 1 742 if time.time() - self._current_time_for_execute_task_times_every_minute > 60: 743 self.logger.info( 744 f'一分鐘內執行了 {self._execute_task_times_every_minute} 次函數 [ {self.consuming_function.__name__} ] ,預計' 745 f'還須要 {time_util.seconds_to_hour_minute_second(self._msg_num_in_broker / self._execute_task_times_every_minute * 60)} 時間' 746 f'才能執行完成 {self._msg_num_in_broker}個剩餘的任務 ') 747 self._current_time_for_execute_task_times_every_minute = time.time() 748 self._execute_task_times_every_minute = 0 749 750 if current_retry_times < self._max_retry_times + 1: 751 # noinspection PyBroadException 752 t_start = time.time() 753 try: 754 function_run = self.consuming_function if self._function_timeout == 0 else self._concurrent_mode_dispatcher.timeout_deco(self._function_timeout)(self.consuming_function) 755 if self._is_consuming_function_use_multi_params: # 消費函數使用傳統的多參數形式 756 function_run(**delete_keys_and_return_new_dict(kw['body'], ['publish_time', 'publish_time_format'])) 757 else: 758 function_run(delete_keys_and_return_new_dict(kw['body'], ['publish_time', 'publish_time_format'])) # 消費函數使用單個參數,參數自身是一個字典,由鍵值對錶示各個參數。 759 self._confirm_consume(kw) 760 if self._do_task_filtering: 761 self._redis_filter.add_a_value(kw['body']) # 函數執行成功後,添加函數的參數排序後的鍵值對字符串到set中。 762 763 self.logger.debug(f'{self._concurrent_mode_dispatcher.get_concurrent_info()} 函數 {self.consuming_function.__name__} ' 764 f'第{current_retry_times + 1}次 運行, 正確了,函數運行時間是 {round(time.time() - t_start, 4)} 秒,入參是 【 {kw["body"]} 】') 765 except Exception as e: 766 if isinstance(e, (PyMongoError, ExceptionForRequeue)): # mongo常常維護備份時候插入不了或掛了,或者本身主動拋出一個ExceptionForRequeue類型的錯誤會從新入隊,不受指定重試次數逇約束。 767 self.logger.critical(f'函數 [{self.consuming_function.__name__}] 中發生錯誤 {type(e)} {e}') 768 return self._requeue(kw) 769 self.logger.error(f'函數 {self.consuming_function.__name__} 第{current_retry_times + 1}次發生錯誤,' 770 f'函數運行時間是 {round(time.time() - t_start, 4)} 秒,\n 入參是 【 {kw["body"]} 】 \n 緣由是 {type(e)} {e} ', exc_info=self._is_print_detail_exception) 771 self._run_consuming_function_with_confirm_and_retry(kw, current_retry_times + 1) 772 else: 773 self.logger.critical(f'函數 {self.consuming_function.__name__} 達到最大重試次數 {self._max_retry_times} 後,仍然失敗, 入參是 【 {kw["body"]} 】') # 錯得超過指定的次數了,就確認消費了。 774 self._confirm_consume(kw) 775 776 @abc.abstractmethod 777 def _confirm_consume(self, kw): 778 """確認消費""" 779 raise NotImplementedError 780 781 # noinspection PyUnusedLocal 782 783 def check_heartbeat_and_message_count(self): 784 self._msg_num_in_broker = self.publisher_of_same_queue.get_message_count() 785 if time.time() - self._last_timestamp_print_msg_num > 60: 786 self.logger.info(f'[{self._queue_name}] 隊列中還有 [{self._msg_num_in_broker}] 個任務') 787 self._last_timestamp_print_msg_num = time.time() 788 if self._msg_num_in_broker != 0: 789 self._last_timestamp_when_has_task_in_queue = time.time() 790 return self._msg_num_in_broker 791 792 @abc.abstractmethod 793 def _requeue(self, kw): 794 """從新入隊""" 795 raise NotImplementedError 796 797 def _submit_task(self, kw): 798 if self._judge_is_daylight(): 799 self._requeue(kw) 800 time.sleep(self.time_interval_for_check_do_not_run_time) 801 return 802 if self._msg_expire_senconds != 0 and time.time() - self._msg_expire_senconds > kw['body']['publish_time']: 803 self.logger.warning(f'消息發佈時戳是 {kw["body"]["publish_time"]} {kw["body"].get("publish_time_format", "")},距離如今 {round(time.time() - kw["body"]["publish_time"], 4)} 秒 ,' 804 f'超過了指定的 {self._msg_expire_senconds} 秒,丟棄任務') 805 self._confirm_consume(kw) 806 return 0 807 self.threadpool.submit(self._run_consuming_function_with_confirm_and_retry, kw) 808 time.sleep(self._msg_schedule_time_intercal) 809 810 def _judge_is_daylight(self): 811 if self._is_do_not_run_by_specify_time_effect and self._do_not_run_by_specify_time[0] < time_util.DatetimeConverter().time_str < self._do_not_run_by_specify_time[1]: 812 self.logger.warning(f'如今時間是 {time_util.DatetimeConverter()} ,如今時間是在 {self._do_not_run_by_specify_time} 之間,不運行') 813 return True 814 815 def __str__(self): 816 return f'隊列爲 {self.queue_name} 函數爲 {self.consuming_function} 的消費者' 817 818 819 # noinspection PyProtectedMember 820 class ConcurrentModeDispatcher(LoggerMixin): 821 schedulal_thread_to_be_join = [] 822 concurrent_mode = None 823 schedual_task_always_use_thread = False 824 825 def __init__(self, consumerx: AbstractConsumer): 826 self.consumer = consumerx 827 if self.__class__.concurrent_mode is not None and self.consumer._concurrent_mode != self.__class__.concurrent_mode: 828 raise ValueError('同一解釋器中不能夠設置兩種併發類型') 829 self._concurrent_mode = self.__class__.concurrent_mode = self.consumer._concurrent_mode 830 concurrent_name = '' 831 self.timeout_deco = None 832 if self._concurrent_mode == 1: 833 concurrent_name = 'thread' 834 self.timeout_deco = decorators.timeout 835 elif self._concurrent_mode == 2: 836 concurrent_name = 'gevent' 837 self.timeout_deco = gevent_timeout_deco 838 elif self._concurrent_mode == 3: 839 concurrent_name = 'evenlet' 840 self.timeout_deco = evenlet_timeout_deco 841 self.concurrent_name = concurrent_name 842 self.logger.warning(f'{self.consumer} 設置併發模式爲 {self.concurrent_name}') 843 844 def build_pool(self): 845 if self.consumer._threadpool: 846 return self.consumer._threadpool 847 848 pool_type = None # 是按照ThreadpoolExecutor寫的三個鴨子類,公有方法名和功能寫成徹底一致,能夠互相替換。 849 if self._concurrent_mode == 1: 850 pool_type = CustomThreadPoolExecutor 851 check_not_monkey() 852 elif self._concurrent_mode == 2: 853 pool_type = GeventPoolExecutor 854 check_gevent_monkey_patch() 855 elif self._concurrent_mode == 3: 856 pool_type = CustomEventletPoolExecutor 857 check_evenlet_monkey_patch() 858 self.consumer._threadpool = self.consumer._specify_threadpool if self.consumer._specify_threadpool else pool_type(self.consumer._threads_num + 1) # 單獨加一個檢測消息數量和心跳的線程 859 self.logger.warning(f'{self.concurrent_name} {self.consumer._threadpool}') 860 return self.consumer._threadpool 861 862 def schedulal_task_with_no_block(self): 863 if self.schedual_task_always_use_thread: 864 t = Thread(target=self.consumer.keep_circulating(1)(self.consumer._shedual_task)) 865 self.__class__.schedulal_thread_to_be_join.append(t) 866 t.start() 867 else: 868 if self._concurrent_mode == 1: 869 t = Thread(target=self.consumer.keep_circulating(1)(self.consumer._shedual_task)) 870 self.__class__.schedulal_thread_to_be_join.append(t) 871 t.start() 872 elif self._concurrent_mode == 2: 873 g = gevent.spawn(self.consumer.keep_circulating(1)(self.consumer._shedual_task), ) 874 self.__class__.schedulal_thread_to_be_join.append(g) 875 elif self._concurrent_mode == 3: 876 g = eventlet.spawn(self.consumer.keep_circulating(1)(self.consumer._shedual_task), ) 877 self.__class__.schedulal_thread_to_be_join.append(g) 878 atexit.register(self.join) 879 880 @classmethod 881 def join(cls): 882 nb_print((cls.schedulal_thread_to_be_join, len(cls.schedulal_thread_to_be_join), '模式:', cls.concurrent_mode)) 883 if cls.schedual_task_always_use_thread: 884 for t in cls.schedulal_thread_to_be_join: 885 nb_print(t) 886 t.join() 887 else: 888 if cls.concurrent_mode == 1: 889 for t in cls.schedulal_thread_to_be_join: 890 nb_print(t) 891 t.join() 892 elif cls.concurrent_mode == 2: 893 # cls.logger.info() 894 nb_print(cls.schedulal_thread_to_be_join) 895 gevent.joinall(cls.schedulal_thread_to_be_join, raise_error=True, ) 896 elif cls.concurrent_mode == 3: 897 for g in cls.schedulal_thread_to_be_join: 898 # eventlet.greenthread.GreenThread. 899 nb_print(g) 900 g.wait() 901 902 def get_concurrent_info(self): 903 concurrent_info = '' 904 if self._concurrent_mode == 1: 905 concurrent_info = f'[{threading.current_thread()} {threading.active_count()}]' 906 elif self._concurrent_mode == 2: 907 concurrent_info = f'[{gevent.getcurrent()} {threading.active_count()}]' 908 elif self._concurrent_mode == 3: 909 # noinspection PyArgumentList 910 concurrent_info = f'[{eventlet.getcurrent()} {threading.active_count()}]' 911 return concurrent_info 912 913 914 def wait_for_possible_has_finish_all_tasks(queue_name: str, minutes: int, send_stop_to_broker=0, broker_kind: int = 0, ): 915 """ 916 因爲是異步消費,和存在隊列一邊被消費,一邊在推送,或者還有結尾少許任務還在確認消費者實際還沒完全運行完成。 但有時候須要判斷 全部任務,務是否完成,提供一個不精確的判斷,要搞清楚緣由和場景後再慎用。 917 :param queue_name: 隊列名字 918 :param minutes: 連續多少分鐘沒任務就判斷爲消費已完成 919 :param send_stop_to_broker :發送中止標誌到中間件,這回致使消費退出循環調度。 920 :param broker_kind: 中間件種類 921 :return: 922 """ 923 if minutes <= 1: 924 raise ValueError('疑似完成任務,判斷時間最少須要設置爲2分鐘內,最好是是10分鐘') 925 pb = get_publisher(queue_name, broker_kind=broker_kind) 926 no_task_time = 0 927 while 1: 928 # noinspection PyBroadException 929 try: 930 message_count = pb.get_message_count() 931 except Exception as e: 932 nb_print(e) 933 message_count = -1 934 if message_count == 0: 935 no_task_time += 30 936 else: 937 no_task_time = 0 938 time.sleep(30) 939 if no_task_time > minutes * 60: 940 break 941 if send_stop_to_broker: 942 pb.publish({'stop': 1}) 943 pb.close() 944 945 946 class RabbitmqConsumer(AbstractConsumer): 947 """ 948 使用pika包實現的。 949 """ 950 BROKER_KIND = 0 951 952 def _shedual_task(self): 953 channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel() 954 channel.queue_declare(queue=self._queue_name, durable=True) 955 channel.basic_qos(prefetch_count=self._threads_num) 956 957 def callback(ch, method, properties, body): 958 body = body.decode() 959 self.logger.debug(f'從rabbitmq的 [{self._queue_name}] 隊列中 取出的消息是: {body}') 960 body = json.loads(body) 961 kw = {'ch': ch, 'method': method, 'properties': properties, 'body': body} 962 self._submit_task(kw) 963 964 channel.basic_consume(callback, 965 queue=self._queue_name, 966 # no_ack=True 967 ) 968 channel.start_consuming() 969 970 def _confirm_consume(self, kw): 971 with self._lock_for_pika: 972 try: 973 kw['ch'].basic_ack(delivery_tag=kw['method'].delivery_tag) # 確認消費 974 except pika.exceptions.AMQPError as e: 975 self.logger.error(f'pika確認消費失敗 {e}') 976 977 def _requeue(self, kw): 978 with self._lock_for_pika: 979 # ch.connection.add_callback_threadsafe(functools.partial(self.__ack_message_pika, ch, method.delivery_tag)) 980 return kw['ch'].basic_nack(delivery_tag=kw['method'].delivery_tag) # 當即從新入隊。 981 982 @staticmethod 983 def __ack_message_pika(channelx, delivery_tagx): 984 """Note that `channel` must be the same pika channel instance via which 985 the message being ACKed was retrieved (AMQP protocol constraint). 986 """ 987 if channelx.is_open: 988 channelx.basic_ack(delivery_tagx) 989 else: 990 # Channel is already closed, so we can't ACK this message; 991 # log and/or do something that makes sense for your app in this case. 992 pass 993 994 995 class RabbitmqConsumerAmqpStorm(AbstractConsumer): 996 """ 997 使用AmqpStorm實現的,多線程安全的,不用加鎖。 998 """ 999 BROKER_KIND = 4 1000 1001 def _shedual_task(self): 1002 # noinspection PyTypeChecker 1003 def callback(amqpstorm_message: amqpstorm.Message): 1004 body = amqpstorm_message.body 1005 self.logger.debug(f'從rabbitmq的 [{self._queue_name}] 隊列中 取出的消息是: {body}') 1006 body = json.loads(body) 1007 kw = {'amqpstorm_message': amqpstorm_message, 'body': body} 1008 self._submit_task(kw) 1009 1010 rp = RabbitmqPublisherUsingAmqpStorm(self.queue_name) 1011 rp.init_broker() 1012 rp.channel_wrapper_by_ampqstormbaic.qos(self._threads_num) 1013 rp.channel_wrapper_by_ampqstormbaic.consume(callback=callback, queue=self.queue_name, no_ack=False) 1014 rp.channel.start_consuming(auto_decode=True) 1015 1016 def _confirm_consume(self, kw): 1017 # noinspection PyBroadException 1018 try: 1019 kw['amqpstorm_message'].ack() # 確認消費 1020 except Exception as e: 1021 self.logger.error(f'AmqpStorm確認消費失敗 {type(e)} {e}') 1022 1023 def _requeue(self, kw): 1024 kw['amqpstorm_message'].nack(requeue=True) 1025 1026 1027 class RabbitmqConsumerRabbitpy(AbstractConsumer): 1028 """ 1029 使用rabbitpy實現的 1030 """ 1031 BROKER_KIND = 1 1032 1033 def _shedual_task(self): 1034 # noinspection PyTypeChecker 1035 channel = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint().creat_a_channel() # type: rabbitpy.AMQP # 1036 channel.queue_declare(queue=self._queue_name, durable=True) 1037 channel.basic_qos(prefetch_count=self._threads_num) 1038 for message in channel.basic_consume(self._queue_name, no_ack=False): 1039 body = message.body.decode() 1040 self.logger.debug(f'從rabbitmq {self._queue_name} 隊列中 取出的消息是: {body}') 1041 kw = {'message': message, 'body': json.loads(message.body.decode())} 1042 self._submit_task(kw) 1043 1044 def _confirm_consume(self, kw): 1045 kw['message'].ack() 1046 1047 def _requeue(self, kw): 1048 kw['message'].nack(requeue=True) 1049 1050 1051 class RedisConsumer(AbstractConsumer, RedisMixin): 1052 """ 1053 redis做爲中間件實現的。 1054 """ 1055 BROKER_KIND = 2 1056 1057 def _shedual_task(self): 1058 while True: 1059 t_start = time.time() 1060 task_bytes = self.redis_db7.blpop(self._queue_name)[1] # 使用db7 1061 if task_bytes: 1062 self.logger.debug(f'取出的任務時間是 {round(time.time() - t_start, 4)} 消息是: {task_bytes.decode()} ') 1063 task_dict = json.loads(task_bytes) 1064 kw = {'body': task_dict} 1065 self._submit_task(kw) 1066 1067 def _confirm_consume(self, kw): 1068 pass # redis沒有確認消費的功能。 1069 1070 def _requeue(self, kw): 1071 self.redis_db7.rpush(self._queue_name, json.dumps(kw['body'])) 1072 1073 1074 class MongoMqConsumer(AbstractConsumer, MongoMixin): 1075 """ 1076 Mongo queue包實現的基於mongo的消息隊列,支持消費確認。 1077 """ 1078 BROKER_KIND = 5 1079 1080 def _shedual_task(self): 1081 mp = MongoMqPublisher(self.queue_name) 1082 while True: 1083 t_start = time.time() 1084 job = mp.queue.next() 1085 if job is not None: 1086 self.logger.debug(f'取出的任務時間是 {round(time.time() - t_start, 4)} 消息是: {job.payload} ') 1087 kw = {'body': job.payload, 'job': job} 1088 self._submit_task(kw) 1089 else: 1090 time.sleep(self._msg_schedule_time_intercal) 1091 1092 def _confirm_consume(self, kw): 1093 kw['job'].complete() 1094 1095 def _requeue(self, kw): 1096 kw['job'].release() 1097 1098 1099 class PersistQueueConsumer(AbstractConsumer): 1100 """ 1101 persist queue包實現的本地持久化消息隊列。 1102 """ 1103 BROKER_KIND = 6 1104 1105 def _shedual_task(self): 1106 pub = PersistQueuePublisher(self.queue_name) 1107 while True: 1108 t_start = time.time() 1109 item = pub.queue.get() 1110 self.logger.debug(f'取出的任務時間是 {round(time.time() - t_start, 4)} 消息是: {item} ') 1111 kw = {'body': json.loads(item), 'q': pub.queue, 'item': item} 1112 self._submit_task(kw) 1113 1114 def _confirm_consume(self, kw): 1115 kw['q'].ack(kw['item']) 1116 1117 def _requeue(self, kw): 1118 kw['q'].nack(kw['item']) 1119 1120 1121 class LocalPythonQueueConsumer(AbstractConsumer): 1122 BROKER_KIND = 3 1123 1124 @property 1125 def local_python_queue(self) -> Queue: 1126 return local_pyhton_queue_name__local_pyhton_queue_obj_map[self._queue_name] 1127 1128 def _shedual_task(self): 1129 while True: 1130 t_start = time.time() 1131 task = self.local_python_queue.get() 1132 if isinstance(task, str): 1133 task = json.loads(task) 1134 self.logger.debug(f'取出的任務時間是 {round(time.time() - t_start, 4)} 消息是: {json.dumps(task)} ') 1135 task_dict = task 1136 kw = {'body': task_dict} 1137 self._submit_task(kw) 1138 1139 def _confirm_consume(self, kw): 1140 pass 1141 1142 def _requeue(self, kw): 1143 self.local_python_queue.put(kw['body']) 1144 1145 1146 def get_publisher(queue_name, *, log_level_int=10, logger_prefix='', is_add_file_handler=False, clear_queue_within_init=False, is_add_publish_time=False, broker_kind=0): 1147 """ 1148 :param queue_name: 1149 :param log_level_int: 1150 :param logger_prefix: 1151 :param is_add_file_handler: 1152 :param clear_queue_within_init: 1153 :param is_add_publish_time:是否添加發布時間到中間件,若是設置了過時時間不爲0,須要設爲True 1154 :param broker_kind: 中間件或使用包的種類。 1155 :return: 1156 """ 1157 all_kwargs = copy.deepcopy(locals()) 1158 all_kwargs.pop('broker_kind') 1159 if broker_kind == 0: 1160 return RabbitmqPublisher(**all_kwargs) 1161 elif broker_kind == 1: 1162 return RabbitmqPublisherUsingRabbitpy(**all_kwargs) 1163 elif broker_kind == 2: 1164 return RedisPublisher(**all_kwargs) 1165 elif broker_kind == 3: 1166 return LocalPythonQueuePublisher(**all_kwargs) 1167 elif broker_kind == 4: 1168 return RabbitmqPublisherUsingAmqpStorm(**all_kwargs) 1169 elif broker_kind == 5: 1170 return MongoMqPublisher(**all_kwargs) 1171 elif broker_kind == 6: 1172 return PersistQueuePublisher(**all_kwargs) 1173 else: 1174 raise ValueError('設置的中間件種類數字不正確') 1175 1176 1177 def get_consumer(queue_name, *, consuming_function: Callable = None, function_timeout=0, threads_num=50, specify_threadpool=None, concurrent_mode=1, 1178 max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, msg_expire_senconds=0, 1179 logger_prefix='', create_logger_file=True, do_task_filtering=False, is_consuming_function_use_multi_params=True, 1180 is_do_not_run_by_specify_time_effect=False, do_not_run_by_specify_time=('10:00:00', '22:00:00'), 1181 schedule_tasks_on_main_thread=False, broker_kind=0): 1182 """ 1183 使用工廠模式再包一層,經過設置數字來生成基於不一樣中間件或包的consumer。 1184 :param queue_name: 1185 :param consuming_function: 處理消息的函數。 1186 :param function_timeout : 超時秒數,函數運行超過這個時間,則自動殺死函數。爲0是不限制。 1187 :param threads_num: 1188 :param specify_threadpool:使用指定的線程池,能夠多個消費者共使用一個線程池,不爲None時候。threads_num失效 1189 :param concurrent_mode:併發模式,1線程 2gevent 3eventlet 1190 :param max_retry_times: 1191 :param log_level: 1192 :param is_print_detail_exception: 1193 :param msg_schedule_time_intercal:消息調度的時間間隔,用於控頻 1194 :param msg_expire_senconds:消息過時時間,爲0永不過時,爲10則表明,10秒以前發佈的任務若是如今才輪到消費則丟棄任務。 1195 :param logger_prefix: 日誌前綴,可以使不一樣的消費者生成不一樣的日誌 1196 :param create_logger_file : 是否建立文件日誌 1197 :param do_task_filtering :是否執行基於函數參數的任務過濾 1198 :param is_consuming_function_use_multi_params 函數的參數是不是傳統的多參數,不爲單個body字典表示多個參數。 1199 :param is_do_not_run_by_specify_time_effect :是否使不運行的時間段生效 1200 :param do_not_run_by_specify_time :不運行的時間段 1201 :param schedule_tasks_on_main_thread :直接在主線程調度任務,意味着不能直接在當前主線程同時開啓兩個消費者。 1202 :param broker_kind:中間件種類,,不要設置爲1。 0 使用pika連接mq,2使用redis,3使用python內置Queue 1203 :return 1204 """ 1205 all_kwargs = copy.copy(locals()) 1206 all_kwargs.pop('broker_kind') 1207 if broker_kind == 0: 1208 return RabbitmqConsumer(**all_kwargs) 1209 elif broker_kind == 1: 1210 return RabbitmqConsumerRabbitpy(**all_kwargs) 1211 elif broker_kind == 2: 1212 return RedisConsumer(**all_kwargs) 1213 elif broker_kind == 3: 1214 return LocalPythonQueueConsumer(**all_kwargs) 1215 elif broker_kind == 4: 1216 return RabbitmqConsumerAmqpStorm(**all_kwargs) 1217 elif broker_kind == 5: 1218 return MongoMqConsumer(**all_kwargs) 1219 elif broker_kind == 6: 1220 return PersistQueueConsumer(**all_kwargs) 1221 else: 1222 raise ValueError('設置的中間件種類數字不正確') 1223 1224 1225 # noinspection PyMethodMayBeStatic,PyShadowingNames 1226 class _Test(unittest.TestCase, LoggerMixin, RedisMixin): 1227 """ 1228 演示一個簡單求和的例子。 1229 """ 1230 1231 @unittest.skip 1232 def test_publisher_with(self): 1233 """ 1234 測試上下文管理器。 1235 :return: 1236 """ 1237 with RabbitmqPublisher('queue_test') as rp: 1238 for i in range(1000): 1239 rp.publish(str(i)) 1240 1241 @unittest.skip 1242 def test_publish_rabbit(self): 1243 """ 1244 測試mq推送 1245 :return: 1246 """ 1247 rabbitmq_publisher = RabbitmqPublisher('queue_test', log_level_int=10, logger_prefix='yy平臺推送') 1248 rabbitmq_publisher.clear() 1249 for i in range(500000): 1250 try: 1251 time.sleep(1) 1252 rabbitmq_publisher.publish({'a': i, 'b': 2 * i}) 1253 except Exception as e: 1254 print(e) 1255 1256 rabbitmq_publisher = RabbitmqPublisher('queue_test2', log_level_int=20, logger_prefix='zz平臺推送') 1257 rabbitmq_publisher.clear() 1258 [rabbitmq_publisher.publish({'somestr_to_be_print': str(i)}) for i in range(500000)] 1259 1260 @unittest.skip 1261 def test_publish_redis(self): 1262 # 若是須要批量推送 1263 for i in range(10007): 1264 # 最犀利的批量操做方式,自動聚合多條redis命令,支持多種redis混合命令批量操做。 1265 RedisBulkWriteHelper(self.redis_db7, 1000).add_task(RedisOperation('lpush', 'queue_test', json.dumps({'a': i, 'b': 2 * i}))) 1266 [self.redis_db7.lpush('queue_test', json.dumps({'a': j, 'b': 2 * j})) for j in range(500)] 1267 print('推送完畢') 1268 1269 @unittest.skip 1270 def test_consume(self): 1271 """ 1272 單參數表明全部傳參 1273 :return: 1274 """ 1275 1276 def f(body): 1277 self.logger.info(f'消費此消息 {body}') 1278 # print(body['a'] + body['b']) 1279 time.sleep(5) # 模擬作某事須要阻塞10秒種,必須用併發。 1280 1281 # 把消費的函數名傳給consuming_function,就這麼簡單。 1282 rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=20, msg_schedule_time_intercal=0.5, log_level=10, logger_prefix='yy平臺消費', 1283 is_consuming_function_use_multi_params=False) 1284 rabbitmq_consumer.start_consuming_message() 1285 1286 @unittest.skip 1287 def test_consume2(self): 1288 """ 1289 測試支持傳統參數形式,不是用一個字典裏面包含全部參數。 1290 :return: 1291 """ 1292 1293 def f2(a, b): 1294 self.logger.debug(f'a的值是 {a}') 1295 self.logger.debug(f'b的值是 {b}') 1296 print(f'{a} + {b} 的和是 {a + b}') 1297 time.sleep(3) # 模擬作某事須要阻塞10秒種,必須用併發。 1298 1299 # 把消費的函數名傳給consuming_function,就這麼簡單。 1300 RabbitmqConsumer('queue_test', consuming_function=f2, threads_num=60, msg_schedule_time_intercal=5, log_level=10, logger_prefix='yy平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message() 1301 1302 @unittest.skip 1303 def test_redis_filter(self): 1304 """ 1305 測試基於redis set結構的過濾器。 1306 :return: 1307 """ 1308 redis_filter = RedisFilter('abcd') 1309 redis_filter.add_a_value({'a': 1, 'c': 3, 'b': 2}) 1310 redis_filter.check_value_exists({'a': 1, 'c': 3, 'b': 2}) 1311 redis_filter.check_value_exists({'a': 1, 'b': 2, 'c': 3}) 1312 with decorators.TimerContextManager(): 1313 print(redis_filter.check_value_exists('{"a": 1, "b": 2, "c": 3}')) 1314 with decorators.TimerContextManager(): 1315 # 實測百萬元素的set,過濾檢查不須要1毫秒,通常最多100萬個酒店。 1316 print(RedisFilter('filter:mafengwo-detail_task').check_value_exists({"_id": "69873340"})) 1317 1318 @unittest.skip 1319 def test_run_two_function(self): 1320 # 演示連續運行兩個consumer 1321 def f3(a, b): 1322 print(f'{a} + {b} = {a + b}') 1323 time.sleep(10) # 模擬作某事須要阻塞10秒種,必須用併發。 1324 1325 def f4(somestr_to_be_print): 1326 print(f'打印 {somestr_to_be_print}') 1327 time.sleep(20) # 模擬作某事須要阻塞10秒種,必須用併發。 1328 1329 RabbitmqConsumer('queue_test', consuming_function=f3, threads_num=20, msg_schedule_time_intercal=2, log_level=10, logger_prefix='yy平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message() 1330 RabbitmqConsumer('queue_test2', consuming_function=f4, threads_num=20, msg_schedule_time_intercal=4, log_level=10, logger_prefix='zz平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message() 1331 # AbstractConsumer.join_shedual_task_thread() 1332 1333 @unittest.skip 1334 def test_local_python_queue_as_broker(self): 1335 def f8(x, y): 1336 nb_print((x, y)) 1337 1338 consumer = get_consumer('queue_testlll', consuming_function=f8, threads_num=30, msg_schedule_time_intercal=1, log_level=10, logger_prefix='uu平臺消費', 1339 function_timeout=20, is_print_detail_exception=True, msg_expire_senconds=5, broker_kind=3) # 經過設置broker_kind,一鍵切換中間件爲mq或redis 1340 get_publisher('queue_testlll', broker_kind=3, is_add_publish_time=True).publish({'x': 3, 'y': 4}) 1341 consumer.publisher_of_same_queue.set_is_add_publish_time(True).publish({'x': 1, 'y': 2}) 1342 nb_print(consumer.publisher_of_same_queue.get_message_count()) 1343 consumer.start_consuming_message() 1344 for i in range(10000): 1345 consumer.publisher_of_same_queue.publish({'x': i, 'y': i * 2}) 1346 time.sleep(2) 1347 1348 # @unittest.skip 1349 def test_factory_pattern_consumer(self): 1350 """ 1351 測試工廠模式來生成消費者 1352 :return: 1353 """ 1354 1355 def f2(a, b): 1356 # body_dict = json.loads(body) 1357 1358 self.logger.info(f'消費此消息 {a} {b} ,結果是 {a + b}') 1359 # print(body_dict['a'] + body_dict['b']) 1360 time.sleep(30) # 模擬作某事須要阻塞10秒種,必須用併發。 1361 # 把消費的函數名傳給consuming_function,就這麼簡單。 1362 1363 consumer = get_consumer('queue_test5', consuming_function=f2, threads_num=30, msg_schedule_time_intercal=1, log_level=10, logger_prefix='zz平臺消費', 1364 function_timeout=20, is_print_detail_exception=True, msg_expire_senconds=500, broker_kind=0) # 經過設置broker_kind,一鍵切換中間件爲mq或redis 1365 consumer.publisher_of_same_queue.clear() 1366 [consumer.publisher_of_same_queue.publish({'a': i, 'b': 2 * i}) for i in range(1)] 1367 time.sleep(10) # sleep測試消息過時。 1368 get_publisher('queue_test5', broker_kind=0).set_is_add_publish_time().publish({'a': 1000, 'b': 2000}) 1369 consumer.start_consuming_message() 1370 # consumer.join_shedual_task_thread() 1371 # block_python_exit.just_block_python_exit() 1372 # show_current_threads_num(block=True) 1373 1374 1375 if __name__ == '__main__': 1376 # noinspection PyArgumentList 1377 unittest.main(sleep_time=1)
gevent 併發模式:
本地持久化隊列,使用sqlite3模擬消息隊列的圖片。
mongodb模擬的消息隊列