python rabbitmq的庫,rabbitpy代替pika

以前看網上都是清一色pika包的例子,就用的pika包,最大問題是非多線程安全,改成使用rabbitpy。大幅改善了pika多線程須要加鎖,和外網推送延遲又不能開多線程致使推送慢的問題。編程

rabbitpy有個適配器,能夠把rabbitpy包的channel適配成與pika包的channel的相同公有方法,減小了難度。設計模式

 

高層次封裝,使用參數來控制使用什麼包來操做rabbitmq。安全

複製代碼
# -*- coding: utf-8 -*-
# @Author  : ydf
from collections import Callable
import time
from threading import Lock
import rabbitpy
from pika import BasicProperties
# noinspection PyUnresolvedReferences
from rabbitpy.message import Properties
import pika
from pika.adapters.blocking_connection import BlockingChannel
from app.utils_ydf import LogManager
from app.utils_ydf.mixins import LoggerMixin
from app.utils_ydf import decorators
from app.utils_ydf import BoundedThreadPoolExecutor
from app import config as app_config

LogManager('pika.heartbeat').get_logger_and_add_handlers(1)
LogManager('rabbitpy').get_logger_and_add_handlers(2)
LogManager('rabbitpy.base').get_logger_and_add_handlers(2)


class RabbitmqClientRabbitPy:
    """
    使用rabbitpy包。
    """

    # noinspection PyUnusedLocal
    def __init__(self, username, password, host, port, virtual_host, heartbeat=60):
        rabbit_url = f'amqp://{username}:{password}@{host}:{port}/{virtual_host}'
        self.connection = rabbitpy.Connection(rabbit_url)

    def creat_a_channel(self) -> rabbitpy.AMQP:
        return rabbitpy.AMQP(self.connection.channel())  # 使用適配器,使rabbitpy包的公有方法幾乎接近pika包的channel的方法。


class RabbitmqClientPika:
    """
    使用pika包,多線程不安全的包。
    """

    def __init__(self, username, password, host, port, virtual_host, heartbeat=60):
        credentials = pika.PlainCredentials(username, password)
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host, port, virtual_host, credentials, heartbeat=heartbeat))

    def creat_a_channel(self) -> BlockingChannel:
        return self.connection.channel()


class RabbitMqFactory:
    def __init__(self, username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60, is_use_rabbitpy=1):
        """
        :param username:
        :param password:
        :param port:
        :param virtual_host:
        :param heartbeat:
        :param is_use_rabbitpy: 爲0使用pika,多線程不安全。爲1使用rabbitpy,多線程安全的包。
        """
        if is_use_rabbitpy:
            self.rabbit_client = RabbitmqClientRabbitPy(username, password, host, port, virtual_host, heartbeat)
        else:
            self.rabbit_client = RabbitmqClientPika(username, password, host, port, virtual_host, heartbeat)

    def get_rabbit_cleint(self):
        return self.rabbit_client


class RabbitmqPublisher(LoggerMixin):
    def __init__(self, queue_name, is_use_rabbitpy=1, log_level_int=10):
        """
        :param queue_name:
        :param is_use_rabbitpy: 是否使用rabbitpy包。不推薦使用pika。
        :param log_level_int:
        """
        self._queue_name = queue_name
        self._is_use_rabbitpy = is_use_rabbitpy
        self.logger.setLevel(log_level_int)
        self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=is_use_rabbitpy).get_rabbit_cleint()
        self.channel = self.rabbit_client.creat_a_channel()
        self.queue = self.channel.queue_declare(queue=queue_name, durable=True)
        self._lock_for_pika = Lock()
        self._lock_for_count = Lock()
        self._current_time = None
        self.count_per_minute = None
        self._init_count()
        self.logger.info(f'{self.__class__} 被實例化了')

    def _init_count(self):
        with self._lock_for_count:
            self._current_time = time.time()
            self.count_per_minute = 0

    def publish(self, msg: str):
        if self._is_use_rabbitpy:
            self._publish_rabbitpy(msg)
        else:
            self._publish_pika(msg)
        self.logger.debug(f'向{self._queue_name} 隊列,推送消息 {msg}')
        with self._lock_for_count:
            self.count_per_minute += 1
        if time.time() - self._current_time > 60:
            self._init_count()
            self.logger.info(f'一分鐘內推送了 {self.count_per_minute} 條消息到 {self.channel.connection} 中')

    @decorators.tomorrow_threads(100)
    def _publish_rabbitpy(self, msg: str):
        # noinspection PyTypeChecker
        self.channel.basic_publish(
            exchange='',
            routing_key=self._queue_name,
            body=msg,
            properties={'delivery_mode': 2},
        )

    def _publish_pika(self, msg: str):
        with self._lock_for_pika:  # 親測pika多線程publish會出錯。
            self.channel.basic_publish(exchange='',
                                       routing_key=self._queue_name,
                                       body=msg,
                                       properties=BasicProperties(
                                           delivery_mode=2,  # make message persistent
                                       )
                                       )

    def clear(self):
        self.channel.queue_purge(self._queue_name)

    def get_message_count(self):
        if self._is_use_rabbitpy:
            return self._get_message_count_rabbitpy()
        else:
            return self._get_message_count_pika()

    def _get_message_count_pika(self):
        queue = self.channel.queue_declare(queue=self._queue_name, durable=True)
        return queue.method.message_count

    def _get_message_count_rabbitpy(self):
        ch = self.rabbit_client.connection.channel()
        q = rabbitpy.amqp_queue.Queue(ch, self._queue_name)
        q.durable = True
        msg_count = q.declare(passive=True)[0]
        ch.close()
        return msg_count


