以前看網上都是清一色pika包的例子,就用的pika包,最大問題是非多線程安全,改成使用rabbitpy。大幅改善了pika多線程須要加鎖,和外網推送延遲又不能開多線程致使推送慢的問題。編程
rabbitpy有個適配器,能夠把rabbitpy包的channel適配成與pika包的channel的相同公有方法,減小了難度。設計模式
高層次封裝,使用參數來控制使用什麼包來操做rabbitmq。安全
# -*- coding: utf-8 -*- # @Author : ydf from collections import Callable import time from threading import Lock import rabbitpy from pika import BasicProperties # noinspection PyUnresolvedReferences from rabbitpy.message import Properties import pika from pika.adapters.blocking_connection import BlockingChannel from app.utils_ydf import LogManager from app.utils_ydf.mixins import LoggerMixin from app.utils_ydf import decorators from app.utils_ydf import BoundedThreadPoolExecutor from app import config as app_config LogManager('pika.heartbeat').get_logger_and_add_handlers(1) LogManager('rabbitpy').get_logger_and_add_handlers(2) LogManager('rabbitpy.base').get_logger_and_add_handlers(2) class RabbitmqClientRabbitPy: """ 使用rabbitpy包。 """ # noinspection PyUnusedLocal def __init__(self, username, password, host, port, virtual_host, heartbeat=60): rabbit_url = f'amqp://{username}:{password}@{host}:{port}/{virtual_host}' self.connection = rabbitpy.Connection(rabbit_url) def creat_a_channel(self) -> rabbitpy.AMQP: return rabbitpy.AMQP(self.connection.channel()) # 使用適配器,使rabbitpy包的公有方法幾乎接近pika包的channel的方法。 class RabbitmqClientPika: """ 使用pika包,多線程不安全的包。 """ def __init__(self, username, password, host, port, virtual_host, heartbeat=60): credentials = pika.PlainCredentials(username, password) self.connection = pika.BlockingConnection(pika.ConnectionParameters( host, port, virtual_host, credentials, heartbeat=heartbeat)) def creat_a_channel(self) -> BlockingChannel: return self.connection.channel() class RabbitMqFactory: def __init__(self, username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60, is_use_rabbitpy=1): """ :param username: :param password: :param port: :param virtual_host: :param heartbeat: :param is_use_rabbitpy: 爲0使用pika,多線程不安全。爲1使用rabbitpy,多線程安全的包。 """ if is_use_rabbitpy: self.rabbit_client = RabbitmqClientRabbitPy(username, password, host, port, virtual_host, heartbeat) else: self.rabbit_client = RabbitmqClientPika(username, password, host, port, virtual_host, heartbeat) def get_rabbit_cleint(self): return self.rabbit_client class RabbitmqPublisher(LoggerMixin): def __init__(self, queue_name, is_use_rabbitpy=1, log_level_int=10): """ :param queue_name: :param is_use_rabbitpy: 是否使用rabbitpy包。不推薦使用pika。 :param log_level_int: """ self._queue_name = queue_name self._is_use_rabbitpy = is_use_rabbitpy self.logger.setLevel(log_level_int) self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=is_use_rabbitpy).get_rabbit_cleint() self.channel = self.rabbit_client.creat_a_channel() self.queue = self.channel.queue_declare(queue=queue_name, durable=True) self._lock_for_pika = Lock() self._lock_for_count = Lock() self._current_time = None self.count_per_minute = None self._init_count() self.logger.info(f'{self.__class__} 被實例化了') def _init_count(self): with self._lock_for_count: self._current_time = time.time() self.count_per_minute = 0 def publish(self, msg: str): if self._is_use_rabbitpy: self._publish_rabbitpy(msg) else: self._publish_pika(msg) self.logger.debug(f'向{self._queue_name} 隊列,推送消息 {msg}') with self._lock_for_count: self.count_per_minute += 1 if time.time() - self._current_time > 60: self._init_count() self.logger.info(f'一分鐘內推送了 {self.count_per_minute} 條消息到 {self.channel.connection} 中') @decorators.tomorrow_threads(100) def _publish_rabbitpy(self, msg: str): # noinspection PyTypeChecker self.channel.basic_publish( exchange='', routing_key=self._queue_name, body=msg, properties={'delivery_mode': 2}, ) def _publish_pika(self, msg: str): with self._lock_for_pika: # 親測pika多線程publish會出錯。 self.channel.basic_publish(exchange='', routing_key=self._queue_name, body=msg, properties=BasicProperties( delivery_mode=2, # make message persistent ) ) def clear(self): self.channel.queue_purge(self._queue_name) def get_message_count(self): if self._is_use_rabbitpy: return self._get_message_count_rabbitpy() else: return self._get_message_count_pika() def _get_message_count_pika(self): queue = self.channel.queue_declare(queue=self._queue_name, durable=True) return queue.method.message_count def _get_message_count_rabbitpy(self): ch = self.rabbit_client.connection.channel() q = rabbitpy.amqp_queue.Queue(ch, self._queue_name) q.durable = True msg_count = q.declare(passive=True)[0] ch.close() return msg_count class RabbitmqConsumer(LoggerMixin): def __init__(self, queue_name, consuming_function: Callable = None, threads_num=100, max_retry_times=3, log_level=10, is_print_detail_exception=True, is_use_rabbitpy=1): """ :param queue_name: :param consuming_function: 處理消息的函數,函數有且只能有一個參數,參數表示消息。是爲了簡單,放棄策略和模板來強制參數。 :param threads_num: :param max_retry_times: :param log_level: :param is_print_detail_exception: :param is_use_rabbitpy: 是否使用rabbitpy包。不推薦使用pika. """ self._queue_name = queue_name self.consuming_function = consuming_function self._threads_num = threads_num self.threadpool = BoundedThreadPoolExecutor(threads_num) self._max_retry_times = max_retry_times self.logger.setLevel(log_level) self.logger.info(f'{self.__class__} 被實例化') self._is_print_detail_exception = is_print_detail_exception self._is_use_rabbitpy = is_use_rabbitpy def start_consuming_message(self): if self._is_use_rabbitpy: self.start_consuming_message_rabbitpy() else: self.start_consuming_message_pika() @decorators.keep_circulating(1) # 是爲了保證不管rabbitmq異常中斷多久,無需重啓程序就能保證恢復後,程序正常。 def start_consuming_message_rabbitpy(self): # noinspection PyArgumentEqualDefault channel = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint().creat_a_channel() # type: rabbitpy.AMQP # 此處先固定使用pika. channel.queue_declare(queue=self._queue_name, durable=True) channel.basic_qos(prefetch_count=self._threads_num) for message in channel.basic_consume(self._queue_name): body = message.body.decode() self.logger.debug(f'從rabbitmq取出的消息是: {body}') self.threadpool.submit(self.__consuming_function_rabbitpy, message) def __consuming_function_rabbitpy(self, message, current_retry_times=0): if current_retry_times < self._max_retry_times: # noinspection PyBroadException try: self.consuming_function(message.body.decode()) message.ack() except Exception as e: self.logger.error(f'函數 {self.consuming_function} 第{current_retry_times+1}次發生錯誤,\n 緣由是{e}', exc_info=self._is_print_detail_exception) self.__consuming_function_rabbitpy(message, current_retry_times + 1) else: self.logger.critical(f'達到最大重試次數 {self._max_retry_times} 後,仍然失敗') # 錯得超過指定的次數了,就確認消費了。 message.ack() @decorators.keep_circulating(1) # 是爲了保證不管rabbitmq異常中斷多久,無需重啓程序就能保證恢復後,程序正常。 def start_consuming_message_pika(self): channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel() # 此處先固定使用pika. channel.queue_declare(queue=self._queue_name, durable=True) channel.basic_qos(prefetch_count=self._threads_num) def callback(ch, method, properties, body): body = body.decode() self.logger.debug(f'從rabbitmq取出的消息是: {body}') self.threadpool.submit(self.__consuming_function_pika, ch, method, properties, body) channel.basic_consume(callback, queue=self._queue_name, # no_ack=True ) channel.start_consuming() @staticmethod def ack_message_pika(channelx, delivery_tagx): """Note that `channel` must be the same pika channel instance via which the message being ACKed was retrieved (AMQP protocol constraint). """ if channelx.is_open: channelx.basic_ack(delivery_tagx) else: # Channel is already closed, so we can't ACK this message; # log and/or do something that makes sense for your app in this case. pass def __consuming_function_pika(self, ch, method, properties, body, current_retry_times=0): if current_retry_times < self._max_retry_times: # noinspection PyBroadException try: self.consuming_function(body) ch.basic_ack(delivery_tag=method.delivery_tag) # self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag)) except Exception as e: self.logger.error(f'函數 {self.consuming_function} 第{current_retry_times+1}次發生錯誤,\n 緣由是{e}', exc_info=self._is_print_detail_exception) self.__consuming_function_pika(ch, method, properties, body, current_retry_times + 1) else: self.logger.critical(f'達到最大重試次數 {self._max_retry_times} 後,仍然失敗') # 錯得超過指定的次數了,就確認消費了。 ch.basic_ack(delivery_tag=method.delivery_tag) # self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag)) if __name__ == '__main__': with decorators.TimerContextManager(): # noinspection PyArgumentEqualDefault rabbitmq_publisher = RabbitmqPublisher('queue_test', is_use_rabbitpy=1) # print(rabbitmq_publisher.get_message_count()) # def pub(msg): # # print(msg) # rabbitmq_publisher.publish(msg) # # # [pub(str(i)) for i in range(200000)] # time.sleep(10) # def f(body): # print('.... ', body) # time.sleep(10) # 模擬作某事須要阻塞10秒種,必須用併發。 # # # rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=200, is_use_rabbitpy=0) # rabbitmq_consumer.start_consuming_message()