class RabbitmqConsumer(LoggerMixin):
    def __init__(self, queue_name, consuming_function: Callable = None, threads_num=100, max_retry_times=3, log_level=10, is_print_detail_exception=True, is_use_rabbitpy=1):
        """
        :param queue_name:
        :param consuming_function: 處理消息的函數,函數有且只能有一個參數,參數表示消息。是爲了簡單,放棄策略和模板來強制參數。
        :param threads_num:
        :param max_retry_times:
        :param log_level:
        :param is_print_detail_exception:
        :param is_use_rabbitpy: 是否使用rabbitpy包。不推薦使用pika.
        """
        self._queue_name = queue_name
        self.consuming_function = consuming_function
        self._threads_num = threads_num
        self.threadpool = BoundedThreadPoolExecutor(threads_num)
        self._max_retry_times = max_retry_times
        self.logger.setLevel(log_level)
        self.logger.info(f'{self.__class__} 被實例化')
        self._is_print_detail_exception = is_print_detail_exception
        self._is_use_rabbitpy = is_use_rabbitpy

    def start_consuming_message(self):
        if self._is_use_rabbitpy:
            self.start_consuming_message_rabbitpy()
        else:
            self.start_consuming_message_pika()

    @decorators.keep_circulating(1)  # 是爲了保證不管rabbitmq異常中斷多久,無需重啓程序就能保證恢復後,程序正常。
    def start_consuming_message_rabbitpy(self):
        # noinspection PyArgumentEqualDefault
        channel = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint().creat_a_channel()  # type:  rabbitpy.AMQP         #   此處先固定使用pika.
        channel.queue_declare(queue=self._queue_name, durable=True)
        channel.basic_qos(prefetch_count=self._threads_num)
        for message in channel.basic_consume(self._queue_name):
            body = message.body.decode()
            self.logger.debug(f'從rabbitmq取出的消息是:  {body}')
            self.threadpool.submit(self.__consuming_function_rabbitpy, message)

    def __consuming_function_rabbitpy(self, message, current_retry_times=0):
        if current_retry_times < self._max_retry_times:
            # noinspection PyBroadException
            try:
                self.consuming_function(message.body.decode())
                message.ack()
            except Exception as e:
                self.logger.error(f'函數 {self.consuming_function}  第{current_retry_times+1}次發生錯誤,\n 緣由是{e}', exc_info=self._is_print_detail_exception)
                self.__consuming_function_rabbitpy(message, current_retry_times + 1)
        else:
            self.logger.critical(f'達到最大重試次數 {self._max_retry_times} 後,仍然失敗')  # 錯得超過指定的次數了,就確認消費了。
            message.ack()

    @decorators.keep_circulating(1)  # 是爲了保證不管rabbitmq異常中斷多久,無需重啓程序就能保證恢復後,程序正常。
    def start_consuming_message_pika(self):
        channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel()  # 此處先固定使用pika.
        channel.queue_declare(queue=self._queue_name, durable=True)
        channel.basic_qos(prefetch_count=self._threads_num)

        def callback(ch, method, properties, body):
            body = body.decode()
            self.logger.debug(f'從rabbitmq取出的消息是:  {body}')
            self.threadpool.submit(self.__consuming_function_pika, ch, method, properties, body)

        channel.basic_consume(callback,
                              queue=self._queue_name,
                              # no_ack=True
                              )
        channel.start_consuming()

    @staticmethod
    def ack_message_pika(channelx, delivery_tagx):
        """Note that `channel` must be the same pika channel instance via which
        the message being ACKed was retrieved (AMQP protocol constraint).
        """
        if channelx.is_open:
            channelx.basic_ack(delivery_tagx)
        else:
            # Channel is already closed, so we can't ACK this message;
            # log and/or do something that makes sense for your app in this case.
            pass

    def __consuming_function_pika(self, ch, method, properties, body, current_retry_times=0):
        if current_retry_times < self._max_retry_times:
            # noinspection PyBroadException
            try:
                self.consuming_function(body)
                ch.basic_ack(delivery_tag=method.delivery_tag)
                # self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))
            except Exception as e:
                self.logger.error(f'函數 {self.consuming_function}  第{current_retry_times+1}次發生錯誤,\n 緣由是{e}', exc_info=self._is_print_detail_exception)
                self.__consuming_function_pika(ch, method, properties, body, current_retry_times + 1)
        else:
            self.logger.critical(f'達到最大重試次數 {self._max_retry_times} 後,仍然失敗')  # 錯得超過指定的次數了,就確認消費了。
            ch.basic_ack(delivery_tag=method.delivery_tag)
            # self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))


if __name__ == '__main__':
    with decorators.TimerContextManager():
        # noinspection PyArgumentEqualDefault
        rabbitmq_publisher = RabbitmqPublisher('queue_test', is_use_rabbitpy=1)
        # print(rabbitmq_publisher.get_message_count())

        # def pub(msg):
        #     # print(msg)
        #     rabbitmq_publisher.publish(msg)
        #
        #
        # [pub(str(i)) for i in range(200000)]
        # time.sleep(10)
     # def f(body):
    #     print('....  ', body)
    #     time.sleep(10)  # 模擬作某事須要阻塞10秒種,必須用併發。
    #
    #
    # rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=200, is_use_rabbitpy=0)
    # rabbitmq_consumer.start_consuming_message()
複製代碼

 

反對極端面向過程編程思惟方式,喜歡面向對象和設計模式的解讀,喜歡對比極端面向過程編程和oop編程消耗代碼代碼行數的區別和緣由。致力於使用oop和36種設計模式寫出最高可複用的框架級代碼和使用最少的代碼行數完成任務,致力於使用oop和設計模式來使部分代碼減小90%行,使絕大部分py文件最低減小50%-80%行的寫法。
相關文章
相關標籤/搜索